runtime: unify worker creation path

This commit is contained in:
Keisuke Hirata 2026-06-29 02:44:52 +09:00
parent 9c8fb7dea6
commit 14bb4934a6
No known key found for this signature in database
8 changed files with 601 additions and 524 deletions

View File

@ -1,31 +1,8 @@
use crate::execution::WorkerExecutionStatus; use crate::execution::WorkerExecutionStatus;
use crate::identity::{RuntimeId, WorkerId, WorkerRef}; use crate::identity::{RuntimeId, WorkerId, WorkerRef};
use crate::interaction::WorkerInput;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
/// Intent supplied when a Worker is created.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum WorkerIntent {
Assistant {
#[serde(default, skip_serializing_if = "Option::is_none")]
purpose: Option<String>,
},
Task {
objective: String,
},
Role {
role: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
purpose: Option<String>,
},
}
impl Default for WorkerIntent {
fn default() -> Self {
Self::Assistant { purpose: None }
}
}
/// Profile selector boundary. This is a selector, not a resolved config bundle. /// Profile selector boundary. This is a selector, not a resolved config bundle.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "kind", content = "value", rename_all = "snake_case")] #[serde(tag = "kind", content = "value", rename_all = "snake_case")]
@ -48,77 +25,21 @@ pub struct ConfigBundleRef {
pub digest: String, pub digest: String,
} }
/// Requested capability name plus optional human-readable reason. /// Canonical Runtime Worker creation request.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] ///
pub struct CapabilityRequest { /// Browser/product launch semantics are resolved by a backend before this
pub name: String, /// request is built. The request contains only durable Runtime identity inputs:
#[serde(default, skip_serializing_if = "Option::is_none")] /// a backend-decided profile selector, a previously synced ConfigBundle identity,
pub reason: Option<String>, /// and optional initial user input that is committed in the same transaction as
} /// Worker catalog/transcript persistence. It carries no cwd/workspace path, tool
/// scope, credential, socket/session path, raw config body, or execution binding
impl CapabilityRequest { /// internals.
pub fn named(name: impl Into<String>) -> Self {
Self {
name: name.into(),
reason: None,
}
}
}
/// Opaque workspace reference supplied by a caller.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct WorkspaceRef {
pub name: String,
pub reference: String,
}
/// Opaque mount reference supplied by a caller.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct MountRef {
pub name: String,
pub reference: String,
}
/// Worker creation request for the catalog/lifecycle API.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct CreateWorkerRequest { pub struct CreateWorkerRequest {
pub intent: WorkerIntent,
pub profile: ProfileSelector, pub profile: ProfileSelector,
pub config_bundle: ConfigBundleRef,
#[serde(default, skip_serializing_if = "Option::is_none")] #[serde(default, skip_serializing_if = "Option::is_none")]
pub config_bundle: Option<ConfigBundleRef>, pub initial_input: Option<WorkerInput>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub requested_capabilities: Vec<CapabilityRequest>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub workspace_refs: Vec<WorkspaceRef>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub mount_refs: Vec<MountRef>,
}
impl Default for CreateWorkerRequest {
fn default() -> Self {
Self {
intent: WorkerIntent::default(),
profile: ProfileSelector::default(),
config_bundle: None,
requested_capabilities: Vec::new(),
workspace_refs: Vec::new(),
mount_refs: Vec::new(),
}
}
}
impl CreateWorkerRequest {
/// Create a tools-less Worker using runtime-local default resources.
pub fn tools_less(intent: WorkerIntent, profile: ProfileSelector) -> Self {
Self {
intent,
profile,
config_bundle: None,
requested_capabilities: Vec::new(),
workspace_refs: Vec::new(),
mount_refs: Vec::new(),
}
}
} }
/// Worker lifecycle status for the in-memory embedded runtime. /// Worker lifecycle status for the in-memory embedded runtime.
@ -144,10 +65,8 @@ pub struct WorkerSummary {
pub worker_id: WorkerId, pub worker_id: WorkerId,
pub status: WorkerStatus, pub status: WorkerStatus,
pub execution: WorkerExecutionStatus, pub execution: WorkerExecutionStatus,
pub intent: WorkerIntent,
pub profile: ProfileSelector, pub profile: ProfileSelector,
pub requested_capability_count: usize, pub config_bundle: ConfigBundleRef,
pub has_config_bundle: bool,
pub transcript_len: usize, pub transcript_len: usize,
pub last_event_id: u64, pub last_event_id: u64,
} }
@ -160,16 +79,8 @@ pub struct WorkerDetail {
pub worker_id: WorkerId, pub worker_id: WorkerId,
pub status: WorkerStatus, pub status: WorkerStatus,
pub execution: WorkerExecutionStatus, pub execution: WorkerExecutionStatus,
pub intent: WorkerIntent,
pub profile: ProfileSelector, pub profile: ProfileSelector,
#[serde(default, skip_serializing_if = "Option::is_none")] pub config_bundle: ConfigBundleRef,
pub config_bundle: Option<ConfigBundleRef>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub requested_capabilities: Vec<CapabilityRequest>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub workspace_refs: Vec<WorkspaceRef>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub mount_refs: Vec<MountRef>,
pub transcript_len: usize, pub transcript_len: usize,
pub last_event_id: u64, pub last_event_id: u64,
} }

View File

@ -35,6 +35,9 @@ pub enum RuntimeError {
message: String, message: String,
}, },
#[error("worker creation has no execution backend: {message}")]
ExecutionBackendUnavailable { message: String },
#[error("worker {worker_id} execution {operation:?} returned {outcome:?}: {message}")] #[error("worker {worker_id} execution {operation:?} returned {outcome:?}: {message}")]
WorkerExecutionRejected { WorkerExecutionRejected {
worker_id: WorkerId, worker_id: WorkerId,

View File

@ -824,6 +824,7 @@ fn status_for_runtime_error(error: &RuntimeError) -> StatusCode {
} }
RuntimeError::RuntimeStopped { .. } RuntimeError::RuntimeStopped { .. }
| RuntimeError::WorkerExecutionUnavailable { .. } | RuntimeError::WorkerExecutionUnavailable { .. }
| RuntimeError::ExecutionBackendUnavailable { .. }
| RuntimeError::WorkerExecutionRejected { .. } => StatusCode::CONFLICT, | RuntimeError::WorkerExecutionRejected { .. } => StatusCode::CONFLICT,
RuntimeError::LimitTooLarge { .. } RuntimeError::LimitTooLarge { .. }
| RuntimeError::InvalidRequest(_) | RuntimeError::InvalidRequest(_)
@ -846,6 +847,7 @@ fn code_for_runtime_error(error: &RuntimeError) -> &'static str {
RuntimeError::WrongRuntimeCursor { .. } => "wrong_runtime_cursor", RuntimeError::WrongRuntimeCursor { .. } => "wrong_runtime_cursor",
RuntimeError::WorkerNotFound { .. } => "worker_not_found", RuntimeError::WorkerNotFound { .. } => "worker_not_found",
RuntimeError::WorkerExecutionUnavailable { .. } => "worker_execution_unavailable", RuntimeError::WorkerExecutionUnavailable { .. } => "worker_execution_unavailable",
RuntimeError::ExecutionBackendUnavailable { .. } => "execution_backend_unavailable",
RuntimeError::WorkerExecutionRejected { .. } => "worker_execution_rejected", RuntimeError::WorkerExecutionRejected { .. } => "worker_execution_rejected",
RuntimeError::LimitTooLarge { .. } => "limit_too_large", RuntimeError::LimitTooLarge { .. } => "limit_too_large",
RuntimeError::InvalidRequest(_) => "invalid_request", RuntimeError::InvalidRequest(_) => "invalid_request",
@ -872,7 +874,10 @@ pub enum RuntimeHttpServerError {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use crate::catalog::{CapabilityRequest, ProfileSelector, WorkerIntent}; use crate::catalog::{ConfigBundleRef, ProfileSelector};
use crate::config_bundle::{
ConfigBundle, ConfigBundleMetadata, ConfigBundleProvenance, ConfigProfileDescriptor,
};
use crate::execution::{ use crate::execution::{
WorkerExecutionBackend, WorkerExecutionHandle, WorkerExecutionOperation, WorkerExecutionBackend, WorkerExecutionHandle, WorkerExecutionOperation,
WorkerExecutionResult, WorkerExecutionRunState, WorkerExecutionSpawnRequest, WorkerExecutionResult, WorkerExecutionRunState, WorkerExecutionSpawnRequest,
@ -883,16 +888,38 @@ mod tests {
use axum::http::Method; use axum::http::Method;
use tower::ServiceExt; use tower::ServiceExt;
fn task_request(objective: &str) -> CreateWorkerRequest { fn test_bundle(profile: ProfileSelector) -> ConfigBundle {
CreateWorkerRequest { ConfigBundle {
intent: WorkerIntent::Task { metadata: ConfigBundleMetadata {
objective: objective.to_string(), id: "http-test-bundle".to_string(),
digest: String::new(),
revision: "test".to_string(),
workspace_id: "test-workspace".to_string(),
created_at: "test".to_string(),
provenance: ConfigBundleProvenance {
source: "test".to_string(),
detail: None,
}, },
profile: ProfileSelector::Builtin("builtin:coder".to_string()), },
config_bundle: None, profiles: vec![ConfigProfileDescriptor {
requested_capabilities: vec![CapabilityRequest::named("read")], selector: profile,
workspace_refs: Vec::new(), label: Some("test".to_string()),
mount_refs: Vec::new(), }],
declarations: Vec::new(),
}
.with_computed_digest()
}
fn task_request(_objective: &str) -> CreateWorkerRequest {
let profile = ProfileSelector::Builtin("builtin:coder".to_string());
let bundle = test_bundle(profile.clone());
CreateWorkerRequest {
profile,
config_bundle: ConfigBundleRef {
id: bundle.metadata.id,
digest: bundle.metadata.digest,
},
initial_input: None,
} }
} }
@ -969,6 +996,11 @@ mod tests {
let runtime = let runtime =
Runtime::with_execution_backend(RuntimeOptions::default(), Arc::new(AcceptingBackend)) Runtime::with_execution_backend(RuntimeOptions::default(), Arc::new(AcceptingBackend))
.unwrap(); .unwrap();
runtime
.store_config_bundle(test_bundle(ProfileSelector::Builtin(
"builtin:coder".to_string(),
)))
.unwrap();
let app = runtime_http_router(runtime.clone(), None); let app = runtime_http_router(runtime.clone(), None);
let response = json_request( let response = json_request(
@ -1091,15 +1123,89 @@ mod tests {
#[cfg(all(test, feature = "ws-server"))] #[cfg(all(test, feature = "ws-server"))]
mod ws_tests { mod ws_tests {
use super::*; use super::*;
use crate::catalog::{ConfigBundleRef, ProfileSelector};
use crate::config_bundle::{
ConfigBundle, ConfigBundleMetadata, ConfigBundleProvenance, ConfigProfileDescriptor,
};
use crate::execution::{
WorkerExecutionBackend, WorkerExecutionHandle, WorkerExecutionOperation,
WorkerExecutionResult, WorkerExecutionRunState, WorkerExecutionSpawnRequest,
WorkerExecutionSpawnResult,
};
use crate::interaction::WorkerInput;
use futures::{SinkExt, StreamExt}; use futures::{SinkExt, StreamExt};
use std::sync::Arc;
use tokio_tungstenite::connect_async; use tokio_tungstenite::connect_async;
use tokio_tungstenite::tungstenite::Message; use tokio_tungstenite::tungstenite::Message;
struct WsBackend;
impl WorkerExecutionBackend for WsBackend {
fn backend_id(&self) -> &str {
"ws-test"
}
fn spawn_worker(&self, request: WorkerExecutionSpawnRequest) -> WorkerExecutionSpawnResult {
WorkerExecutionSpawnResult::Connected {
handle: WorkerExecutionHandle::new(request.worker_ref, self.backend_id()),
run_state: WorkerExecutionRunState::Idle,
}
}
fn dispatch_input(
&self,
_handle: &WorkerExecutionHandle,
_input: WorkerInput,
) -> WorkerExecutionResult {
WorkerExecutionResult::accepted(
WorkerExecutionOperation::Input,
WorkerExecutionRunState::Idle,
)
}
}
fn ws_test_bundle(profile: ProfileSelector) -> ConfigBundle {
ConfigBundle {
metadata: ConfigBundleMetadata {
id: "ws-test-bundle".to_string(),
digest: String::new(),
revision: "test".to_string(),
workspace_id: "test".to_string(),
created_at: "test".to_string(),
provenance: ConfigBundleProvenance {
source: "test".to_string(),
detail: None,
},
},
profiles: vec![ConfigProfileDescriptor {
selector: profile,
label: Some("ws".to_string()),
}],
declarations: Vec::new(),
}
.with_computed_digest()
}
fn ws_create_request() -> CreateWorkerRequest {
let bundle = ws_test_bundle(ProfileSelector::RuntimeDefault);
CreateWorkerRequest {
profile: ProfileSelector::RuntimeDefault,
config_bundle: ConfigBundleRef {
id: bundle.metadata.id,
digest: bundle.metadata.digest,
},
initial_input: None,
}
}
async fn spawn_runtime_server() -> (Runtime, WorkerRef, String) { async fn spawn_runtime_server() -> (Runtime, WorkerRef, String) {
let runtime = Runtime::new_memory(); let runtime =
let worker = runtime Runtime::with_execution_backend(RuntimeOptions::default(), Arc::new(WsBackend))
.create_worker(CreateWorkerRequest::default())
.unwrap(); .unwrap();
runtime
.store_config_bundle(ws_test_bundle(ProfileSelector::RuntimeDefault))
.unwrap();
let worker = runtime.create_worker(ws_create_request()).unwrap();
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap(); let addr = listener.local_addr().unwrap();
tokio::spawn({ tokio::spawn({
@ -1169,9 +1275,7 @@ mod ws_tests {
#[tokio::test] #[tokio::test]
async fn runtime_ws_cursor_resume_is_duplicate_safe_and_filters_workers() { async fn runtime_ws_cursor_resume_is_duplicate_safe_and_filters_workers() {
let (runtime, worker_ref, url) = spawn_runtime_server().await; let (runtime, worker_ref, url) = spawn_runtime_server().await;
let other = runtime let other = runtime.create_worker(ws_create_request()).unwrap();
.create_worker(CreateWorkerRequest::default())
.unwrap();
let first = runtime let first = runtime
.observe_worker_event( .observe_worker_event(
&worker_ref, &worker_ref,

View File

@ -24,4 +24,5 @@ mod runtime;
#[cfg(feature = "fs-store")] #[cfg(feature = "fs-store")]
pub use fs_store::{FsRuntimeStore, FsRuntimeStoreOptions}; pub use fs_store::{FsRuntimeStore, FsRuntimeStoreOptions};
pub use management::{RuntimeLimits, RuntimeOptions};
pub use runtime::Runtime; pub use runtime::Runtime;

View File

@ -4,9 +4,9 @@ use crate::catalog::{
}; };
use crate::config_bundle::{ use crate::config_bundle::{
ConfigBundle, ConfigBundleAvailability, ConfigBundleSummary, validate_config_bundle, ConfigBundle, ConfigBundleAvailability, ConfigBundleSummary, validate_config_bundle,
validate_config_bundle_ref, validate_profile_selector, validate_config_bundle_ref,
}; };
use crate::diagnostics::{DiagnosticSeverity, RuntimeDiagnostic}; use crate::diagnostics::RuntimeDiagnostic;
use crate::error::RuntimeError; use crate::error::RuntimeError;
use crate::execution::{ use crate::execution::{
WorkerExecutionBackend, WorkerExecutionBackendKind, WorkerExecutionBackendRef, WorkerExecutionBackend, WorkerExecutionBackendKind, WorkerExecutionBackendRef,
@ -231,55 +231,116 @@ impl Runtime {
Ok(event_id) Ok(event_id)
} }
/// Create a Worker in the embedded catalog. /// Create a Worker through the canonical ConfigBundle + execution backend path.
pub fn create_worker( pub fn create_worker(
&self, &self,
request: CreateWorkerRequest, request: CreateWorkerRequest,
) -> Result<WorkerDetail, RuntimeError> { ) -> Result<WorkerDetail, RuntimeError> {
let (backend, worker_ref, spawn_request) = {
let mut state = self.lock()?; let mut state = self.lock()?;
state.ensure_running()?; state.ensure_running()?;
validate_create_worker_request(&request)?; validate_create_worker_request(&request)?;
state.validate_worker_config_boundary(&request)?; state.validate_worker_config_boundary(&request)?;
let backend = state.execution_backend.clone().ok_or_else(|| {
RuntimeError::ExecutionBackendUnavailable {
message: "worker creation requires an execution backend".to_string(),
}
})?;
let worker_id = WorkerId::generated(state.next_worker_sequence); let worker_id = WorkerId::generated(state.next_worker_sequence);
state.next_worker_sequence += 1; state.next_worker_sequence += 1;
let worker_ref = WorkerRef::new(state.runtime_id.clone(), worker_id.clone()); let worker_ref = WorkerRef::new(state.runtime_id.clone(), worker_id.clone());
let backend = state.execution_backend.clone();
let spawn_request = backend.as_ref().map(|_| WorkerExecutionSpawnRequest {
worker_ref: worker_ref.clone(),
request: request.clone(),
context: self.execution_context(worker_ref.clone()),
});
let event_id = state.push_event( let event_id = state.push_event(
Some(worker_ref.clone()), Some(worker_ref.clone()),
RuntimeEventKind::WorkerCreated, RuntimeEventKind::WorkerCreated,
format!("worker {worker_id} created"), format!("worker {worker_id} created"),
); );
let mut transcript = Vec::new();
let mut next_transcript_sequence = 1;
if let Some(input) = request.initial_input.clone() {
let role = match input.kind {
WorkerInputKind::User => TranscriptRole::User,
WorkerInputKind::System => TranscriptRole::System,
};
transcript.push(TranscriptEntry {
sequence: next_transcript_sequence,
worker_ref: worker_ref.clone(),
role,
content: input.content,
event_id,
});
next_transcript_sequence += 1;
}
let record = WorkerRecord { let record = WorkerRecord {
worker_ref: worker_ref.clone(), worker_ref: worker_ref.clone(),
worker_id: worker_id.clone(), worker_id: worker_id.clone(),
status: WorkerStatus::Running, status: WorkerStatus::Running,
request, request: request.clone(),
execution: WorkerExecutionStatus::unconnected(), execution: WorkerExecutionStatus::unconnected(),
execution_handle: None, execution_handle: None,
transcript: Vec::new(), transcript,
next_transcript_sequence: 1, next_transcript_sequence,
last_event_id: event_id, last_event_id: event_id,
}; };
let detail = record.detail(&state.runtime_id); state.workers.insert(worker_id, record);
state.emit_create_diagnostics(&detail); let spawn_request = WorkerExecutionSpawnRequest {
state.workers.insert(worker_id.clone(), record); worker_ref: worker_ref.clone(),
state.persist_runtime_snapshot()?; request,
state.persist_worker(&worker_id)?; context: self.execution_context(worker_ref.clone()),
state.persist_event_by_id(event_id)?; };
drop(state); (backend, worker_ref, spawn_request)
};
if let (Some(backend), Some(spawn_request)) = (backend, spawn_request) { let spawn_result = backend.spawn_worker(spawn_request);
let result = backend.spawn_worker(spawn_request); let (handle, run_state) = match spawn_result {
self.apply_spawn_result(&worker_ref, result) WorkerExecutionSpawnResult::Connected { handle, run_state } => (handle, run_state),
WorkerExecutionSpawnResult::Rejected(result)
| WorkerExecutionSpawnResult::Errored(result) => {
self.rollback_failed_create(&worker_ref)?;
return Err(RuntimeError::WorkerExecutionRejected {
worker_id: worker_ref.worker_id.clone(),
operation: result.operation,
outcome: result.outcome,
message: result.message_or_default(),
result,
});
}
};
if let Some(initial_input) = {
let state = self.lock()?;
state.worker(&worker_ref)?.request.initial_input.clone()
} {
let dispatch_result = backend.dispatch_input(&handle, initial_input);
if !dispatch_result.is_accepted() {
let _ = backend.stop_worker(&handle);
self.rollback_failed_create(&worker_ref)?;
return Err(RuntimeError::WorkerExecutionRejected {
worker_id: worker_ref.worker_id.clone(),
operation: dispatch_result.operation,
outcome: dispatch_result.outcome,
message: dispatch_result.message_or_default(),
result: dispatch_result,
});
}
self.commit_created_worker(
&worker_ref,
handle,
WorkerExecutionRunState::Busy,
WorkerExecutionResult::accepted(
WorkerExecutionOperation::Input,
WorkerExecutionRunState::Busy,
),
)
} else { } else {
Ok(detail) self.commit_created_worker(
&worker_ref,
handle,
run_state,
WorkerExecutionResult::accepted(WorkerExecutionOperation::Spawn, run_state),
)
} }
} }
@ -414,38 +475,41 @@ impl Runtime {
}) })
} }
fn apply_spawn_result( fn commit_created_worker(
&self, &self,
worker_ref: &WorkerRef, worker_ref: &WorkerRef,
result: WorkerExecutionSpawnResult, handle: WorkerExecutionHandle,
run_state: WorkerExecutionRunState,
result: WorkerExecutionResult,
) -> Result<WorkerDetail, RuntimeError> { ) -> Result<WorkerDetail, RuntimeError> {
let mut state = self.lock()?; let mut state = self.lock()?;
let runtime_id = state.runtime_id.clone(); let runtime_id = state.runtime_id.clone();
let detail = { let detail = {
let worker = state.worker_mut(worker_ref)?; let worker = state.worker_mut(worker_ref)?;
match result {
WorkerExecutionSpawnResult::Connected { handle, run_state } => {
worker.execution_handle = Some(handle); worker.execution_handle = Some(handle);
worker.execution = WorkerExecutionStatus::connected(run_state).with_result( worker.execution = WorkerExecutionStatus::connected(run_state).with_result(result);
WorkerExecutionResult::accepted(WorkerExecutionOperation::Spawn, run_state),
);
}
WorkerExecutionSpawnResult::Rejected(result)
| WorkerExecutionSpawnResult::Errored(result) => {
worker.execution_handle = None;
worker.execution = WorkerExecutionStatus {
backend: WorkerExecutionBackendKind::Connected,
run_state: result.run_state,
last_result: Some(result),
};
}
}
worker.detail(&runtime_id) worker.detail(&runtime_id)
}; };
state.persist_runtime_snapshot()?;
state.persist_worker(&worker_ref.worker_id)?; state.persist_worker(&worker_ref.worker_id)?;
let worker = state.worker(worker_ref)?;
for entry in &worker.transcript {
state.persist_transcript_entry(&worker_ref.worker_id, entry.sequence)?;
}
state.persist_event_by_id(detail.last_event_id)?;
Ok(detail) Ok(detail)
} }
fn rollback_failed_create(&self, worker_ref: &WorkerRef) -> Result<(), RuntimeError> {
let mut state = self.lock()?;
if let Some(record) = state.workers.remove(&worker_ref.worker_id) {
state.events.retain(|event| {
event.id != record.last_event_id || event.worker_ref.as_ref() != Some(worker_ref)
});
}
Ok(())
}
fn record_execution_result( fn record_execution_result(
&self, &self,
worker_ref: &WorkerRef, worker_ref: &WorkerRef,
@ -827,7 +891,6 @@ struct RuntimeState {
execution_backend: Option<WorkerExecutionBackendRef>, execution_backend: Option<WorkerExecutionBackendRef>,
next_worker_sequence: u64, next_worker_sequence: u64,
next_event_id: u64, next_event_id: u64,
next_diagnostic_id: u64,
workers: BTreeMap<WorkerId, WorkerRecord>, workers: BTreeMap<WorkerId, WorkerRecord>,
config_bundles: BTreeMap<String, ConfigBundle>, config_bundles: BTreeMap<String, ConfigBundle>,
events: Vec<RuntimeEvent>, events: Vec<RuntimeEvent>,
@ -852,7 +915,6 @@ impl RuntimeState {
execution_backend: None, execution_backend: None,
next_worker_sequence: 1, next_worker_sequence: 1,
next_event_id: 1, next_event_id: 1,
next_diagnostic_id: 1,
workers: BTreeMap::new(), workers: BTreeMap::new(),
config_bundles: BTreeMap::new(), config_bundles: BTreeMap::new(),
events: Vec::new(), events: Vec::new(),
@ -883,7 +945,6 @@ impl RuntimeState {
execution_backend: None, execution_backend: None,
next_worker_sequence: 1, next_worker_sequence: 1,
next_event_id: 1, next_event_id: 1,
next_diagnostic_id: 1,
workers: BTreeMap::new(), workers: BTreeMap::new(),
config_bundles: BTreeMap::new(), config_bundles: BTreeMap::new(),
events: Vec::new(), events: Vec::new(),
@ -1131,8 +1192,7 @@ impl RuntimeState {
&self, &self,
request: &CreateWorkerRequest, request: &CreateWorkerRequest,
) -> Result<(), RuntimeError> { ) -> Result<(), RuntimeError> {
match &request.config_bundle { let reference = &request.config_bundle;
Some(reference) => {
let availability = self.check_config_bundle_ref(reference)?; let availability = self.check_config_bundle_ref(reference)?;
let bundle = self let bundle = self
.config_bundles .config_bundles
@ -1144,24 +1204,11 @@ impl RuntimeState {
return Err(RuntimeError::InvalidProfileSelector { return Err(RuntimeError::InvalidProfileSelector {
profile: profile_label(&request.profile), profile: profile_label(&request.profile),
bundle_id: Some(reference.id.clone()), bundle_id: Some(reference.id.clone()),
message: "profile selector is not declared by synced config bundle" message: "profile selector is not declared by synced config bundle".to_string(),
.to_string(),
}); });
} }
Ok(()) Ok(())
} }
None => match &request.profile {
ProfileSelector::RuntimeDefault | ProfileSelector::Builtin(_) => {
validate_profile_selector(request.profile.clone(), None)
}
ProfileSelector::Named(_) => Err(RuntimeError::InvalidProfileSelector {
profile: profile_label(&request.profile),
bundle_id: None,
message: "named profiles require a synced config bundle reference".to_string(),
}),
},
}
}
fn ensure_worker_ref(&self, worker_ref: &WorkerRef) -> Result<(), RuntimeError> { fn ensure_worker_ref(&self, worker_ref: &WorkerRef) -> Result<(), RuntimeError> {
if worker_ref.runtime_id != self.runtime_id { if worker_ref.runtime_id != self.runtime_id {
@ -1373,43 +1420,6 @@ impl RuntimeState {
status.run_state = next_run_state; status.run_state = next_run_state;
true true
} }
fn push_diagnostic(
&mut self,
severity: DiagnosticSeverity,
code: impl Into<String>,
message: impl Into<String>,
worker_ref: Option<WorkerRef>,
) {
let id = self.next_diagnostic_id;
self.next_diagnostic_id += 1;
self.diagnostics.push(RuntimeDiagnostic {
id,
severity,
code: code.into(),
message: message.into(),
worker_ref,
});
}
fn emit_create_diagnostics(&mut self, detail: &WorkerDetail) {
if detail.config_bundle.is_none() {
self.push_diagnostic(
DiagnosticSeverity::Info,
"runtime.local_default_resources",
"worker created without ConfigBundleRef; runtime-local defaults are assumed",
Some(detail.worker_ref.clone()),
);
}
if detail.requested_capabilities.is_empty() {
self.push_diagnostic(
DiagnosticSeverity::Info,
"worker.tools_less",
"worker created without requested tool capabilities",
Some(detail.worker_ref.clone()),
);
}
}
} }
#[derive(Debug)] #[derive(Debug)]
@ -1433,10 +1443,8 @@ impl WorkerRecord {
worker_id: self.worker_id.clone(), worker_id: self.worker_id.clone(),
status: self.status, status: self.status,
execution: self.execution.clone(), execution: self.execution.clone(),
intent: self.request.intent.clone(),
profile: self.request.profile.clone(), profile: self.request.profile.clone(),
requested_capability_count: self.request.requested_capabilities.len(), config_bundle: self.request.config_bundle.clone(),
has_config_bundle: self.request.config_bundle.is_some(),
transcript_len: self.transcript.len(), transcript_len: self.transcript.len(),
last_event_id: self.last_event_id, last_event_id: self.last_event_id,
} }
@ -1449,12 +1457,8 @@ impl WorkerRecord {
worker_id: self.worker_id.clone(), worker_id: self.worker_id.clone(),
status: self.status, status: self.status,
execution: self.execution.clone(), execution: self.execution.clone(),
intent: self.request.intent.clone(),
profile: self.request.profile.clone(), profile: self.request.profile.clone(),
config_bundle: self.request.config_bundle.clone(), config_bundle: self.request.config_bundle.clone(),
requested_capabilities: self.request.requested_capabilities.clone(),
workspace_refs: self.request.workspace_refs.clone(),
mount_refs: self.request.mount_refs.clone(),
transcript_len: self.transcript.len(), transcript_len: self.transcript.len(),
last_event_id: self.last_event_id, last_event_id: self.last_event_id,
} }
@ -1483,17 +1487,20 @@ fn profile_label(selector: &ProfileSelector) -> String {
} }
fn validate_create_worker_request(request: &CreateWorkerRequest) -> Result<(), RuntimeError> { fn validate_create_worker_request(request: &CreateWorkerRequest) -> Result<(), RuntimeError> {
if let crate::catalog::WorkerIntent::Task { objective } = &request.intent { if request.config_bundle.id.trim().is_empty() {
if objective.trim().is_empty() {
return Err(RuntimeError::InvalidRequest( return Err(RuntimeError::InvalidRequest(
"task objective must not be empty".to_string(), "config_bundle.id must not be empty".to_string(),
)); ));
} }
} if request.config_bundle.digest.trim().is_empty() {
for capability in &request.requested_capabilities {
if capability.name.trim().is_empty() {
return Err(RuntimeError::InvalidRequest( return Err(RuntimeError::InvalidRequest(
"capability name must not be empty".to_string(), "config_bundle.digest must not be empty".to_string(),
));
}
if let Some(input) = &request.initial_input {
if input.content.trim().is_empty() {
return Err(RuntimeError::InvalidRequest(
"initial_input.content must not be empty".to_string(),
)); ));
} }
} }
@ -1512,7 +1519,7 @@ fn validate_worker_input(input: &WorkerInput) -> Result<(), RuntimeError> {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use crate::catalog::{CapabilityRequest, ConfigBundleRef, ProfileSelector, WorkerIntent}; use crate::catalog::{ConfigBundleRef, ProfileSelector};
use crate::config_bundle::{ use crate::config_bundle::{
ConfigBundle, ConfigBundleMetadata, ConfigBundleProvenance, ConfigDeclaration, ConfigBundle, ConfigBundleMetadata, ConfigBundleProvenance, ConfigDeclaration,
ConfigDeclarationKind, ConfigProfileDescriptor, ConfigDeclarationKind, ConfigProfileDescriptor,
@ -1524,19 +1531,45 @@ mod tests {
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
fn task_request(objective: &str) -> CreateWorkerRequest { fn task_request(_objective: &str) -> CreateWorkerRequest {
let profile = ProfileSelector::Builtin("builtin:coder".to_string());
let bundle = test_bundle_for_profile(profile.clone());
CreateWorkerRequest { CreateWorkerRequest {
intent: WorkerIntent::Task { profile,
objective: objective.to_string(), config_bundle: ConfigBundleRef {
id: bundle.metadata.id,
digest: bundle.metadata.digest,
}, },
profile: ProfileSelector::Builtin("builtin:coder".to_string()), initial_input: None,
config_bundle: None,
requested_capabilities: vec![CapabilityRequest::named("read")],
workspace_refs: Vec::new(),
mount_refs: Vec::new(),
} }
} }
fn test_bundle_for_profile(profile: ProfileSelector) -> ConfigBundle {
ConfigBundle {
metadata: ConfigBundleMetadata {
id: "bundle-1".to_string(),
digest: String::new(),
revision: "rev-1".to_string(),
workspace_id: "workspace-1".to_string(),
created_at: "2026-06-26T00:00:00Z".to_string(),
provenance: ConfigBundleProvenance {
source: "workspace-backend".to_string(),
detail: Some("profile-sync".to_string()),
},
},
profiles: vec![ConfigProfileDescriptor {
selector: profile,
label: Some("Coder".to_string()),
}],
declarations: vec![ConfigDeclaration {
kind: ConfigDeclarationKind::CapabilityGrant,
name: "read".to_string(),
reference: "capability:read".to_string(),
}],
}
.with_computed_digest()
}
#[derive(Default)] #[derive(Default)]
struct TestExecutionBackend { struct TestExecutionBackend {
dispatch_result: Mutex<Option<WorkerExecutionResult>>, dispatch_result: Mutex<Option<WorkerExecutionResult>>,
@ -1609,77 +1642,58 @@ mod tests {
} }
fn runtime_with_backend() -> Runtime { fn runtime_with_backend() -> Runtime {
Runtime::with_execution_backend( let runtime = Runtime::with_execution_backend(
RuntimeOptions::default(), RuntimeOptions::default(),
Arc::new(TestExecutionBackend::default()), Arc::new(TestExecutionBackend::default()),
) )
.unwrap() .unwrap();
runtime.store_config_bundle(test_bundle()).unwrap();
runtime
} }
fn runtime_and_backend() -> (Runtime, Arc<TestExecutionBackend>) { fn runtime_and_backend() -> (Runtime, Arc<TestExecutionBackend>) {
let backend = Arc::new(TestExecutionBackend::default()); let backend = Arc::new(TestExecutionBackend::default());
let runtime = let runtime =
Runtime::with_execution_backend(RuntimeOptions::default(), backend.clone()).unwrap(); Runtime::with_execution_backend(RuntimeOptions::default(), backend.clone()).unwrap();
runtime.store_config_bundle(test_bundle()).unwrap();
(runtime, backend) (runtime, backend)
} }
fn test_bundle() -> ConfigBundle { fn test_bundle() -> ConfigBundle {
ConfigBundle { test_bundle_for_profile(ProfileSelector::Builtin("builtin:coder".to_string()))
metadata: ConfigBundleMetadata {
id: "bundle-1".to_string(),
digest: String::new(),
revision: "rev-1".to_string(),
workspace_id: "workspace-1".to_string(),
created_at: "2026-06-26T00:00:00Z".to_string(),
provenance: ConfigBundleProvenance {
source: "workspace-backend".to_string(),
detail: Some("profile-sync".to_string()),
},
},
profiles: vec![ConfigProfileDescriptor {
selector: ProfileSelector::Builtin("builtin:coder".to_string()),
label: Some("Coder".to_string()),
}],
declarations: vec![ConfigDeclaration {
kind: ConfigDeclarationKind::CapabilityGrant,
name: "read".to_string(),
reference: "capability:read".to_string(),
}],
}
.with_computed_digest()
} }
fn bundled_task_request(objective: &str, bundle: &ConfigBundle) -> CreateWorkerRequest { fn bundled_task_request(objective: &str, bundle: &ConfigBundle) -> CreateWorkerRequest {
let mut request = task_request(objective); let mut request = task_request(objective);
request.config_bundle = Some(ConfigBundleRef { request.config_bundle = ConfigBundleRef {
id: bundle.metadata.id.clone(), id: bundle.metadata.id.clone(),
digest: bundle.metadata.digest.clone(), digest: bundle.metadata.digest.clone(),
}); };
request request
} }
#[test] #[test]
fn create_list_and_detail_preserve_runtime_worker_authority() { fn create_list_and_detail_preserve_runtime_worker_authority() {
let runtime = Runtime::new_memory(); let runtime = runtime_with_backend();
let detail = runtime.create_worker(task_request("implement v0")).unwrap(); let detail = runtime.create_worker(task_request("implement v0")).unwrap();
assert_eq!(detail.worker_ref.runtime_id, runtime.runtime_id().unwrap()); assert_eq!(detail.worker_ref.runtime_id, runtime.runtime_id().unwrap());
assert_eq!(detail.status, WorkerStatus::Running); assert_eq!(detail.status, WorkerStatus::Running);
assert!(detail.config_bundle.is_none()); assert_eq!(detail.config_bundle.id, "bundle-1");
let list = runtime.list_workers().unwrap(); let list = runtime.list_workers().unwrap();
assert_eq!(list.len(), 1); assert_eq!(list.len(), 1);
assert_eq!(list[0].worker_ref, detail.worker_ref); assert_eq!(list[0].worker_ref, detail.worker_ref);
assert_eq!(list[0].requested_capability_count, 1); assert_eq!(list[0].config_bundle, detail.config_bundle);
let fetched = runtime.worker_detail(&detail.worker_ref).unwrap(); let fetched = runtime.worker_detail(&detail.worker_ref).unwrap();
assert_eq!(fetched.worker_id, detail.worker_id); assert_eq!(fetched.worker_id, detail.worker_id);
assert_eq!(fetched.intent, detail.intent); assert_eq!(fetched.profile, detail.profile);
} }
#[test] #[test]
fn synced_config_bundle_is_stored_checked_and_used_for_worker_creation() { fn synced_config_bundle_is_stored_checked_and_used_for_worker_creation() {
let runtime = Runtime::new_memory(); let runtime = runtime_with_backend();
let bundle = test_bundle(); let bundle = test_bundle();
let availability = runtime.store_config_bundle(bundle.clone()).unwrap(); let availability = runtime.store_config_bundle(bundle.clone()).unwrap();
assert_eq!(availability.reference.id, "bundle-1"); assert_eq!(availability.reference.id, "bundle-1");
@ -1697,7 +1711,7 @@ mod tests {
let detail = runtime let detail = runtime
.create_worker(bundled_task_request("synced", &bundle)) .create_worker(bundled_task_request("synced", &bundle))
.unwrap(); .unwrap();
assert_eq!(detail.config_bundle, Some(availability.reference)); assert_eq!(detail.config_bundle, availability.reference);
} }
#[test] #[test]
@ -1746,8 +1760,8 @@ mod tests {
#[test] #[test]
fn rejects_worker_refs_from_another_runtime() { fn rejects_worker_refs_from_another_runtime() {
let runtime_a = Runtime::new_memory(); let runtime_a = runtime_with_backend();
let runtime_b = Runtime::new_memory(); let runtime_b = runtime_with_backend();
let detail = runtime_a.create_worker(task_request("runtime a")).unwrap(); let detail = runtime_a.create_worker(task_request("runtime a")).unwrap();
let err = runtime_b.worker_detail(&detail.worker_ref).unwrap_err(); let err = runtime_b.worker_detail(&detail.worker_ref).unwrap_err();
@ -1755,52 +1769,27 @@ mod tests {
} }
#[test] #[test]
fn tools_less_worker_without_config_bundle_uses_local_defaults_and_diagnostics() { fn create_worker_without_execution_backend_is_rejected_and_not_persisted() {
let runtime = Runtime::new_memory(); let runtime = Runtime::new_memory();
let detail = runtime runtime.store_config_bundle(test_bundle()).unwrap();
.create_worker(CreateWorkerRequest::tools_less( let error = runtime
WorkerIntent::default(), .create_worker(task_request("no backend"))
ProfileSelector::RuntimeDefault, .unwrap_err();
)) assert!(matches!(
.unwrap(); error,
RuntimeError::ExecutionBackendUnavailable { .. }
assert!(detail.config_bundle.is_none()); ));
assert!(detail.requested_capabilities.is_empty()); assert!(runtime.list_workers().unwrap().is_empty());
let diagnostics = runtime.diagnostics().unwrap();
assert_eq!(diagnostics.len(), 2);
assert!(
diagnostics
.iter()
.any(|diagnostic| diagnostic.code == "runtime.local_default_resources")
);
assert!(
diagnostics
.iter()
.any(|diagnostic| diagnostic.code == "worker.tools_less")
);
} }
#[test] #[test]
fn backend_unconnected_worker_input_is_rejected_and_not_transcribed() { fn create_worker_missing_config_bundle_is_rejected_before_backend() {
let runtime = Runtime::new_memory(); let runtime = Runtime::new_memory();
let detail = runtime.create_worker(task_request("placeholder")).unwrap(); let error = runtime
assert_eq!( .create_worker(task_request("missing bundle"))
detail.execution.backend,
WorkerExecutionBackendKind::Unconnected
);
let err = runtime
.send_input(&detail.worker_ref, WorkerInput::user("must reject"))
.unwrap_err(); .unwrap_err();
assert!(matches!( assert!(matches!(error, RuntimeError::ConfigBundleMissing { .. }));
err, assert!(runtime.list_workers().unwrap().is_empty());
RuntimeError::WorkerExecutionUnavailable { .. }
));
let projection = runtime
.transcript_projection(&detail.worker_ref, TranscriptQuery::new(0, 1))
.unwrap();
assert_eq!(projection.total_items, 0);
} }
#[test] #[test]
@ -1885,6 +1874,7 @@ mod tests {
let runtime = let runtime =
Runtime::with_execution_backend(RuntimeOptions::default(), Arc::new(InputOnlyBackend)) Runtime::with_execution_backend(RuntimeOptions::default(), Arc::new(InputOnlyBackend))
.unwrap(); .unwrap();
runtime.store_config_bundle(test_bundle()).unwrap();
let detail = runtime.create_worker(task_request("no stop")).unwrap(); let detail = runtime.create_worker(task_request("no stop")).unwrap();
let err = runtime let err = runtime
@ -1916,6 +1906,7 @@ mod tests {
Arc::new(TestExecutionBackend::default()), Arc::new(TestExecutionBackend::default()),
) )
.unwrap(); .unwrap();
runtime.store_config_bundle(test_bundle()).unwrap();
let detail = runtime.create_worker(task_request("chat")).unwrap(); let detail = runtime.create_worker(task_request("chat")).unwrap();
let first = runtime let first = runtime
@ -1946,7 +1937,7 @@ mod tests {
#[test] #[test]
fn stop_and_cancel_workers_update_summary() { fn stop_and_cancel_workers_update_summary() {
let runtime = Runtime::new_memory(); let runtime = runtime_with_backend();
let stopped = runtime.create_worker(task_request("stop me")).unwrap(); let stopped = runtime.create_worker(task_request("stop me")).unwrap();
let cancelled = runtime.create_worker(task_request("cancel me")).unwrap(); let cancelled = runtime.create_worker(task_request("cancel me")).unwrap();
@ -1969,7 +1960,7 @@ mod tests {
#[test] #[test]
fn stop_then_cancel_preserves_stopped_terminal_state() { fn stop_then_cancel_preserves_stopped_terminal_state() {
let runtime = Runtime::new_memory(); let runtime = runtime_with_backend();
let cursor = runtime.event_cursor_from_start().unwrap(); let cursor = runtime.event_cursor_from_start().unwrap();
let worker = runtime let worker = runtime
.create_worker(task_request("stable stopped")) .create_worker(task_request("stable stopped"))
@ -2014,7 +2005,7 @@ mod tests {
#[test] #[test]
fn cancel_then_stop_preserves_cancelled_terminal_state() { fn cancel_then_stop_preserves_cancelled_terminal_state() {
let runtime = Runtime::new_memory(); let runtime = runtime_with_backend();
let cursor = runtime.event_cursor_from_start().unwrap(); let cursor = runtime.event_cursor_from_start().unwrap();
let worker = runtime let worker = runtime
.create_worker(task_request("stable cancelled")) .create_worker(task_request("stable cancelled"))

View File

@ -2,7 +2,7 @@ use std::sync::{Arc, Mutex};
use chrono::Utc; use chrono::Utc;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use worker_runtime::catalog::{CapabilityRequest, ConfigBundleRef, ProfileSelector}; use worker_runtime::catalog::ProfileSelector;
use worker_runtime::config_bundle::{ use worker_runtime::config_bundle::{
ConfigBundle, ConfigBundleMetadata, ConfigBundleProvenance, ConfigProfileDescriptor, ConfigBundle, ConfigBundleMetadata, ConfigBundleProvenance, ConfigProfileDescriptor,
}; };
@ -367,10 +367,6 @@ fn spawn_companion_worker(runtime: &RuntimeRegistry) -> CompanionWorkerState {
let selector = companion_profile_selector(); let selector = companion_profile_selector();
let mut diagnostics = Vec::new(); let mut diagnostics = Vec::new();
let config_bundle = companion_config_bundle(); let config_bundle = companion_config_bundle();
let config_ref = ConfigBundleRef {
id: config_bundle.metadata.id.clone(),
digest: config_bundle.metadata.digest.clone(),
};
match runtime.sync_config_bundle(COMPANION_RUNTIME_ID, config_bundle) { match runtime.sync_config_bundle(COMPANION_RUNTIME_ID, config_bundle) {
Ok(result) => diagnostics.extend(result.diagnostics), Ok(result) => diagnostics.extend(result.diagnostics),
@ -390,8 +386,7 @@ fn spawn_companion_worker(runtime: &RuntimeRegistry) -> CompanionWorkerState {
expected_segments: 0, expected_segments: 0,
}, },
profile: Some(selector), profile: Some(selector),
config_bundle: Some(config_ref), initial_input: None,
requested_capabilities: vec![CapabilityRequest::named("worker.input.user")],
}, },
); );
@ -611,36 +606,37 @@ mod tests {
} }
#[test] #[test]
fn companion_spawns_worker_with_companion_profile_and_diagnostic_when_not_input_capable() { fn companion_spawns_worker_with_companion_profile_through_runtime_backend() {
let registry = let registry = RuntimeRegistry::for_workspace(
RuntimeRegistry::for_workspace(EmbeddedWorkerRuntime::new_memory("local:test")); EmbeddedWorkerRuntime::new_memory_with_execution_backend(
"local:test",
Arc::new(DeterministicExecutionBackend::default()),
)
.unwrap(),
);
let registry = Arc::new(registry); let registry = Arc::new(registry);
let companion = CompanionConsole::new(registry.clone()); let companion = CompanionConsole::new(registry.clone());
let status = companion.status(); let status = companion.status();
let worker = status.worker.clone().expect("companion worker"); let worker = status.worker.clone().expect("companion worker");
assert_eq!(worker.runtime_id, COMPANION_RUNTIME_ID); assert_eq!(worker.runtime_id, COMPANION_RUNTIME_ID);
assert_eq!(worker.role.as_deref(), Some("workspace_companion")); assert_eq!(worker.role.as_deref(), Some(COMPANION_PROFILE_ID));
assert!(!worker.capabilities.can_accept_input); assert!(worker.capabilities.can_accept_input);
assert_eq!(status.transport.completion, "not_input_capable"); assert_eq!(status.transport.completion, "connected");
assert!( assert!(status.diagnostics.is_empty());
status
.diagnostics
.iter()
.any(|diagnostic| diagnostic.code == "companion_worker_not_input_capable")
);
let response = companion.send_message(CompanionMessageRequest { let response = companion.send_message(CompanionMessageRequest {
content: "hello".to_string(), content: "hello".to_string(),
}); });
assert_eq!(response.state, CompanionState::Rejected); assert_eq!(response.state, CompanionState::Accepted);
assert!(response.diagnostics.is_empty());
assert!( assert!(
!response response
.diagnostics .transcript
.items
.iter() .iter()
.any(|diagnostic| diagnostic.code == "companion_llm_not_connected") .any(|entry| entry.role == "user" && entry.content == "hello")
); );
assert!(response.transcript.items.is_empty());
let worker_detail = registry let worker_detail = registry
.worker(COMPANION_RUNTIME_ID, &worker.worker_id) .worker(COMPANION_RUNTIME_ID, &worker.worker_id)

View File

@ -8,10 +8,13 @@ use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256}; use sha2::{Digest, Sha256};
use std::{sync::Arc, time::Duration}; use std::{sync::Arc, time::Duration};
use worker_runtime::catalog::{ use worker_runtime::catalog::{
CapabilityRequest, ConfigBundleRef, CreateWorkerRequest, ProfileSelector, ConfigBundleRef, CreateWorkerRequest, ProfileSelector, WorkerDetail as EmbeddedWorkerDetail,
WorkerDetail as EmbeddedWorkerDetail, WorkerIntent, WorkerStatus as EmbeddedWorkerStatus, WorkerStatus as EmbeddedWorkerStatus,
};
use worker_runtime::config_bundle::{
ConfigBundle, ConfigBundleAvailability, ConfigBundleMetadata, ConfigBundleProvenance,
ConfigBundleSummary, ConfigProfileDescriptor,
}; };
use worker_runtime::config_bundle::{ConfigBundle, ConfigBundleAvailability, ConfigBundleSummary};
use worker_runtime::error::RuntimeError as EmbeddedRuntimeError; use worker_runtime::error::RuntimeError as EmbeddedRuntimeError;
use worker_runtime::execution::WorkerExecutionRunState; use worker_runtime::execution::WorkerExecutionRunState;
use worker_runtime::http_server::{ use worker_runtime::http_server::{
@ -236,11 +239,12 @@ pub struct WorkerLookupResult {
/// Browser-safe worker spawn request shape. /// Browser-safe worker spawn request shape.
/// ///
/// The request intentionally carries only workspace policy intents, stable /// The request carries Browser-facing launch semantics only: workspace intent,
/// worker identifiers, optional profile selectors, config bundle refs, and /// optional display identity, acceptance policy, optional profile selector, and
/// requested capability names. Raw workspace roots, child cwd, executable path, /// optional initial input. Runtime execution authority is resolved by the host
/// Runtime endpoints/credentials, raw bundle storage paths, and host-local /// into a synced ConfigBundle before the canonical Runtime create request is
/// resolved WorkerSpec content are resolved by the runtime service and never /// built. Raw workspace roots, child cwd, executable paths, tool scope,
/// credentials, raw config stores, sockets, sessions, and storage paths are not
/// accepted from Workspace API callers. /// accepted from Workspace API callers.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct WorkerSpawnRequest { pub struct WorkerSpawnRequest {
@ -251,9 +255,7 @@ pub struct WorkerSpawnRequest {
#[serde(default, skip_serializing_if = "Option::is_none")] #[serde(default, skip_serializing_if = "Option::is_none")]
pub profile: Option<ProfileSelector>, pub profile: Option<ProfileSelector>,
#[serde(default, skip_serializing_if = "Option::is_none")] #[serde(default, skip_serializing_if = "Option::is_none")]
pub config_bundle: Option<ConfigBundleRef>, pub initial_input: Option<EmbeddedWorkerInput>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub requested_capabilities: Vec<CapabilityRequest>,
} }
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
@ -994,7 +996,7 @@ impl EmbeddedWorkerRuntime {
worker_id: summary.worker_ref.worker_id.as_str().to_string(), worker_id: summary.worker_ref.worker_id.as_str().to_string(),
host_id: self.host_id.clone(), host_id: self.host_id.clone(),
label: safe_display_hint(summary.worker_ref.worker_id.as_str()), label: safe_display_hint(summary.worker_ref.worker_id.as_str()),
role: embedded_intent_label(&summary.intent), role: embedded_profile_label(&summary.profile),
profile: embedded_profile_label(&summary.profile), profile: embedded_profile_label(&summary.profile),
workspace: WorkerWorkspaceSummary { workspace: WorkerWorkspaceSummary {
visibility: "backend_internal".to_string(), visibility: "backend_internal".to_string(),
@ -1027,7 +1029,7 @@ impl EmbeddedWorkerRuntime {
worker_id: detail.worker_id.as_str().to_string(), worker_id: detail.worker_id.as_str().to_string(),
host_id: self.host_id.clone(), host_id: self.host_id.clone(),
label: safe_display_hint(detail.worker_id.as_str()), label: safe_display_hint(detail.worker_id.as_str()),
role: embedded_intent_label(&detail.intent), role: embedded_profile_label(&detail.profile),
profile: embedded_profile_label(&detail.profile), profile: embedded_profile_label(&detail.profile),
workspace: WorkerWorkspaceSummary { workspace: WorkerWorkspaceSummary {
visibility: "backend_internal".to_string(), visibility: "backend_internal".to_string(),
@ -1195,26 +1197,35 @@ impl WorkspaceWorkerRuntime for EmbeddedWorkerRuntime {
if matches!(request.acceptance, WorkerSpawnAcceptanceRequirement::RunAccepted { expected_segments } if expected_segments > 0) if matches!(request.acceptance, WorkerSpawnAcceptanceRequirement::RunAccepted { expected_segments } if expected_segments > 0)
{ {
diagnostics.push(diagnostic( diagnostics.push(diagnostic(
"embedded_runtime_tools_less", "embedded_runtime_acceptance_projection",
DiagnosticSeverity::Info, DiagnosticSeverity::Info,
"Embedded Runtime v0 creates a tools-less catalog Worker and does not spawn provider segments".to_string(), "Embedded Runtime accepts creation through a runtime execution backend; provider segment counts are observed after execution, not faked at create time".to_string(),
)); ));
} }
let create_request = CreateWorkerRequest { let profile = request
intent: embedded_create_intent(&request.intent),
profile: request
.profile .profile
.clone() .clone()
.unwrap_or_else(|| embedded_profile_selector(&request.intent)), .unwrap_or_else(|| embedded_profile_selector(&request.intent));
config_bundle: request.config_bundle.clone(), let config_bundle = match self
requested_capabilities: if request.requested_capabilities.is_empty() { .runtime
vec![CapabilityRequest::named("read")] .store_config_bundle(default_embedded_config_bundle(&profile))
} else { {
request.requested_capabilities.clone() Ok(availability) => availability.reference,
}, Err(error) => {
workspace_refs: Vec::new(), diagnostics.push(embedded_runtime_diagnostic(&error));
mount_refs: Vec::new(), return WorkerSpawnResult {
state: WorkerOperationState::Rejected,
worker: None,
acceptance_evidence: Vec::new(),
diagnostics,
};
}
};
let create_request = CreateWorkerRequest {
profile,
config_bundle,
initial_input: request.initial_input.clone(),
}; };
match self.runtime.create_worker(create_request) { match self.runtime.create_worker(create_request) {
Ok(detail) => { Ok(detail) => {
@ -1675,7 +1686,7 @@ impl RemoteWorkerRuntime {
worker_id: summary.worker_ref.worker_id.as_str().to_string(), worker_id: summary.worker_ref.worker_id.as_str().to_string(),
host_id: self.host_id.clone(), host_id: self.host_id.clone(),
label: safe_display_hint(summary.worker_ref.worker_id.as_str()), label: safe_display_hint(summary.worker_ref.worker_id.as_str()),
role: embedded_intent_label(&summary.intent), role: None,
profile: embedded_profile_label(&summary.profile), profile: embedded_profile_label(&summary.profile),
workspace: WorkerWorkspaceSummary { workspace: WorkerWorkspaceSummary {
visibility: "remote_runtime".to_string(), visibility: "remote_runtime".to_string(),
@ -1707,7 +1718,7 @@ impl RemoteWorkerRuntime {
worker_id: detail.worker_id.as_str().to_string(), worker_id: detail.worker_id.as_str().to_string(),
host_id: self.host_id.clone(), host_id: self.host_id.clone(),
label: safe_display_hint(detail.worker_id.as_str()), label: safe_display_hint(detail.worker_id.as_str()),
role: embedded_intent_label(&detail.intent), role: None,
profile: embedded_profile_label(&detail.profile), profile: embedded_profile_label(&detail.profile),
workspace: WorkerWorkspaceSummary { workspace: WorkerWorkspaceSummary {
visibility: "remote_runtime".to_string(), visibility: "remote_runtime".to_string(),
@ -1875,20 +1886,24 @@ impl WorkspaceWorkerRuntime for RemoteWorkerRuntime {
)], )],
}; };
} }
let create = CreateWorkerRequest { let profile = request
intent: embedded_create_intent(&request.intent),
profile: request
.profile .profile
.clone() .clone()
.unwrap_or_else(|| embedded_profile_selector(&request.intent)), .unwrap_or_else(|| embedded_profile_selector(&request.intent));
config_bundle: request.config_bundle.clone(), let sync = self.sync_config_bundle(default_embedded_config_bundle(&profile));
requested_capabilities: if request.requested_capabilities.is_empty() { let Some(config_bundle) = sync.availability.map(|availability| availability.reference)
vec![CapabilityRequest::named("read")] else {
} else { return WorkerSpawnResult {
request.requested_capabilities.clone() state: WorkerOperationState::Rejected,
}, worker: None,
workspace_refs: Vec::new(), acceptance_evidence: Vec::new(),
mount_refs: Vec::new(), diagnostics: sync.diagnostics,
};
};
let create = CreateWorkerRequest {
profile,
config_bundle,
initial_input: request.initial_input.clone(),
}; };
match self.post_json::<_, RuntimeHttpWorkerResponse>("/v1/workers", &create) { match self.post_json::<_, RuntimeHttpWorkerResponse>("/v1/workers", &create) {
Ok(response) => WorkerSpawnResult { Ok(response) => WorkerSpawnResult {
@ -2138,21 +2153,32 @@ fn embedded_worker_execution_status_label(
} }
} }
fn embedded_create_intent(intent: &WorkerSpawnIntent) -> WorkerIntent { fn default_embedded_config_bundle(profile: &ProfileSelector) -> ConfigBundle {
match intent { let id = format!(
WorkerSpawnIntent::WorkspaceCompanion => WorkerIntent::Role { "workspace-runtime-{}",
role: "workspace_companion".to_string(), embedded_profile_label(profile)
purpose: Some("workspace backend internal companion".to_string()), .unwrap_or_else(|| "default".to_string())
.replace([':', '/', ' '], "-")
);
ConfigBundle {
metadata: ConfigBundleMetadata {
id,
digest: String::new(),
revision: "workspace-runtime-v0".to_string(),
workspace_id: "workspace-server".to_string(),
created_at: "runtime-generated".to_string(),
provenance: ConfigBundleProvenance {
source: "workspace-server".to_string(),
detail: Some("backend-resolved launch bundle".to_string()),
}, },
WorkerSpawnIntent::WorkspaceOrchestrator => WorkerIntent::Role {
role: "workspace_orchestrator".to_string(),
purpose: Some("workspace backend internal orchestration".to_string()),
},
WorkerSpawnIntent::TicketRole { ticket_id, role } => WorkerIntent::Role {
role: ticket_role_profile_slug(role).to_string(),
purpose: Some(format!("ticket {ticket_id}")),
}, },
profiles: vec![ConfigProfileDescriptor {
selector: profile.clone(),
label: embedded_profile_label(profile),
}],
declarations: Vec::new(),
} }
.with_computed_digest()
} }
fn embedded_profile_selector(intent: &WorkerSpawnIntent) -> ProfileSelector { fn embedded_profile_selector(intent: &WorkerSpawnIntent) -> ProfileSelector {
@ -2176,16 +2202,6 @@ fn ticket_role_profile_slug(role: &TicketWorkerRole) -> &'static str {
} }
} }
fn embedded_intent_label(intent: &WorkerIntent) -> Option<String> {
match intent {
WorkerIntent::Assistant { purpose } => {
purpose.clone().or_else(|| Some("assistant".to_string()))
}
WorkerIntent::Task { objective } => Some(safe_display_hint(objective)),
WorkerIntent::Role { role, .. } => Some(safe_display_hint(role)),
}
}
fn embedded_profile_label(profile: &ProfileSelector) -> Option<String> { fn embedded_profile_label(profile: &ProfileSelector) -> Option<String> {
Some(match profile { Some(match profile {
ProfileSelector::RuntimeDefault => "runtime_default".to_string(), ProfileSelector::RuntimeDefault => "runtime_default".to_string(),
@ -2324,7 +2340,8 @@ fn embedded_runtime_diagnostic(error: &EmbeddedRuntimeError) -> RuntimeDiagnosti
DiagnosticSeverity::Warning, DiagnosticSeverity::Warning,
"Embedded Runtime worker was not found".to_string(), "Embedded Runtime worker was not found".to_string(),
), ),
EmbeddedRuntimeError::WorkerExecutionUnavailable { .. } => diagnostic( EmbeddedRuntimeError::WorkerExecutionUnavailable { .. }
| EmbeddedRuntimeError::ExecutionBackendUnavailable { .. } => diagnostic(
"embedded_worker_execution_unavailable", "embedded_worker_execution_unavailable",
DiagnosticSeverity::Warning, DiagnosticSeverity::Warning,
"Embedded Worker has no execution backend attached".to_string(), "Embedded Worker has no execution backend attached".to_string(),
@ -2962,8 +2979,7 @@ mod tests {
expected_segments: 0, expected_segments: 0,
}, },
profile: None, profile: None,
config_bundle: None, initial_input: None,
requested_capabilities: Vec::new(),
} }
} }
@ -2978,13 +2994,10 @@ mod tests {
assert_eq!(spawned.state, WorkerOperationState::Rejected); assert_eq!(spawned.state, WorkerOperationState::Rejected);
assert!(spawned.acceptance_evidence.is_empty()); assert!(spawned.acceptance_evidence.is_empty());
assert!(spawned.diagnostics.iter().any(|diagnostic| { assert!(spawned.diagnostics.iter().any(|diagnostic| {
diagnostic.code == "embedded_worker_execution_spawn_errored" diagnostic.code == "embedded_worker_execution_rejected"
&& !diagnostic.message.contains("/tmp/secret-provider-config") && !diagnostic.message.contains("/tmp/secret-provider-config")
})); }));
let worker = spawned.worker.expect("failed execution is still projected"); assert!(spawned.worker.is_none());
assert_eq!(worker.status, "errored");
assert!(!worker.capabilities.can_accept_input);
assert!(!worker.capabilities.can_stop);
} }
#[test] #[test]
@ -3035,8 +3048,13 @@ mod tests {
#[test] #[test]
fn embedded_runtime_registers_routes_input_and_transcript_without_internal_leaks() { fn embedded_runtime_registers_routes_input_and_transcript_without_internal_leaks() {
let registry = let registry = RuntimeRegistry::for_workspace(
RuntimeRegistry::for_workspace(EmbeddedWorkerRuntime::new_memory("local:test")); EmbeddedWorkerRuntime::new_memory_with_execution_backend(
"local:test",
Arc::new(AcceptingExecutionBackend::default()),
)
.expect("test backend should connect"),
);
let runtimes = registry.list_runtimes(10); let runtimes = registry.list_runtimes(10);
let embedded_summary = runtimes let embedded_summary = runtimes
@ -3050,7 +3068,7 @@ mod tests {
); );
assert_eq!(embedded_summary.source.status, RuntimeSourceStatus::Active); assert_eq!(embedded_summary.source.status, RuntimeSourceStatus::Active);
assert!(embedded_summary.capabilities.can_spawn_worker); assert!(embedded_summary.capabilities.can_spawn_worker);
assert!(!embedded_summary.capabilities.can_accept_input); assert!(embedded_summary.capabilities.can_accept_input);
let spawned = registry let spawned = registry
.spawn_worker( .spawn_worker(
@ -3065,8 +3083,7 @@ mod tests {
expected_segments: 0, expected_segments: 0,
}, },
profile: None, profile: None,
config_bundle: None, initial_input: None,
requested_capabilities: Vec::new(),
}, },
) )
.unwrap(); .unwrap();
@ -3083,7 +3100,7 @@ mod tests {
assert_eq!(worker.workspace.identity, "runtime_registry_worker"); assert_eq!(worker.workspace.identity, "runtime_registry_worker");
assert_eq!(worker.implementation.kind, "embedded_worker_runtime"); assert_eq!(worker.implementation.kind, "embedded_worker_runtime");
assert_eq!(worker.profile.as_deref(), Some("builtin:coder")); assert_eq!(worker.profile.as_deref(), Some("builtin:coder"));
assert!(!worker.capabilities.can_accept_input); assert!(worker.capabilities.can_accept_input);
let input = registry let input = registry
.send_input( .send_input(
@ -3095,21 +3112,20 @@ mod tests {
}, },
) )
.unwrap(); .unwrap();
assert_eq!(input.state, WorkerOperationState::Rejected); assert_eq!(input.state, WorkerOperationState::Accepted);
assert_eq!(input.runtime_id, EMBEDDED_RUNTIME_ID); assert_eq!(input.runtime_id, EMBEDDED_RUNTIME_ID);
assert_eq!(input.worker_id, worker.worker_id); assert_eq!(input.worker_id, worker.worker_id);
assert!(
input
.diagnostics
.iter()
.any(|diagnostic| diagnostic.code == "embedded_worker_execution_unavailable")
);
let transcript = registry let transcript = registry
.transcript(EMBEDDED_RUNTIME_ID, &worker.worker_id, 0, 10) .transcript(EMBEDDED_RUNTIME_ID, &worker.worker_id, 0, 10)
.unwrap(); .unwrap();
assert_eq!(transcript.state, WorkerOperationState::Accepted); assert_eq!(transcript.state, WorkerOperationState::Accepted);
assert!(transcript.items.is_empty()); assert!(
transcript
.items
.iter()
.any(|entry| entry.role == "user" && entry.content == "hello embedded runtime")
);
let json = serde_json::to_string(&(embedded_summary, worker, transcript)).unwrap(); let json = serde_json::to_string(&(embedded_summary, worker, transcript)).unwrap();
for forbidden in [ for forbidden in [
@ -3132,9 +3148,13 @@ mod tests {
#[test] #[test]
fn embedded_backend_syncs_config_bundle_and_spawns_with_bundle_ref() { fn embedded_backend_syncs_config_bundle_and_spawns_with_bundle_ref() {
let registry = RuntimeRegistry::new(vec![Arc::new(EmbeddedWorkerRuntime::new_memory( let registry = RuntimeRegistry::new(vec![Arc::new(
EmbeddedWorkerRuntime::new_memory_with_execution_backend(
"local:test", "local:test",
))]); Arc::new(AcceptingExecutionBackend::default()),
)
.unwrap(),
)]);
let bundle = test_config_bundle(); let bundle = test_config_bundle();
let sync = registry let sync = registry
.sync_config_bundle(EMBEDDED_RUNTIME_ID, bundle.clone()) .sync_config_bundle(EMBEDDED_RUNTIME_ID, bundle.clone())
@ -3162,8 +3182,7 @@ mod tests {
expected_segments: 0, expected_segments: 0,
}, },
profile: Some(ProfileSelector::Builtin("builtin:coder".to_string())), profile: Some(ProfileSelector::Builtin("builtin:coder".to_string())),
config_bundle: Some(reference), initial_input: None,
requested_capabilities: vec![CapabilityRequest::named("read")],
}, },
) )
.unwrap(); .unwrap();
@ -3176,9 +3195,13 @@ mod tests {
#[test] #[test]
fn embedded_runtime_rejects_socket_ready_acceptance_without_socket_identity() { fn embedded_runtime_rejects_socket_ready_acceptance_without_socket_identity() {
let registry = RuntimeRegistry::new(vec![Arc::new(EmbeddedWorkerRuntime::new_memory( let registry = RuntimeRegistry::new(vec![Arc::new(
EmbeddedWorkerRuntime::new_memory_with_execution_backend(
"local:test", "local:test",
))]); Arc::new(AcceptingExecutionBackend::default()),
)
.unwrap(),
)]);
let result = registry let result = registry
.spawn_worker( .spawn_worker(
EMBEDDED_RUNTIME_ID, EMBEDDED_RUNTIME_ID,
@ -3187,8 +3210,7 @@ mod tests {
requested_worker_name: None, requested_worker_name: None,
acceptance: WorkerSpawnAcceptanceRequirement::SocketReady, acceptance: WorkerSpawnAcceptanceRequirement::SocketReady,
profile: None, profile: None,
config_bundle: None, initial_input: None,
requested_capabilities: Vec::new(),
}, },
) )
.unwrap(); .unwrap();
@ -3480,12 +3502,7 @@ mod tests {
"execution": { "backend": "connected", "run_state": "idle" }, "execution": { "backend": "connected", "run_state": "idle" },
"intent": { "kind": "role", "role": "coder", "purpose": "remote test" }, "intent": { "kind": "role", "role": "coder", "purpose": "remote test" },
"profile": { "kind": "builtin", "value": "coder" }, "profile": { "kind": "builtin", "value": "coder" },
"config_bundle": null, "config_bundle": { "id": "remote-bundle", "digest": "remote-digest" },
"requested_capabilities": [],
"workspace_refs": [],
"mount_refs": [],
"requested_capability_count": 0,
"has_config_bundle": false,
"transcript_len": 0, "transcript_len": 0,
"last_event_id": 0 "last_event_id": 0
}) })

View File

@ -1063,6 +1063,7 @@ mod tests {
use axum::http::Request; use axum::http::Request;
use futures::{SinkExt, StreamExt}; use futures::{SinkExt, StreamExt};
use serde_json::{Value, json}; use serde_json::{Value, json};
use std::sync::Arc;
use tokio_tungstenite::connect_async; use tokio_tungstenite::connect_async;
use tokio_tungstenite::tungstenite::Message; use tokio_tungstenite::tungstenite::Message;
use tower::ServiceExt; use tower::ServiceExt;
@ -1140,6 +1141,51 @@ mod tests {
} }
} }
fn runtime_test_bundle() -> worker_runtime::config_bundle::ConfigBundle {
worker_runtime::config_bundle::ConfigBundle {
metadata: worker_runtime::config_bundle::ConfigBundleMetadata {
id: "server-test-bundle".to_string(),
digest: String::new(),
revision: "test".to_string(),
workspace_id: "test".to_string(),
created_at: "test".to_string(),
provenance: worker_runtime::config_bundle::ConfigBundleProvenance {
source: "test".to_string(),
detail: None,
},
},
profiles: vec![worker_runtime::config_bundle::ConfigProfileDescriptor {
selector: worker_runtime::catalog::ProfileSelector::RuntimeDefault,
label: Some("server-test".to_string()),
}],
declarations: Vec::new(),
}
.with_computed_digest()
}
fn runtime_create_request() -> worker_runtime::catalog::CreateWorkerRequest {
let bundle = runtime_test_bundle();
worker_runtime::catalog::CreateWorkerRequest {
profile: worker_runtime::catalog::ProfileSelector::RuntimeDefault,
config_bundle: worker_runtime::catalog::ConfigBundleRef {
id: bundle.metadata.id,
digest: bundle.metadata.digest,
},
initial_input: None,
}
}
fn runtime_with_worker() -> (worker_runtime::Runtime, worker_runtime::identity::WorkerRef) {
let runtime = worker_runtime::Runtime::with_execution_backend(
worker_runtime::RuntimeOptions::default(),
Arc::new(DeterministicExecutionBackend::default()),
)
.unwrap();
runtime.store_config_bundle(runtime_test_bundle()).unwrap();
let worker = runtime.create_worker(runtime_create_request()).unwrap();
(runtime, worker.worker_ref)
}
#[tokio::test] #[tokio::test]
async fn serves_bounded_read_apis_and_static_spa_separately() { async fn serves_bounded_read_apis_and_static_spa_separately() {
let dir = tempfile::tempdir().unwrap(); let dir = tempfile::tempdir().unwrap();
@ -1153,7 +1199,13 @@ mod tests {
let store = SqliteWorkspaceStore::in_memory().unwrap(); let store = SqliteWorkspaceStore::in_memory().unwrap();
let mut config = ServerConfig::local_dev(dir.path(), test_identity()); let mut config = ServerConfig::local_dev(dir.path(), test_identity());
config.static_assets_dir = Some(static_dir); config.static_assets_dir = Some(static_dir);
let api = WorkspaceApi::new(config, Arc::new(store)).await.unwrap(); let api = WorkspaceApi::new_with_execution_backend(
config,
Arc::new(store),
Arc::new(DeterministicExecutionBackend::default()),
)
.await
.unwrap();
let app = build_router(api); let app = build_router(api);
let workspace = get_json(app.clone(), "/api/workspace").await; let workspace = get_json(app.clone(), "/api/workspace").await;
@ -1260,7 +1312,7 @@ mod tests {
let worker_items = workers["items"].as_array().unwrap(); let worker_items = workers["items"].as_array().unwrap();
let companion_worker = worker_items let companion_worker = worker_items
.iter() .iter()
.find(|worker| worker["role"] == "workspace_companion") .find(|worker| worker["role"] == "builtin:companion")
.expect("companion worker is visible through runtime worker API"); .expect("companion worker is visible through runtime worker API");
assert_eq!(companion_worker["runtime_id"], "embedded-worker-runtime"); assert_eq!(companion_worker["runtime_id"], "embedded-worker-runtime");
assert!(companion_worker["capabilities"]["can_stop"].is_boolean()); assert!(companion_worker["capabilities"]["can_stop"].is_boolean());
@ -1270,7 +1322,7 @@ mod tests {
companion_status["state"].as_str(), companion_status["state"].as_str(),
Some("ready") | Some("error") Some("ready") | Some("error")
)); ));
assert_eq!(companion_status["worker"]["role"], "workspace_companion"); assert_eq!(companion_status["worker"]["role"], "builtin:companion");
assert_eq!( assert_eq!(
companion_status["transport"]["kind"], companion_status["transport"]["kind"],
"embedded_worker_runtime" "embedded_worker_runtime"
@ -1305,7 +1357,7 @@ mod tests {
.as_array() .as_array()
.unwrap() .unwrap()
.iter() .iter()
.any(|worker| worker["role"] == "workspace_companion") .any(|worker| worker["role"] == "builtin:companion")
); );
let runs_response = app let runs_response = app
@ -1477,7 +1529,13 @@ mod tests {
let dir = tempfile::tempdir().unwrap(); let dir = tempfile::tempdir().unwrap();
let store = SqliteWorkspaceStore::in_memory().unwrap(); let store = SqliteWorkspaceStore::in_memory().unwrap();
let config = ServerConfig::local_dev(dir.path(), test_identity()); let config = ServerConfig::local_dev(dir.path(), test_identity());
let api = WorkspaceApi::new(config, Arc::new(store)).await.unwrap(); let api = WorkspaceApi::new_with_execution_backend(
config,
Arc::new(store),
Arc::new(DeterministicExecutionBackend::default()),
)
.await
.unwrap();
let app = build_router(api); let app = build_router(api);
let runtimes = get_json(app.clone(), "/api/runtimes").await; let runtimes = get_json(app.clone(), "/api/runtimes").await;
@ -1515,20 +1573,14 @@ mod tests {
}), }),
) )
.await; .await;
assert_eq!(spawned["state"], "rejected"); assert_eq!(spawned["state"], "accepted");
assert!( let diagnostics = spawned["diagnostics"].as_array().unwrap();
spawned["diagnostics"] assert!(diagnostics.iter().all(|diagnostic| {
.as_array() !diagnostic["message"]
.unwrap()
.iter()
.any(|diagnostic| {
diagnostic["code"] == "embedded_worker_execution_spawn_errored"
&& !diagnostic["message"]
.as_str() .as_str()
.unwrap() .unwrap_or_default()
.contains("/workspace/demo") .contains("/workspace/demo")
}) }));
);
let worker_id = spawned["worker"]["worker_id"].as_str().unwrap().to_string(); let worker_id = spawned["worker"]["worker_id"].as_str().unwrap().to_string();
assert_eq!(spawned["worker"]["runtime_id"], "embedded-worker-runtime"); assert_eq!(spawned["worker"]["runtime_id"], "embedded-worker-runtime");
assert_eq!( assert_eq!(
@ -1557,16 +1609,10 @@ mod tests {
}), }),
) )
.await; .await;
assert_eq!(accepted["state"], "rejected"); assert_eq!(accepted["state"], "accepted");
assert_eq!(accepted["runtime_id"], "embedded-worker-runtime"); assert_eq!(accepted["runtime_id"], "embedded-worker-runtime");
assert_eq!(accepted["worker_id"], worker_id); assert_eq!(accepted["worker_id"], worker_id);
assert!( assert!(accepted["diagnostics"].as_array().unwrap().is_empty());
accepted["diagnostics"]
.as_array()
.unwrap()
.iter()
.any(|diagnostic| diagnostic["code"] == "embedded_worker_execution_unavailable")
);
let transcript = get_json( let transcript = get_json(
app.clone(), app.clone(),
@ -1574,7 +1620,9 @@ mod tests {
) )
.await; .await;
assert_eq!(transcript["state"], "accepted"); assert_eq!(transcript["state"], "accepted");
assert!(transcript["items"].as_array().unwrap().is_empty()); assert!(transcript["items"].as_array().unwrap().iter().any(
|item| item["role"] == "user" && item["content"] == "hello from browser-facing api"
));
let wrong_runtime = app let wrong_runtime = app
.clone() .clone()
@ -1620,10 +1668,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn proxies_worker_observation_ws_with_backend_cursors_and_diagnostics() { async fn proxies_worker_observation_ws_with_backend_cursors_and_diagnostics() {
let runtime = worker_runtime::Runtime::new_memory(); let (runtime, worker_ref) = runtime_with_worker();
let worker = runtime
.create_worker(worker_runtime::catalog::CreateWorkerRequest::default())
.unwrap();
let runtime_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); let runtime_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let runtime_addr = runtime_listener.local_addr().unwrap(); let runtime_addr = runtime_listener.local_addr().unwrap();
tokio::spawn({ tokio::spawn({
@ -1645,11 +1690,17 @@ mod tests {
worker_id: "worker-a".into(), worker_id: "worker-a".into(),
endpoint: format!( endpoint: format!(
"ws://{runtime_addr}/v1/workers/{}/events/ws", "ws://{runtime_addr}/v1/workers/{}/events/ws",
worker.worker_ref.worker_id worker_ref.worker_id
), ),
bearer_token: None, bearer_token: None,
}); });
let api = WorkspaceApi::new(config, Arc::new(store)).await.unwrap(); let api = WorkspaceApi::new_with_execution_backend(
config,
Arc::new(store),
Arc::new(DeterministicExecutionBackend::default()),
)
.await
.unwrap();
let app_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); let app_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let app_addr = app_listener.local_addr().unwrap(); let app_addr = app_listener.local_addr().unwrap();
tokio::spawn(async move { axum::serve(app_listener, build_router(api)).await.unwrap() }); tokio::spawn(async move { axum::serve(app_listener, build_router(api)).await.unwrap() });
@ -1666,7 +1717,7 @@ mod tests {
runtime runtime
.observe_worker_event( .observe_worker_event(
&worker.worker_ref, &worker_ref,
protocol::Event::TextDelta { protocol::Event::TextDelta {
text: "live".into(), text: "live".into(),
}, },
@ -1686,7 +1737,7 @@ mod tests {
let _snapshot = next_client_frame(&mut resumed).await; let _snapshot = next_client_frame(&mut resumed).await;
runtime runtime
.observe_worker_event( .observe_worker_event(
&worker.worker_ref, &worker_ref,
protocol::Event::TextDone { protocol::Event::TextDone {
text: "done".into(), text: "done".into(),
}, },
@ -1822,10 +1873,7 @@ mod tests {
worker_runtime::identity::WorkerRef, worker_runtime::identity::WorkerRef,
String, String,
) { ) {
let runtime = worker_runtime::Runtime::new_memory(); let (runtime, worker_ref) = runtime_with_worker();
let worker = runtime
.create_worker(worker_runtime::catalog::CreateWorkerRequest::default())
.unwrap();
let runtime_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); let runtime_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let runtime_addr = runtime_listener.local_addr().unwrap(); let runtime_addr = runtime_listener.local_addr().unwrap();
tokio::spawn({ tokio::spawn({
@ -1838,9 +1886,9 @@ mod tests {
}); });
let endpoint = format!( let endpoint = format!(
"ws://{runtime_addr}/v1/workers/{}/events/ws", "ws://{runtime_addr}/v1/workers/{}/events/ws",
worker.worker_ref.worker_id worker_ref.worker_id
); );
(runtime, worker.worker_ref, endpoint) (runtime, worker_ref, endpoint)
} }
async fn spawn_workspace_proxy( async fn spawn_workspace_proxy(
@ -1852,7 +1900,13 @@ mod tests {
let runtime_id = source.runtime_id.clone(); let runtime_id = source.runtime_id.clone();
let worker_id = source.worker_id.clone(); let worker_id = source.worker_id.clone();
config.runtime_event_sources.push(source); config.runtime_event_sources.push(source);
let api = WorkspaceApi::new(config, Arc::new(store)).await.unwrap(); let api = WorkspaceApi::new_with_execution_backend(
config,
Arc::new(store),
Arc::new(DeterministicExecutionBackend::default()),
)
.await
.unwrap();
let app_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); let app_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let app_addr = app_listener.local_addr().unwrap(); let app_addr = app_listener.local_addr().unwrap();
tokio::spawn(async move { axum::serve(app_listener, build_router(api)).await.unwrap() }); tokio::spawn(async move { axum::serve(app_listener, build_router(api)).await.unwrap() });