diff --git a/Cargo.lock b/Cargo.lock index 7d240f78..1f046047 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5932,6 +5932,7 @@ dependencies = [ "uuid", "wasmtime", "wat", + "worker-runtime", "workflow", "yoi-plugin-pdk", ] @@ -6054,6 +6055,7 @@ dependencies = [ "tower", "tracing", "uuid", + "worker", "worker-runtime", ] diff --git a/crates/worker-runtime/src/runtime.rs b/crates/worker-runtime/src/runtime.rs index 062005bb..83f51cd1 100644 --- a/crates/worker-runtime/src/runtime.rs +++ b/crates/worker-runtime/src/runtime.rs @@ -11,7 +11,8 @@ use crate::error::RuntimeError; use crate::execution::{ WorkerExecutionBackend, WorkerExecutionBackendKind, WorkerExecutionBackendRef, WorkerExecutionHandle, WorkerExecutionOperation, WorkerExecutionResult, - WorkerExecutionSpawnRequest, WorkerExecutionSpawnResult, WorkerExecutionStatus, + WorkerExecutionRunState, WorkerExecutionSpawnRequest, WorkerExecutionSpawnResult, + WorkerExecutionStatus, }; #[cfg(feature = "fs-store")] use crate::fs_store::{ @@ -722,7 +723,16 @@ impl Runtime { ) -> Result { let mut state = self.lock()?; state.ensure_worker_ref(worker_ref)?; + let transcript_sequence = state.project_protocol_event_to_transcript(worker_ref, &payload); + let execution_state_changed = + state.project_protocol_event_to_execution(worker_ref, &payload); let event = state.push_worker_observation_event(worker_ref.clone(), payload); + if transcript_sequence.is_some() || execution_state_changed { + state.persist_worker(&worker_ref.worker_id)?; + } + if let Some(sequence) = transcript_sequence { + state.persist_transcript_entry(&worker_ref.worker_id, sequence)?; + } Ok(event) } @@ -1259,6 +1269,111 @@ impl RuntimeState { event } + #[cfg(feature = "ws-server")] + fn append_worker_transcript_entry( + &mut self, + worker_ref: &WorkerRef, + role: TranscriptRole, + content: impl Into, + ) -> Option { + let content = content.into(); + if content.trim().is_empty() { + return None; + } + let event_id = self.last_event_id(); + let worker = self.workers.get_mut(&worker_ref.worker_id)?; + let sequence = worker.next_transcript_sequence; + worker.next_transcript_sequence += 1; + worker.transcript.push(TranscriptEntry { + sequence, + worker_ref: worker_ref.clone(), + role, + content, + event_id, + }); + Some(sequence) + } + + #[cfg(feature = "ws-server")] + fn project_protocol_event_to_transcript( + &mut self, + worker_ref: &WorkerRef, + event: &protocol::Event, + ) -> Option { + match event { + protocol::Event::TextDone { text, .. } => self.append_worker_transcript_entry( + worker_ref, + TranscriptRole::Assistant, + text.clone(), + ), + protocol::Event::Error { message, .. } => self.append_worker_transcript_entry( + worker_ref, + TranscriptRole::System, + format!("error: {message}"), + ), + protocol::Event::ToolResult { + id, + summary, + is_error, + .. + } => self.append_worker_transcript_entry( + worker_ref, + TranscriptRole::System, + format!( + "tool result {id}: {}{}", + if *is_error { "error: " } else { "" }, + summary + ), + ), + _ => None, + } + } + + #[cfg(feature = "ws-server")] + fn project_protocol_event_to_execution( + &mut self, + worker_ref: &WorkerRef, + event: &protocol::Event, + ) -> bool { + let Some(worker) = self.workers.get_mut(&worker_ref.worker_id) else { + return false; + }; + let status = &mut worker.execution; + let next_run_state = match event { + protocol::Event::Status { + status: protocol::WorkerStatus::Running, + } => Some(WorkerExecutionRunState::Busy), + protocol::Event::Status { + status: protocol::WorkerStatus::Idle, + } => Some(WorkerExecutionRunState::Idle), + protocol::Event::Status { + status: protocol::WorkerStatus::Paused, + } => Some(WorkerExecutionRunState::Busy), + protocol::Event::Snapshot { status, .. } => match status { + protocol::WorkerStatus::Running => Some(WorkerExecutionRunState::Busy), + protocol::WorkerStatus::Idle => Some(WorkerExecutionRunState::Idle), + protocol::WorkerStatus::Paused => Some(WorkerExecutionRunState::Busy), + }, + protocol::Event::RunEnd { result } => match result { + protocol::RunResult::Finished | protocol::RunResult::RolledBack => { + Some(WorkerExecutionRunState::Idle) + } + protocol::RunResult::Paused => Some(WorkerExecutionRunState::Busy), + protocol::RunResult::LimitReached => Some(WorkerExecutionRunState::Errored), + }, + protocol::Event::Error { .. } => Some(WorkerExecutionRunState::Errored), + _ => None, + }; + let Some(next_run_state) = next_run_state else { + return false; + }; + if status.run_state == next_run_state { + return false; + } + status.run_state = next_run_state; + true + } + fn push_diagnostic( &mut self, severity: DiagnosticSeverity, diff --git a/crates/worker/Cargo.toml b/crates/worker/Cargo.toml index d853cc1e..5b065cc3 100644 --- a/crates/worker/Cargo.toml +++ b/crates/worker/Cargo.toml @@ -5,6 +5,10 @@ edition.workspace = true license.workspace = true autobins = false +[features] +default = [] +runtime-adapter = ["dep:worker-runtime"] + [dependencies] async-trait = { workspace = true } clap = { version = "4.6.0", features = ["derive"] } @@ -17,6 +21,7 @@ protocol = { workspace = true } provider = { workspace = true } client = { workspace = true } pod-registry = { workspace = true } +worker-runtime = { workspace = true, features = ["ws-server"], optional = true } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } reqwest = { version = "0.13", default-features = false, features = ["blocking", "native-tls"] } diff --git a/crates/worker/src/entrypoint.rs b/crates/worker/src/entrypoint.rs index 2801a25b..1ea6f81c 100644 --- a/crates/worker/src/entrypoint.rs +++ b/crates/worker/src/entrypoint.rs @@ -225,6 +225,21 @@ fn load_profile( Ok((resolved.manifest, PromptLoader::builtins_only())) } +#[cfg(feature = "runtime-adapter")] +pub(crate) fn resolve_runtime_profile_manifest( + profile: Option<&str>, + workspace_root: &Path, + worker_name: &str, +) -> Result<(WorkerManifest, PromptLoader), String> { + let selector = profile + .map(ProfileSelector::parse_cli) + .unwrap_or(ProfileSelector::Default); + let (mut manifest, loader) = load_profile(&selector, workspace_root, worker_name)?; + apply_profile_launch_policy(&mut manifest, workspace_root, None)?; + apply_plugin_resolution_plan(&mut manifest, workspace_root); + Ok((manifest, loader)) +} + fn load_single_manifest( path: &Path, explicit_worker_name: Option<&str>, diff --git a/crates/worker/src/lib.rs b/crates/worker/src/lib.rs index 399dfc4c..1f89097d 100644 --- a/crates/worker/src/lib.rs +++ b/crates/worker/src/lib.rs @@ -10,6 +10,8 @@ pub(crate) mod in_flight; pub mod ipc; pub mod prompt; pub mod runtime; +#[cfg(feature = "runtime-adapter")] +pub mod runtime_adapter; pub mod segment_log_sink; pub mod shared_state; mod shutdown_after_idle; diff --git a/crates/worker/src/runtime_adapter.rs b/crates/worker/src/runtime_adapter.rs new file mode 100644 index 00000000..22854447 --- /dev/null +++ b/crates/worker/src/runtime_adapter.rs @@ -0,0 +1,712 @@ +//! Adapter from `worker-runtime` execution backend boundary to the real +//! `worker` crate controller/run lifecycle. +//! +//! The adapter intentionally owns real `WorkerHandle`s internally and exposes +//! only the opaque `worker-runtime` execution handle to callers. Browser/API +//! projections therefore keep the existing runtime redaction boundary: no raw +//! socket paths, session paths, manifests, credentials, or handles leave this +//! module. + +use std::collections::HashMap; +use std::future::Future; +use std::path::PathBuf; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, Mutex, mpsc}; +use std::time::Duration; + +use async_trait::async_trait; +use manifest::paths; +use pod_store::{CombinedStore, FsWorkerStore}; +use protocol::{Method, Segment, WorkerStatus}; +use session_store::FsStore; +use tokio::runtime::Runtime; +use tokio::sync::broadcast; +use worker_runtime::execution::{ + WorkerExecutionBackend, WorkerExecutionHandle, WorkerExecutionOperation, WorkerExecutionResult, + WorkerExecutionRunState, WorkerExecutionSpawnRequest, WorkerExecutionSpawnResult, +}; +use worker_runtime::interaction::{WorkerInput, WorkerInputKind}; + +use crate::{Worker, WorkerController, WorkerHandle}; + +const DEFAULT_BACKEND_ID: &str = "worker-crate"; +const RUNTIME_TASK_TIMEOUT: Duration = Duration::from_secs(10); + +/// Factory seam used by [`WorkerRuntimeExecutionBackend`] to construct a real +/// controller-backed Worker for a Runtime catalog entry. +#[async_trait] +pub trait RuntimeWorkerFactory: Send + Sync + 'static { + async fn spawn_controller( + &self, + request: WorkerExecutionSpawnRequest, + ) -> Result; +} + +/// Production factory that resolves a normal Worker profile and spawns it under +/// `WorkerController`. +#[derive(Debug, Clone)] +pub struct ProfileRuntimeWorkerFactory { + workspace_root: PathBuf, + cwd: PathBuf, + store_dir: Option, + pod_store_dir: Option, + profile: Option, + runtime_base_dir: Option, +} + +impl ProfileRuntimeWorkerFactory { + pub fn new(workspace_root: impl Into) -> Self { + let workspace_root = workspace_root.into(); + Self { + cwd: workspace_root.clone(), + workspace_root, + store_dir: None, + pod_store_dir: None, + profile: None, + runtime_base_dir: None, + } + } + + pub fn with_cwd(mut self, cwd: impl Into) -> Self { + self.cwd = cwd.into(); + self + } + + pub fn with_store_dir(mut self, store_dir: impl Into) -> Self { + self.store_dir = Some(store_dir.into()); + self + } + + pub fn with_pod_store_dir(mut self, pod_store_dir: impl Into) -> Self { + self.pod_store_dir = Some(pod_store_dir.into()); + self + } + + /// Set the profile selector used for Runtime-created Workers. When unset, + /// normal default profile discovery is used. + pub fn with_profile(mut self, profile: impl Into) -> Self { + self.profile = Some(profile.into()); + self + } + + pub fn with_runtime_base_dir(mut self, runtime_base_dir: impl Into) -> Self { + self.runtime_base_dir = Some(runtime_base_dir.into()); + self + } + + fn store_dir(&self) -> Result { + self.store_dir + .clone() + .or_else(paths::sessions_dir) + .ok_or_else(|| { + "could not resolve sessions directory (set YOI_HOME, YOI_DATA_DIR, or HOME)" + .to_string() + }) + } + + fn pod_store_dir(&self, store_dir: &std::path::Path) -> PathBuf { + self.pod_store_dir + .clone() + .or_else(|| paths::data_dir().map(|data_dir| data_dir.join("pods"))) + .or_else(|| store_dir.parent().map(|parent| parent.join("pods"))) + .unwrap_or_else(|| PathBuf::from("workers")) + } + + fn runtime_base_dir(&self) -> Result { + self.runtime_base_dir + .clone() + .or_else(|| crate::runtime::dir::default_base().ok()) + .ok_or_else(|| "could not resolve worker runtime directory".to_string()) + } + + fn runtime_worker_name(request: &WorkerExecutionSpawnRequest) -> String { + request.worker_ref.worker_id.to_string() + } + + fn runtime_profile_value( + profile: &worker_runtime::catalog::ProfileSelector, + ) -> Option> { + match profile { + worker_runtime::catalog::ProfileSelector::RuntimeDefault => None, + worker_runtime::catalog::ProfileSelector::Named(name) => { + Some(std::borrow::Cow::Borrowed(name.as_str())) + } + worker_runtime::catalog::ProfileSelector::Builtin(name) => { + if name.starts_with("builtin:") { + Some(std::borrow::Cow::Borrowed(name.as_str())) + } else { + Some(std::borrow::Cow::Owned(format!("builtin:{name}"))) + } + } + } + } + + fn runtime_profile<'a>( + &'a self, + request: &'a WorkerExecutionSpawnRequest, + ) -> Option> { + if let Some(profile) = self.profile.as_deref() { + return Some(std::borrow::Cow::Borrowed(profile)); + } + Self::runtime_profile_value(&request.request.profile) + } +} + +#[async_trait] +impl RuntimeWorkerFactory for ProfileRuntimeWorkerFactory { + async fn spawn_controller( + &self, + request: WorkerExecutionSpawnRequest, + ) -> Result { + let worker_name = Self::runtime_worker_name(&request); + let profile = self.runtime_profile(&request); + let (mut manifest, loader) = crate::entrypoint::resolve_runtime_profile_manifest( + profile.as_deref(), + &self.workspace_root, + &worker_name, + )?; + manifest.worker.name = worker_name; + + let store_dir = self.store_dir()?; + let session_store = FsStore::new(&store_dir).map_err(|err| { + format!( + "failed to initialize session store at {}: {err}", + store_dir.display() + ) + })?; + let pod_store_dir = self.pod_store_dir(&store_dir); + let pod_store = FsWorkerStore::new(&pod_store_dir).map_err(|err| { + format!( + "failed to initialize worker metadata store at {}: {err}", + pod_store_dir.display() + ) + })?; + let store = CombinedStore::new(session_store, pod_store); + + let worker = Worker::from_manifest_with_context( + manifest, + store, + loader, + self.workspace_root.clone(), + self.cwd.clone(), + ) + .await + .map_err(|err| format!("failed to create Worker from profile: {err}"))?; + + let runtime_base = self.runtime_base_dir()?; + let (handle, _shutdown_rx) = WorkerController::spawn(worker, &runtime_base) + .await + .map_err(|err| format!("failed to spawn Worker controller: {err}"))?; + Ok(handle) + } +} + +struct RuntimeWorkerExecution { + handle: WorkerHandle, + busy: Arc, +} + +/// `worker-runtime` execution backend backed by real `worker` crate Workers. +pub struct WorkerRuntimeExecutionBackend { + backend_id: String, + factory: Arc, + runtime: Mutex>, + workers: Mutex>, +} + +impl WorkerRuntimeExecutionBackend { + pub fn from_workspace(workspace_root: impl Into) -> Result { + Self::new(ProfileRuntimeWorkerFactory::new(workspace_root)) + } +} + +impl WorkerRuntimeExecutionBackend +where + F: RuntimeWorkerFactory, +{ + pub fn new(factory: F) -> Result { + let runtime = tokio::runtime::Builder::new_multi_thread() + .thread_name("yoi-runtime-worker-adapter") + .enable_all() + .build() + .map_err(|err| format!("failed to build worker adapter runtime: {err}"))?; + Ok(Self { + backend_id: DEFAULT_BACKEND_ID.to_string(), + factory: Arc::new(factory), + runtime: Mutex::new(Some(runtime)), + workers: Mutex::new(HashMap::new()), + }) + } + + pub fn with_backend_id(mut self, backend_id: impl Into) -> Self { + self.backend_id = backend_id.into(); + self + } + + fn wait_for_runtime_task(receiver: mpsc::Receiver>) -> Result { + receiver + .recv_timeout(RUNTIME_TASK_TIMEOUT) + .map_err(|err| format!("worker adapter task did not complete: {err}"))? + } + + fn spawn_on_adapter_runtime(&self, task: Fut) -> Result<(), String> + where + Fut: Future + Send + 'static, + { + let runtime = self + .runtime + .lock() + .map_err(|_| "worker adapter runtime lock is poisoned".to_string())?; + let runtime = runtime + .as_ref() + .ok_or_else(|| "worker adapter runtime is shutting down".to_string())?; + runtime.spawn(task); + Ok(()) + } + + fn run_on_adapter_runtime(&self, task: Fut) -> Result + where + T: Send + 'static, + Fut: Future> + Send + 'static, + { + let (tx, rx) = mpsc::sync_channel(1); + self.spawn_on_adapter_runtime(async move { + let _ = tx.send(task.await); + })?; + Self::wait_for_runtime_task(rx) + } + + fn get_execution( + &self, + handle: &WorkerExecutionHandle, + ) -> Result<(WorkerHandle, Arc), WorkerExecutionResult> { + if handle.backend_id() != self.backend_id() { + return Err(WorkerExecutionResult::rejected( + WorkerExecutionOperation::Input, + format!( + "execution handle belongs to backend {}, not {}", + handle.backend_id(), + self.backend_id() + ), + )); + } + let workers = self.workers.lock().map_err(|_| { + WorkerExecutionResult::errored( + WorkerExecutionOperation::Input, + "worker adapter registry lock is poisoned", + ) + })?; + workers + .get(handle.worker_ref()) + .map(|execution| (execution.handle.clone(), execution.busy.clone())) + .ok_or_else(|| { + WorkerExecutionResult::rejected( + WorkerExecutionOperation::Input, + "execution handle does not reference a live Worker", + ) + }) + } + + fn send_method( + &self, + operation: WorkerExecutionOperation, + worker: WorkerHandle, + method: Method, + accepted_run_state: WorkerExecutionRunState, + ) -> WorkerExecutionResult { + self.run_on_adapter_runtime(async move { + worker + .send(method) + .await + .map_err(|err| format!("failed to send Worker method: {err}")) + }) + .map(|_| WorkerExecutionResult::accepted(operation, accepted_run_state)) + .unwrap_or_else(|message| WorkerExecutionResult::errored(operation, message)) + } +} + +impl Drop for WorkerRuntimeExecutionBackend { + fn drop(&mut self) { + if let Ok(mut runtime) = self.runtime.lock() + && let Some(runtime) = runtime.take() + { + let _ = std::thread::spawn(move || drop(runtime)).join(); + } + } +} + +impl WorkerExecutionBackend for WorkerRuntimeExecutionBackend +where + F: RuntimeWorkerFactory, +{ + fn backend_id(&self) -> &str { + &self.backend_id + } + + fn spawn_worker(&self, request: WorkerExecutionSpawnRequest) -> WorkerExecutionSpawnResult { + if self + .workers + .lock() + .map(|workers| workers.contains_key(&request.worker_ref)) + .unwrap_or(false) + { + return WorkerExecutionSpawnResult::Rejected(WorkerExecutionResult::busy( + WorkerExecutionOperation::Spawn, + "Worker is already connected to execution backend", + )); + } + + let factory = self.factory.clone(); + let bridge_context = request.context.clone(); + let worker_ref = request.worker_ref.clone(); + let spawn_result = + self.run_on_adapter_runtime(async move { factory.spawn_controller(request).await }); + + let handle = match spawn_result { + Ok(handle) => handle, + Err(message) => { + return WorkerExecutionSpawnResult::Errored(WorkerExecutionResult::errored( + WorkerExecutionOperation::Spawn, + message, + )); + } + }; + + let mut events = handle.subscribe(); + let bridge_handle = handle.clone(); + let busy = Arc::new(AtomicBool::new(false)); + let bridge_busy = busy.clone(); + if let Err(message) = self.spawn_on_adapter_runtime(async move { + loop { + match events.recv().await { + Ok(event) => { + let _ = bridge_context.publish_protocol_event(event); + if bridge_handle.shared_state.get_status() == WorkerStatus::Idle { + bridge_busy.store(false, Ordering::SeqCst); + } + } + Err(broadcast::error::RecvError::Lagged(_)) => continue, + Err(broadcast::error::RecvError::Closed) => break, + } + } + }) { + return WorkerExecutionSpawnResult::Errored(WorkerExecutionResult::errored( + WorkerExecutionOperation::Spawn, + message, + )); + } + + let mut workers = match self.workers.lock() { + Ok(workers) => workers, + Err(_) => { + return WorkerExecutionSpawnResult::Errored(WorkerExecutionResult::errored( + WorkerExecutionOperation::Spawn, + "worker adapter registry lock is poisoned", + )); + } + }; + workers.insert(worker_ref.clone(), RuntimeWorkerExecution { handle, busy }); + + WorkerExecutionSpawnResult::Connected { + handle: WorkerExecutionHandle::new(worker_ref, self.backend_id()), + run_state: WorkerExecutionRunState::Idle, + } + } + + fn dispatch_input( + &self, + handle: &WorkerExecutionHandle, + input: WorkerInput, + ) -> WorkerExecutionResult { + let (worker, busy) = match self.get_execution(handle) { + Ok(execution) => execution, + Err(mut result) => { + result.operation = WorkerExecutionOperation::Input; + return result; + } + }; + + if worker.shared_state.get_status() != WorkerStatus::Idle + || busy + .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) + .is_err() + { + return WorkerExecutionResult::busy( + WorkerExecutionOperation::Input, + "Worker is already running; runtime adapter v0 does not queue input", + ); + } + + let WorkerInputKind::User = input.kind else { + busy.store(false, Ordering::SeqCst); + return WorkerExecutionResult::unsupported( + WorkerExecutionOperation::Input, + "runtime adapter currently dispatches user input only", + ); + }; + let content = input.content.trim().to_string(); + if content.is_empty() { + busy.store(false, Ordering::SeqCst); + return WorkerExecutionResult::rejected( + WorkerExecutionOperation::Input, + "runtime adapter rejects empty user input", + ); + } + + let result = self.send_method( + WorkerExecutionOperation::Input, + worker, + Method::Run { + input: vec![Segment::text(content)], + }, + WorkerExecutionRunState::Busy, + ); + if result.outcome != worker_runtime::execution::WorkerExecutionOutcome::Accepted { + busy.store(false, Ordering::SeqCst); + } + result + } + + fn stop_worker(&self, handle: &WorkerExecutionHandle) -> WorkerExecutionResult { + let (worker, _busy) = match self.get_execution(handle) { + Ok(execution) => execution, + Err(mut result) => { + result.operation = WorkerExecutionOperation::Stop; + return result; + } + }; + self.send_method( + WorkerExecutionOperation::Stop, + worker, + Method::Shutdown, + WorkerExecutionRunState::Stopped, + ) + } + + fn cancel_worker(&self, handle: &WorkerExecutionHandle) -> WorkerExecutionResult { + let (worker, _busy) = match self.get_execution(handle) { + Ok(execution) => execution, + Err(mut result) => { + result.operation = WorkerExecutionOperation::Cancel; + return result; + } + }; + self.send_method( + WorkerExecutionOperation::Cancel, + worker, + Method::Cancel, + WorkerExecutionRunState::Idle, + ) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::pin::Pin; + use std::sync::atomic::{AtomicUsize, Ordering}; + + use async_trait::async_trait; + use futures::Stream; + use llm_engine::Engine; + use llm_engine::llm_client::event::{Event as LlmEvent, ResponseStatus, StatusEvent}; + use llm_engine::llm_client::{ClientError, LlmClient, Request}; + use manifest::{Scope, WorkerManifest}; + use worker_runtime::Runtime as EmbeddedRuntime; + use worker_runtime::catalog::{CreateWorkerRequest, ProfileSelector, WorkerIntent}; + use worker_runtime::identity::RuntimeId; + use worker_runtime::management::RuntimeOptions; + use worker_runtime::observation::{TranscriptQuery, TranscriptRole}; + + #[derive(Clone)] + struct MockClient { + responses: Arc>>, + call_count: Arc, + captured: Arc>>, + } + + impl MockClient { + fn new(events: Vec) -> Self { + Self { + responses: Arc::new(vec![events]), + call_count: Arc::new(AtomicUsize::new(0)), + captured: Arc::new(Mutex::new(Vec::new())), + } + } + } + + #[async_trait] + impl LlmClient for MockClient { + fn clone_boxed(&self) -> Box { + Box::new(self.clone()) + } + + async fn stream( + &self, + request: Request, + ) -> Result> + Send>>, ClientError> + { + self.captured.lock().unwrap().push(request); + let idx = self.call_count.fetch_add(1, Ordering::SeqCst); + let events = self.responses.get(idx).cloned().unwrap_or_default(); + Ok(Box::pin(futures::stream::iter(events.into_iter().map(Ok)))) + } + } + + struct MockFactory { + client: MockClient, + runtime_base: PathBuf, + cwd: PathBuf, + store_dir: PathBuf, + pod_store_dir: PathBuf, + } + + #[async_trait] + impl RuntimeWorkerFactory for MockFactory { + async fn spawn_controller( + &self, + _request: WorkerExecutionSpawnRequest, + ) -> Result { + let manifest = WorkerManifest::from_toml( + r#" + [worker] + name = "runtime-adapter-test" + pwd = "./" + + [model] + scheme = "anthropic" + model_id = "test-model" + + [engine] + max_tokens = 100 + + [[scope.allow]] + target = "./" + permission = "write" + "#, + ) + .map_err(|err| err.to_string())?; + let store = CombinedStore::new( + FsStore::new(&self.store_dir).map_err(|err| err.to_string())?, + FsWorkerStore::new(&self.pod_store_dir).map_err(|err| err.to_string())?, + ); + let scope = Scope::writable(&self.cwd).map_err(|err| err.to_string())?; + let worker = Worker::new( + manifest, + Engine::new(self.client.clone()), + store, + self.cwd.clone(), + scope, + ) + .await + .map_err(|err| err.to_string())?; + let (handle, _shutdown_rx) = WorkerController::spawn(worker, &self.runtime_base) + .await + .map_err(|err| err.to_string())?; + Ok(handle) + } + } + + fn simple_text_events() -> Vec { + vec![ + LlmEvent::text_block_start(0), + LlmEvent::text_delta(0, "hello"), + LlmEvent::text_delta(0, " from worker"), + LlmEvent::text_block_stop(0, None), + LlmEvent::Status(StatusEvent { + status: ResponseStatus::Completed, + }), + ] + } + + fn create_request(name: &str) -> CreateWorkerRequest { + CreateWorkerRequest { + intent: WorkerIntent::Role { + role: name.to_string(), + purpose: None, + }, + profile: ProfileSelector::RuntimeDefault, + config_bundle: None, + requested_capabilities: Vec::new(), + workspace_refs: Vec::new(), + mount_refs: Vec::new(), + } + } + + #[test] + fn builtin_profile_selector_is_not_double_prefixed() { + assert_eq!( + ProfileRuntimeWorkerFactory::runtime_profile_value( + &worker_runtime::catalog::ProfileSelector::Builtin("coder".to_string()) + ) + .as_deref(), + Some("builtin:coder") + ); + assert_eq!( + ProfileRuntimeWorkerFactory::runtime_profile_value( + &worker_runtime::catalog::ProfileSelector::Builtin("builtin:coder".to_string()) + ) + .as_deref(), + Some("builtin:coder") + ); + } + + #[test] + fn adapter_dispatches_user_input_through_worker_run_lifecycle() { + let client = MockClient::new(simple_text_events()); + let runtime_base = tempfile::tempdir().unwrap(); + let cwd = tempfile::tempdir().unwrap(); + let store = tempfile::tempdir().unwrap(); + let factory = MockFactory { + client: client.clone(), + runtime_base: runtime_base.path().to_path_buf(), + cwd: cwd.path().to_path_buf(), + store_dir: store.path().join("sessions"), + pod_store_dir: store.path().join("pods"), + }; + let backend = WorkerRuntimeExecutionBackend::new(factory).unwrap(); + let runtime = EmbeddedRuntime::with_execution_backend( + RuntimeOptions { + runtime_id: RuntimeId::new("embedded"), + ..RuntimeOptions::default() + }, + Arc::new(backend), + ) + .unwrap(); + let detail = runtime.create_worker(create_request("chat")).unwrap(); + + runtime + .send_input(&detail.worker_ref, WorkerInput::user("say hello")) + .unwrap(); + + let deadline = std::time::Instant::now() + Duration::from_secs(5); + loop { + let projection = runtime + .transcript_projection(&detail.worker_ref, TranscriptQuery::new(0, 10)) + .unwrap(); + if projection.items.iter().any(|item| { + item.role == TranscriptRole::Assistant && item.content == "hello from worker" + }) { + break; + } + assert!( + std::time::Instant::now() < deadline, + "timed out waiting for assistant transcript projection" + ); + std::thread::sleep(Duration::from_millis(20)); + } + + assert_eq!(client.captured.lock().unwrap().len(), 1); + let observations = runtime + .read_worker_observation_events( + &detail.worker_ref, + worker_runtime::observation::WorkerObservationCursor::zero(), + ) + .unwrap(); + assert!( + observations + .iter() + .any(|event| matches!(event.payload, protocol::Event::TextDone { .. })) + ); + } +} diff --git a/crates/workspace-server/Cargo.toml b/crates/workspace-server/Cargo.toml index 9e6c20cd..842975df 100644 --- a/crates/workspace-server/Cargo.toml +++ b/crates/workspace-server/Cargo.toml @@ -22,6 +22,7 @@ thiserror.workspace = true ticket.workspace = true tokio = { workspace = true, features = ["fs", "macros", "net", "rt-multi-thread", "sync"] } tokio-tungstenite.workspace = true +worker = { workspace = true, features = ["runtime-adapter"] } worker-runtime = { workspace = true, features = ["ws-server"] } toml.workspace = true tracing.workspace = true diff --git a/crates/workspace-server/src/hosts.rs b/crates/workspace-server/src/hosts.rs index b2ba5548..a193ed62 100644 --- a/crates/workspace-server/src/hosts.rs +++ b/crates/workspace-server/src/hosts.rs @@ -13,6 +13,7 @@ use worker_runtime::catalog::{ }; use worker_runtime::config_bundle::{ConfigBundle, ConfigBundleAvailability, ConfigBundleSummary}; use worker_runtime::error::RuntimeError as EmbeddedRuntimeError; +use worker_runtime::execution::WorkerExecutionRunState; use worker_runtime::http_server::{ RuntimeHttpConfigBundleAvailabilityResponse, RuntimeHttpConfigBundleSyncRequest, RuntimeHttpErrorResponse, RuntimeHttpSummaryResponse, RuntimeHttpTranscriptResponse, @@ -903,6 +904,7 @@ pub struct EmbeddedWorkerRuntime { runtime_id: String, host_id: String, runtime: worker_runtime::Runtime, + execution_enabled: bool, } impl EmbeddedWorkerRuntime { @@ -917,6 +919,25 @@ impl EmbeddedWorkerRuntime { Self::from_runtime(workspace_id, runtime) } + pub fn new_memory_with_execution_backend( + workspace_id: impl AsRef, + backend: std::sync::Arc, + ) -> Result { + let runtime_id = EmbeddedRuntimeId::new(EMBEDDED_RUNTIME_ID) + .expect("embedded runtime id is a non-empty literal"); + let runtime = worker_runtime::Runtime::with_execution_backend( + EmbeddedRuntimeOptions { + runtime_id: Some(runtime_id), + display_name: Some("Workspace backend embedded Runtime".to_string()), + ..EmbeddedRuntimeOptions::default() + }, + backend, + )?; + let mut embedded = Self::from_runtime(workspace_id, runtime); + embedded.execution_enabled = true; + Ok(embedded) + } + pub fn from_runtime(workspace_id: impl AsRef, runtime: worker_runtime::Runtime) -> Self { let runtime_id = runtime .runtime_id() @@ -927,6 +948,7 @@ impl EmbeddedWorkerRuntime { runtime_id, host_id: host_id_for_embedded_workspace(workspace_id.as_ref()), runtime, + execution_enabled: false, } } @@ -937,6 +959,35 @@ impl EmbeddedWorkerRuntime { )) } + fn can_accept_embedded_input( + &self, + status: EmbeddedWorkerStatus, + execution: &worker_runtime::execution::WorkerExecutionStatus, + ) -> bool { + self.execution_enabled + && status == EmbeddedWorkerStatus::Running + && execution.backend == worker_runtime::execution::WorkerExecutionBackendKind::Connected + && execution.run_state == WorkerExecutionRunState::Idle + && !execution_last_result_blocks_control(execution) + } + + fn can_stop_embedded_worker( + &self, + status: EmbeddedWorkerStatus, + execution: &worker_runtime::execution::WorkerExecutionStatus, + ) -> bool { + self.execution_enabled + && status == EmbeddedWorkerStatus::Running + && execution.backend == worker_runtime::execution::WorkerExecutionBackendKind::Connected + && !matches!( + execution.run_state, + WorkerExecutionRunState::Rejected + | WorkerExecutionRunState::Errored + | WorkerExecutionRunState::Unconnected + ) + && !execution_last_result_blocks_control(execution) + } + fn map_worker_summary(&self, summary: worker_runtime::catalog::WorkerSummary) -> WorkerSummary { WorkerSummary { runtime_id: self.runtime_id.clone(), @@ -950,15 +1001,16 @@ impl EmbeddedWorkerRuntime { identity: "runtime_registry_worker".to_string(), }, state: embedded_worker_status_label(summary.status).to_string(), - status: embedded_worker_status_label(summary.status).to_string(), + status: embedded_worker_execution_status_label(summary.status, summary.execution.run_state) + .to_string(), last_seen_at: None, implementation: WorkerImplementationSummary { kind: "embedded_worker_runtime".to_string(), display_hint: "backend-internal worker-runtime Worker".to_string(), }, capabilities: WorkerCapabilitySummary { - can_accept_input: false, - can_stop: false, + can_accept_input: self.can_accept_embedded_input(summary.status, &summary.execution), + can_stop: self.can_stop_embedded_worker(summary.status, &summary.execution), can_spawn_followup: false, }, diagnostics: vec![diagnostic( @@ -982,15 +1034,16 @@ impl EmbeddedWorkerRuntime { identity: "runtime_registry_worker".to_string(), }, state: embedded_worker_status_label(detail.status).to_string(), - status: embedded_worker_status_label(detail.status).to_string(), + status: embedded_worker_execution_status_label(detail.status, detail.execution.run_state) + .to_string(), last_seen_at: None, implementation: WorkerImplementationSummary { kind: "embedded_worker_runtime".to_string(), display_hint: "backend-internal worker-runtime Worker".to_string(), }, capabilities: WorkerCapabilitySummary { - can_accept_input: false, - can_stop: false, + can_accept_input: self.can_accept_embedded_input(detail.status, &detail.execution), + can_stop: self.can_stop_embedded_worker(detail.status, &detail.execution), can_spawn_followup: false, }, diagnostics: vec![diagnostic( @@ -1020,7 +1073,7 @@ impl WorkspaceWorkerRuntime for EmbeddedWorkerRuntime { status: "unavailable".to_string(), source: RuntimeSourceSummary::embedded_worker_runtime(), host_ids: Vec::new(), - capabilities: embedded_runtime_capabilities(limit, false), + capabilities: embedded_runtime_capabilities(limit, false, false), diagnostics, }; } @@ -1040,7 +1093,7 @@ impl WorkspaceWorkerRuntime for EmbeddedWorkerRuntime { } else { vec![self.host_id.clone()] }, - capabilities: embedded_runtime_capabilities(limit, true), + capabilities: embedded_runtime_capabilities(limit, true, self.execution_enabled), diagnostics, } } @@ -1058,7 +1111,7 @@ impl WorkspaceWorkerRuntime for EmbeddedWorkerRuntime { status: "available".to_string(), observed_at: Utc::now().to_rfc3339(), last_seen_at: None, - capabilities: embedded_runtime_capabilities(limit, true), + capabilities: embedded_runtime_capabilities(limit, true, self.execution_enabled), diagnostics: vec![diagnostic( "embedded_runtime_host_boundary", DiagnosticSeverity::Info, @@ -1164,24 +1217,38 @@ impl WorkspaceWorkerRuntime for EmbeddedWorkerRuntime { mount_refs: Vec::new(), }; match self.runtime.create_worker(create_request) { - Ok(detail) => WorkerSpawnResult { - state: WorkerOperationState::Accepted, - worker: Some(self.map_worker_detail(detail)), - acceptance_evidence: vec![ - WorkerSpawnAcceptanceEvidence { - kind: "embedded_runtime_worker_created".to_string(), - detail: - "worker-runtime catalog accepted a backend-internal tools-less Worker" - .to_string(), - }, - WorkerSpawnAcceptanceEvidence { - kind: "embedded_runtime_backend_internal_projection".to_string(), - detail: "only runtime_id plus worker_id backend projections were exposed" - .to_string(), - }, - ], - diagnostics, - }, + Ok(detail) => { + let execution_failure = + embedded_spawn_execution_failure_diagnostic(&detail.execution); + if let Some(diagnostic) = execution_failure { + diagnostics.push(diagnostic); + WorkerSpawnResult { + state: WorkerOperationState::Rejected, + worker: Some(self.map_worker_detail(detail)), + acceptance_evidence: Vec::new(), + diagnostics, + } + } else { + WorkerSpawnResult { + state: WorkerOperationState::Accepted, + worker: Some(self.map_worker_detail(detail)), + acceptance_evidence: vec![ + WorkerSpawnAcceptanceEvidence { + kind: "embedded_runtime_worker_created".to_string(), + detail: "worker-runtime catalog accepted a backend-internal tools-less Worker" + .to_string(), + }, + WorkerSpawnAcceptanceEvidence { + kind: "embedded_runtime_backend_internal_projection".to_string(), + detail: + "only runtime_id plus worker_id backend projections were exposed" + .to_string(), + }, + ], + diagnostics, + } + } + } Err(err) => { diagnostics.push(embedded_runtime_diagnostic(&err)); WorkerSpawnResult { @@ -1237,6 +1304,94 @@ impl WorkspaceWorkerRuntime for EmbeddedWorkerRuntime { } } + fn stop_worker( + &self, + worker_id: &str, + request: WorkerLifecycleRequest, + ) -> WorkerLifecycleResult { + if !self.execution_enabled { + return embedded_lifecycle_rejected( + &self.runtime_id, + worker_id, + diagnostic( + "embedded_worker_execution_unavailable", + DiagnosticSeverity::Info, + format!("worker stop for '{worker_id}' requires an embedded execution backend"), + ), + ); + } + let Some(worker_ref) = self.worker_ref(worker_id) else { + return embedded_lifecycle_rejected( + &self.runtime_id, + worker_id, + diagnostic( + "embedded_worker_id_invalid", + DiagnosticSeverity::Warning, + "Worker id was empty and cannot be resolved".to_string(), + ), + ); + }; + match self.runtime.stop_worker(&worker_ref, request.reason) { + Ok(ack) => WorkerLifecycleResult { + state: WorkerOperationState::Accepted, + runtime_id: self.runtime_id.clone(), + worker_id: worker_id.to_string(), + event_id: Some(ack.event_id), + diagnostics: Vec::new(), + }, + Err(error) => embedded_lifecycle_rejected( + &self.runtime_id, + worker_id, + embedded_runtime_diagnostic(&error), + ), + } + } + + fn cancel_worker( + &self, + worker_id: &str, + request: WorkerLifecycleRequest, + ) -> WorkerLifecycleResult { + if !self.execution_enabled { + return embedded_lifecycle_rejected( + &self.runtime_id, + worker_id, + diagnostic( + "embedded_worker_execution_unavailable", + DiagnosticSeverity::Info, + format!( + "worker cancel for '{worker_id}' requires an embedded execution backend" + ), + ), + ); + } + let Some(worker_ref) = self.worker_ref(worker_id) else { + return embedded_lifecycle_rejected( + &self.runtime_id, + worker_id, + diagnostic( + "embedded_worker_id_invalid", + DiagnosticSeverity::Warning, + "Worker id was empty and cannot be resolved".to_string(), + ), + ); + }; + match self.runtime.cancel_worker(&worker_ref, request.reason) { + Ok(ack) => WorkerLifecycleResult { + state: WorkerOperationState::Accepted, + runtime_id: self.runtime_id.clone(), + worker_id: worker_id.to_string(), + event_id: Some(ack.event_id), + diagnostics: Vec::new(), + }, + Err(error) => embedded_lifecycle_rejected( + &self.runtime_id, + worker_id, + embedded_runtime_diagnostic(&error), + ), + } + } + fn observation_source( &self, worker_id: &str, @@ -1255,17 +1410,53 @@ impl WorkspaceWorkerRuntime for EmbeddedWorkerRuntime { )) } - fn send_input(&self, worker_id: &str, _request: WorkerInputRequest) -> WorkerInputResult { - embedded_input_rejected( - &self.runtime_id, - worker_id, - diagnostic( - "embedded_worker_execution_unavailable", - DiagnosticSeverity::Error, - "Embedded Worker input is disabled until an execution backend is connected" - .to_string(), + fn send_input(&self, worker_id: &str, request: WorkerInputRequest) -> WorkerInputResult { + if !self.execution_enabled { + return embedded_input_rejected( + &self.runtime_id, + worker_id, + diagnostic( + "embedded_worker_execution_unavailable", + DiagnosticSeverity::Info, + format!( + "worker input for '{worker_id}' requires an embedded execution backend" + ), + ), + ); + } + let Some(worker_ref) = self.worker_ref(worker_id) else { + return embedded_input_rejected( + &self.runtime_id, + worker_id, + diagnostic( + "embedded_worker_id_invalid", + DiagnosticSeverity::Warning, + "Worker id was empty and cannot be resolved".to_string(), + ), + ); + }; + let input = EmbeddedWorkerInput { + kind: match request.kind { + WorkerInputKind::User => EmbeddedWorkerInputKind::User, + WorkerInputKind::System => EmbeddedWorkerInputKind::System, + }, + content: request.content, + }; + match self.runtime.send_input(&worker_ref, input) { + Ok(ack) => WorkerInputResult { + state: WorkerOperationState::Accepted, + runtime_id: self.runtime_id.clone(), + worker_id: worker_id.to_string(), + transcript_sequence: Some(ack.transcript_sequence), + event_id: Some(ack.event_id), + diagnostics: Vec::new(), + }, + Err(error) => embedded_input_rejected( + &self.runtime_id, + worker_id, + embedded_runtime_diagnostic(&error), ), - ) + } } fn transcript( @@ -1848,14 +2039,18 @@ impl WorkspaceWorkerRuntime for RemoteWorkerRuntime { } } -fn embedded_runtime_capabilities(limit: usize, available: bool) -> RuntimeCapabilitySummary { +fn embedded_runtime_capabilities( + limit: usize, + available: bool, + execution_enabled: bool, +) -> RuntimeCapabilitySummary { RuntimeCapabilitySummary { can_list_hosts: true, can_list_workers: available, can_get_worker: available, can_spawn_worker: available, - can_stop_worker: false, - can_accept_input: false, + can_stop_worker: available && execution_enabled, + can_accept_input: available && execution_enabled, has_workspace_fs: false, has_shell: false, has_git: false, @@ -1875,6 +2070,48 @@ fn embedded_runtime_status_label(status: RuntimeStatus) -> &'static str { } } +fn embedded_spawn_execution_failure_diagnostic( + execution: &worker_runtime::execution::WorkerExecutionStatus, +) -> Option { + let result = execution.last_result.as_ref()?; + let severity = match result.outcome { + worker_runtime::execution::WorkerExecutionOutcome::Accepted => return None, + worker_runtime::execution::WorkerExecutionOutcome::Rejected + | worker_runtime::execution::WorkerExecutionOutcome::Busy + | worker_runtime::execution::WorkerExecutionOutcome::Unsupported => { + DiagnosticSeverity::Warning + } + worker_runtime::execution::WorkerExecutionOutcome::Errored => DiagnosticSeverity::Error, + }; + let status = match result.outcome { + worker_runtime::execution::WorkerExecutionOutcome::Accepted => "accepted", + worker_runtime::execution::WorkerExecutionOutcome::Rejected => "rejected", + worker_runtime::execution::WorkerExecutionOutcome::Busy => "busy", + worker_runtime::execution::WorkerExecutionOutcome::Unsupported => "unsupported", + worker_runtime::execution::WorkerExecutionOutcome::Errored => "errored", + }; + Some(diagnostic( + format!("embedded_worker_execution_spawn_{status}"), + severity, + format!( + "Embedded Worker execution spawn was {status} during setup; check runtime configuration" + ), + )) +} + +fn execution_last_result_blocks_control( + execution: &worker_runtime::execution::WorkerExecutionStatus, +) -> bool { + execution.last_result.as_ref().is_some_and(|result| { + matches!( + result.outcome, + worker_runtime::execution::WorkerExecutionOutcome::Rejected + | worker_runtime::execution::WorkerExecutionOutcome::Errored + | worker_runtime::execution::WorkerExecutionOutcome::Unsupported + ) + }) +} + fn embedded_worker_status_label(status: EmbeddedWorkerStatus) -> &'static str { match status { EmbeddedWorkerStatus::Running => "running", @@ -1883,6 +2120,24 @@ fn embedded_worker_status_label(status: EmbeddedWorkerStatus) -> &'static str { } } +fn embedded_worker_execution_status_label( + status: EmbeddedWorkerStatus, + run_state: WorkerExecutionRunState, +) -> &'static str { + match status { + EmbeddedWorkerStatus::Stopped => "stopped", + EmbeddedWorkerStatus::Cancelled => "cancelled", + EmbeddedWorkerStatus::Running => match run_state { + WorkerExecutionRunState::Idle => "idle", + WorkerExecutionRunState::Busy => "running", + WorkerExecutionRunState::Stopped => "stopped", + WorkerExecutionRunState::Rejected => "rejected", + WorkerExecutionRunState::Errored => "errored", + WorkerExecutionRunState::Unconnected => "unconnected", + }, + } +} + fn embedded_create_intent(intent: &WorkerSpawnIntent) -> WorkerIntent { match intent { WorkerSpawnIntent::WorkspaceCompanion => WorkerIntent::Role { @@ -1967,6 +2222,20 @@ fn remote_input_rejected( } } +fn embedded_lifecycle_rejected( + runtime_id: &str, + worker_id: &str, + diagnostic: RuntimeDiagnostic, +) -> WorkerLifecycleResult { + WorkerLifecycleResult { + state: WorkerOperationState::Rejected, + runtime_id: runtime_id.to_string(), + worker_id: worker_id.to_string(), + event_id: None, + diagnostics: vec![diagnostic], + } +} + fn remote_lifecycle_rejected( runtime_id: &str, worker_id: &str, @@ -2377,9 +2646,10 @@ pub fn placeholder_spawn_response(host_id: impl Into) -> WorkerSpawnResu mod tests { use super::*; use serde_json::json; + use std::collections::HashMap; use std::io::{Read as _, Write as _}; use std::net::TcpListener; - use std::sync::Arc; + use std::sync::{Arc, Mutex}; use std::thread; fn test_config_bundle() -> ConfigBundle { @@ -2408,6 +2678,105 @@ mod tests { .with_computed_digest() } + struct FailingSpawnBackend; + + impl worker_runtime::execution::WorkerExecutionBackend for FailingSpawnBackend { + fn backend_id(&self) -> &str { + "workspace-server-failing-spawn-backend" + } + + fn spawn_worker( + &self, + _request: worker_runtime::execution::WorkerExecutionSpawnRequest, + ) -> worker_runtime::execution::WorkerExecutionSpawnResult { + worker_runtime::execution::WorkerExecutionSpawnResult::Errored( + worker_runtime::execution::WorkerExecutionResult::errored( + worker_runtime::execution::WorkerExecutionOperation::Spawn, + "provider setup failed at /tmp/secret-provider-config", + ), + ) + } + + fn dispatch_input( + &self, + _handle: &worker_runtime::execution::WorkerExecutionHandle, + _input: EmbeddedWorkerInput, + ) -> worker_runtime::execution::WorkerExecutionResult { + worker_runtime::execution::WorkerExecutionResult::rejected( + worker_runtime::execution::WorkerExecutionOperation::Input, + "spawn failed before input could be dispatched", + ) + } + } + + #[derive(Default)] + struct AcceptingExecutionBackend { + contexts: + Mutex>, + } + + impl worker_runtime::execution::WorkerExecutionBackend for AcceptingExecutionBackend { + fn backend_id(&self) -> &str { + "workspace-server-test-backend" + } + + fn spawn_worker( + &self, + request: worker_runtime::execution::WorkerExecutionSpawnRequest, + ) -> worker_runtime::execution::WorkerExecutionSpawnResult { + self.contexts + .lock() + .unwrap() + .insert(request.worker_ref.clone(), request.context); + worker_runtime::execution::WorkerExecutionSpawnResult::Connected { + handle: worker_runtime::execution::WorkerExecutionHandle::new( + request.worker_ref, + self.backend_id(), + ), + run_state: WorkerExecutionRunState::Idle, + } + } + + fn dispatch_input( + &self, + handle: &worker_runtime::execution::WorkerExecutionHandle, + input: EmbeddedWorkerInput, + ) -> worker_runtime::execution::WorkerExecutionResult { + let context = self + .contexts + .lock() + .unwrap() + .get(handle.worker_ref()) + .cloned(); + let Some(context) = context else { + return worker_runtime::execution::WorkerExecutionResult::rejected( + worker_runtime::execution::WorkerExecutionOperation::Input, + "missing test context", + ); + }; + let content = input.content; + std::thread::spawn(move || { + std::thread::sleep(std::time::Duration::from_millis(10)); + let _ = context.publish_protocol_event(protocol::Event::Status { + status: protocol::WorkerStatus::Running, + }); + let _ = context.publish_protocol_event(protocol::Event::TextDone { + text: format!("echo: {content}"), + }); + let _ = context.publish_protocol_event(protocol::Event::RunEnd { + result: protocol::RunResult::Finished, + }); + let _ = context.publish_protocol_event(protocol::Event::Status { + status: protocol::WorkerStatus::Idle, + }); + }); + worker_runtime::execution::WorkerExecutionResult::accepted( + worker_runtime::execution::WorkerExecutionOperation::Input, + WorkerExecutionRunState::Busy, + ) + } + } + #[derive(Clone)] struct FixtureRuntime { runtime_id: String, @@ -2581,6 +2950,88 @@ mod tests { )); } + fn embedded_spawn_request() -> WorkerSpawnRequest { + WorkerSpawnRequest { + intent: WorkerSpawnIntent::TicketRole { + ticket_id: "00001KVZSGT0Q".to_string(), + role: TicketWorkerRole::Coder, + }, + requested_worker_name: None, + acceptance: WorkerSpawnAcceptanceRequirement::RunAccepted { + expected_segments: 0, + }, + profile: None, + config_bundle: None, + requested_capabilities: Vec::new(), + } + } + + #[test] + fn embedded_runtime_spawn_execution_failure_is_rejected_and_not_input_capable() { + let runtime = EmbeddedWorkerRuntime::new_memory_with_execution_backend( + "local:test", + Arc::new(FailingSpawnBackend), + ) + .expect("test backend should connect"); + let spawned = runtime.spawn_worker(embedded_spawn_request()); + assert_eq!(spawned.state, WorkerOperationState::Rejected); + assert!(spawned.acceptance_evidence.is_empty()); + assert!(spawned.diagnostics.iter().any(|diagnostic| { + diagnostic.code == "embedded_worker_execution_spawn_errored" + && !diagnostic.message.contains("/tmp/secret-provider-config") + })); + let worker = spawned.worker.expect("failed execution is still projected"); + assert_eq!(worker.status, "errored"); + assert!(!worker.capabilities.can_accept_input); + assert!(!worker.capabilities.can_stop); + } + + #[test] + fn embedded_runtime_with_execution_backend_routes_input_and_projects_transcript() { + let runtime = EmbeddedWorkerRuntime::new_memory_with_execution_backend( + "local:test", + Arc::new(AcceptingExecutionBackend::default()), + ) + .expect("test backend should connect"); + let spawned = runtime.spawn_worker(embedded_spawn_request()); + assert_eq!(spawned.state, WorkerOperationState::Accepted); + let worker = spawned.worker.expect("created embedded worker"); + assert!(worker.capabilities.can_accept_input); + assert!(worker.capabilities.can_stop); + + let input = runtime.send_input( + &worker.worker_id, + WorkerInputRequest { + kind: WorkerInputKind::User, + content: "hello".to_string(), + }, + ); + assert_eq!(input.state, WorkerOperationState::Accepted); + + let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2); + loop { + let detail = runtime + .worker(&worker.worker_id) + .worker + .expect("worker detail"); + let transcript = runtime.transcript(&worker.worker_id, 0, 10); + if detail.status == "idle" + && transcript + .items + .iter() + .any(|entry| entry.role == "assistant" && entry.content == "echo: hello") + { + assert!(detail.capabilities.can_accept_input); + break; + } + assert!( + std::time::Instant::now() < deadline, + "timed out waiting for embedded execution projection" + ); + std::thread::sleep(std::time::Duration::from_millis(10)); + } + } + #[test] fn embedded_runtime_registers_routes_input_and_transcript_without_internal_leaks() { let registry = diff --git a/crates/workspace-server/src/server.rs b/crates/workspace-server/src/server.rs index 8a38424e..6b7944f0 100644 --- a/crates/workspace-server/src/server.rs +++ b/crates/workspace-server/src/server.rs @@ -11,6 +11,7 @@ use axum::{Json, Router}; use futures::StreamExt; use serde::{Deserialize, Serialize}; use tokio::net::TcpListener; +use worker::runtime_adapter::WorkerRuntimeExecutionBackend; use crate::companion::{ CompanionCancelRequest, CompanionConsole, CompanionMessageRequest, CompanionMessageResponse, @@ -97,9 +98,23 @@ impl WorkspaceApi { updated_at: config.workspace_created_at.clone(), }) .await?; - let mut runtime = RuntimeRegistry::for_workspace(EmbeddedWorkerRuntime::new_memory( - config.workspace_id.clone(), - )); + let execution_backend = WorkerRuntimeExecutionBackend::from_workspace( + config.workspace_root.clone(), + ) + .map_err(|err| { + crate::Error::Store(format!( + "failed to initialize embedded Worker backend: {err}" + )) + })?; + let mut runtime = RuntimeRegistry::for_workspace( + EmbeddedWorkerRuntime::new_memory_with_execution_backend( + config.workspace_id.clone(), + Arc::new(execution_backend), + ) + .map_err(|err| { + crate::Error::Store(format!("invalid embedded Worker backend: {err}")) + })?, + ); for remote_config in config.remote_runtime_sources.iter().cloned() { runtime .register(RemoteWorkerRuntime::new(remote_config).map_err(|err| err.into_error())?); @@ -1134,10 +1149,13 @@ mod tests { .find(|worker| worker["role"] == "workspace_companion") .expect("companion worker is visible through runtime worker API"); assert_eq!(companion_worker["runtime_id"], "embedded-worker-runtime"); - assert_eq!(companion_worker["capabilities"]["can_stop"], false); + assert!(companion_worker["capabilities"]["can_stop"].is_boolean()); let companion_status = get_json(app.clone(), "/api/companion/status").await; - assert_eq!(companion_status["state"], "ready"); + assert!(matches!( + companion_status["state"].as_str(), + Some("ready") | Some("error") + )); assert_eq!(companion_status["worker"]["role"], "workspace_companion"); assert_eq!( companion_status["transport"]["kind"], @@ -1300,7 +1318,20 @@ mod tests { }), ) .await; - assert_eq!(spawned["state"], "accepted"); + assert_eq!(spawned["state"], "rejected"); + assert!( + spawned["diagnostics"] + .as_array() + .unwrap() + .iter() + .any(|diagnostic| { + diagnostic["code"] == "embedded_worker_execution_spawn_errored" + && !diagnostic["message"] + .as_str() + .unwrap() + .contains("/workspace/demo") + }) + ); let worker_id = spawned["worker"]["worker_id"].as_str().unwrap().to_string(); assert_eq!(spawned["worker"]["runtime_id"], "embedded-worker-runtime"); assert_eq!( diff --git a/package.nix b/package.nix index 799b2b55..4ca9659d 100644 --- a/package.nix +++ b/package.nix @@ -43,7 +43,7 @@ rustPlatform.buildRustPackage rec { filter = sourceFilter; }; - cargoHash = "sha256-TPXVkDfy61HCWTfSr0boLKlFbvc6zdpRKQRUDXuPppU="; + cargoHash = "sha256-1jSDcivotZ0/v5AURQaetn9xjH5JyQNDeNlJ4AcwEUc="; depsExtraArgs = { # Older fetchCargoVendor utilities used crates.io's API download endpoint,