From 2d5971738478f832ba9a135601ea11dda60c565d Mon Sep 17 00:00:00 2001 From: Hare Date: Sun, 28 Jun 2026 04:33:01 +0900 Subject: [PATCH 1/2] feat: add worker execution backend boundary --- crates/worker-runtime/src/catalog.rs | 3 + crates/worker-runtime/src/error.rs | 16 + crates/worker-runtime/src/execution.rs | 337 ++++++++++++++++ crates/worker-runtime/src/http_server.rs | 49 ++- crates/worker-runtime/src/lib.rs | 1 + crates/worker-runtime/src/runtime.rs | 480 +++++++++++++++++++++-- crates/workspace-server/src/hosts.rs | 19 +- crates/workspace-server/src/server.rs | 2 +- 8 files changed, 877 insertions(+), 30 deletions(-) create mode 100644 crates/worker-runtime/src/execution.rs diff --git a/crates/worker-runtime/src/catalog.rs b/crates/worker-runtime/src/catalog.rs index 85c45d37..d32623d3 100644 --- a/crates/worker-runtime/src/catalog.rs +++ b/crates/worker-runtime/src/catalog.rs @@ -1,3 +1,4 @@ +use crate::execution::WorkerExecutionStatus; use crate::identity::{RuntimeId, WorkerId, WorkerRef}; use serde::{Deserialize, Serialize}; @@ -142,6 +143,7 @@ pub struct WorkerSummary { pub runtime_id: RuntimeId, pub worker_id: WorkerId, pub status: WorkerStatus, + pub execution: WorkerExecutionStatus, pub intent: WorkerIntent, pub profile: ProfileSelector, pub requested_capability_count: usize, @@ -157,6 +159,7 @@ pub struct WorkerDetail { pub runtime_id: RuntimeId, pub worker_id: WorkerId, pub status: WorkerStatus, + pub execution: WorkerExecutionStatus, pub intent: WorkerIntent, pub profile: ProfileSelector, #[serde(default, skip_serializing_if = "Option::is_none")] diff --git a/crates/worker-runtime/src/error.rs b/crates/worker-runtime/src/error.rs index 54484d64..1df15ba5 100644 --- a/crates/worker-runtime/src/error.rs +++ b/crates/worker-runtime/src/error.rs @@ -1,3 +1,4 @@ +use crate::execution::WorkerExecutionResult; use crate::identity::{RuntimeId, WorkerId}; use std::path::PathBuf; @@ -28,6 +29,21 @@ pub enum RuntimeError { worker_id: WorkerId, }, + #[error("worker {worker_id} has no execution backend: {message}")] + WorkerExecutionUnavailable { + worker_id: WorkerId, + message: String, + }, + + #[error("worker {worker_id} execution {operation:?} returned {outcome:?}: {message}")] + WorkerExecutionRejected { + worker_id: WorkerId, + operation: crate::execution::WorkerExecutionOperation, + outcome: crate::execution::WorkerExecutionOutcome, + message: String, + result: WorkerExecutionResult, + }, + #[error("limit {requested} exceeds maximum {max}")] LimitTooLarge { requested: usize, max: usize }, diff --git a/crates/worker-runtime/src/execution.rs b/crates/worker-runtime/src/execution.rs new file mode 100644 index 00000000..46d9bba3 --- /dev/null +++ b/crates/worker-runtime/src/execution.rs @@ -0,0 +1,337 @@ +use crate::catalog::CreateWorkerRequest; +use crate::error::RuntimeError; +use crate::identity::WorkerRef; +use crate::interaction::WorkerInput; +#[cfg(feature = "ws-server")] +use crate::observation::WorkerObservationEvent; +use serde::{Deserialize, Serialize}; +use std::fmt; +use std::sync::Arc; + +/// Coarse execution attachment visible through Worker catalog/detail responses. +/// +/// This deliberately does not expose backend handles, process paths, sockets, +/// credentials, session files, or manifest paths. It only says whether Runtime +/// has an execution backend attached for the Worker. +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum WorkerExecutionBackendKind { + #[default] + Unconnected, + Connected, +} + +/// Current execution-side run state for a Worker. +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum WorkerExecutionRunState { + #[default] + Unconnected, + Idle, + Busy, + Rejected, + Errored, + Stopped, +} + +/// Execution operation that produced a result. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum WorkerExecutionOperation { + Spawn, + Input, + Stop, + Cancel, +} + +/// Typed execution result class. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum WorkerExecutionOutcome { + Accepted, + Busy, + Rejected, + Errored, + Unsupported, +} + +/// Backend result for a Worker execution operation. +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct WorkerExecutionResult { + pub operation: WorkerExecutionOperation, + pub outcome: WorkerExecutionOutcome, + pub run_state: WorkerExecutionRunState, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub message: Option, +} + +impl WorkerExecutionResult { + pub fn accepted( + operation: WorkerExecutionOperation, + run_state: WorkerExecutionRunState, + ) -> Self { + Self { + operation, + outcome: WorkerExecutionOutcome::Accepted, + run_state, + message: None, + } + } + + pub fn busy(operation: WorkerExecutionOperation, message: impl Into) -> Self { + Self { + operation, + outcome: WorkerExecutionOutcome::Busy, + run_state: WorkerExecutionRunState::Busy, + message: Some(message.into()), + } + } + + pub fn rejected(operation: WorkerExecutionOperation, message: impl Into) -> Self { + Self { + operation, + outcome: WorkerExecutionOutcome::Rejected, + run_state: WorkerExecutionRunState::Rejected, + message: Some(message.into()), + } + } + + pub fn errored(operation: WorkerExecutionOperation, message: impl Into) -> Self { + Self { + operation, + outcome: WorkerExecutionOutcome::Errored, + run_state: WorkerExecutionRunState::Errored, + message: Some(message.into()), + } + } + + pub fn unsupported(operation: WorkerExecutionOperation, message: impl Into) -> Self { + Self { + operation, + outcome: WorkerExecutionOutcome::Unsupported, + run_state: WorkerExecutionRunState::Rejected, + message: Some(message.into()), + } + } + + pub fn is_accepted(&self) -> bool { + self.outcome == WorkerExecutionOutcome::Accepted + } + + pub fn message_or_default(&self) -> String { + self.message + .clone() + .unwrap_or_else(|| format!("{:?} {:?}", self.operation, self.outcome)) + } +} + +/// Execution status surfaced in Worker summary/detail responses. +#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)] +pub struct WorkerExecutionStatus { + pub backend: WorkerExecutionBackendKind, + pub run_state: WorkerExecutionRunState, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub last_result: Option, +} + +impl WorkerExecutionStatus { + pub fn unconnected() -> Self { + Self::default() + } + + pub fn connected(run_state: WorkerExecutionRunState) -> Self { + Self { + backend: WorkerExecutionBackendKind::Connected, + run_state, + last_result: None, + } + } + + pub fn with_result(mut self, result: WorkerExecutionResult) -> Self { + self.run_state = result.run_state; + self.last_result = Some(result); + self + } +} + +/// Opaque per-Worker execution handle returned by a backend. +/// +/// The handle is a typed token for routing calls back into the same backend. It +/// intentionally contains no socket path, process id, credential, manifest path, +/// or session path. +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct WorkerExecutionHandle { + worker_ref: WorkerRef, + backend_id: String, +} + +impl WorkerExecutionHandle { + pub fn new(worker_ref: WorkerRef, backend_id: impl Into) -> Self { + Self { + worker_ref, + backend_id: backend_id.into(), + } + } + + pub fn worker_ref(&self) -> &WorkerRef { + &self.worker_ref + } + + pub fn backend_id(&self) -> &str { + &self.backend_id + } +} + +/// Runtime hooks available to an execution backend for one Worker. +#[derive(Clone)] +pub struct WorkerExecutionContext { + worker_ref: WorkerRef, + #[cfg(feature = "ws-server")] + observation_publisher: Arc< + dyn Fn(WorkerRef, protocol::Event) -> Result + + Send + + Sync, + >, +} + +impl WorkerExecutionContext { + #[cfg(feature = "ws-server")] + pub(crate) fn new( + worker_ref: WorkerRef, + observation_publisher: Arc< + dyn Fn(WorkerRef, protocol::Event) -> Result + + Send + + Sync, + >, + ) -> Self { + Self { + worker_ref, + observation_publisher, + } + } + + #[cfg(not(feature = "ws-server"))] + pub(crate) fn new(worker_ref: WorkerRef) -> Self { + Self { worker_ref } + } + + pub fn worker_ref(&self) -> &WorkerRef { + &self.worker_ref + } + + /// Publish a protocol event into the Runtime observation bus. + #[cfg(feature = "ws-server")] + pub fn publish_protocol_event( + &self, + payload: protocol::Event, + ) -> Result { + (self.observation_publisher)(self.worker_ref.clone(), payload) + } +} + +impl fmt::Debug for WorkerExecutionContext { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("WorkerExecutionContext") + .field("worker_ref", &self.worker_ref) + .finish_non_exhaustive() + } +} + +/// Spawn/initialization request passed to an execution backend. +#[derive(Clone, Debug)] +pub struct WorkerExecutionSpawnRequest { + pub worker_ref: WorkerRef, + pub request: CreateWorkerRequest, + pub context: WorkerExecutionContext, +} + +/// Result of backend Worker spawn/initialization. +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum WorkerExecutionSpawnResult { + Connected { + handle: WorkerExecutionHandle, + run_state: WorkerExecutionRunState, + }, + Rejected(WorkerExecutionResult), + Errored(WorkerExecutionResult), +} + +/// Backend boundary for Worker execution. +/// +/// Runtime owns Worker catalog, transcript, observation, and lifecycle state. A +/// backend owns concrete execution. The default Runtime has no backend, so input +/// to those Workers is rejected instead of producing providerless responses. +pub trait WorkerExecutionBackend: Send + Sync + 'static { + fn backend_id(&self) -> &str; + + fn spawn_worker(&self, request: WorkerExecutionSpawnRequest) -> WorkerExecutionSpawnResult; + + fn dispatch_input( + &self, + handle: &WorkerExecutionHandle, + input: WorkerInput, + ) -> WorkerExecutionResult; + + fn stop_worker(&self, _handle: &WorkerExecutionHandle) -> WorkerExecutionResult { + WorkerExecutionResult::unsupported( + WorkerExecutionOperation::Stop, + "execution backend does not support stopping workers", + ) + } + + fn cancel_worker(&self, _handle: &WorkerExecutionHandle) -> WorkerExecutionResult { + WorkerExecutionResult::unsupported( + WorkerExecutionOperation::Cancel, + "execution backend does not support cancelling workers", + ) + } +} + +#[derive(Clone)] +pub(crate) struct WorkerExecutionBackendRef { + id: String, + backend: Arc, +} + +impl WorkerExecutionBackendRef { + pub(crate) fn new(backend: Arc) -> Result { + let id = backend.backend_id().trim().to_string(); + if id.is_empty() { + return Err(RuntimeError::InvalidRequest( + "execution backend id must not be empty".to_string(), + )); + } + Ok(Self { id, backend }) + } + + pub(crate) fn spawn_worker( + &self, + request: WorkerExecutionSpawnRequest, + ) -> WorkerExecutionSpawnResult { + self.backend.spawn_worker(request) + } + + pub(crate) fn dispatch_input( + &self, + handle: &WorkerExecutionHandle, + input: WorkerInput, + ) -> WorkerExecutionResult { + self.backend.dispatch_input(handle, input) + } + + pub(crate) fn stop_worker(&self, handle: &WorkerExecutionHandle) -> WorkerExecutionResult { + self.backend.stop_worker(handle) + } + + pub(crate) fn cancel_worker(&self, handle: &WorkerExecutionHandle) -> WorkerExecutionResult { + self.backend.cancel_worker(handle) + } +} + +impl fmt::Debug for WorkerExecutionBackendRef { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("WorkerExecutionBackendRef") + .field("id", &self.id) + .finish_non_exhaustive() + } +} diff --git a/crates/worker-runtime/src/http_server.rs b/crates/worker-runtime/src/http_server.rs index 7bd2bdfb..588a03a5 100644 --- a/crates/worker-runtime/src/http_server.rs +++ b/crates/worker-runtime/src/http_server.rs @@ -822,7 +822,9 @@ fn status_for_runtime_error(error: &RuntimeError) -> StatusCode { RuntimeError::WorkerNotFound { .. } | RuntimeError::ConfigBundleMissing { .. } => { StatusCode::NOT_FOUND } - RuntimeError::RuntimeStopped { .. } => StatusCode::CONFLICT, + RuntimeError::RuntimeStopped { .. } + | RuntimeError::WorkerExecutionUnavailable { .. } + | RuntimeError::WorkerExecutionRejected { .. } => StatusCode::CONFLICT, RuntimeError::LimitTooLarge { .. } | RuntimeError::InvalidRequest(_) | RuntimeError::ConfigBundleDigestMismatch { .. } @@ -843,6 +845,8 @@ fn code_for_runtime_error(error: &RuntimeError) -> &'static str { RuntimeError::WrongRuntime { .. } => "wrong_runtime", RuntimeError::WrongRuntimeCursor { .. } => "wrong_runtime_cursor", RuntimeError::WorkerNotFound { .. } => "worker_not_found", + RuntimeError::WorkerExecutionUnavailable { .. } => "worker_execution_unavailable", + RuntimeError::WorkerExecutionRejected { .. } => "worker_execution_rejected", RuntimeError::LimitTooLarge { .. } => "limit_too_large", RuntimeError::InvalidRequest(_) => "invalid_request", RuntimeError::ConfigBundleMissing { .. } => "config_bundle_missing", @@ -869,6 +873,12 @@ pub enum RuntimeHttpServerError { mod tests { use super::*; use crate::catalog::{CapabilityRequest, ProfileSelector, WorkerIntent}; + use crate::execution::{ + WorkerExecutionBackend, WorkerExecutionHandle, WorkerExecutionOperation, + WorkerExecutionResult, WorkerExecutionRunState, WorkerExecutionSpawnRequest, + WorkerExecutionSpawnResult, + }; + use crate::management::RuntimeOptions; use axum::body::to_bytes; use axum::http::Method; use tower::ServiceExt; @@ -886,6 +896,39 @@ mod tests { } } + struct AcceptingBackend; + + impl WorkerExecutionBackend for AcceptingBackend { + fn backend_id(&self) -> &str { + "http-test" + } + + fn spawn_worker(&self, request: WorkerExecutionSpawnRequest) -> WorkerExecutionSpawnResult { + WorkerExecutionSpawnResult::Connected { + handle: WorkerExecutionHandle::new(request.worker_ref, self.backend_id()), + run_state: WorkerExecutionRunState::Idle, + } + } + + fn dispatch_input( + &self, + _handle: &WorkerExecutionHandle, + _input: WorkerInput, + ) -> WorkerExecutionResult { + WorkerExecutionResult::accepted( + WorkerExecutionOperation::Input, + WorkerExecutionRunState::Idle, + ) + } + + fn stop_worker(&self, _handle: &WorkerExecutionHandle) -> WorkerExecutionResult { + WorkerExecutionResult::accepted( + WorkerExecutionOperation::Stop, + WorkerExecutionRunState::Stopped, + ) + } + } + async fn json_request( app: Router, method: Method, @@ -923,7 +966,9 @@ mod tests { #[tokio::test] async fn rest_command_api_delegates_to_runtime() { - let runtime = Runtime::new_memory(); + let runtime = + Runtime::with_execution_backend(RuntimeOptions::default(), Arc::new(AcceptingBackend)) + .unwrap(); let app = runtime_http_router(runtime.clone(), None); let response = json_request( diff --git a/crates/worker-runtime/src/lib.rs b/crates/worker-runtime/src/lib.rs index e6fa362d..97a29e00 100644 --- a/crates/worker-runtime/src/lib.rs +++ b/crates/worker-runtime/src/lib.rs @@ -11,6 +11,7 @@ pub mod catalog; pub mod config_bundle; pub mod diagnostics; pub mod error; +pub mod execution; #[cfg(feature = "fs-store")] pub mod fs_store; #[cfg(feature = "http-server")] diff --git a/crates/worker-runtime/src/runtime.rs b/crates/worker-runtime/src/runtime.rs index cde65c15..16b1283a 100644 --- a/crates/worker-runtime/src/runtime.rs +++ b/crates/worker-runtime/src/runtime.rs @@ -8,6 +8,11 @@ use crate::config_bundle::{ }; use crate::diagnostics::{DiagnosticSeverity, RuntimeDiagnostic}; use crate::error::RuntimeError; +use crate::execution::{ + WorkerExecutionBackend, WorkerExecutionBackendKind, WorkerExecutionBackendRef, + WorkerExecutionHandle, WorkerExecutionOperation, WorkerExecutionResult, + WorkerExecutionSpawnRequest, WorkerExecutionSpawnResult, WorkerExecutionStatus, +}; #[cfg(feature = "fs-store")] use crate::fs_store::{ FsRuntimeStore, FsRuntimeStoreOptions, PersistedRuntimeState, PersistedWorkerRecord, @@ -63,6 +68,16 @@ impl Runtime { } } + /// Create a memory-backed Runtime with an attached execution backend. + pub fn with_execution_backend( + options: RuntimeOptions, + backend: Arc, + ) -> Result { + let runtime = Self::with_options(options); + runtime.install_execution_backend(backend)?; + Ok(runtime) + } + /// Create or restore a filesystem-backed Runtime. /// /// The store is scoped by typed Runtime identity under `options.root`; if the @@ -71,11 +86,28 @@ impl Runtime { /// created before the Runtime is returned. #[cfg(feature = "fs-store")] pub fn with_fs_store(options: FsRuntimeStoreOptions) -> Result { + Self::with_fs_store_inner(options, None) + } + + /// Create or restore a filesystem-backed Runtime with an execution backend. + #[cfg(feature = "fs-store")] + pub fn with_fs_store_and_execution_backend( + options: FsRuntimeStoreOptions, + backend: Arc, + ) -> Result { + Self::with_fs_store_inner(options, Some(WorkerExecutionBackendRef::new(backend)?)) + } + + #[cfg(feature = "fs-store")] + fn with_fs_store_inner( + options: FsRuntimeStoreOptions, + execution_backend: Option, + ) -> Result { let runtime_id = options.runtime_id.unwrap_or_else(|| { RuntimeId::generated(NEXT_RUNTIME_SEQUENCE.fetch_add(1, Ordering::Relaxed)) }); let opened = FsRuntimeStore::open_or_create(options.root, runtime_id.clone())?; - let state = if let Some(persisted) = opened.state { + let mut state = if let Some(persisted) = opened.state { RuntimeState::from_persisted(persisted, opened.store)? } else { let mut state = RuntimeState::new_fs_backed( @@ -90,6 +122,7 @@ impl Runtime { state.persist_event_by_id(event_id)?; state }; + state.execution_backend = execution_backend; Ok(Self { inner: Arc::new(Mutex::new(state)), }) @@ -210,6 +243,12 @@ impl Runtime { let worker_id = WorkerId::generated(state.next_worker_sequence); state.next_worker_sequence += 1; let worker_ref = WorkerRef::new(state.runtime_id.clone(), worker_id.clone()); + let backend = state.execution_backend.clone(); + let spawn_request = backend.as_ref().map(|_| WorkerExecutionSpawnRequest { + worker_ref: worker_ref.clone(), + request: request.clone(), + context: self.execution_context(worker_ref.clone()), + }); let event_id = state.push_event( Some(worker_ref.clone()), RuntimeEventKind::WorkerCreated, @@ -217,10 +256,12 @@ impl Runtime { ); let record = WorkerRecord { - worker_ref, + worker_ref: worker_ref.clone(), worker_id: worker_id.clone(), status: WorkerStatus::Running, request, + execution: WorkerExecutionStatus::unconnected(), + execution_handle: None, transcript: Vec::new(), next_transcript_sequence: 1, last_event_id: event_id, @@ -231,7 +272,14 @@ impl Runtime { state.persist_runtime_snapshot()?; state.persist_worker(&worker_id)?; state.persist_event_by_id(event_id)?; - Ok(detail) + drop(state); + + if let (Some(backend), Some(spawn_request)) = (backend, spawn_request) { + let result = backend.spawn_worker(spawn_request); + self.apply_spawn_result(&worker_ref, result) + } else { + Ok(detail) + } } /// List Workers known to this Runtime. @@ -257,11 +305,11 @@ impl Runtime { worker_ref: &WorkerRef, input: WorkerInput, ) -> Result { - let mut state = self.lock()?; - state.ensure_running()?; - validate_worker_input(&input)?; - state.ensure_worker_ref(worker_ref)?; - { + let (backend, handle) = { + let mut state = self.lock()?; + state.ensure_running()?; + validate_worker_input(&input)?; + state.ensure_worker_ref(worker_ref)?; let worker = state.worker(worker_ref)?; if !worker.status.is_active() { return Err(RuntimeError::InvalidRequest(format!( @@ -269,8 +317,40 @@ impl Runtime { worker_ref.worker_id ))); } + let backend = state.execution_backend.clone(); + let handle = worker.execution_handle.clone(); + match (backend, handle) { + (Some(backend), Some(handle)) => (backend, handle), + _ => { + let result = WorkerExecutionResult::rejected( + WorkerExecutionOperation::Input, + "worker has no execution backend", + ); + let worker = state.worker_mut(worker_ref)?; + worker.execution = WorkerExecutionStatus::unconnected().with_result(result); + state.persist_worker(&worker_ref.worker_id)?; + return Err(RuntimeError::WorkerExecutionUnavailable { + worker_id: worker_ref.worker_id.clone(), + message: "worker has no execution backend".to_string(), + }); + } + } + }; + + let dispatch_result = backend.dispatch_input(&handle, input.clone()); + if !dispatch_result.is_accepted() { + self.record_execution_result(worker_ref, dispatch_result.clone())?; + return Err(RuntimeError::WorkerExecutionRejected { + worker_id: worker_ref.worker_id.clone(), + operation: dispatch_result.operation, + outcome: dispatch_result.outcome, + message: dispatch_result.message_or_default(), + result: dispatch_result, + }); } + let mut state = self.lock()?; + state.ensure_running()?; let event_id = state.push_event( Some(worker_ref.clone()), RuntimeEventKind::WorkerInputAccepted, @@ -286,6 +366,11 @@ impl Runtime { let transcript_sequence = worker.next_transcript_sequence; worker.next_transcript_sequence += 1; worker.last_event_id = event_id; + worker.execution = WorkerExecutionStatus { + backend: WorkerExecutionBackendKind::Connected, + run_state: dispatch_result.run_state, + last_result: Some(dispatch_result), + }; worker.transcript.push(TranscriptEntry { sequence: transcript_sequence, worker_ref: worker_ref.clone(), @@ -328,12 +413,103 @@ impl Runtime { }) } + fn apply_spawn_result( + &self, + worker_ref: &WorkerRef, + result: WorkerExecutionSpawnResult, + ) -> Result { + let mut state = self.lock()?; + let runtime_id = state.runtime_id.clone(); + let detail = { + let worker = state.worker_mut(worker_ref)?; + match result { + WorkerExecutionSpawnResult::Connected { handle, run_state } => { + worker.execution_handle = Some(handle); + worker.execution = WorkerExecutionStatus::connected(run_state).with_result( + WorkerExecutionResult::accepted(WorkerExecutionOperation::Spawn, run_state), + ); + } + WorkerExecutionSpawnResult::Rejected(result) + | WorkerExecutionSpawnResult::Errored(result) => { + worker.execution_handle = None; + worker.execution = WorkerExecutionStatus { + backend: WorkerExecutionBackendKind::Connected, + run_state: result.run_state, + last_result: Some(result), + }; + } + } + worker.detail(&runtime_id) + }; + state.persist_worker(&worker_ref.worker_id)?; + Ok(detail) + } + + fn record_execution_result( + &self, + worker_ref: &WorkerRef, + result: WorkerExecutionResult, + ) -> Result<(), RuntimeError> { + let mut state = self.lock()?; + let worker = state.worker_mut(worker_ref)?; + worker.execution = WorkerExecutionStatus { + backend: WorkerExecutionBackendKind::Connected, + run_state: result.run_state, + last_result: Some(result), + }; + state.persist_worker(&worker_ref.worker_id)?; + Ok(()) + } + + fn dispatch_lifecycle_to_backend( + &self, + worker_ref: &WorkerRef, + operation: WorkerExecutionOperation, + ) -> Result<(), RuntimeError> { + let Some((backend, handle)) = ({ + let state = self.lock()?; + state.ensure_worker_ref(worker_ref)?; + let worker = state.worker(worker_ref)?; + if !worker.status.is_active() { + return Ok(()); + } + match ( + state.execution_backend.clone(), + worker.execution_handle.clone(), + ) { + (Some(backend), Some(handle)) => Some((backend, handle)), + _ => None, + } + }) else { + return Ok(()); + }; + + let result = match operation { + WorkerExecutionOperation::Stop => backend.stop_worker(&handle), + WorkerExecutionOperation::Cancel => backend.cancel_worker(&handle), + WorkerExecutionOperation::Spawn | WorkerExecutionOperation::Input => return Ok(()), + }; + if result.is_accepted() { + self.record_execution_result(worker_ref, result)?; + return Ok(()); + } + self.record_execution_result(worker_ref, result.clone())?; + Err(RuntimeError::WorkerExecutionRejected { + worker_id: worker_ref.worker_id.clone(), + operation: result.operation, + outcome: result.outcome, + message: result.message_or_default(), + result, + }) + } + /// Stop a Worker. Repeated stops are idempotent and return the last event id. pub fn stop_worker( &self, worker_ref: &WorkerRef, reason: Option, ) -> Result { + self.dispatch_lifecycle_to_backend(worker_ref, WorkerExecutionOperation::Stop)?; self.transition_worker( worker_ref, WorkerStatus::Stopped, @@ -348,6 +524,7 @@ impl Runtime { worker_ref: &WorkerRef, reason: Option, ) -> Result { + self.dispatch_lifecycle_to_backend(worker_ref, WorkerExecutionOperation::Cancel)?; self.transition_worker( worker_ref, WorkerStatus::Cancelled, @@ -591,6 +768,30 @@ impl Runtime { }) } + fn install_execution_backend( + &self, + backend: Arc, + ) -> Result<(), RuntimeError> { + let backend = WorkerExecutionBackendRef::new(backend)?; + let mut state = self.lock()?; + state.execution_backend = Some(backend); + Ok(()) + } + + #[cfg(feature = "ws-server")] + fn execution_context(&self, worker_ref: WorkerRef) -> crate::execution::WorkerExecutionContext { + let runtime = self.clone(); + crate::execution::WorkerExecutionContext::new( + worker_ref, + Arc::new(move |worker_ref, payload| runtime.observe_worker_event(&worker_ref, payload)), + ) + } + + #[cfg(not(feature = "ws-server"))] + fn execution_context(&self, worker_ref: WorkerRef) -> crate::execution::WorkerExecutionContext { + crate::execution::WorkerExecutionContext::new(worker_ref) + } + fn lock(&self) -> Result, RuntimeError> { self.inner.lock().map_err(|_| RuntimeError::StatePoisoned) } @@ -613,6 +814,7 @@ struct RuntimeState { persistence: RuntimePersistence, status: RuntimeStatus, limits: RuntimeLimits, + execution_backend: Option, next_worker_sequence: u64, next_event_id: u64, next_diagnostic_id: u64, @@ -637,6 +839,7 @@ impl RuntimeState { persistence: RuntimePersistence::Memory, status: RuntimeStatus::Running, limits, + execution_backend: None, next_worker_sequence: 1, next_event_id: 1, next_diagnostic_id: 1, @@ -667,6 +870,7 @@ impl RuntimeState { persistence: RuntimePersistence::Fs(store), status: RuntimeStatus::Running, limits, + execution_backend: None, next_worker_sequence: 1, next_event_id: 1, next_diagnostic_id: 1, @@ -709,6 +913,8 @@ impl RuntimeState { worker_id: worker.worker_id, status: worker.status, request: worker.request, + execution: WorkerExecutionStatus::unconnected(), + execution_handle: None, transcript: worker.transcript, next_transcript_sequence: worker.next_transcript_sequence, last_event_id: worker.last_event_id, @@ -723,6 +929,7 @@ impl RuntimeState { persistence: RuntimePersistence::Fs(store), status: persisted.status, limits: persisted.limits, + execution_backend: None, next_worker_sequence: persisted.next_worker_sequence, next_event_id: persisted.next_event_id, next_diagnostic_id: persisted.next_diagnostic_id, @@ -1090,6 +1297,8 @@ struct WorkerRecord { worker_id: WorkerId, status: WorkerStatus, request: CreateWorkerRequest, + execution: WorkerExecutionStatus, + execution_handle: Option, transcript: Vec, next_transcript_sequence: u64, last_event_id: u64, @@ -1102,6 +1311,7 @@ impl WorkerRecord { runtime_id: runtime_id.clone(), worker_id: self.worker_id.clone(), status: self.status, + execution: self.execution.clone(), intent: self.request.intent.clone(), profile: self.request.profile.clone(), requested_capability_count: self.request.requested_capabilities.len(), @@ -1117,6 +1327,7 @@ impl WorkerRecord { runtime_id: runtime_id.clone(), worker_id: self.worker_id.clone(), status: self.status, + execution: self.execution.clone(), intent: self.request.intent.clone(), profile: self.request.profile.clone(), config_bundle: self.request.config_bundle.clone(), @@ -1185,7 +1396,12 @@ mod tests { ConfigBundle, ConfigBundleMetadata, ConfigBundleProvenance, ConfigDeclaration, ConfigDeclarationKind, ConfigProfileDescriptor, }; - use crate::management::RuntimeLimits; + use crate::execution::{ + WorkerExecutionBackend, WorkerExecutionContext, WorkerExecutionHandle, + WorkerExecutionRunState, + }; + use std::collections::BTreeMap; + use std::sync::{Arc, Mutex}; fn task_request(objective: &str) -> CreateWorkerRequest { CreateWorkerRequest { @@ -1200,6 +1416,92 @@ mod tests { } } + #[derive(Default)] + struct TestExecutionBackend { + dispatch_result: Mutex>, + contexts: Mutex>, + } + + impl TestExecutionBackend { + fn set_dispatch_result(&self, result: WorkerExecutionResult) { + *self.dispatch_result.lock().unwrap() = Some(result); + } + + #[cfg(feature = "ws-server")] + fn publish_text_delta( + &self, + worker_ref: &WorkerRef, + text: &str, + ) -> Result { + let contexts = self.contexts.lock().unwrap(); + let context = contexts.get(&worker_ref.worker_id).expect("context stored"); + context.publish_protocol_event(protocol::Event::TextDelta { text: text.into() }) + } + } + + impl WorkerExecutionBackend for TestExecutionBackend { + fn backend_id(&self) -> &str { + "test-execution-backend" + } + + fn spawn_worker(&self, request: WorkerExecutionSpawnRequest) -> WorkerExecutionSpawnResult { + self.contexts + .lock() + .unwrap() + .insert(request.worker_ref.worker_id.clone(), request.context); + WorkerExecutionSpawnResult::Connected { + handle: WorkerExecutionHandle::new(request.worker_ref, self.backend_id()), + run_state: WorkerExecutionRunState::Idle, + } + } + + fn dispatch_input( + &self, + _handle: &WorkerExecutionHandle, + _input: WorkerInput, + ) -> WorkerExecutionResult { + self.dispatch_result + .lock() + .unwrap() + .clone() + .unwrap_or_else(|| { + WorkerExecutionResult::accepted( + WorkerExecutionOperation::Input, + WorkerExecutionRunState::Idle, + ) + }) + } + + fn stop_worker(&self, _handle: &WorkerExecutionHandle) -> WorkerExecutionResult { + WorkerExecutionResult::accepted( + WorkerExecutionOperation::Stop, + WorkerExecutionRunState::Stopped, + ) + } + + fn cancel_worker(&self, _handle: &WorkerExecutionHandle) -> WorkerExecutionResult { + WorkerExecutionResult::accepted( + WorkerExecutionOperation::Cancel, + WorkerExecutionRunState::Stopped, + ) + } + } + + fn runtime_with_backend() -> Runtime { + Runtime::with_execution_backend( + RuntimeOptions::default(), + Arc::new(TestExecutionBackend::default()), + ) + .unwrap() + } + + fn runtime_and_backend() -> (Runtime, Arc) { + let backend = Arc::new(TestExecutionBackend::default()); + let runtime = + Runtime::with_execution_backend(RuntimeOptions::default(), backend.clone()).unwrap(); + (runtime, backend) + } + fn test_bundle() -> ConfigBundle { ConfigBundle { metadata: ConfigBundleMetadata { @@ -1357,15 +1659,142 @@ mod tests { ); } + #[test] + fn backend_unconnected_worker_input_is_rejected_and_not_transcribed() { + let runtime = Runtime::new_memory(); + let detail = runtime.create_worker(task_request("placeholder")).unwrap(); + assert_eq!( + detail.execution.backend, + WorkerExecutionBackendKind::Unconnected + ); + + let err = runtime + .send_input(&detail.worker_ref, WorkerInput::user("must reject")) + .unwrap_err(); + assert!(matches!( + err, + RuntimeError::WorkerExecutionUnavailable { .. } + )); + + let projection = runtime + .transcript_projection(&detail.worker_ref, TranscriptQuery::new(0, 1)) + .unwrap(); + assert_eq!(projection.total_items, 0); + } + + #[test] + fn connected_backend_busy_dispatch_is_typed_and_not_transcribed() { + let (runtime, backend) = runtime_and_backend(); + backend.set_dispatch_result(WorkerExecutionResult::busy( + WorkerExecutionOperation::Input, + "worker is already running", + )); + let detail = runtime.create_worker(task_request("busy")).unwrap(); + + let err = runtime + .send_input(&detail.worker_ref, WorkerInput::user("wait")) + .unwrap_err(); + assert!(matches!( + err, + RuntimeError::WorkerExecutionRejected { + outcome: crate::execution::WorkerExecutionOutcome::Busy, + .. + } + )); + let refreshed = runtime.worker_detail(&detail.worker_ref).unwrap(); + assert_eq!(refreshed.execution.run_state, WorkerExecutionRunState::Busy); + assert_eq!( + runtime + .transcript_projection(&detail.worker_ref, TranscriptQuery::new(0, 1)) + .unwrap() + .total_items, + 0 + ); + } + + #[cfg(feature = "ws-server")] + #[test] + fn backend_protocol_publish_hook_writes_observation_bus() { + let (runtime, backend) = runtime_and_backend(); + let detail = runtime.create_worker(task_request("observe")).unwrap(); + + let observation = backend + .publish_text_delta(&detail.worker_ref, "from backend") + .unwrap(); + assert_eq!(observation.worker_ref, detail.worker_ref); + + let observations = runtime + .read_worker_observation_events(&detail.worker_ref, WorkerObservationCursor::zero()) + .unwrap(); + assert_eq!(observations.len(), 1); + assert!(matches!( + observations[0].payload, + protocol::Event::TextDelta { .. } + )); + } + + struct InputOnlyBackend; + + impl WorkerExecutionBackend for InputOnlyBackend { + fn backend_id(&self) -> &str { + "input-only" + } + + fn spawn_worker(&self, request: WorkerExecutionSpawnRequest) -> WorkerExecutionSpawnResult { + WorkerExecutionSpawnResult::Connected { + handle: WorkerExecutionHandle::new(request.worker_ref, self.backend_id()), + run_state: WorkerExecutionRunState::Idle, + } + } + + fn dispatch_input( + &self, + _handle: &WorkerExecutionHandle, + _input: WorkerInput, + ) -> WorkerExecutionResult { + WorkerExecutionResult::accepted( + WorkerExecutionOperation::Input, + WorkerExecutionRunState::Idle, + ) + } + } + + #[test] + fn connected_backend_stop_unsupported_is_typed_rejection() { + let runtime = + Runtime::with_execution_backend(RuntimeOptions::default(), Arc::new(InputOnlyBackend)) + .unwrap(); + let detail = runtime.create_worker(task_request("no stop")).unwrap(); + + let err = runtime + .stop_worker(&detail.worker_ref, Some("stop".to_string())) + .unwrap_err(); + assert!(matches!( + err, + RuntimeError::WorkerExecutionRejected { + outcome: crate::execution::WorkerExecutionOutcome::Unsupported, + .. + } + )); + assert_eq!( + runtime.worker_detail(&detail.worker_ref).unwrap().status, + WorkerStatus::Running + ); + } + #[test] fn send_input_and_project_bounded_transcript() { - let runtime = Runtime::with_options(RuntimeOptions { - limits: RuntimeLimits { - max_transcript_projection_items: 2, - max_event_batch_items: 16, + let runtime = Runtime::with_execution_backend( + RuntimeOptions { + limits: RuntimeLimits { + max_transcript_projection_items: 2, + max_event_batch_items: 16, + }, + ..RuntimeOptions::default() }, - ..RuntimeOptions::default() - }); + Arc::new(TestExecutionBackend::default()), + ) + .unwrap(); let detail = runtime.create_worker(task_request("chat")).unwrap(); let first = runtime @@ -1509,7 +1938,7 @@ mod tests { #[test] fn event_cursor_and_poll_only_subscription_are_bounded_placeholders() { - let runtime = Runtime::new_memory(); + let runtime = runtime_with_backend(); let cursor = runtime.event_cursor_from_start().unwrap(); let subscription = runtime.subscribe_events(cursor.clone()).unwrap(); assert_eq!(subscription.mode, EventSubscriptionMode::PollOnly); @@ -1559,15 +1988,18 @@ mod tests { fn fs_store_restores_workers_events_and_transcripts() { let root = fs_store_root("restore"); let runtime_id = RuntimeId::new("runtime-fs-authority").unwrap(); - let runtime = Runtime::with_fs_store(crate::fs_store::FsRuntimeStoreOptions { - root: root.clone(), - runtime_id: Some(runtime_id.clone()), - display_name: Some("filesystem runtime".to_string()), - limits: RuntimeLimits { - max_transcript_projection_items: 2, - max_event_batch_items: 2, + let runtime = Runtime::with_fs_store_and_execution_backend( + crate::fs_store::FsRuntimeStoreOptions { + root: root.clone(), + runtime_id: Some(runtime_id.clone()), + display_name: Some("filesystem runtime".to_string()), + limits: RuntimeLimits { + max_transcript_projection_items: 2, + max_event_batch_items: 2, + }, }, - }) + Arc::new(TestExecutionBackend::default()), + ) .unwrap(); assert_eq!( runtime.summary().unwrap().backend, diff --git a/crates/workspace-server/src/hosts.rs b/crates/workspace-server/src/hosts.rs index f7ee4ea2..b2ba5548 100644 --- a/crates/workspace-server/src/hosts.rs +++ b/crates/workspace-server/src/hosts.rs @@ -1260,9 +1260,9 @@ impl WorkspaceWorkerRuntime for EmbeddedWorkerRuntime { &self.runtime_id, worker_id, diagnostic( - "embedded_worker_llm_not_connected", + "embedded_worker_execution_unavailable", DiagnosticSeverity::Error, - "Embedded Worker input is disabled until actual Worker/LLM execution is connected" + "Embedded Worker input is disabled until an execution backend is connected" .to_string(), ), ) @@ -2054,6 +2054,16 @@ fn embedded_runtime_diagnostic(error: &EmbeddedRuntimeError) -> RuntimeDiagnosti DiagnosticSeverity::Warning, "Embedded Runtime worker was not found".to_string(), ), + EmbeddedRuntimeError::WorkerExecutionUnavailable { .. } => diagnostic( + "embedded_worker_execution_unavailable", + DiagnosticSeverity::Warning, + "Embedded Worker has no execution backend attached".to_string(), + ), + EmbeddedRuntimeError::WorkerExecutionRejected { .. } => diagnostic( + "embedded_worker_execution_rejected", + DiagnosticSeverity::Warning, + "Embedded Worker execution backend rejected the operation".to_string(), + ), EmbeddedRuntimeError::LimitTooLarge { requested, max } => diagnostic( "embedded_runtime_limit_too_large", DiagnosticSeverity::Warning, @@ -2640,7 +2650,7 @@ mod tests { input .diagnostics .iter() - .any(|diagnostic| diagnostic.code == "embedded_worker_llm_not_connected") + .any(|diagnostic| diagnostic.code == "embedded_worker_execution_unavailable") ); let transcript = registry @@ -2658,6 +2668,8 @@ mod tests { "token", "credential", "provider", + "can_stream_events", + "can_read_bounded_transcript", ] { assert!( !json.contains(forbidden), @@ -3013,6 +3025,7 @@ mod tests { "runtime_id": runtime_id, "worker_id": worker_id, "status": "running", + "execution": { "backend": "connected", "run_state": "idle" }, "intent": { "kind": "role", "role": "coder", "purpose": "remote test" }, "profile": { "kind": "builtin", "value": "coder" }, "config_bundle": null, diff --git a/crates/workspace-server/src/server.rs b/crates/workspace-server/src/server.rs index cca62427..8a38424e 100644 --- a/crates/workspace-server/src/server.rs +++ b/crates/workspace-server/src/server.rs @@ -1337,7 +1337,7 @@ mod tests { .as_array() .unwrap() .iter() - .any(|diagnostic| diagnostic["code"] == "embedded_worker_llm_not_connected") + .any(|diagnostic| diagnostic["code"] == "embedded_worker_execution_unavailable") ); let transcript = get_json( From 761b60c85750d03c119733a088fb5073f9b37e9a Mon Sep 17 00:00:00 2001 From: Hare Date: Sun, 28 Jun 2026 04:46:08 +0900 Subject: [PATCH 2/2] fix: initialize restored worker observations --- crates/worker-runtime/src/runtime.rs | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/crates/worker-runtime/src/runtime.rs b/crates/worker-runtime/src/runtime.rs index 16b1283a..062005bb 100644 --- a/crates/worker-runtime/src/runtime.rs +++ b/crates/worker-runtime/src/runtime.rs @@ -937,6 +937,12 @@ impl RuntimeState { config_bundles: persisted.config_bundles, events: persisted.events, diagnostics: persisted.diagnostics, + #[cfg(feature = "ws-server")] + next_observation_sequence: 1, + #[cfg(feature = "ws-server")] + observation_events: VecDeque::new(), + #[cfg(feature = "ws-server")] + observation_tx: broadcast::channel(256).0, }) } @@ -2051,6 +2057,24 @@ mod tests { .unwrap(); assert_eq!(direct_transcript.items[0].content, "second"); + #[cfg(feature = "ws-server")] + { + let observation = restored + .observe_worker_event( + &worker.worker_ref, + protocol::Event::TextDelta { + text: "restored observation bus".to_string(), + }, + ) + .unwrap(); + assert_eq!(observation.sequence, 1); + let observations = restored + .read_worker_observation_events(&worker.worker_ref, WorkerObservationCursor::zero()) + .unwrap(); + assert_eq!(observations.len(), 1); + assert_eq!(observations[0].cursor, observation.cursor); + } + let _ = std::fs::remove_dir_all(root); }