From 18526ee36264610048f48b07b5db50ce86852fd2 Mon Sep 17 00:00:00 2001 From: Hare Date: Sun, 28 Jun 2026 05:50:11 +0900 Subject: [PATCH] feat: connect runtime worker execution adapter --- Cargo.lock | 2 + crates/worker-runtime/src/runtime.rs | 65 +++ crates/worker/Cargo.toml | 5 + crates/worker/src/entrypoint.rs | 15 + crates/worker/src/lib.rs | 2 + crates/worker/src/runtime_adapter.rs | 672 ++++++++++++++++++++++++++ crates/workspace-server/Cargo.toml | 1 + crates/workspace-server/src/hosts.rs | 17 + crates/workspace-server/src/server.rs | 21 +- package.nix | 2 +- 10 files changed, 798 insertions(+), 4 deletions(-) create mode 100644 crates/worker/src/runtime_adapter.rs diff --git a/Cargo.lock b/Cargo.lock index 7d240f78..1f046047 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5932,6 +5932,7 @@ dependencies = [ "uuid", "wasmtime", "wat", + "worker-runtime", "workflow", "yoi-plugin-pdk", ] @@ -6054,6 +6055,7 @@ dependencies = [ "tower", "tracing", "uuid", + "worker", "worker-runtime", ] diff --git a/crates/worker-runtime/src/runtime.rs b/crates/worker-runtime/src/runtime.rs index 062005bb..c5c57e2e 100644 --- a/crates/worker-runtime/src/runtime.rs +++ b/crates/worker-runtime/src/runtime.rs @@ -722,7 +722,12 @@ impl Runtime { ) -> Result { let mut state = self.lock()?; state.ensure_worker_ref(worker_ref)?; + let transcript_sequence = state.project_protocol_event_to_transcript(worker_ref, &payload); let event = state.push_worker_observation_event(worker_ref.clone(), payload); + if let Some(sequence) = transcript_sequence { + state.persist_worker(&worker_ref.worker_id)?; + state.persist_transcript_entry(&worker_ref.worker_id, sequence)?; + } Ok(event) } @@ -1259,6 +1264,66 @@ impl RuntimeState { event } + #[cfg(feature = "ws-server")] + fn append_worker_transcript_entry( + &mut self, + worker_ref: &WorkerRef, + role: TranscriptRole, + content: impl Into, + ) -> Option { + let content = content.into(); + if content.trim().is_empty() { + return None; + } + let event_id = self.last_event_id(); + let worker = self.workers.get_mut(&worker_ref.worker_id)?; + let sequence = worker.next_transcript_sequence; + worker.next_transcript_sequence += 1; + worker.transcript.push(TranscriptEntry { + sequence, + worker_ref: worker_ref.clone(), + role, + content, + event_id, + }); + Some(sequence) + } + + #[cfg(feature = "ws-server")] + fn project_protocol_event_to_transcript( + &mut self, + worker_ref: &WorkerRef, + event: &protocol::Event, + ) -> Option { + match event { + protocol::Event::TextDone { text, .. } => self.append_worker_transcript_entry( + worker_ref, + TranscriptRole::Assistant, + text.clone(), + ), + protocol::Event::Error { message, .. } => self.append_worker_transcript_entry( + worker_ref, + TranscriptRole::System, + format!("error: {message}"), + ), + protocol::Event::ToolResult { + id, + summary, + is_error, + .. + } => self.append_worker_transcript_entry( + worker_ref, + TranscriptRole::System, + format!( + "tool result {id}: {}{}", + if *is_error { "error: " } else { "" }, + summary + ), + ), + _ => None, + } + } + fn push_diagnostic( &mut self, severity: DiagnosticSeverity, diff --git a/crates/worker/Cargo.toml b/crates/worker/Cargo.toml index d853cc1e..5b065cc3 100644 --- a/crates/worker/Cargo.toml +++ b/crates/worker/Cargo.toml @@ -5,6 +5,10 @@ edition.workspace = true license.workspace = true autobins = false +[features] +default = [] +runtime-adapter = ["dep:worker-runtime"] + [dependencies] async-trait = { workspace = true } clap = { version = "4.6.0", features = ["derive"] } @@ -17,6 +21,7 @@ protocol = { workspace = true } provider = { workspace = true } client = { workspace = true } pod-registry = { workspace = true } +worker-runtime = { workspace = true, features = ["ws-server"], optional = true } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } reqwest = { version = "0.13", default-features = false, features = ["blocking", "native-tls"] } diff --git a/crates/worker/src/entrypoint.rs b/crates/worker/src/entrypoint.rs index 2801a25b..1ea6f81c 100644 --- a/crates/worker/src/entrypoint.rs +++ b/crates/worker/src/entrypoint.rs @@ -225,6 +225,21 @@ fn load_profile( Ok((resolved.manifest, PromptLoader::builtins_only())) } +#[cfg(feature = "runtime-adapter")] +pub(crate) fn resolve_runtime_profile_manifest( + profile: Option<&str>, + workspace_root: &Path, + worker_name: &str, +) -> Result<(WorkerManifest, PromptLoader), String> { + let selector = profile + .map(ProfileSelector::parse_cli) + .unwrap_or(ProfileSelector::Default); + let (mut manifest, loader) = load_profile(&selector, workspace_root, worker_name)?; + apply_profile_launch_policy(&mut manifest, workspace_root, None)?; + apply_plugin_resolution_plan(&mut manifest, workspace_root); + Ok((manifest, loader)) +} + fn load_single_manifest( path: &Path, explicit_worker_name: Option<&str>, diff --git a/crates/worker/src/lib.rs b/crates/worker/src/lib.rs index 399dfc4c..1f89097d 100644 --- a/crates/worker/src/lib.rs +++ b/crates/worker/src/lib.rs @@ -10,6 +10,8 @@ pub(crate) mod in_flight; pub mod ipc; pub mod prompt; pub mod runtime; +#[cfg(feature = "runtime-adapter")] +pub mod runtime_adapter; pub mod segment_log_sink; pub mod shared_state; mod shutdown_after_idle; diff --git a/crates/worker/src/runtime_adapter.rs b/crates/worker/src/runtime_adapter.rs new file mode 100644 index 00000000..34705657 --- /dev/null +++ b/crates/worker/src/runtime_adapter.rs @@ -0,0 +1,672 @@ +//! Adapter from `worker-runtime` execution backend boundary to the real +//! `worker` crate controller/run lifecycle. +//! +//! The adapter intentionally owns real `WorkerHandle`s internally and exposes +//! only the opaque `worker-runtime` execution handle to callers. Browser/API +//! projections therefore keep the existing runtime redaction boundary: no raw +//! socket paths, session paths, manifests, credentials, or handles leave this +//! module. + +use std::collections::HashMap; +use std::future::Future; +use std::path::PathBuf; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Arc, Mutex, mpsc}; +use std::time::Duration; + +use async_trait::async_trait; +use manifest::paths; +use pod_store::{CombinedStore, FsWorkerStore}; +use protocol::{Method, Segment, WorkerStatus}; +use session_store::FsStore; +use tokio::runtime::Runtime; +use tokio::sync::broadcast; +use worker_runtime::execution::{ + WorkerExecutionBackend, WorkerExecutionHandle, WorkerExecutionOperation, WorkerExecutionResult, + WorkerExecutionRunState, WorkerExecutionSpawnRequest, WorkerExecutionSpawnResult, +}; +use worker_runtime::interaction::{WorkerInput, WorkerInputKind}; + +use crate::{Worker, WorkerController, WorkerHandle}; + +const DEFAULT_BACKEND_ID: &str = "worker-crate"; +const RUNTIME_TASK_TIMEOUT: Duration = Duration::from_secs(10); + +/// Factory seam used by [`WorkerRuntimeExecutionBackend`] to construct a real +/// controller-backed Worker for a Runtime catalog entry. +#[async_trait] +pub trait RuntimeWorkerFactory: Send + Sync + 'static { + async fn spawn_controller( + &self, + request: WorkerExecutionSpawnRequest, + ) -> Result; +} + +/// Production factory that resolves a normal Worker profile and spawns it under +/// `WorkerController`. +#[derive(Debug, Clone)] +pub struct ProfileRuntimeWorkerFactory { + workspace_root: PathBuf, + cwd: PathBuf, + store_dir: Option, + pod_store_dir: Option, + profile: Option, + runtime_base_dir: Option, +} + +impl ProfileRuntimeWorkerFactory { + pub fn new(workspace_root: impl Into) -> Self { + let workspace_root = workspace_root.into(); + Self { + cwd: workspace_root.clone(), + workspace_root, + store_dir: None, + pod_store_dir: None, + profile: None, + runtime_base_dir: None, + } + } + + pub fn with_cwd(mut self, cwd: impl Into) -> Self { + self.cwd = cwd.into(); + self + } + + pub fn with_store_dir(mut self, store_dir: impl Into) -> Self { + self.store_dir = Some(store_dir.into()); + self + } + + pub fn with_pod_store_dir(mut self, pod_store_dir: impl Into) -> Self { + self.pod_store_dir = Some(pod_store_dir.into()); + self + } + + /// Set the profile selector used for Runtime-created Workers. When unset, + /// normal default profile discovery is used. + pub fn with_profile(mut self, profile: impl Into) -> Self { + self.profile = Some(profile.into()); + self + } + + pub fn with_runtime_base_dir(mut self, runtime_base_dir: impl Into) -> Self { + self.runtime_base_dir = Some(runtime_base_dir.into()); + self + } + + fn store_dir(&self) -> Result { + self.store_dir + .clone() + .or_else(paths::sessions_dir) + .ok_or_else(|| { + "could not resolve sessions directory (set YOI_HOME, YOI_DATA_DIR, or HOME)" + .to_string() + }) + } + + fn pod_store_dir(&self, store_dir: &std::path::Path) -> PathBuf { + self.pod_store_dir + .clone() + .or_else(|| paths::data_dir().map(|data_dir| data_dir.join("pods"))) + .or_else(|| store_dir.parent().map(|parent| parent.join("pods"))) + .unwrap_or_else(|| PathBuf::from("workers")) + } + + fn runtime_base_dir(&self) -> Result { + self.runtime_base_dir + .clone() + .or_else(|| crate::runtime::dir::default_base().ok()) + .ok_or_else(|| "could not resolve worker runtime directory".to_string()) + } + + fn runtime_worker_name(request: &WorkerExecutionSpawnRequest) -> String { + request.worker_ref.worker_id.to_string() + } + + fn runtime_profile<'a>( + &'a self, + request: &'a WorkerExecutionSpawnRequest, + ) -> Option> { + if let Some(profile) = self.profile.as_deref() { + return Some(std::borrow::Cow::Borrowed(profile)); + } + match &request.request.profile { + worker_runtime::catalog::ProfileSelector::RuntimeDefault => None, + worker_runtime::catalog::ProfileSelector::Named(name) => { + Some(std::borrow::Cow::Borrowed(name.as_str())) + } + worker_runtime::catalog::ProfileSelector::Builtin(name) => { + Some(std::borrow::Cow::Owned(format!("builtin:{name}"))) + } + } + } +} + +#[async_trait] +impl RuntimeWorkerFactory for ProfileRuntimeWorkerFactory { + async fn spawn_controller( + &self, + request: WorkerExecutionSpawnRequest, + ) -> Result { + let worker_name = Self::runtime_worker_name(&request); + let profile = self.runtime_profile(&request); + let (mut manifest, loader) = crate::entrypoint::resolve_runtime_profile_manifest( + profile.as_deref(), + &self.workspace_root, + &worker_name, + )?; + manifest.worker.name = worker_name; + + let store_dir = self.store_dir()?; + let session_store = FsStore::new(&store_dir).map_err(|err| { + format!( + "failed to initialize session store at {}: {err}", + store_dir.display() + ) + })?; + let pod_store_dir = self.pod_store_dir(&store_dir); + let pod_store = FsWorkerStore::new(&pod_store_dir).map_err(|err| { + format!( + "failed to initialize worker metadata store at {}: {err}", + pod_store_dir.display() + ) + })?; + let store = CombinedStore::new(session_store, pod_store); + + let worker = Worker::from_manifest_with_context( + manifest, + store, + loader, + self.workspace_root.clone(), + self.cwd.clone(), + ) + .await + .map_err(|err| format!("failed to create Worker from profile: {err}"))?; + + let runtime_base = self.runtime_base_dir()?; + let (handle, _shutdown_rx) = WorkerController::spawn(worker, &runtime_base) + .await + .map_err(|err| format!("failed to spawn Worker controller: {err}"))?; + Ok(handle) + } +} + +struct RuntimeWorkerExecution { + handle: WorkerHandle, + busy: Arc, +} + +/// `worker-runtime` execution backend backed by real `worker` crate Workers. +pub struct WorkerRuntimeExecutionBackend { + backend_id: String, + factory: Arc, + runtime: Mutex>, + workers: Mutex>, +} + +impl WorkerRuntimeExecutionBackend { + pub fn from_workspace(workspace_root: impl Into) -> Result { + Self::new(ProfileRuntimeWorkerFactory::new(workspace_root)) + } +} + +impl WorkerRuntimeExecutionBackend +where + F: RuntimeWorkerFactory, +{ + pub fn new(factory: F) -> Result { + let runtime = tokio::runtime::Builder::new_multi_thread() + .thread_name("yoi-runtime-worker-adapter") + .enable_all() + .build() + .map_err(|err| format!("failed to build worker adapter runtime: {err}"))?; + Ok(Self { + backend_id: DEFAULT_BACKEND_ID.to_string(), + factory: Arc::new(factory), + runtime: Mutex::new(Some(runtime)), + workers: Mutex::new(HashMap::new()), + }) + } + + pub fn with_backend_id(mut self, backend_id: impl Into) -> Self { + self.backend_id = backend_id.into(); + self + } + + fn wait_for_runtime_task(receiver: mpsc::Receiver>) -> Result { + receiver + .recv_timeout(RUNTIME_TASK_TIMEOUT) + .map_err(|err| format!("worker adapter task did not complete: {err}"))? + } + + fn spawn_on_adapter_runtime(&self, task: Fut) -> Result<(), String> + where + Fut: Future + Send + 'static, + { + let runtime = self + .runtime + .lock() + .map_err(|_| "worker adapter runtime lock is poisoned".to_string())?; + let runtime = runtime + .as_ref() + .ok_or_else(|| "worker adapter runtime is shutting down".to_string())?; + runtime.spawn(task); + Ok(()) + } + + fn run_on_adapter_runtime(&self, task: Fut) -> Result + where + T: Send + 'static, + Fut: Future> + Send + 'static, + { + let (tx, rx) = mpsc::sync_channel(1); + self.spawn_on_adapter_runtime(async move { + let _ = tx.send(task.await); + })?; + Self::wait_for_runtime_task(rx) + } + + fn get_execution( + &self, + handle: &WorkerExecutionHandle, + ) -> Result<(WorkerHandle, Arc), WorkerExecutionResult> { + if handle.backend_id() != self.backend_id() { + return Err(WorkerExecutionResult::rejected( + WorkerExecutionOperation::Input, + format!( + "execution handle belongs to backend {}, not {}", + handle.backend_id(), + self.backend_id() + ), + )); + } + let workers = self.workers.lock().map_err(|_| { + WorkerExecutionResult::errored( + WorkerExecutionOperation::Input, + "worker adapter registry lock is poisoned", + ) + })?; + workers + .get(handle.worker_ref()) + .map(|execution| (execution.handle.clone(), execution.busy.clone())) + .ok_or_else(|| { + WorkerExecutionResult::rejected( + WorkerExecutionOperation::Input, + "execution handle does not reference a live Worker", + ) + }) + } + + fn send_method( + &self, + operation: WorkerExecutionOperation, + worker: WorkerHandle, + method: Method, + ) -> WorkerExecutionResult { + self.run_on_adapter_runtime(async move { + worker + .send(method) + .await + .map_err(|err| format!("failed to send Worker method: {err}")) + }) + .map(|_| WorkerExecutionResult::accepted(operation, WorkerExecutionRunState::Idle)) + .unwrap_or_else(|message| WorkerExecutionResult::errored(operation, message)) + } +} + +impl Drop for WorkerRuntimeExecutionBackend { + fn drop(&mut self) { + if let Ok(mut runtime) = self.runtime.lock() + && let Some(runtime) = runtime.take() + { + let _ = std::thread::spawn(move || drop(runtime)).join(); + } + } +} + +impl WorkerExecutionBackend for WorkerRuntimeExecutionBackend +where + F: RuntimeWorkerFactory, +{ + fn backend_id(&self) -> &str { + &self.backend_id + } + + fn spawn_worker(&self, request: WorkerExecutionSpawnRequest) -> WorkerExecutionSpawnResult { + if self + .workers + .lock() + .map(|workers| workers.contains_key(&request.worker_ref)) + .unwrap_or(false) + { + return WorkerExecutionSpawnResult::Rejected(WorkerExecutionResult::busy( + WorkerExecutionOperation::Spawn, + "Worker is already connected to execution backend", + )); + } + + let factory = self.factory.clone(); + let bridge_context = request.context.clone(); + let worker_ref = request.worker_ref.clone(); + let spawn_result = + self.run_on_adapter_runtime(async move { factory.spawn_controller(request).await }); + + let handle = match spawn_result { + Ok(handle) => handle, + Err(message) => { + return WorkerExecutionSpawnResult::Errored(WorkerExecutionResult::errored( + WorkerExecutionOperation::Spawn, + message, + )); + } + }; + + let mut events = handle.subscribe(); + let bridge_handle = handle.clone(); + let busy = Arc::new(AtomicBool::new(false)); + let bridge_busy = busy.clone(); + if let Err(message) = self.spawn_on_adapter_runtime(async move { + loop { + match events.recv().await { + Ok(event) => { + let _ = bridge_context.publish_protocol_event(event); + if bridge_handle.shared_state.get_status() == WorkerStatus::Idle { + bridge_busy.store(false, Ordering::SeqCst); + } + } + Err(broadcast::error::RecvError::Lagged(_)) => continue, + Err(broadcast::error::RecvError::Closed) => break, + } + } + }) { + return WorkerExecutionSpawnResult::Errored(WorkerExecutionResult::errored( + WorkerExecutionOperation::Spawn, + message, + )); + } + + let mut workers = match self.workers.lock() { + Ok(workers) => workers, + Err(_) => { + return WorkerExecutionSpawnResult::Errored(WorkerExecutionResult::errored( + WorkerExecutionOperation::Spawn, + "worker adapter registry lock is poisoned", + )); + } + }; + workers.insert(worker_ref.clone(), RuntimeWorkerExecution { handle, busy }); + + WorkerExecutionSpawnResult::Connected { + handle: WorkerExecutionHandle::new(worker_ref, self.backend_id()), + run_state: WorkerExecutionRunState::Idle, + } + } + + fn dispatch_input( + &self, + handle: &WorkerExecutionHandle, + input: WorkerInput, + ) -> WorkerExecutionResult { + let (worker, busy) = match self.get_execution(handle) { + Ok(execution) => execution, + Err(mut result) => { + result.operation = WorkerExecutionOperation::Input; + return result; + } + }; + + if worker.shared_state.get_status() != WorkerStatus::Idle + || busy + .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) + .is_err() + { + return WorkerExecutionResult::busy( + WorkerExecutionOperation::Input, + "Worker is already running; runtime adapter v0 does not queue input", + ); + } + + let WorkerInputKind::User = input.kind else { + busy.store(false, Ordering::SeqCst); + return WorkerExecutionResult::unsupported( + WorkerExecutionOperation::Input, + "runtime adapter currently dispatches user input only", + ); + }; + let content = input.content.trim().to_string(); + if content.is_empty() { + busy.store(false, Ordering::SeqCst); + return WorkerExecutionResult::rejected( + WorkerExecutionOperation::Input, + "runtime adapter rejects empty user input", + ); + } + + let result = self.send_method( + WorkerExecutionOperation::Input, + worker, + Method::Run { + input: vec![Segment::text(content)], + }, + ); + if result.outcome != worker_runtime::execution::WorkerExecutionOutcome::Accepted { + busy.store(false, Ordering::SeqCst); + } + result + } + + fn stop_worker(&self, handle: &WorkerExecutionHandle) -> WorkerExecutionResult { + let (worker, _busy) = match self.get_execution(handle) { + Ok(execution) => execution, + Err(mut result) => { + result.operation = WorkerExecutionOperation::Stop; + return result; + } + }; + self.send_method(WorkerExecutionOperation::Stop, worker, Method::Shutdown) + } + + fn cancel_worker(&self, handle: &WorkerExecutionHandle) -> WorkerExecutionResult { + let (worker, _busy) = match self.get_execution(handle) { + Ok(execution) => execution, + Err(mut result) => { + result.operation = WorkerExecutionOperation::Cancel; + return result; + } + }; + self.send_method(WorkerExecutionOperation::Cancel, worker, Method::Cancel) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::pin::Pin; + use std::sync::atomic::{AtomicUsize, Ordering}; + + use async_trait::async_trait; + use futures::Stream; + use llm_engine::Engine; + use llm_engine::llm_client::event::{Event as LlmEvent, ResponseStatus, StatusEvent}; + use llm_engine::llm_client::{ClientError, LlmClient, Request}; + use manifest::{Scope, WorkerManifest}; + use worker_runtime::Runtime as EmbeddedRuntime; + use worker_runtime::catalog::{CreateWorkerRequest, ProfileSelector, WorkerIntent}; + use worker_runtime::identity::RuntimeId; + use worker_runtime::management::RuntimeOptions; + use worker_runtime::observation::{TranscriptQuery, TranscriptRole}; + + #[derive(Clone)] + struct MockClient { + responses: Arc>>, + call_count: Arc, + captured: Arc>>, + } + + impl MockClient { + fn new(events: Vec) -> Self { + Self { + responses: Arc::new(vec![events]), + call_count: Arc::new(AtomicUsize::new(0)), + captured: Arc::new(Mutex::new(Vec::new())), + } + } + } + + #[async_trait] + impl LlmClient for MockClient { + fn clone_boxed(&self) -> Box { + Box::new(self.clone()) + } + + async fn stream( + &self, + request: Request, + ) -> Result> + Send>>, ClientError> + { + self.captured.lock().unwrap().push(request); + let idx = self.call_count.fetch_add(1, Ordering::SeqCst); + let events = self.responses.get(idx).cloned().unwrap_or_default(); + Ok(Box::pin(futures::stream::iter(events.into_iter().map(Ok)))) + } + } + + struct MockFactory { + client: MockClient, + runtime_base: PathBuf, + cwd: PathBuf, + store_dir: PathBuf, + pod_store_dir: PathBuf, + } + + #[async_trait] + impl RuntimeWorkerFactory for MockFactory { + async fn spawn_controller( + &self, + _request: WorkerExecutionSpawnRequest, + ) -> Result { + let manifest = WorkerManifest::from_toml( + r#" + [worker] + name = "runtime-adapter-test" + pwd = "./" + + [model] + scheme = "anthropic" + model_id = "test-model" + + [engine] + max_tokens = 100 + + [[scope.allow]] + target = "./" + permission = "write" + "#, + ) + .map_err(|err| err.to_string())?; + let store = CombinedStore::new( + FsStore::new(&self.store_dir).map_err(|err| err.to_string())?, + FsWorkerStore::new(&self.pod_store_dir).map_err(|err| err.to_string())?, + ); + let scope = Scope::writable(&self.cwd).map_err(|err| err.to_string())?; + let worker = Worker::new( + manifest, + Engine::new(self.client.clone()), + store, + self.cwd.clone(), + scope, + ) + .await + .map_err(|err| err.to_string())?; + let (handle, _shutdown_rx) = WorkerController::spawn(worker, &self.runtime_base) + .await + .map_err(|err| err.to_string())?; + Ok(handle) + } + } + + fn simple_text_events() -> Vec { + vec![ + LlmEvent::text_block_start(0), + LlmEvent::text_delta(0, "hello"), + LlmEvent::text_delta(0, " from worker"), + LlmEvent::text_block_stop(0, None), + LlmEvent::Status(StatusEvent { + status: ResponseStatus::Completed, + }), + ] + } + + fn create_request(name: &str) -> CreateWorkerRequest { + CreateWorkerRequest { + intent: WorkerIntent::Role { + role: name.to_string(), + purpose: None, + }, + profile: ProfileSelector::RuntimeDefault, + config_bundle: None, + requested_capabilities: Vec::new(), + workspace_refs: Vec::new(), + mount_refs: Vec::new(), + } + } + + #[test] + fn adapter_dispatches_user_input_through_worker_run_lifecycle() { + let client = MockClient::new(simple_text_events()); + let runtime_base = tempfile::tempdir().unwrap(); + let cwd = tempfile::tempdir().unwrap(); + let store = tempfile::tempdir().unwrap(); + let factory = MockFactory { + client: client.clone(), + runtime_base: runtime_base.path().to_path_buf(), + cwd: cwd.path().to_path_buf(), + store_dir: store.path().join("sessions"), + pod_store_dir: store.path().join("pods"), + }; + let backend = WorkerRuntimeExecutionBackend::new(factory).unwrap(); + let runtime = EmbeddedRuntime::with_execution_backend( + RuntimeOptions { + runtime_id: RuntimeId::new("embedded"), + ..RuntimeOptions::default() + }, + Arc::new(backend), + ) + .unwrap(); + let detail = runtime.create_worker(create_request("chat")).unwrap(); + + runtime + .send_input(&detail.worker_ref, WorkerInput::user("say hello")) + .unwrap(); + + let deadline = std::time::Instant::now() + Duration::from_secs(5); + loop { + let projection = runtime + .transcript_projection(&detail.worker_ref, TranscriptQuery::new(0, 10)) + .unwrap(); + if projection.items.iter().any(|item| { + item.role == TranscriptRole::Assistant && item.content == "hello from worker" + }) { + break; + } + assert!( + std::time::Instant::now() < deadline, + "timed out waiting for assistant transcript projection" + ); + std::thread::sleep(Duration::from_millis(20)); + } + + assert_eq!(client.captured.lock().unwrap().len(), 1); + let observations = runtime + .read_worker_observation_events( + &detail.worker_ref, + worker_runtime::observation::WorkerObservationCursor::zero(), + ) + .unwrap(); + assert!( + observations + .iter() + .any(|event| matches!(event.payload, protocol::Event::TextDone { .. })) + ); + } +} diff --git a/crates/workspace-server/Cargo.toml b/crates/workspace-server/Cargo.toml index 9e6c20cd..842975df 100644 --- a/crates/workspace-server/Cargo.toml +++ b/crates/workspace-server/Cargo.toml @@ -22,6 +22,7 @@ thiserror.workspace = true ticket.workspace = true tokio = { workspace = true, features = ["fs", "macros", "net", "rt-multi-thread", "sync"] } tokio-tungstenite.workspace = true +worker = { workspace = true, features = ["runtime-adapter"] } worker-runtime = { workspace = true, features = ["ws-server"] } toml.workspace = true tracing.workspace = true diff --git a/crates/workspace-server/src/hosts.rs b/crates/workspace-server/src/hosts.rs index b2ba5548..667d560c 100644 --- a/crates/workspace-server/src/hosts.rs +++ b/crates/workspace-server/src/hosts.rs @@ -917,6 +917,23 @@ impl EmbeddedWorkerRuntime { Self::from_runtime(workspace_id, runtime) } + pub fn new_memory_with_execution_backend( + workspace_id: impl AsRef, + backend: std::sync::Arc, + ) -> Result { + let runtime_id = EmbeddedRuntimeId::new(EMBEDDED_RUNTIME_ID) + .expect("embedded runtime id is a non-empty literal"); + let runtime = worker_runtime::Runtime::with_execution_backend( + EmbeddedRuntimeOptions { + runtime_id: Some(runtime_id), + display_name: Some("Workspace backend embedded Runtime".to_string()), + ..EmbeddedRuntimeOptions::default() + }, + backend, + )?; + Ok(Self::from_runtime(workspace_id, runtime)) + } + pub fn from_runtime(workspace_id: impl AsRef, runtime: worker_runtime::Runtime) -> Self { let runtime_id = runtime .runtime_id() diff --git a/crates/workspace-server/src/server.rs b/crates/workspace-server/src/server.rs index 8a38424e..d9ba75c5 100644 --- a/crates/workspace-server/src/server.rs +++ b/crates/workspace-server/src/server.rs @@ -11,6 +11,7 @@ use axum::{Json, Router}; use futures::StreamExt; use serde::{Deserialize, Serialize}; use tokio::net::TcpListener; +use worker::runtime_adapter::WorkerRuntimeExecutionBackend; use crate::companion::{ CompanionCancelRequest, CompanionConsole, CompanionMessageRequest, CompanionMessageResponse, @@ -97,9 +98,23 @@ impl WorkspaceApi { updated_at: config.workspace_created_at.clone(), }) .await?; - let mut runtime = RuntimeRegistry::for_workspace(EmbeddedWorkerRuntime::new_memory( - config.workspace_id.clone(), - )); + let execution_backend = WorkerRuntimeExecutionBackend::from_workspace( + config.workspace_root.clone(), + ) + .map_err(|err| { + crate::Error::Store(format!( + "failed to initialize embedded Worker backend: {err}" + )) + })?; + let mut runtime = RuntimeRegistry::for_workspace( + EmbeddedWorkerRuntime::new_memory_with_execution_backend( + config.workspace_id.clone(), + Arc::new(execution_backend), + ) + .map_err(|err| { + crate::Error::Store(format!("invalid embedded Worker backend: {err}")) + })?, + ); for remote_config in config.remote_runtime_sources.iter().cloned() { runtime .register(RemoteWorkerRuntime::new(remote_config).map_err(|err| err.into_error())?); diff --git a/package.nix b/package.nix index 799b2b55..4ca9659d 100644 --- a/package.nix +++ b/package.nix @@ -43,7 +43,7 @@ rustPlatform.buildRustPackage rec { filter = sourceFilter; }; - cargoHash = "sha256-TPXVkDfy61HCWTfSr0boLKlFbvc6zdpRKQRUDXuPppU="; + cargoHash = "sha256-1jSDcivotZ0/v5AURQaetn9xjH5JyQNDeNlJ4AcwEUc="; depsExtraArgs = { # Older fetchCargoVendor utilities used crates.io's API download endpoint,