From 14bb4934a6374eea64591035e5342088ab0ccd09 Mon Sep 17 00:00:00 2001 From: Hare Date: Mon, 29 Jun 2026 02:44:52 +0900 Subject: [PATCH] runtime: unify worker creation path --- crates/worker-runtime/src/catalog.rs | 119 +----- crates/worker-runtime/src/error.rs | 3 + crates/worker-runtime/src/http_server.rs | 136 ++++++- crates/worker-runtime/src/lib.rs | 1 + crates/worker-runtime/src/runtime.rs | 457 +++++++++++------------ crates/workspace-server/src/companion.rs | 44 +-- crates/workspace-server/src/hosts.rs | 225 +++++------ crates/workspace-server/src/server.rs | 140 ++++--- 8 files changed, 601 insertions(+), 524 deletions(-) diff --git a/crates/worker-runtime/src/catalog.rs b/crates/worker-runtime/src/catalog.rs index d32623d3..14f41649 100644 --- a/crates/worker-runtime/src/catalog.rs +++ b/crates/worker-runtime/src/catalog.rs @@ -1,32 +1,9 @@ use crate::execution::WorkerExecutionStatus; use crate::identity::{RuntimeId, WorkerId, WorkerRef}; +use crate::interaction::WorkerInput; use serde::{Deserialize, Serialize}; -/// Intent supplied when a Worker is created. -#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] -#[serde(tag = "kind", rename_all = "snake_case")] -pub enum WorkerIntent { - Assistant { - #[serde(default, skip_serializing_if = "Option::is_none")] - purpose: Option, - }, - Task { - objective: String, - }, - Role { - role: String, - #[serde(default, skip_serializing_if = "Option::is_none")] - purpose: Option, - }, -} - -impl Default for WorkerIntent { - fn default() -> Self { - Self::Assistant { purpose: None } - } -} - -/// Profile selector boundary. This is a selector, not a resolved config bundle. +/// Profile selector boundary. This is a selector, not a resolved config bundle. #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] #[serde(tag = "kind", content = "value", rename_all = "snake_case")] pub enum ProfileSelector { @@ -48,77 +25,21 @@ pub struct ConfigBundleRef { pub digest: String, } -/// Requested capability name plus optional human-readable reason. -#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] -pub struct CapabilityRequest { - pub name: String, - #[serde(default, skip_serializing_if = "Option::is_none")] - pub reason: Option, -} - -impl CapabilityRequest { - pub fn named(name: impl Into) -> Self { - Self { - name: name.into(), - reason: None, - } - } -} - -/// Opaque workspace reference supplied by a caller. -#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] -pub struct WorkspaceRef { - pub name: String, - pub reference: String, -} - -/// Opaque mount reference supplied by a caller. -#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] -pub struct MountRef { - pub name: String, - pub reference: String, -} - -/// Worker creation request for the catalog/lifecycle API. +/// Canonical Runtime Worker creation request. +/// +/// Browser/product launch semantics are resolved by a backend before this +/// request is built. The request contains only durable Runtime identity inputs: +/// a backend-decided profile selector, a previously synced ConfigBundle identity, +/// and optional initial user input that is committed in the same transaction as +/// Worker catalog/transcript persistence. It carries no cwd/workspace path, tool +/// scope, credential, socket/session path, raw config body, or execution binding +/// internals. #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct CreateWorkerRequest { - pub intent: WorkerIntent, pub profile: ProfileSelector, + pub config_bundle: ConfigBundleRef, #[serde(default, skip_serializing_if = "Option::is_none")] - pub config_bundle: Option, - #[serde(default, skip_serializing_if = "Vec::is_empty")] - pub requested_capabilities: Vec, - #[serde(default, skip_serializing_if = "Vec::is_empty")] - pub workspace_refs: Vec, - #[serde(default, skip_serializing_if = "Vec::is_empty")] - pub mount_refs: Vec, -} - -impl Default for CreateWorkerRequest { - fn default() -> Self { - Self { - intent: WorkerIntent::default(), - profile: ProfileSelector::default(), - config_bundle: None, - requested_capabilities: Vec::new(), - workspace_refs: Vec::new(), - mount_refs: Vec::new(), - } - } -} - -impl CreateWorkerRequest { - /// Create a tools-less Worker using runtime-local default resources. - pub fn tools_less(intent: WorkerIntent, profile: ProfileSelector) -> Self { - Self { - intent, - profile, - config_bundle: None, - requested_capabilities: Vec::new(), - workspace_refs: Vec::new(), - mount_refs: Vec::new(), - } - } + pub initial_input: Option, } /// Worker lifecycle status for the in-memory embedded runtime. @@ -144,10 +65,8 @@ pub struct WorkerSummary { pub worker_id: WorkerId, pub status: WorkerStatus, pub execution: WorkerExecutionStatus, - pub intent: WorkerIntent, pub profile: ProfileSelector, - pub requested_capability_count: usize, - pub has_config_bundle: bool, + pub config_bundle: ConfigBundleRef, pub transcript_len: usize, pub last_event_id: u64, } @@ -160,16 +79,8 @@ pub struct WorkerDetail { pub worker_id: WorkerId, pub status: WorkerStatus, pub execution: WorkerExecutionStatus, - pub intent: WorkerIntent, pub profile: ProfileSelector, - #[serde(default, skip_serializing_if = "Option::is_none")] - pub config_bundle: Option, - #[serde(default, skip_serializing_if = "Vec::is_empty")] - pub requested_capabilities: Vec, - #[serde(default, skip_serializing_if = "Vec::is_empty")] - pub workspace_refs: Vec, - #[serde(default, skip_serializing_if = "Vec::is_empty")] - pub mount_refs: Vec, + pub config_bundle: ConfigBundleRef, pub transcript_len: usize, pub last_event_id: u64, } diff --git a/crates/worker-runtime/src/error.rs b/crates/worker-runtime/src/error.rs index 1df15ba5..6fe917c5 100644 --- a/crates/worker-runtime/src/error.rs +++ b/crates/worker-runtime/src/error.rs @@ -35,6 +35,9 @@ pub enum RuntimeError { message: String, }, + #[error("worker creation has no execution backend: {message}")] + ExecutionBackendUnavailable { message: String }, + #[error("worker {worker_id} execution {operation:?} returned {outcome:?}: {message}")] WorkerExecutionRejected { worker_id: WorkerId, diff --git a/crates/worker-runtime/src/http_server.rs b/crates/worker-runtime/src/http_server.rs index 588a03a5..d53c6bde 100644 --- a/crates/worker-runtime/src/http_server.rs +++ b/crates/worker-runtime/src/http_server.rs @@ -824,6 +824,7 @@ fn status_for_runtime_error(error: &RuntimeError) -> StatusCode { } RuntimeError::RuntimeStopped { .. } | RuntimeError::WorkerExecutionUnavailable { .. } + | RuntimeError::ExecutionBackendUnavailable { .. } | RuntimeError::WorkerExecutionRejected { .. } => StatusCode::CONFLICT, RuntimeError::LimitTooLarge { .. } | RuntimeError::InvalidRequest(_) @@ -846,6 +847,7 @@ fn code_for_runtime_error(error: &RuntimeError) -> &'static str { RuntimeError::WrongRuntimeCursor { .. } => "wrong_runtime_cursor", RuntimeError::WorkerNotFound { .. } => "worker_not_found", RuntimeError::WorkerExecutionUnavailable { .. } => "worker_execution_unavailable", + RuntimeError::ExecutionBackendUnavailable { .. } => "execution_backend_unavailable", RuntimeError::WorkerExecutionRejected { .. } => "worker_execution_rejected", RuntimeError::LimitTooLarge { .. } => "limit_too_large", RuntimeError::InvalidRequest(_) => "invalid_request", @@ -872,7 +874,10 @@ pub enum RuntimeHttpServerError { #[cfg(test)] mod tests { use super::*; - use crate::catalog::{CapabilityRequest, ProfileSelector, WorkerIntent}; + use crate::catalog::{ConfigBundleRef, ProfileSelector}; + use crate::config_bundle::{ + ConfigBundle, ConfigBundleMetadata, ConfigBundleProvenance, ConfigProfileDescriptor, + }; use crate::execution::{ WorkerExecutionBackend, WorkerExecutionHandle, WorkerExecutionOperation, WorkerExecutionResult, WorkerExecutionRunState, WorkerExecutionSpawnRequest, @@ -883,16 +888,38 @@ mod tests { use axum::http::Method; use tower::ServiceExt; - fn task_request(objective: &str) -> CreateWorkerRequest { - CreateWorkerRequest { - intent: WorkerIntent::Task { - objective: objective.to_string(), + fn test_bundle(profile: ProfileSelector) -> ConfigBundle { + ConfigBundle { + metadata: ConfigBundleMetadata { + id: "http-test-bundle".to_string(), + digest: String::new(), + revision: "test".to_string(), + workspace_id: "test-workspace".to_string(), + created_at: "test".to_string(), + provenance: ConfigBundleProvenance { + source: "test".to_string(), + detail: None, + }, }, - profile: ProfileSelector::Builtin("builtin:coder".to_string()), - config_bundle: None, - requested_capabilities: vec![CapabilityRequest::named("read")], - workspace_refs: Vec::new(), - mount_refs: Vec::new(), + profiles: vec![ConfigProfileDescriptor { + selector: profile, + label: Some("test".to_string()), + }], + declarations: Vec::new(), + } + .with_computed_digest() + } + + fn task_request(_objective: &str) -> CreateWorkerRequest { + let profile = ProfileSelector::Builtin("builtin:coder".to_string()); + let bundle = test_bundle(profile.clone()); + CreateWorkerRequest { + profile, + config_bundle: ConfigBundleRef { + id: bundle.metadata.id, + digest: bundle.metadata.digest, + }, + initial_input: None, } } @@ -969,6 +996,11 @@ mod tests { let runtime = Runtime::with_execution_backend(RuntimeOptions::default(), Arc::new(AcceptingBackend)) .unwrap(); + runtime + .store_config_bundle(test_bundle(ProfileSelector::Builtin( + "builtin:coder".to_string(), + ))) + .unwrap(); let app = runtime_http_router(runtime.clone(), None); let response = json_request( @@ -1091,15 +1123,89 @@ mod tests { #[cfg(all(test, feature = "ws-server"))] mod ws_tests { use super::*; + use crate::catalog::{ConfigBundleRef, ProfileSelector}; + use crate::config_bundle::{ + ConfigBundle, ConfigBundleMetadata, ConfigBundleProvenance, ConfigProfileDescriptor, + }; + use crate::execution::{ + WorkerExecutionBackend, WorkerExecutionHandle, WorkerExecutionOperation, + WorkerExecutionResult, WorkerExecutionRunState, WorkerExecutionSpawnRequest, + WorkerExecutionSpawnResult, + }; + use crate::interaction::WorkerInput; use futures::{SinkExt, StreamExt}; + use std::sync::Arc; use tokio_tungstenite::connect_async; use tokio_tungstenite::tungstenite::Message; + struct WsBackend; + + impl WorkerExecutionBackend for WsBackend { + fn backend_id(&self) -> &str { + "ws-test" + } + + fn spawn_worker(&self, request: WorkerExecutionSpawnRequest) -> WorkerExecutionSpawnResult { + WorkerExecutionSpawnResult::Connected { + handle: WorkerExecutionHandle::new(request.worker_ref, self.backend_id()), + run_state: WorkerExecutionRunState::Idle, + } + } + + fn dispatch_input( + &self, + _handle: &WorkerExecutionHandle, + _input: WorkerInput, + ) -> WorkerExecutionResult { + WorkerExecutionResult::accepted( + WorkerExecutionOperation::Input, + WorkerExecutionRunState::Idle, + ) + } + } + + fn ws_test_bundle(profile: ProfileSelector) -> ConfigBundle { + ConfigBundle { + metadata: ConfigBundleMetadata { + id: "ws-test-bundle".to_string(), + digest: String::new(), + revision: "test".to_string(), + workspace_id: "test".to_string(), + created_at: "test".to_string(), + provenance: ConfigBundleProvenance { + source: "test".to_string(), + detail: None, + }, + }, + profiles: vec![ConfigProfileDescriptor { + selector: profile, + label: Some("ws".to_string()), + }], + declarations: Vec::new(), + } + .with_computed_digest() + } + + fn ws_create_request() -> CreateWorkerRequest { + let bundle = ws_test_bundle(ProfileSelector::RuntimeDefault); + CreateWorkerRequest { + profile: ProfileSelector::RuntimeDefault, + config_bundle: ConfigBundleRef { + id: bundle.metadata.id, + digest: bundle.metadata.digest, + }, + initial_input: None, + } + } + async fn spawn_runtime_server() -> (Runtime, WorkerRef, String) { - let runtime = Runtime::new_memory(); - let worker = runtime - .create_worker(CreateWorkerRequest::default()) + let runtime = + Runtime::with_execution_backend(RuntimeOptions::default(), Arc::new(WsBackend)) + .unwrap(); + runtime + .store_config_bundle(ws_test_bundle(ProfileSelector::RuntimeDefault)) .unwrap(); + let worker = runtime.create_worker(ws_create_request()).unwrap(); let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); let addr = listener.local_addr().unwrap(); tokio::spawn({ @@ -1169,9 +1275,7 @@ mod ws_tests { #[tokio::test] async fn runtime_ws_cursor_resume_is_duplicate_safe_and_filters_workers() { let (runtime, worker_ref, url) = spawn_runtime_server().await; - let other = runtime - .create_worker(CreateWorkerRequest::default()) - .unwrap(); + let other = runtime.create_worker(ws_create_request()).unwrap(); let first = runtime .observe_worker_event( &worker_ref, diff --git a/crates/worker-runtime/src/lib.rs b/crates/worker-runtime/src/lib.rs index 97a29e00..52df2450 100644 --- a/crates/worker-runtime/src/lib.rs +++ b/crates/worker-runtime/src/lib.rs @@ -24,4 +24,5 @@ mod runtime; #[cfg(feature = "fs-store")] pub use fs_store::{FsRuntimeStore, FsRuntimeStoreOptions}; +pub use management::{RuntimeLimits, RuntimeOptions}; pub use runtime::Runtime; diff --git a/crates/worker-runtime/src/runtime.rs b/crates/worker-runtime/src/runtime.rs index 83f51cd1..83f5022e 100644 --- a/crates/worker-runtime/src/runtime.rs +++ b/crates/worker-runtime/src/runtime.rs @@ -4,9 +4,9 @@ use crate::catalog::{ }; use crate::config_bundle::{ ConfigBundle, ConfigBundleAvailability, ConfigBundleSummary, validate_config_bundle, - validate_config_bundle_ref, validate_profile_selector, + validate_config_bundle_ref, }; -use crate::diagnostics::{DiagnosticSeverity, RuntimeDiagnostic}; +use crate::diagnostics::RuntimeDiagnostic; use crate::error::RuntimeError; use crate::execution::{ WorkerExecutionBackend, WorkerExecutionBackendKind, WorkerExecutionBackendRef, @@ -231,55 +231,116 @@ impl Runtime { Ok(event_id) } - /// Create a Worker in the embedded catalog. + /// Create a Worker through the canonical ConfigBundle + execution backend path. pub fn create_worker( &self, request: CreateWorkerRequest, ) -> Result { - let mut state = self.lock()?; - state.ensure_running()?; - validate_create_worker_request(&request)?; - state.validate_worker_config_boundary(&request)?; + let (backend, worker_ref, spawn_request) = { + let mut state = self.lock()?; + state.ensure_running()?; + validate_create_worker_request(&request)?; + state.validate_worker_config_boundary(&request)?; + let backend = state.execution_backend.clone().ok_or_else(|| { + RuntimeError::ExecutionBackendUnavailable { + message: "worker creation requires an execution backend".to_string(), + } + })?; - let worker_id = WorkerId::generated(state.next_worker_sequence); - state.next_worker_sequence += 1; - let worker_ref = WorkerRef::new(state.runtime_id.clone(), worker_id.clone()); - let backend = state.execution_backend.clone(); - let spawn_request = backend.as_ref().map(|_| WorkerExecutionSpawnRequest { - worker_ref: worker_ref.clone(), - request: request.clone(), - context: self.execution_context(worker_ref.clone()), - }); - let event_id = state.push_event( - Some(worker_ref.clone()), - RuntimeEventKind::WorkerCreated, - format!("worker {worker_id} created"), - ); + let worker_id = WorkerId::generated(state.next_worker_sequence); + state.next_worker_sequence += 1; + let worker_ref = WorkerRef::new(state.runtime_id.clone(), worker_id.clone()); + let event_id = state.push_event( + Some(worker_ref.clone()), + RuntimeEventKind::WorkerCreated, + format!("worker {worker_id} created"), + ); - let record = WorkerRecord { - worker_ref: worker_ref.clone(), - worker_id: worker_id.clone(), - status: WorkerStatus::Running, - request, - execution: WorkerExecutionStatus::unconnected(), - execution_handle: None, - transcript: Vec::new(), - next_transcript_sequence: 1, - last_event_id: event_id, + let mut transcript = Vec::new(); + let mut next_transcript_sequence = 1; + if let Some(input) = request.initial_input.clone() { + let role = match input.kind { + WorkerInputKind::User => TranscriptRole::User, + WorkerInputKind::System => TranscriptRole::System, + }; + transcript.push(TranscriptEntry { + sequence: next_transcript_sequence, + worker_ref: worker_ref.clone(), + role, + content: input.content, + event_id, + }); + next_transcript_sequence += 1; + } + + let record = WorkerRecord { + worker_ref: worker_ref.clone(), + worker_id: worker_id.clone(), + status: WorkerStatus::Running, + request: request.clone(), + execution: WorkerExecutionStatus::unconnected(), + execution_handle: None, + transcript, + next_transcript_sequence, + last_event_id: event_id, + }; + state.workers.insert(worker_id, record); + let spawn_request = WorkerExecutionSpawnRequest { + worker_ref: worker_ref.clone(), + request, + context: self.execution_context(worker_ref.clone()), + }; + (backend, worker_ref, spawn_request) }; - let detail = record.detail(&state.runtime_id); - state.emit_create_diagnostics(&detail); - state.workers.insert(worker_id.clone(), record); - state.persist_runtime_snapshot()?; - state.persist_worker(&worker_id)?; - state.persist_event_by_id(event_id)?; - drop(state); - if let (Some(backend), Some(spawn_request)) = (backend, spawn_request) { - let result = backend.spawn_worker(spawn_request); - self.apply_spawn_result(&worker_ref, result) + let spawn_result = backend.spawn_worker(spawn_request); + let (handle, run_state) = match spawn_result { + WorkerExecutionSpawnResult::Connected { handle, run_state } => (handle, run_state), + WorkerExecutionSpawnResult::Rejected(result) + | WorkerExecutionSpawnResult::Errored(result) => { + self.rollback_failed_create(&worker_ref)?; + return Err(RuntimeError::WorkerExecutionRejected { + worker_id: worker_ref.worker_id.clone(), + operation: result.operation, + outcome: result.outcome, + message: result.message_or_default(), + result, + }); + } + }; + + if let Some(initial_input) = { + let state = self.lock()?; + state.worker(&worker_ref)?.request.initial_input.clone() + } { + let dispatch_result = backend.dispatch_input(&handle, initial_input); + if !dispatch_result.is_accepted() { + let _ = backend.stop_worker(&handle); + self.rollback_failed_create(&worker_ref)?; + return Err(RuntimeError::WorkerExecutionRejected { + worker_id: worker_ref.worker_id.clone(), + operation: dispatch_result.operation, + outcome: dispatch_result.outcome, + message: dispatch_result.message_or_default(), + result: dispatch_result, + }); + } + self.commit_created_worker( + &worker_ref, + handle, + WorkerExecutionRunState::Busy, + WorkerExecutionResult::accepted( + WorkerExecutionOperation::Input, + WorkerExecutionRunState::Busy, + ), + ) } else { - Ok(detail) + self.commit_created_worker( + &worker_ref, + handle, + run_state, + WorkerExecutionResult::accepted(WorkerExecutionOperation::Spawn, run_state), + ) } } @@ -414,38 +475,41 @@ impl Runtime { }) } - fn apply_spawn_result( + fn commit_created_worker( &self, worker_ref: &WorkerRef, - result: WorkerExecutionSpawnResult, + handle: WorkerExecutionHandle, + run_state: WorkerExecutionRunState, + result: WorkerExecutionResult, ) -> Result { let mut state = self.lock()?; let runtime_id = state.runtime_id.clone(); let detail = { let worker = state.worker_mut(worker_ref)?; - match result { - WorkerExecutionSpawnResult::Connected { handle, run_state } => { - worker.execution_handle = Some(handle); - worker.execution = WorkerExecutionStatus::connected(run_state).with_result( - WorkerExecutionResult::accepted(WorkerExecutionOperation::Spawn, run_state), - ); - } - WorkerExecutionSpawnResult::Rejected(result) - | WorkerExecutionSpawnResult::Errored(result) => { - worker.execution_handle = None; - worker.execution = WorkerExecutionStatus { - backend: WorkerExecutionBackendKind::Connected, - run_state: result.run_state, - last_result: Some(result), - }; - } - } + worker.execution_handle = Some(handle); + worker.execution = WorkerExecutionStatus::connected(run_state).with_result(result); worker.detail(&runtime_id) }; + state.persist_runtime_snapshot()?; state.persist_worker(&worker_ref.worker_id)?; + let worker = state.worker(worker_ref)?; + for entry in &worker.transcript { + state.persist_transcript_entry(&worker_ref.worker_id, entry.sequence)?; + } + state.persist_event_by_id(detail.last_event_id)?; Ok(detail) } + fn rollback_failed_create(&self, worker_ref: &WorkerRef) -> Result<(), RuntimeError> { + let mut state = self.lock()?; + if let Some(record) = state.workers.remove(&worker_ref.worker_id) { + state.events.retain(|event| { + event.id != record.last_event_id || event.worker_ref.as_ref() != Some(worker_ref) + }); + } + Ok(()) + } + fn record_execution_result( &self, worker_ref: &WorkerRef, @@ -827,7 +891,6 @@ struct RuntimeState { execution_backend: Option, next_worker_sequence: u64, next_event_id: u64, - next_diagnostic_id: u64, workers: BTreeMap, config_bundles: BTreeMap, events: Vec, @@ -852,7 +915,6 @@ impl RuntimeState { execution_backend: None, next_worker_sequence: 1, next_event_id: 1, - next_diagnostic_id: 1, workers: BTreeMap::new(), config_bundles: BTreeMap::new(), events: Vec::new(), @@ -883,7 +945,6 @@ impl RuntimeState { execution_backend: None, next_worker_sequence: 1, next_event_id: 1, - next_diagnostic_id: 1, workers: BTreeMap::new(), config_bundles: BTreeMap::new(), events: Vec::new(), @@ -1131,36 +1192,22 @@ impl RuntimeState { &self, request: &CreateWorkerRequest, ) -> Result<(), RuntimeError> { - match &request.config_bundle { - Some(reference) => { - let availability = self.check_config_bundle_ref(reference)?; - let bundle = self - .config_bundles - .get(&availability.reference.id) - .ok_or_else(|| RuntimeError::ConfigBundleMissing { - bundle_id: availability.reference.id.clone(), - })?; - if !bundle.contains_profile(&request.profile) { - return Err(RuntimeError::InvalidProfileSelector { - profile: profile_label(&request.profile), - bundle_id: Some(reference.id.clone()), - message: "profile selector is not declared by synced config bundle" - .to_string(), - }); - } - Ok(()) - } - None => match &request.profile { - ProfileSelector::RuntimeDefault | ProfileSelector::Builtin(_) => { - validate_profile_selector(request.profile.clone(), None) - } - ProfileSelector::Named(_) => Err(RuntimeError::InvalidProfileSelector { - profile: profile_label(&request.profile), - bundle_id: None, - message: "named profiles require a synced config bundle reference".to_string(), - }), - }, + let reference = &request.config_bundle; + let availability = self.check_config_bundle_ref(reference)?; + let bundle = self + .config_bundles + .get(&availability.reference.id) + .ok_or_else(|| RuntimeError::ConfigBundleMissing { + bundle_id: availability.reference.id.clone(), + })?; + if !bundle.contains_profile(&request.profile) { + return Err(RuntimeError::InvalidProfileSelector { + profile: profile_label(&request.profile), + bundle_id: Some(reference.id.clone()), + message: "profile selector is not declared by synced config bundle".to_string(), + }); } + Ok(()) } fn ensure_worker_ref(&self, worker_ref: &WorkerRef) -> Result<(), RuntimeError> { @@ -1373,43 +1420,6 @@ impl RuntimeState { status.run_state = next_run_state; true } - - fn push_diagnostic( - &mut self, - severity: DiagnosticSeverity, - code: impl Into, - message: impl Into, - worker_ref: Option, - ) { - let id = self.next_diagnostic_id; - self.next_diagnostic_id += 1; - self.diagnostics.push(RuntimeDiagnostic { - id, - severity, - code: code.into(), - message: message.into(), - worker_ref, - }); - } - - fn emit_create_diagnostics(&mut self, detail: &WorkerDetail) { - if detail.config_bundle.is_none() { - self.push_diagnostic( - DiagnosticSeverity::Info, - "runtime.local_default_resources", - "worker created without ConfigBundleRef; runtime-local defaults are assumed", - Some(detail.worker_ref.clone()), - ); - } - if detail.requested_capabilities.is_empty() { - self.push_diagnostic( - DiagnosticSeverity::Info, - "worker.tools_less", - "worker created without requested tool capabilities", - Some(detail.worker_ref.clone()), - ); - } - } } #[derive(Debug)] @@ -1433,10 +1443,8 @@ impl WorkerRecord { worker_id: self.worker_id.clone(), status: self.status, execution: self.execution.clone(), - intent: self.request.intent.clone(), profile: self.request.profile.clone(), - requested_capability_count: self.request.requested_capabilities.len(), - has_config_bundle: self.request.config_bundle.is_some(), + config_bundle: self.request.config_bundle.clone(), transcript_len: self.transcript.len(), last_event_id: self.last_event_id, } @@ -1449,12 +1457,8 @@ impl WorkerRecord { worker_id: self.worker_id.clone(), status: self.status, execution: self.execution.clone(), - intent: self.request.intent.clone(), profile: self.request.profile.clone(), config_bundle: self.request.config_bundle.clone(), - requested_capabilities: self.request.requested_capabilities.clone(), - workspace_refs: self.request.workspace_refs.clone(), - mount_refs: self.request.mount_refs.clone(), transcript_len: self.transcript.len(), last_event_id: self.last_event_id, } @@ -1483,17 +1487,20 @@ fn profile_label(selector: &ProfileSelector) -> String { } fn validate_create_worker_request(request: &CreateWorkerRequest) -> Result<(), RuntimeError> { - if let crate::catalog::WorkerIntent::Task { objective } = &request.intent { - if objective.trim().is_empty() { - return Err(RuntimeError::InvalidRequest( - "task objective must not be empty".to_string(), - )); - } + if request.config_bundle.id.trim().is_empty() { + return Err(RuntimeError::InvalidRequest( + "config_bundle.id must not be empty".to_string(), + )); } - for capability in &request.requested_capabilities { - if capability.name.trim().is_empty() { + if request.config_bundle.digest.trim().is_empty() { + return Err(RuntimeError::InvalidRequest( + "config_bundle.digest must not be empty".to_string(), + )); + } + if let Some(input) = &request.initial_input { + if input.content.trim().is_empty() { return Err(RuntimeError::InvalidRequest( - "capability name must not be empty".to_string(), + "initial_input.content must not be empty".to_string(), )); } } @@ -1512,7 +1519,7 @@ fn validate_worker_input(input: &WorkerInput) -> Result<(), RuntimeError> { #[cfg(test)] mod tests { use super::*; - use crate::catalog::{CapabilityRequest, ConfigBundleRef, ProfileSelector, WorkerIntent}; + use crate::catalog::{ConfigBundleRef, ProfileSelector}; use crate::config_bundle::{ ConfigBundle, ConfigBundleMetadata, ConfigBundleProvenance, ConfigDeclaration, ConfigDeclarationKind, ConfigProfileDescriptor, @@ -1524,19 +1531,45 @@ mod tests { use std::collections::BTreeMap; use std::sync::{Arc, Mutex}; - fn task_request(objective: &str) -> CreateWorkerRequest { + fn task_request(_objective: &str) -> CreateWorkerRequest { + let profile = ProfileSelector::Builtin("builtin:coder".to_string()); + let bundle = test_bundle_for_profile(profile.clone()); CreateWorkerRequest { - intent: WorkerIntent::Task { - objective: objective.to_string(), + profile, + config_bundle: ConfigBundleRef { + id: bundle.metadata.id, + digest: bundle.metadata.digest, }, - profile: ProfileSelector::Builtin("builtin:coder".to_string()), - config_bundle: None, - requested_capabilities: vec![CapabilityRequest::named("read")], - workspace_refs: Vec::new(), - mount_refs: Vec::new(), + initial_input: None, } } + fn test_bundle_for_profile(profile: ProfileSelector) -> ConfigBundle { + ConfigBundle { + metadata: ConfigBundleMetadata { + id: "bundle-1".to_string(), + digest: String::new(), + revision: "rev-1".to_string(), + workspace_id: "workspace-1".to_string(), + created_at: "2026-06-26T00:00:00Z".to_string(), + provenance: ConfigBundleProvenance { + source: "workspace-backend".to_string(), + detail: Some("profile-sync".to_string()), + }, + }, + profiles: vec![ConfigProfileDescriptor { + selector: profile, + label: Some("Coder".to_string()), + }], + declarations: vec![ConfigDeclaration { + kind: ConfigDeclarationKind::CapabilityGrant, + name: "read".to_string(), + reference: "capability:read".to_string(), + }], + } + .with_computed_digest() + } + #[derive(Default)] struct TestExecutionBackend { dispatch_result: Mutex>, @@ -1609,77 +1642,58 @@ mod tests { } fn runtime_with_backend() -> Runtime { - Runtime::with_execution_backend( + let runtime = Runtime::with_execution_backend( RuntimeOptions::default(), Arc::new(TestExecutionBackend::default()), ) - .unwrap() + .unwrap(); + runtime.store_config_bundle(test_bundle()).unwrap(); + runtime } fn runtime_and_backend() -> (Runtime, Arc) { let backend = Arc::new(TestExecutionBackend::default()); let runtime = Runtime::with_execution_backend(RuntimeOptions::default(), backend.clone()).unwrap(); + runtime.store_config_bundle(test_bundle()).unwrap(); (runtime, backend) } fn test_bundle() -> ConfigBundle { - ConfigBundle { - metadata: ConfigBundleMetadata { - id: "bundle-1".to_string(), - digest: String::new(), - revision: "rev-1".to_string(), - workspace_id: "workspace-1".to_string(), - created_at: "2026-06-26T00:00:00Z".to_string(), - provenance: ConfigBundleProvenance { - source: "workspace-backend".to_string(), - detail: Some("profile-sync".to_string()), - }, - }, - profiles: vec![ConfigProfileDescriptor { - selector: ProfileSelector::Builtin("builtin:coder".to_string()), - label: Some("Coder".to_string()), - }], - declarations: vec![ConfigDeclaration { - kind: ConfigDeclarationKind::CapabilityGrant, - name: "read".to_string(), - reference: "capability:read".to_string(), - }], - } - .with_computed_digest() + test_bundle_for_profile(ProfileSelector::Builtin("builtin:coder".to_string())) } fn bundled_task_request(objective: &str, bundle: &ConfigBundle) -> CreateWorkerRequest { let mut request = task_request(objective); - request.config_bundle = Some(ConfigBundleRef { + request.config_bundle = ConfigBundleRef { id: bundle.metadata.id.clone(), digest: bundle.metadata.digest.clone(), - }); + }; request } #[test] fn create_list_and_detail_preserve_runtime_worker_authority() { - let runtime = Runtime::new_memory(); + let runtime = runtime_with_backend(); let detail = runtime.create_worker(task_request("implement v0")).unwrap(); assert_eq!(detail.worker_ref.runtime_id, runtime.runtime_id().unwrap()); assert_eq!(detail.status, WorkerStatus::Running); - assert!(detail.config_bundle.is_none()); + assert_eq!(detail.config_bundle.id, "bundle-1"); let list = runtime.list_workers().unwrap(); assert_eq!(list.len(), 1); assert_eq!(list[0].worker_ref, detail.worker_ref); - assert_eq!(list[0].requested_capability_count, 1); + assert_eq!(list[0].config_bundle, detail.config_bundle); let fetched = runtime.worker_detail(&detail.worker_ref).unwrap(); assert_eq!(fetched.worker_id, detail.worker_id); - assert_eq!(fetched.intent, detail.intent); + assert_eq!(fetched.profile, detail.profile); } #[test] fn synced_config_bundle_is_stored_checked_and_used_for_worker_creation() { - let runtime = Runtime::new_memory(); + let runtime = runtime_with_backend(); let bundle = test_bundle(); let availability = runtime.store_config_bundle(bundle.clone()).unwrap(); assert_eq!(availability.reference.id, "bundle-1"); @@ -1697,7 +1711,7 @@ mod tests { let detail = runtime .create_worker(bundled_task_request("synced", &bundle)) .unwrap(); - assert_eq!(detail.config_bundle, Some(availability.reference)); + assert_eq!(detail.config_bundle, availability.reference); } #[test] @@ -1746,8 +1760,8 @@ mod tests { #[test] fn rejects_worker_refs_from_another_runtime() { - let runtime_a = Runtime::new_memory(); - let runtime_b = Runtime::new_memory(); + let runtime_a = runtime_with_backend(); + let runtime_b = runtime_with_backend(); let detail = runtime_a.create_worker(task_request("runtime a")).unwrap(); let err = runtime_b.worker_detail(&detail.worker_ref).unwrap_err(); @@ -1755,52 +1769,27 @@ mod tests { } #[test] - fn tools_less_worker_without_config_bundle_uses_local_defaults_and_diagnostics() { + fn create_worker_without_execution_backend_is_rejected_and_not_persisted() { let runtime = Runtime::new_memory(); - let detail = runtime - .create_worker(CreateWorkerRequest::tools_less( - WorkerIntent::default(), - ProfileSelector::RuntimeDefault, - )) - .unwrap(); - - assert!(detail.config_bundle.is_none()); - assert!(detail.requested_capabilities.is_empty()); - let diagnostics = runtime.diagnostics().unwrap(); - assert_eq!(diagnostics.len(), 2); - assert!( - diagnostics - .iter() - .any(|diagnostic| diagnostic.code == "runtime.local_default_resources") - ); - assert!( - diagnostics - .iter() - .any(|diagnostic| diagnostic.code == "worker.tools_less") - ); + runtime.store_config_bundle(test_bundle()).unwrap(); + let error = runtime + .create_worker(task_request("no backend")) + .unwrap_err(); + assert!(matches!( + error, + RuntimeError::ExecutionBackendUnavailable { .. } + )); + assert!(runtime.list_workers().unwrap().is_empty()); } #[test] - fn backend_unconnected_worker_input_is_rejected_and_not_transcribed() { + fn create_worker_missing_config_bundle_is_rejected_before_backend() { let runtime = Runtime::new_memory(); - let detail = runtime.create_worker(task_request("placeholder")).unwrap(); - assert_eq!( - detail.execution.backend, - WorkerExecutionBackendKind::Unconnected - ); - - let err = runtime - .send_input(&detail.worker_ref, WorkerInput::user("must reject")) + let error = runtime + .create_worker(task_request("missing bundle")) .unwrap_err(); - assert!(matches!( - err, - RuntimeError::WorkerExecutionUnavailable { .. } - )); - - let projection = runtime - .transcript_projection(&detail.worker_ref, TranscriptQuery::new(0, 1)) - .unwrap(); - assert_eq!(projection.total_items, 0); + assert!(matches!(error, RuntimeError::ConfigBundleMissing { .. })); + assert!(runtime.list_workers().unwrap().is_empty()); } #[test] @@ -1885,6 +1874,7 @@ mod tests { let runtime = Runtime::with_execution_backend(RuntimeOptions::default(), Arc::new(InputOnlyBackend)) .unwrap(); + runtime.store_config_bundle(test_bundle()).unwrap(); let detail = runtime.create_worker(task_request("no stop")).unwrap(); let err = runtime @@ -1916,6 +1906,7 @@ mod tests { Arc::new(TestExecutionBackend::default()), ) .unwrap(); + runtime.store_config_bundle(test_bundle()).unwrap(); let detail = runtime.create_worker(task_request("chat")).unwrap(); let first = runtime @@ -1946,7 +1937,7 @@ mod tests { #[test] fn stop_and_cancel_workers_update_summary() { - let runtime = Runtime::new_memory(); + let runtime = runtime_with_backend(); let stopped = runtime.create_worker(task_request("stop me")).unwrap(); let cancelled = runtime.create_worker(task_request("cancel me")).unwrap(); @@ -1969,7 +1960,7 @@ mod tests { #[test] fn stop_then_cancel_preserves_stopped_terminal_state() { - let runtime = Runtime::new_memory(); + let runtime = runtime_with_backend(); let cursor = runtime.event_cursor_from_start().unwrap(); let worker = runtime .create_worker(task_request("stable stopped")) @@ -2014,7 +2005,7 @@ mod tests { #[test] fn cancel_then_stop_preserves_cancelled_terminal_state() { - let runtime = Runtime::new_memory(); + let runtime = runtime_with_backend(); let cursor = runtime.event_cursor_from_start().unwrap(); let worker = runtime .create_worker(task_request("stable cancelled")) diff --git a/crates/workspace-server/src/companion.rs b/crates/workspace-server/src/companion.rs index ff40c01e..70443e27 100644 --- a/crates/workspace-server/src/companion.rs +++ b/crates/workspace-server/src/companion.rs @@ -2,7 +2,7 @@ use std::sync::{Arc, Mutex}; use chrono::Utc; use serde::{Deserialize, Serialize}; -use worker_runtime::catalog::{CapabilityRequest, ConfigBundleRef, ProfileSelector}; +use worker_runtime::catalog::ProfileSelector; use worker_runtime::config_bundle::{ ConfigBundle, ConfigBundleMetadata, ConfigBundleProvenance, ConfigProfileDescriptor, }; @@ -367,10 +367,6 @@ fn spawn_companion_worker(runtime: &RuntimeRegistry) -> CompanionWorkerState { let selector = companion_profile_selector(); let mut diagnostics = Vec::new(); let config_bundle = companion_config_bundle(); - let config_ref = ConfigBundleRef { - id: config_bundle.metadata.id.clone(), - digest: config_bundle.metadata.digest.clone(), - }; match runtime.sync_config_bundle(COMPANION_RUNTIME_ID, config_bundle) { Ok(result) => diagnostics.extend(result.diagnostics), @@ -390,8 +386,7 @@ fn spawn_companion_worker(runtime: &RuntimeRegistry) -> CompanionWorkerState { expected_segments: 0, }, profile: Some(selector), - config_bundle: Some(config_ref), - requested_capabilities: vec![CapabilityRequest::named("worker.input.user")], + initial_input: None, }, ); @@ -611,36 +606,37 @@ mod tests { } #[test] - fn companion_spawns_worker_with_companion_profile_and_diagnostic_when_not_input_capable() { - let registry = - RuntimeRegistry::for_workspace(EmbeddedWorkerRuntime::new_memory("local:test")); + fn companion_spawns_worker_with_companion_profile_through_runtime_backend() { + let registry = RuntimeRegistry::for_workspace( + EmbeddedWorkerRuntime::new_memory_with_execution_backend( + "local:test", + Arc::new(DeterministicExecutionBackend::default()), + ) + .unwrap(), + ); let registry = Arc::new(registry); let companion = CompanionConsole::new(registry.clone()); let status = companion.status(); let worker = status.worker.clone().expect("companion worker"); assert_eq!(worker.runtime_id, COMPANION_RUNTIME_ID); - assert_eq!(worker.role.as_deref(), Some("workspace_companion")); - assert!(!worker.capabilities.can_accept_input); - assert_eq!(status.transport.completion, "not_input_capable"); - assert!( - status - .diagnostics - .iter() - .any(|diagnostic| diagnostic.code == "companion_worker_not_input_capable") - ); + assert_eq!(worker.role.as_deref(), Some(COMPANION_PROFILE_ID)); + assert!(worker.capabilities.can_accept_input); + assert_eq!(status.transport.completion, "connected"); + assert!(status.diagnostics.is_empty()); let response = companion.send_message(CompanionMessageRequest { content: "hello".to_string(), }); - assert_eq!(response.state, CompanionState::Rejected); + assert_eq!(response.state, CompanionState::Accepted); + assert!(response.diagnostics.is_empty()); assert!( - !response - .diagnostics + response + .transcript + .items .iter() - .any(|diagnostic| diagnostic.code == "companion_llm_not_connected") + .any(|entry| entry.role == "user" && entry.content == "hello") ); - assert!(response.transcript.items.is_empty()); let worker_detail = registry .worker(COMPANION_RUNTIME_ID, &worker.worker_id) diff --git a/crates/workspace-server/src/hosts.rs b/crates/workspace-server/src/hosts.rs index 0f754821..955539d3 100644 --- a/crates/workspace-server/src/hosts.rs +++ b/crates/workspace-server/src/hosts.rs @@ -8,10 +8,13 @@ use serde::{Deserialize, Serialize}; use sha2::{Digest, Sha256}; use std::{sync::Arc, time::Duration}; use worker_runtime::catalog::{ - CapabilityRequest, ConfigBundleRef, CreateWorkerRequest, ProfileSelector, - WorkerDetail as EmbeddedWorkerDetail, WorkerIntent, WorkerStatus as EmbeddedWorkerStatus, + ConfigBundleRef, CreateWorkerRequest, ProfileSelector, WorkerDetail as EmbeddedWorkerDetail, + WorkerStatus as EmbeddedWorkerStatus, +}; +use worker_runtime::config_bundle::{ + ConfigBundle, ConfigBundleAvailability, ConfigBundleMetadata, ConfigBundleProvenance, + ConfigBundleSummary, ConfigProfileDescriptor, }; -use worker_runtime::config_bundle::{ConfigBundle, ConfigBundleAvailability, ConfigBundleSummary}; use worker_runtime::error::RuntimeError as EmbeddedRuntimeError; use worker_runtime::execution::WorkerExecutionRunState; use worker_runtime::http_server::{ @@ -236,11 +239,12 @@ pub struct WorkerLookupResult { /// Browser-safe worker spawn request shape. /// -/// The request intentionally carries only workspace policy intents, stable -/// worker identifiers, optional profile selectors, config bundle refs, and -/// requested capability names. Raw workspace roots, child cwd, executable path, -/// Runtime endpoints/credentials, raw bundle storage paths, and host-local -/// resolved WorkerSpec content are resolved by the runtime service and never +/// The request carries Browser-facing launch semantics only: workspace intent, +/// optional display identity, acceptance policy, optional profile selector, and +/// optional initial input. Runtime execution authority is resolved by the host +/// into a synced ConfigBundle before the canonical Runtime create request is +/// built. Raw workspace roots, child cwd, executable paths, tool scope, +/// credentials, raw config stores, sockets, sessions, and storage paths are not /// accepted from Workspace API callers. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct WorkerSpawnRequest { @@ -251,9 +255,7 @@ pub struct WorkerSpawnRequest { #[serde(default, skip_serializing_if = "Option::is_none")] pub profile: Option, #[serde(default, skip_serializing_if = "Option::is_none")] - pub config_bundle: Option, - #[serde(default, skip_serializing_if = "Vec::is_empty")] - pub requested_capabilities: Vec, + pub initial_input: Option, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] @@ -994,7 +996,7 @@ impl EmbeddedWorkerRuntime { worker_id: summary.worker_ref.worker_id.as_str().to_string(), host_id: self.host_id.clone(), label: safe_display_hint(summary.worker_ref.worker_id.as_str()), - role: embedded_intent_label(&summary.intent), + role: embedded_profile_label(&summary.profile), profile: embedded_profile_label(&summary.profile), workspace: WorkerWorkspaceSummary { visibility: "backend_internal".to_string(), @@ -1027,7 +1029,7 @@ impl EmbeddedWorkerRuntime { worker_id: detail.worker_id.as_str().to_string(), host_id: self.host_id.clone(), label: safe_display_hint(detail.worker_id.as_str()), - role: embedded_intent_label(&detail.intent), + role: embedded_profile_label(&detail.profile), profile: embedded_profile_label(&detail.profile), workspace: WorkerWorkspaceSummary { visibility: "backend_internal".to_string(), @@ -1195,26 +1197,35 @@ impl WorkspaceWorkerRuntime for EmbeddedWorkerRuntime { if matches!(request.acceptance, WorkerSpawnAcceptanceRequirement::RunAccepted { expected_segments } if expected_segments > 0) { diagnostics.push(diagnostic( - "embedded_runtime_tools_less", + "embedded_runtime_acceptance_projection", DiagnosticSeverity::Info, - "Embedded Runtime v0 creates a tools-less catalog Worker and does not spawn provider segments".to_string(), + "Embedded Runtime accepts creation through a runtime execution backend; provider segment counts are observed after execution, not faked at create time".to_string(), )); } + let profile = request + .profile + .clone() + .unwrap_or_else(|| embedded_profile_selector(&request.intent)); + let config_bundle = match self + .runtime + .store_config_bundle(default_embedded_config_bundle(&profile)) + { + Ok(availability) => availability.reference, + Err(error) => { + diagnostics.push(embedded_runtime_diagnostic(&error)); + return WorkerSpawnResult { + state: WorkerOperationState::Rejected, + worker: None, + acceptance_evidence: Vec::new(), + diagnostics, + }; + } + }; let create_request = CreateWorkerRequest { - intent: embedded_create_intent(&request.intent), - profile: request - .profile - .clone() - .unwrap_or_else(|| embedded_profile_selector(&request.intent)), - config_bundle: request.config_bundle.clone(), - requested_capabilities: if request.requested_capabilities.is_empty() { - vec![CapabilityRequest::named("read")] - } else { - request.requested_capabilities.clone() - }, - workspace_refs: Vec::new(), - mount_refs: Vec::new(), + profile, + config_bundle, + initial_input: request.initial_input.clone(), }; match self.runtime.create_worker(create_request) { Ok(detail) => { @@ -1675,7 +1686,7 @@ impl RemoteWorkerRuntime { worker_id: summary.worker_ref.worker_id.as_str().to_string(), host_id: self.host_id.clone(), label: safe_display_hint(summary.worker_ref.worker_id.as_str()), - role: embedded_intent_label(&summary.intent), + role: None, profile: embedded_profile_label(&summary.profile), workspace: WorkerWorkspaceSummary { visibility: "remote_runtime".to_string(), @@ -1707,7 +1718,7 @@ impl RemoteWorkerRuntime { worker_id: detail.worker_id.as_str().to_string(), host_id: self.host_id.clone(), label: safe_display_hint(detail.worker_id.as_str()), - role: embedded_intent_label(&detail.intent), + role: None, profile: embedded_profile_label(&detail.profile), workspace: WorkerWorkspaceSummary { visibility: "remote_runtime".to_string(), @@ -1875,20 +1886,24 @@ impl WorkspaceWorkerRuntime for RemoteWorkerRuntime { )], }; } + let profile = request + .profile + .clone() + .unwrap_or_else(|| embedded_profile_selector(&request.intent)); + let sync = self.sync_config_bundle(default_embedded_config_bundle(&profile)); + let Some(config_bundle) = sync.availability.map(|availability| availability.reference) + else { + return WorkerSpawnResult { + state: WorkerOperationState::Rejected, + worker: None, + acceptance_evidence: Vec::new(), + diagnostics: sync.diagnostics, + }; + }; let create = CreateWorkerRequest { - intent: embedded_create_intent(&request.intent), - profile: request - .profile - .clone() - .unwrap_or_else(|| embedded_profile_selector(&request.intent)), - config_bundle: request.config_bundle.clone(), - requested_capabilities: if request.requested_capabilities.is_empty() { - vec![CapabilityRequest::named("read")] - } else { - request.requested_capabilities.clone() - }, - workspace_refs: Vec::new(), - mount_refs: Vec::new(), + profile, + config_bundle, + initial_input: request.initial_input.clone(), }; match self.post_json::<_, RuntimeHttpWorkerResponse>("/v1/workers", &create) { Ok(response) => WorkerSpawnResult { @@ -2138,21 +2153,32 @@ fn embedded_worker_execution_status_label( } } -fn embedded_create_intent(intent: &WorkerSpawnIntent) -> WorkerIntent { - match intent { - WorkerSpawnIntent::WorkspaceCompanion => WorkerIntent::Role { - role: "workspace_companion".to_string(), - purpose: Some("workspace backend internal companion".to_string()), - }, - WorkerSpawnIntent::WorkspaceOrchestrator => WorkerIntent::Role { - role: "workspace_orchestrator".to_string(), - purpose: Some("workspace backend internal orchestration".to_string()), - }, - WorkerSpawnIntent::TicketRole { ticket_id, role } => WorkerIntent::Role { - role: ticket_role_profile_slug(role).to_string(), - purpose: Some(format!("ticket {ticket_id}")), +fn default_embedded_config_bundle(profile: &ProfileSelector) -> ConfigBundle { + let id = format!( + "workspace-runtime-{}", + embedded_profile_label(profile) + .unwrap_or_else(|| "default".to_string()) + .replace([':', '/', ' '], "-") + ); + ConfigBundle { + metadata: ConfigBundleMetadata { + id, + digest: String::new(), + revision: "workspace-runtime-v0".to_string(), + workspace_id: "workspace-server".to_string(), + created_at: "runtime-generated".to_string(), + provenance: ConfigBundleProvenance { + source: "workspace-server".to_string(), + detail: Some("backend-resolved launch bundle".to_string()), + }, }, + profiles: vec![ConfigProfileDescriptor { + selector: profile.clone(), + label: embedded_profile_label(profile), + }], + declarations: Vec::new(), } + .with_computed_digest() } fn embedded_profile_selector(intent: &WorkerSpawnIntent) -> ProfileSelector { @@ -2176,16 +2202,6 @@ fn ticket_role_profile_slug(role: &TicketWorkerRole) -> &'static str { } } -fn embedded_intent_label(intent: &WorkerIntent) -> Option { - match intent { - WorkerIntent::Assistant { purpose } => { - purpose.clone().or_else(|| Some("assistant".to_string())) - } - WorkerIntent::Task { objective } => Some(safe_display_hint(objective)), - WorkerIntent::Role { role, .. } => Some(safe_display_hint(role)), - } -} - fn embedded_profile_label(profile: &ProfileSelector) -> Option { Some(match profile { ProfileSelector::RuntimeDefault => "runtime_default".to_string(), @@ -2324,7 +2340,8 @@ fn embedded_runtime_diagnostic(error: &EmbeddedRuntimeError) -> RuntimeDiagnosti DiagnosticSeverity::Warning, "Embedded Runtime worker was not found".to_string(), ), - EmbeddedRuntimeError::WorkerExecutionUnavailable { .. } => diagnostic( + EmbeddedRuntimeError::WorkerExecutionUnavailable { .. } + | EmbeddedRuntimeError::ExecutionBackendUnavailable { .. } => diagnostic( "embedded_worker_execution_unavailable", DiagnosticSeverity::Warning, "Embedded Worker has no execution backend attached".to_string(), @@ -2962,8 +2979,7 @@ mod tests { expected_segments: 0, }, profile: None, - config_bundle: None, - requested_capabilities: Vec::new(), + initial_input: None, } } @@ -2978,13 +2994,10 @@ mod tests { assert_eq!(spawned.state, WorkerOperationState::Rejected); assert!(spawned.acceptance_evidence.is_empty()); assert!(spawned.diagnostics.iter().any(|diagnostic| { - diagnostic.code == "embedded_worker_execution_spawn_errored" + diagnostic.code == "embedded_worker_execution_rejected" && !diagnostic.message.contains("/tmp/secret-provider-config") })); - let worker = spawned.worker.expect("failed execution is still projected"); - assert_eq!(worker.status, "errored"); - assert!(!worker.capabilities.can_accept_input); - assert!(!worker.capabilities.can_stop); + assert!(spawned.worker.is_none()); } #[test] @@ -3035,8 +3048,13 @@ mod tests { #[test] fn embedded_runtime_registers_routes_input_and_transcript_without_internal_leaks() { - let registry = - RuntimeRegistry::for_workspace(EmbeddedWorkerRuntime::new_memory("local:test")); + let registry = RuntimeRegistry::for_workspace( + EmbeddedWorkerRuntime::new_memory_with_execution_backend( + "local:test", + Arc::new(AcceptingExecutionBackend::default()), + ) + .expect("test backend should connect"), + ); let runtimes = registry.list_runtimes(10); let embedded_summary = runtimes @@ -3050,7 +3068,7 @@ mod tests { ); assert_eq!(embedded_summary.source.status, RuntimeSourceStatus::Active); assert!(embedded_summary.capabilities.can_spawn_worker); - assert!(!embedded_summary.capabilities.can_accept_input); + assert!(embedded_summary.capabilities.can_accept_input); let spawned = registry .spawn_worker( @@ -3065,8 +3083,7 @@ mod tests { expected_segments: 0, }, profile: None, - config_bundle: None, - requested_capabilities: Vec::new(), + initial_input: None, }, ) .unwrap(); @@ -3083,7 +3100,7 @@ mod tests { assert_eq!(worker.workspace.identity, "runtime_registry_worker"); assert_eq!(worker.implementation.kind, "embedded_worker_runtime"); assert_eq!(worker.profile.as_deref(), Some("builtin:coder")); - assert!(!worker.capabilities.can_accept_input); + assert!(worker.capabilities.can_accept_input); let input = registry .send_input( @@ -3095,21 +3112,20 @@ mod tests { }, ) .unwrap(); - assert_eq!(input.state, WorkerOperationState::Rejected); + assert_eq!(input.state, WorkerOperationState::Accepted); assert_eq!(input.runtime_id, EMBEDDED_RUNTIME_ID); assert_eq!(input.worker_id, worker.worker_id); - assert!( - input - .diagnostics - .iter() - .any(|diagnostic| diagnostic.code == "embedded_worker_execution_unavailable") - ); let transcript = registry .transcript(EMBEDDED_RUNTIME_ID, &worker.worker_id, 0, 10) .unwrap(); assert_eq!(transcript.state, WorkerOperationState::Accepted); - assert!(transcript.items.is_empty()); + assert!( + transcript + .items + .iter() + .any(|entry| entry.role == "user" && entry.content == "hello embedded runtime") + ); let json = serde_json::to_string(&(embedded_summary, worker, transcript)).unwrap(); for forbidden in [ @@ -3132,9 +3148,13 @@ mod tests { #[test] fn embedded_backend_syncs_config_bundle_and_spawns_with_bundle_ref() { - let registry = RuntimeRegistry::new(vec![Arc::new(EmbeddedWorkerRuntime::new_memory( - "local:test", - ))]); + let registry = RuntimeRegistry::new(vec![Arc::new( + EmbeddedWorkerRuntime::new_memory_with_execution_backend( + "local:test", + Arc::new(AcceptingExecutionBackend::default()), + ) + .unwrap(), + )]); let bundle = test_config_bundle(); let sync = registry .sync_config_bundle(EMBEDDED_RUNTIME_ID, bundle.clone()) @@ -3162,8 +3182,7 @@ mod tests { expected_segments: 0, }, profile: Some(ProfileSelector::Builtin("builtin:coder".to_string())), - config_bundle: Some(reference), - requested_capabilities: vec![CapabilityRequest::named("read")], + initial_input: None, }, ) .unwrap(); @@ -3176,9 +3195,13 @@ mod tests { #[test] fn embedded_runtime_rejects_socket_ready_acceptance_without_socket_identity() { - let registry = RuntimeRegistry::new(vec![Arc::new(EmbeddedWorkerRuntime::new_memory( - "local:test", - ))]); + let registry = RuntimeRegistry::new(vec![Arc::new( + EmbeddedWorkerRuntime::new_memory_with_execution_backend( + "local:test", + Arc::new(AcceptingExecutionBackend::default()), + ) + .unwrap(), + )]); let result = registry .spawn_worker( EMBEDDED_RUNTIME_ID, @@ -3187,8 +3210,7 @@ mod tests { requested_worker_name: None, acceptance: WorkerSpawnAcceptanceRequirement::SocketReady, profile: None, - config_bundle: None, - requested_capabilities: Vec::new(), + initial_input: None, }, ) .unwrap(); @@ -3480,12 +3502,7 @@ mod tests { "execution": { "backend": "connected", "run_state": "idle" }, "intent": { "kind": "role", "role": "coder", "purpose": "remote test" }, "profile": { "kind": "builtin", "value": "coder" }, - "config_bundle": null, - "requested_capabilities": [], - "workspace_refs": [], - "mount_refs": [], - "requested_capability_count": 0, - "has_config_bundle": false, + "config_bundle": { "id": "remote-bundle", "digest": "remote-digest" }, "transcript_len": 0, "last_event_id": 0 }) diff --git a/crates/workspace-server/src/server.rs b/crates/workspace-server/src/server.rs index 55c5fe25..79fcf849 100644 --- a/crates/workspace-server/src/server.rs +++ b/crates/workspace-server/src/server.rs @@ -1063,6 +1063,7 @@ mod tests { use axum::http::Request; use futures::{SinkExt, StreamExt}; use serde_json::{Value, json}; + use std::sync::Arc; use tokio_tungstenite::connect_async; use tokio_tungstenite::tungstenite::Message; use tower::ServiceExt; @@ -1140,6 +1141,51 @@ mod tests { } } + fn runtime_test_bundle() -> worker_runtime::config_bundle::ConfigBundle { + worker_runtime::config_bundle::ConfigBundle { + metadata: worker_runtime::config_bundle::ConfigBundleMetadata { + id: "server-test-bundle".to_string(), + digest: String::new(), + revision: "test".to_string(), + workspace_id: "test".to_string(), + created_at: "test".to_string(), + provenance: worker_runtime::config_bundle::ConfigBundleProvenance { + source: "test".to_string(), + detail: None, + }, + }, + profiles: vec![worker_runtime::config_bundle::ConfigProfileDescriptor { + selector: worker_runtime::catalog::ProfileSelector::RuntimeDefault, + label: Some("server-test".to_string()), + }], + declarations: Vec::new(), + } + .with_computed_digest() + } + + fn runtime_create_request() -> worker_runtime::catalog::CreateWorkerRequest { + let bundle = runtime_test_bundle(); + worker_runtime::catalog::CreateWorkerRequest { + profile: worker_runtime::catalog::ProfileSelector::RuntimeDefault, + config_bundle: worker_runtime::catalog::ConfigBundleRef { + id: bundle.metadata.id, + digest: bundle.metadata.digest, + }, + initial_input: None, + } + } + + fn runtime_with_worker() -> (worker_runtime::Runtime, worker_runtime::identity::WorkerRef) { + let runtime = worker_runtime::Runtime::with_execution_backend( + worker_runtime::RuntimeOptions::default(), + Arc::new(DeterministicExecutionBackend::default()), + ) + .unwrap(); + runtime.store_config_bundle(runtime_test_bundle()).unwrap(); + let worker = runtime.create_worker(runtime_create_request()).unwrap(); + (runtime, worker.worker_ref) + } + #[tokio::test] async fn serves_bounded_read_apis_and_static_spa_separately() { let dir = tempfile::tempdir().unwrap(); @@ -1153,7 +1199,13 @@ mod tests { let store = SqliteWorkspaceStore::in_memory().unwrap(); let mut config = ServerConfig::local_dev(dir.path(), test_identity()); config.static_assets_dir = Some(static_dir); - let api = WorkspaceApi::new(config, Arc::new(store)).await.unwrap(); + let api = WorkspaceApi::new_with_execution_backend( + config, + Arc::new(store), + Arc::new(DeterministicExecutionBackend::default()), + ) + .await + .unwrap(); let app = build_router(api); let workspace = get_json(app.clone(), "/api/workspace").await; @@ -1260,7 +1312,7 @@ mod tests { let worker_items = workers["items"].as_array().unwrap(); let companion_worker = worker_items .iter() - .find(|worker| worker["role"] == "workspace_companion") + .find(|worker| worker["role"] == "builtin:companion") .expect("companion worker is visible through runtime worker API"); assert_eq!(companion_worker["runtime_id"], "embedded-worker-runtime"); assert!(companion_worker["capabilities"]["can_stop"].is_boolean()); @@ -1270,7 +1322,7 @@ mod tests { companion_status["state"].as_str(), Some("ready") | Some("error") )); - assert_eq!(companion_status["worker"]["role"], "workspace_companion"); + assert_eq!(companion_status["worker"]["role"], "builtin:companion"); assert_eq!( companion_status["transport"]["kind"], "embedded_worker_runtime" @@ -1305,7 +1357,7 @@ mod tests { .as_array() .unwrap() .iter() - .any(|worker| worker["role"] == "workspace_companion") + .any(|worker| worker["role"] == "builtin:companion") ); let runs_response = app @@ -1477,7 +1529,13 @@ mod tests { let dir = tempfile::tempdir().unwrap(); let store = SqliteWorkspaceStore::in_memory().unwrap(); let config = ServerConfig::local_dev(dir.path(), test_identity()); - let api = WorkspaceApi::new(config, Arc::new(store)).await.unwrap(); + let api = WorkspaceApi::new_with_execution_backend( + config, + Arc::new(store), + Arc::new(DeterministicExecutionBackend::default()), + ) + .await + .unwrap(); let app = build_router(api); let runtimes = get_json(app.clone(), "/api/runtimes").await; @@ -1515,20 +1573,14 @@ mod tests { }), ) .await; - assert_eq!(spawned["state"], "rejected"); - assert!( - spawned["diagnostics"] - .as_array() - .unwrap() - .iter() - .any(|diagnostic| { - diagnostic["code"] == "embedded_worker_execution_spawn_errored" - && !diagnostic["message"] - .as_str() - .unwrap() - .contains("/workspace/demo") - }) - ); + assert_eq!(spawned["state"], "accepted"); + let diagnostics = spawned["diagnostics"].as_array().unwrap(); + assert!(diagnostics.iter().all(|diagnostic| { + !diagnostic["message"] + .as_str() + .unwrap_or_default() + .contains("/workspace/demo") + })); let worker_id = spawned["worker"]["worker_id"].as_str().unwrap().to_string(); assert_eq!(spawned["worker"]["runtime_id"], "embedded-worker-runtime"); assert_eq!( @@ -1557,16 +1609,10 @@ mod tests { }), ) .await; - assert_eq!(accepted["state"], "rejected"); + assert_eq!(accepted["state"], "accepted"); assert_eq!(accepted["runtime_id"], "embedded-worker-runtime"); assert_eq!(accepted["worker_id"], worker_id); - assert!( - accepted["diagnostics"] - .as_array() - .unwrap() - .iter() - .any(|diagnostic| diagnostic["code"] == "embedded_worker_execution_unavailable") - ); + assert!(accepted["diagnostics"].as_array().unwrap().is_empty()); let transcript = get_json( app.clone(), @@ -1574,7 +1620,9 @@ mod tests { ) .await; assert_eq!(transcript["state"], "accepted"); - assert!(transcript["items"].as_array().unwrap().is_empty()); + assert!(transcript["items"].as_array().unwrap().iter().any( + |item| item["role"] == "user" && item["content"] == "hello from browser-facing api" + )); let wrong_runtime = app .clone() @@ -1620,10 +1668,7 @@ mod tests { #[tokio::test] async fn proxies_worker_observation_ws_with_backend_cursors_and_diagnostics() { - let runtime = worker_runtime::Runtime::new_memory(); - let worker = runtime - .create_worker(worker_runtime::catalog::CreateWorkerRequest::default()) - .unwrap(); + let (runtime, worker_ref) = runtime_with_worker(); let runtime_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); let runtime_addr = runtime_listener.local_addr().unwrap(); tokio::spawn({ @@ -1645,11 +1690,17 @@ mod tests { worker_id: "worker-a".into(), endpoint: format!( "ws://{runtime_addr}/v1/workers/{}/events/ws", - worker.worker_ref.worker_id + worker_ref.worker_id ), bearer_token: None, }); - let api = WorkspaceApi::new(config, Arc::new(store)).await.unwrap(); + let api = WorkspaceApi::new_with_execution_backend( + config, + Arc::new(store), + Arc::new(DeterministicExecutionBackend::default()), + ) + .await + .unwrap(); let app_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); let app_addr = app_listener.local_addr().unwrap(); tokio::spawn(async move { axum::serve(app_listener, build_router(api)).await.unwrap() }); @@ -1666,7 +1717,7 @@ mod tests { runtime .observe_worker_event( - &worker.worker_ref, + &worker_ref, protocol::Event::TextDelta { text: "live".into(), }, @@ -1686,7 +1737,7 @@ mod tests { let _snapshot = next_client_frame(&mut resumed).await; runtime .observe_worker_event( - &worker.worker_ref, + &worker_ref, protocol::Event::TextDone { text: "done".into(), }, @@ -1822,10 +1873,7 @@ mod tests { worker_runtime::identity::WorkerRef, String, ) { - let runtime = worker_runtime::Runtime::new_memory(); - let worker = runtime - .create_worker(worker_runtime::catalog::CreateWorkerRequest::default()) - .unwrap(); + let (runtime, worker_ref) = runtime_with_worker(); let runtime_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); let runtime_addr = runtime_listener.local_addr().unwrap(); tokio::spawn({ @@ -1838,9 +1886,9 @@ mod tests { }); let endpoint = format!( "ws://{runtime_addr}/v1/workers/{}/events/ws", - worker.worker_ref.worker_id + worker_ref.worker_id ); - (runtime, worker.worker_ref, endpoint) + (runtime, worker_ref, endpoint) } async fn spawn_workspace_proxy( @@ -1852,7 +1900,13 @@ mod tests { let runtime_id = source.runtime_id.clone(); let worker_id = source.worker_id.clone(); config.runtime_event_sources.push(source); - let api = WorkspaceApi::new(config, Arc::new(store)).await.unwrap(); + let api = WorkspaceApi::new_with_execution_backend( + config, + Arc::new(store), + Arc::new(DeterministicExecutionBackend::default()), + ) + .await + .unwrap(); let app_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); let app_addr = app_listener.local_addr().unwrap(); tokio::spawn(async move { axum::serve(app_listener, build_router(api)).await.unwrap() });