diff --git a/crates/workspace-server/src/companion.rs b/crates/workspace-server/src/companion.rs index 1054cf9a..ff40c01e 100644 --- a/crates/workspace-server/src/companion.rs +++ b/crates/workspace-server/src/companion.rs @@ -1,14 +1,21 @@ use std::sync::{Arc, Mutex}; +use chrono::Utc; use serde::{Deserialize, Serialize}; -use worker_runtime::catalog::{CapabilityRequest, ProfileSelector}; +use worker_runtime::catalog::{CapabilityRequest, ConfigBundleRef, ProfileSelector}; +use worker_runtime::config_bundle::{ + ConfigBundle, ConfigBundleMetadata, ConfigBundleProvenance, ConfigProfileDescriptor, +}; use crate::hosts::{ - DiagnosticSeverity, RuntimeDiagnostic, RuntimeRegistry, WorkerOperationState, - WorkerSpawnAcceptanceRequirement, WorkerSpawnIntent, WorkerSpawnRequest, WorkerSummary, + DiagnosticSeverity, RuntimeDiagnostic, RuntimeRegistry, WorkerInputKind, WorkerInputRequest, + WorkerOperationState, WorkerSpawnAcceptanceRequirement, WorkerSpawnIntent, WorkerSpawnRequest, + WorkerSummary, WorkerTranscriptItem as RuntimeTranscriptItem, WorkerTranscriptProjection, }; const COMPANION_RUNTIME_ID: &str = "embedded-worker-runtime"; +const COMPANION_PROFILE_ID: &str = "builtin:companion"; +const COMPANION_CONFIG_BUNDLE_ID: &str = "workspace-companion-config"; const MAX_MESSAGE_CHARS: usize = 8_000; #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] @@ -85,12 +92,6 @@ pub struct CompanionTranscriptItem { pub status: String, } -#[derive(Debug, Default)] -struct CompanionTranscript { - items: Vec, - next_sequence: u64, -} - #[derive(Debug)] struct CompanionWorkerState { state: CompanionState, @@ -99,63 +100,86 @@ struct CompanionWorkerState { } pub struct CompanionConsole { + runtime: Arc, worker: Mutex, - transcript: Mutex, } impl CompanionConsole { pub fn new(runtime: Arc) -> Self { let initial = spawn_companion_worker(&runtime); Self { + runtime, worker: Mutex::new(initial), - transcript: Mutex::new(CompanionTranscript::default()), } } pub fn status(&self) -> CompanionStatusResponse { - let worker = match self.worker.lock() { - Ok(worker) => worker, - Err(_) => { - return CompanionStatusResponse { - state: CompanionState::Error, - worker: None, - transport: companion_transport(), - diagnostics: vec![diagnostic( - "companion_state_unavailable", - DiagnosticSeverity::Error, - "Companion state is unavailable", - )], - }; - } - }; - CompanionStatusResponse { - state: worker.state, - worker: worker.worker.clone(), - transport: companion_transport(), - diagnostics: worker.diagnostics.clone(), + match self.refresh_worker_state() { + Ok(worker) => CompanionStatusResponse { + state: worker.state, + worker: worker.worker.clone(), + transport: companion_transport(worker.worker.as_ref()), + diagnostics: worker.diagnostics.clone(), + }, + Err(diagnostic) => CompanionStatusResponse { + state: CompanionState::Error, + worker: None, + transport: companion_transport(None), + diagnostics: vec![diagnostic], + }, } } pub fn transcript(&self, start: usize, limit: usize) -> CompanionTranscriptProjection { - let transcript = match self.transcript.lock() { - Ok(transcript) => transcript, - Err(_) => { - return CompanionTranscriptProjection { - state: CompanionState::Error, - start, - limit, - total_items: 0, - next_start: None, - items: Vec::new(), - diagnostics: vec![diagnostic( - "companion_transcript_unavailable", - DiagnosticSeverity::Error, - "Companion transcript is unavailable", - )], - }; + match self.current_worker() { + Ok(Some(worker)) => { + match self + .runtime + .transcript(COMPANION_RUNTIME_ID, &worker.worker_id, start, limit) + { + Ok(transcript) => project_runtime_transcript( + &transcript, + companion_state_for_worker(&worker), + Vec::new(), + ), + Err(error) => CompanionTranscriptProjection { + state: CompanionState::Error, + start, + limit, + total_items: 0, + next_start: None, + items: Vec::new(), + diagnostics: vec![diagnostic( + "companion_transcript_unavailable", + DiagnosticSeverity::Error, + format!("Companion Worker transcript is unavailable: {error:?}"), + )], + }, + } } - }; - project_transcript(&transcript, CompanionState::Ready, start, limit, Vec::new()) + Ok(None) => CompanionTranscriptProjection { + state: CompanionState::Error, + start, + limit, + total_items: 0, + next_start: None, + items: Vec::new(), + diagnostics: vec![diagnostic( + "companion_worker_unavailable", + DiagnosticSeverity::Error, + "Workspace Companion Worker is unavailable", + )], + }, + Err(diagnostic) => CompanionTranscriptProjection { + state: CompanionState::Error, + start, + limit, + total_items: 0, + next_start: None, + items: Vec::new(), + diagnostics: vec![diagnostic], + }, + } } pub fn send_message(&self, request: CompanionMessageRequest) -> CompanionMessageResponse { @@ -175,11 +199,66 @@ impl CompanionConsole { )); } - self.rejected_message_response(diagnostic( - "companion_llm_not_connected", - DiagnosticSeverity::Error, - "Workspace Companion input is disabled until it is connected to actual Worker/LLM execution", - )) + let worker = match self.current_worker() { + Ok(Some(worker)) => worker, + Ok(None) => { + return self.rejected_message_response(diagnostic( + "companion_worker_unavailable", + DiagnosticSeverity::Error, + "Workspace Companion Worker is unavailable", + )); + } + Err(diagnostic) => return self.rejected_message_response(diagnostic), + }; + + let response = self.runtime.send_input( + COMPANION_RUNTIME_ID, + &worker.worker_id, + WorkerInputRequest { + kind: WorkerInputKind::User, + content: content.clone(), + }, + ); + + match response { + Ok(result) => { + let state = match result.state { + WorkerOperationState::Accepted => CompanionState::Accepted, + WorkerOperationState::Unsupported | WorkerOperationState::Rejected => { + CompanionState::Rejected + } + }; + let diagnostics = if result.diagnostics.is_empty() { + Vec::new() + } else { + result.diagnostics.clone() + }; + let projection = self.transcript(0, 200); + CompanionMessageResponse { + state, + worker: projection_worker(&self.status()), + user_item: projection + .items + .iter() + .rev() + .find(|item| item.role == "user" && item.content == content) + .cloned(), + assistant_item: projection + .items + .iter() + .rev() + .find(|item| item.role == "assistant") + .cloned(), + transcript: projection, + diagnostics, + } + } + Err(error) => self.rejected_message_response(diagnostic( + "companion_worker_input_failed", + DiagnosticSeverity::Error, + format!("Companion Worker input dispatch failed: {error:?}"), + )), + } } pub fn cancel(&self, _request: CompanionCancelRequest) -> CompanionMessageResponse { @@ -188,157 +267,267 @@ impl CompanionConsole { DiagnosticSeverity::Info, "Workspace Companion has no active generation to cancel", )]; - match self.transcript.lock() { - Ok(transcript) => response_from_locked_transcript( - &transcript, - CompanionState::Cancelled, - self.status().worker, - None, - None, - 0, - 200, - diagnostics, - ), - Err(_) => CompanionMessageResponse { - state: CompanionState::Error, - worker: self.status().worker, - user_item: None, - assistant_item: None, - transcript: CompanionTranscriptProjection { - state: CompanionState::Error, - start: 0, - limit: 200, - total_items: 0, - next_start: None, - items: Vec::new(), - diagnostics: vec![diagnostic( - "companion_transcript_unavailable", - DiagnosticSeverity::Error, - "Companion transcript is unavailable", - )], - }, - diagnostics, - }, + let status = self.status(); + let projection = self.transcript(0, 200); + CompanionMessageResponse { + state: CompanionState::Cancelled, + worker: status.worker, + user_item: None, + assistant_item: projection + .items + .iter() + .rev() + .find(|item| item.role == "assistant") + .cloned(), + transcript: projection, + diagnostics, } } fn rejected_message_response(&self, diagnostic: RuntimeDiagnostic) -> CompanionMessageResponse { - match self.transcript.lock() { - Ok(transcript) => response_from_locked_transcript( - &transcript, - CompanionState::Rejected, - self.status().worker, - None, - None, - 0, - 200, - vec![diagnostic], - ), - Err(_) => CompanionMessageResponse { - state: CompanionState::Rejected, - worker: self.status().worker, - user_item: None, - assistant_item: None, - transcript: CompanionTranscriptProjection { - state: CompanionState::Error, - start: 0, - limit: 200, - total_items: 0, - next_start: None, - items: Vec::new(), - diagnostics: vec![diagnostic.clone()], - }, - diagnostics: vec![diagnostic], - }, + let status = self.status(); + let projection = self.transcript(0, 200); + CompanionMessageResponse { + state: CompanionState::Rejected, + worker: status.worker, + user_item: None, + assistant_item: projection + .items + .iter() + .rev() + .find(|item| item.role == "assistant") + .cloned(), + transcript: projection, + diagnostics: vec![diagnostic], } } + + fn current_worker(&self) -> Result, RuntimeDiagnostic> { + self.refresh_worker_state() + .map(|state| state.worker.clone()) + } + + fn refresh_worker_state(&self) -> Result { + let mut state = self.worker.lock().map_err(|_| { + diagnostic( + "companion_state_unavailable", + DiagnosticSeverity::Error, + "Companion state is unavailable", + ) + })?; + let Some(worker_id) = state.worker.as_ref().map(|worker| worker.worker_id.clone()) else { + return Ok(CompanionWorkerState { + state: state.state, + worker: None, + diagnostics: state.diagnostics.clone(), + }); + }; + + match self.runtime.worker(COMPANION_RUNTIME_ID, &worker_id) { + Ok(worker) => { + let mut diagnostics = if worker.capabilities.can_accept_input { + Vec::new() + } else { + state.diagnostics.clone() + }; + if !worker.capabilities.can_accept_input + && !diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "companion_worker_not_input_capable") + { + diagnostics.push(companion_not_input_capable_diagnostic(&worker)); + } + state.state = companion_state_for_worker(&worker); + state.worker = Some(worker); + state.diagnostics = diagnostics; + } + Err(error) => { + state.state = CompanionState::Error; + state.diagnostics = vec![diagnostic( + "companion_worker_lookup_failed", + DiagnosticSeverity::Error, + format!("Companion Worker lookup failed: {error:?}"), + )]; + } + } + + Ok(CompanionWorkerState { + state: state.state, + worker: state.worker.clone(), + diagnostics: state.diagnostics.clone(), + }) + } +} + +fn projection_worker(status: &CompanionStatusResponse) -> Option { + status.worker.clone() } fn spawn_companion_worker(runtime: &RuntimeRegistry) -> CompanionWorkerState { - let request = WorkerSpawnRequest { - intent: WorkerSpawnIntent::WorkspaceCompanion, - requested_worker_name: Some("workspace-companion".to_string()), - acceptance: WorkerSpawnAcceptanceRequirement::RunAccepted { - expected_segments: 0, - }, - profile: Some(ProfileSelector::RuntimeDefault), - config_bundle: None, - requested_capabilities: vec![CapabilityRequest::named("conversation")], + let selector = companion_profile_selector(); + let mut diagnostics = Vec::new(); + let config_bundle = companion_config_bundle(); + let config_ref = ConfigBundleRef { + id: config_bundle.metadata.id.clone(), + digest: config_bundle.metadata.digest.clone(), }; - match runtime.spawn_worker(COMPANION_RUNTIME_ID, request) { - Ok(result) if result.state == WorkerOperationState::Accepted => CompanionWorkerState { - state: CompanionState::Ready, - worker: result.worker, - diagnostics: result.diagnostics, - }, - Ok(result) => CompanionWorkerState { - state: CompanionState::Error, - worker: result.worker, - diagnostics: result.diagnostics, + + match runtime.sync_config_bundle(COMPANION_RUNTIME_ID, config_bundle) { + Ok(result) => diagnostics.extend(result.diagnostics), + Err(error) => diagnostics.push(diagnostic( + "companion_config_bundle_sync_failed", + DiagnosticSeverity::Error, + format!("Workspace Companion config bundle sync failed: {error:?}"), + )), + } + + let response = runtime.spawn_worker( + COMPANION_RUNTIME_ID, + WorkerSpawnRequest { + intent: WorkerSpawnIntent::WorkspaceCompanion, + requested_worker_name: Some("workspace-companion".to_string()), + acceptance: WorkerSpawnAcceptanceRequirement::RunAccepted { + expected_segments: 0, + }, + profile: Some(selector), + config_bundle: Some(config_ref), + requested_capabilities: vec![CapabilityRequest::named("worker.input.user")], }, + ); + + match response { + Ok(response) => { + diagnostics.extend(response.diagnostics); + if let Some(worker) = response.worker { + if !worker.capabilities.can_accept_input { + diagnostics.push(companion_not_input_capable_diagnostic(&worker)); + } + CompanionWorkerState { + state: companion_state_for_worker(&worker), + worker: Some(worker), + diagnostics, + } + } else { + diagnostics.push(diagnostic( + "companion_worker_missing", + DiagnosticSeverity::Error, + "Workspace Companion Worker spawn did not return a Worker projection", + )); + CompanionWorkerState { + state: CompanionState::Error, + worker: None, + diagnostics, + } + } + } Err(error) => CompanionWorkerState { state: CompanionState::Error, worker: None, diagnostics: vec![diagnostic( "companion_worker_spawn_failed", DiagnosticSeverity::Error, - format!("Companion Worker spawn failed: {error:?}"), + format!("Workspace Companion Worker spawn failed: {error:?}"), )], }, } } -fn response_from_locked_transcript( - transcript: &CompanionTranscript, - state: CompanionState, - worker: Option, - user_item: Option, - assistant_item: Option, - start: usize, - limit: usize, - diagnostics: Vec, -) -> CompanionMessageResponse { - CompanionMessageResponse { - state, - worker, - user_item, - assistant_item, - transcript: project_transcript(transcript, state, start, limit, diagnostics.clone()), - diagnostics, +fn companion_profile_selector() -> ProfileSelector { + ProfileSelector::Builtin(COMPANION_PROFILE_ID.to_string()) +} + +fn companion_config_bundle() -> ConfigBundle { + ConfigBundle { + metadata: ConfigBundleMetadata { + id: COMPANION_CONFIG_BUNDLE_ID.to_string(), + digest: String::new(), + revision: "1".to_string(), + workspace_id: "workspace-companion".to_string(), + created_at: Utc::now().to_rfc3339(), + provenance: ConfigBundleProvenance { + source: "workspace-server".to_string(), + detail: Some("workspace-companion".to_string()), + }, + }, + profiles: vec![ConfigProfileDescriptor { + selector: companion_profile_selector(), + label: Some("Workspace Companion".to_string()), + }], + declarations: Vec::new(), + } + .with_computed_digest() +} + +fn companion_state_for_worker(worker: &WorkerSummary) -> CompanionState { + if !worker.capabilities.can_accept_input { + return CompanionState::Error; + } + match worker.status.as_str() { + "busy" | "running" | "stopping" => CompanionState::Busy, + "errored" | "error" | "stopped" | "unavailable" => CompanionState::Error, + _ => CompanionState::Ready, } } -fn project_transcript( - transcript: &CompanionTranscript, +fn companion_not_input_capable_diagnostic(worker: &WorkerSummary) -> RuntimeDiagnostic { + diagnostic( + "companion_worker_not_input_capable", + DiagnosticSeverity::Error, + format!( + "Workspace Companion Worker '{}' is not input-capable; check profile, provider, secret, and authority diagnostics", + worker.worker_id + ), + ) +} + +fn project_runtime_transcript( + transcript: &WorkerTranscriptProjection, state: CompanionState, - start: usize, - limit: usize, diagnostics: Vec, ) -> CompanionTranscriptProjection { - let limit = limit.min(200); - let total_items = transcript.items.len(); - let end = start.saturating_add(limit).min(total_items); - let items = if start < total_items { - transcript.items[start..end].to_vec() - } else { - Vec::new() - }; CompanionTranscriptProjection { state, - start, - limit, - total_items, - next_start: (end < total_items).then_some(end), - items, + start: transcript.start, + limit: transcript.limit, + total_items: transcript.total_items, + next_start: transcript.next_start, + items: transcript + .items + .iter() + .map(project_runtime_transcript_item) + .collect(), diagnostics, } } -fn companion_transport() -> CompanionTransportSummary { - CompanionTransportSummary { - kind: "embedded_worker_runtime".to_string(), - completion: "not_connected".to_string(), - limitation: "Workspace Companion is visible as an embedded Worker, but browser input is disabled until actual Worker/LLM execution is connected.".to_string(), +fn project_runtime_transcript_item(item: &RuntimeTranscriptItem) -> CompanionTranscriptItem { + CompanionTranscriptItem { + sequence: item.sequence, + role: item.role.clone(), + content: item.content.clone(), + created_at: format!("runtime_sequence:{}", item.sequence), + source: "worker_runtime".to_string(), + status: "committed".to_string(), + } +} + +fn companion_transport(worker: Option<&WorkerSummary>) -> CompanionTransportSummary { + if worker.is_some_and(|worker| worker.capabilities.can_accept_input) { + CompanionTransportSummary { + kind: "embedded_worker_runtime".to_string(), + completion: "connected".to_string(), + limitation: + "Workspace Companion input is dispatched through the normal Worker runtime path." + .to_string(), + } + } else { + CompanionTransportSummary { + kind: "embedded_worker_runtime".to_string(), + completion: "not_input_capable".to_string(), + limitation: + "Workspace Companion is a Worker but is not input-capable; inspect typed diagnostics for missing profile, provider, secret, or authority." + .to_string(), + } } } @@ -358,52 +547,114 @@ fn diagnostic( mod tests { use super::*; use crate::hosts::{EmbeddedWorkerRuntime, RuntimeRegistry}; + use std::collections::HashMap; + use std::sync::Mutex as StdMutex; + use std::thread; + use std::time::{Duration, Instant}; + use worker_runtime::execution::{ + WorkerExecutionBackend, WorkerExecutionContext, WorkerExecutionHandle, + WorkerExecutionOperation, WorkerExecutionResult, WorkerExecutionRunState, + WorkerExecutionSpawnRequest, WorkerExecutionSpawnResult, + }; + use worker_runtime::identity::WorkerRef; + use worker_runtime::interaction::WorkerInput; + + #[derive(Default)] + struct DeterministicExecutionBackend { + contexts: StdMutex>, + } + + impl WorkerExecutionBackend for DeterministicExecutionBackend { + fn backend_id(&self) -> &str { + "deterministic-companion-test" + } + + fn spawn_worker(&self, request: WorkerExecutionSpawnRequest) -> WorkerExecutionSpawnResult { + self.contexts + .lock() + .unwrap() + .insert(request.worker_ref.clone(), request.context); + WorkerExecutionSpawnResult::Connected { + handle: WorkerExecutionHandle::new( + request.worker_ref.clone(), + "deterministic-companion-test", + ), + run_state: WorkerExecutionRunState::Idle, + } + } + + fn dispatch_input( + &self, + handle: &WorkerExecutionHandle, + request: WorkerInput, + ) -> WorkerExecutionResult { + let worker = handle.worker_ref().clone(); + let context = self + .contexts + .lock() + .unwrap() + .get(&worker) + .cloned() + .expect("execution context"); + let content = request.content.clone(); + thread::spawn(move || { + thread::sleep(Duration::from_millis(25)); + let _ = context.publish_protocol_event(protocol::Event::TextDone { + text: format!("companion echoed: {content}"), + }); + }); + WorkerExecutionResult::accepted( + WorkerExecutionOperation::Input, + WorkerExecutionRunState::Idle, + ) + } + } #[test] - fn companion_spawns_visible_worker_without_fake_turn() { + fn companion_spawns_worker_with_companion_profile_and_diagnostic_when_not_input_capable() { let registry = RuntimeRegistry::for_workspace(EmbeddedWorkerRuntime::new_memory("local:test")); let registry = Arc::new(registry); let companion = CompanionConsole::new(registry.clone()); let status = companion.status(); - assert_eq!(status.state, CompanionState::Ready); let worker = status.worker.clone().expect("companion worker"); assert_eq!(worker.runtime_id, COMPANION_RUNTIME_ID); assert_eq!(worker.role.as_deref(), Some("workspace_companion")); - assert!(!worker.capabilities.can_stop); - - let workers = registry.list_workers(10); + assert!(!worker.capabilities.can_accept_input); + assert_eq!(status.transport.completion, "not_input_capable"); assert!( - workers - .items + status + .diagnostics .iter() - .any(|item| item.worker_id == worker.worker_id) + .any(|diagnostic| diagnostic.code == "companion_worker_not_input_capable") ); let response = companion.send_message(CompanionMessageRequest { content: "hello".to_string(), }); assert_eq!(response.state, CompanionState::Rejected); - assert!(response.transcript.items.is_empty()); assert!( - response + !response .diagnostics .iter() .any(|diagnostic| diagnostic.code == "companion_llm_not_connected") ); + assert!(response.transcript.items.is_empty()); - let runtime_transcript = registry - .transcript(COMPANION_RUNTIME_ID, &worker.worker_id, 0, 10) - .unwrap(); - assert!(runtime_transcript.items.is_empty()); + let worker_detail = registry + .worker(COMPANION_RUNTIME_ID, &worker.worker_id) + .expect("worker detail"); + assert_eq!(worker_detail.profile.as_deref(), Some(COMPANION_PROFILE_ID)); - let browser_payload = serde_json::to_string(&(status, response)).unwrap(); + let browser_payload = serde_json::to_string(&(status, response, worker_detail)).unwrap(); for forbidden in [ "/workspace/project", "metadata.json", ".jsonl", "/run/user/", + "session", + "manifest", ] { assert!( !browser_payload.contains(forbidden), @@ -411,4 +662,101 @@ mod tests { ); } } + + #[test] + fn companion_dispatches_input_and_projects_assistant_output_from_worker_runtime() { + let registry = RuntimeRegistry::for_workspace( + EmbeddedWorkerRuntime::new_memory_with_execution_backend( + "local:test", + Arc::new(DeterministicExecutionBackend::default()), + ) + .expect("embedded runtime"), + ); + let registry = Arc::new(registry); + let companion = CompanionConsole::new(registry.clone()); + let status = companion.status(); + let worker = status.worker.clone().expect("companion worker"); + assert_eq!(status.transport.completion, "connected"); + assert_eq!(worker.profile.as_deref(), Some(COMPANION_PROFILE_ID)); + assert!(worker.capabilities.can_accept_input); + + let source = registry + .observation_source(COMPANION_RUNTIME_ID, &worker.worker_id) + .expect("observation source"); + let crate::observation::RuntimeObservationSource::Embedded(source) = source else { + panic!("expected embedded observation source"); + }; + let cursor = source + .runtime + .worker_observation_cursor_now(&source.worker_ref) + .expect("observation cursor"); + + let response = companion.send_message(CompanionMessageRequest { + content: "hello runtime".to_string(), + }); + assert_eq!(response.state, CompanionState::Accepted); + assert!( + response + .user_item + .as_ref() + .is_some_and(|item| item.role == "user" && item.content == "hello runtime") + ); + assert!(response.diagnostics.is_empty()); + + let deadline = Instant::now() + Duration::from_secs(2); + let observed = loop { + let observed = source + .runtime + .read_worker_observation_events(&source.worker_ref, cursor) + .expect("observation events"); + if observed.iter().any(|event| { + serde_json::to_string(event) + .unwrap() + .contains("companion echoed: hello runtime") + }) { + break observed; + } + assert!( + Instant::now() < deadline, + "timed out waiting for observation event" + ); + thread::sleep(Duration::from_millis(20)); + }; + let observed_json = serde_json::to_string(&observed).unwrap(); + assert!(observed_json.contains("companion echoed: hello runtime")); + + let deadline = Instant::now() + Duration::from_secs(2); + let transcript = loop { + let transcript = companion.transcript(0, 20); + if transcript.items.iter().any(|item| { + item.role == "assistant" && item.content == "companion echoed: hello runtime" + }) { + break transcript; + } + assert!( + Instant::now() < deadline, + "timed out waiting for companion assistant output: {transcript:?}" + ); + thread::sleep(Duration::from_millis(20)); + }; + + assert!( + transcript + .items + .iter() + .any(|item| item.role == "user" && item.content == "hello runtime") + ); + assert!(transcript.items.iter().any(|item| { + item.role == "assistant" + && item.source == "worker_runtime" + && item.status == "committed" + })); + + let runtime_transcript = registry + .transcript(COMPANION_RUNTIME_ID, &worker.worker_id, 0, 20) + .expect("runtime transcript"); + assert!(runtime_transcript.items.iter().any(|item| { + item.role == "assistant" && item.content == "companion echoed: hello runtime" + })); + } } diff --git a/crates/workspace-server/src/hosts.rs b/crates/workspace-server/src/hosts.rs index a193ed62..0f754821 100644 --- a/crates/workspace-server/src/hosts.rs +++ b/crates/workspace-server/src/hosts.rs @@ -2160,9 +2160,10 @@ fn embedded_profile_selector(intent: &WorkerSpawnIntent) -> ProfileSelector { WorkerSpawnIntent::TicketRole { role, .. } => { ProfileSelector::Builtin(format!("builtin:{}", ticket_role_profile_slug(role))) } - WorkerSpawnIntent::WorkspaceCompanion | WorkerSpawnIntent::WorkspaceOrchestrator => { - ProfileSelector::RuntimeDefault + WorkerSpawnIntent::WorkspaceCompanion => { + ProfileSelector::Builtin("builtin:companion".to_string()) } + WorkerSpawnIntent::WorkspaceOrchestrator => ProfileSelector::RuntimeDefault, } } diff --git a/crates/workspace-server/src/server.rs b/crates/workspace-server/src/server.rs index 6b7944f0..2aff2549 100644 --- a/crates/workspace-server/src/server.rs +++ b/crates/workspace-server/src/server.rs @@ -89,6 +89,22 @@ pub struct WorkspaceApi { impl WorkspaceApi { pub async fn new(config: ServerConfig, store: Arc) -> Result { + let execution_backend = WorkerRuntimeExecutionBackend::from_workspace( + config.workspace_root.clone(), + ) + .map_err(|err| { + crate::Error::Store(format!( + "failed to initialize embedded Worker backend: {err}" + )) + })?; + Self::new_with_execution_backend(config, store, Arc::new(execution_backend)).await + } + + async fn new_with_execution_backend( + config: ServerConfig, + store: Arc, + execution_backend: Arc, + ) -> Result { store .upsert_workspace(&WorkspaceRecord { workspace_id: config.workspace_id.clone(), @@ -98,18 +114,10 @@ impl WorkspaceApi { updated_at: config.workspace_created_at.clone(), }) .await?; - let execution_backend = WorkerRuntimeExecutionBackend::from_workspace( - config.workspace_root.clone(), - ) - .map_err(|err| { - crate::Error::Store(format!( - "failed to initialize embedded Worker backend: {err}" - )) - })?; let mut runtime = RuntimeRegistry::for_workspace( EmbeddedWorkerRuntime::new_memory_with_execution_backend( config.workspace_id.clone(), - Arc::new(execution_backend), + execution_backend, ) .map_err(|err| { crate::Error::Store(format!("invalid embedded Worker backend: {err}")) @@ -228,7 +236,6 @@ pub async fn serve( pub struct WorkspaceResponse { pub workspace_id: String, pub display_name: String, - pub local_root: PathBuf, pub record_authority: String, pub schema_version: i64, pub auth: AuthConfig, @@ -335,7 +342,6 @@ async fn get_workspace(State(api): State) -> ApiResult, + >, + } + + impl worker_runtime::execution::WorkerExecutionBackend for DeterministicExecutionBackend { + fn backend_id(&self) -> &str { + "deterministic-workspace-server-test" + } + + fn spawn_worker( + &self, + request: worker_runtime::execution::WorkerExecutionSpawnRequest, + ) -> worker_runtime::execution::WorkerExecutionSpawnResult { + self.contexts + .lock() + .unwrap() + .insert(request.worker_ref.clone(), request.context); + worker_runtime::execution::WorkerExecutionSpawnResult::Connected { + handle: worker_runtime::execution::WorkerExecutionHandle::new( + request.worker_ref, + self.backend_id(), + ), + run_state: worker_runtime::execution::WorkerExecutionRunState::Idle, + } + } + + fn dispatch_input( + &self, + handle: &worker_runtime::execution::WorkerExecutionHandle, + input: worker_runtime::interaction::WorkerInput, + ) -> worker_runtime::execution::WorkerExecutionResult { + let context = self + .contexts + .lock() + .unwrap() + .get(handle.worker_ref()) + .cloned() + .expect("execution context"); + let content = input.content.clone(); + std::thread::spawn(move || { + std::thread::sleep(std::time::Duration::from_millis(25)); + let _ = context.publish_protocol_event(protocol::Event::TextDone { + text: format!("server companion echoed: {content}"), + }); + }); + worker_runtime::execution::WorkerExecutionResult::accepted( + worker_runtime::execution::WorkerExecutionOperation::Input, + worker_runtime::execution::WorkerExecutionRunState::Idle, + ) + } + } + fn test_identity() -> WorkspaceIdentity { WorkspaceIdentity { workspace_id: TEST_WORKSPACE_ID.to_string(), @@ -1161,6 +1225,7 @@ mod tests { companion_status["transport"]["kind"], "embedded_worker_runtime" ); + assert_ne!(companion_status["transport"]["completion"], "not_connected"); assert!(!companion_status.to_string().contains("/workspace/demo")); let companion_message = post_json( @@ -1177,11 +1242,17 @@ mod tests { .is_empty() ); assert!( - companion_message["diagnostics"] + !companion_message + .to_string() + .contains("companion_llm_not_connected"), + "legacy non-execution diagnostic leaked: {companion_message}" + ); + assert!( + !companion_message["diagnostics"] .as_array() .unwrap() - .iter() - .any(|diagnostic| diagnostic["code"] == "companion_llm_not_connected") + .is_empty(), + "missing typed diagnostic for non-input-capable Companion: {companion_message}" ); assert!(!companion_message.to_string().contains("/workspace/demo")); @@ -1275,6 +1346,76 @@ mod tests { ); } + #[tokio::test] + async fn legacy_companion_messages_route_dispatches_through_worker_runtime() { + let temp = tempfile::tempdir().unwrap(); + let config = ServerConfig::local_dev(temp.path().join("workspace"), test_identity()); + let api = WorkspaceApi::new_with_execution_backend( + config, + Arc::new(SqliteWorkspaceStore::in_memory().unwrap()), + Arc::new(DeterministicExecutionBackend::default()), + ) + .await + .unwrap(); + let app = build_router(api); + + let status = get_json(app.clone(), "/api/companion/status").await; + assert_eq!(status["transport"]["completion"], "connected"); + let worker_id = status["worker"]["worker_id"].as_str().unwrap().to_string(); + assert_eq!(status["worker"]["profile"], "builtin:companion"); + + let response = post_json( + app.clone(), + "/api/companion/messages", + serde_json::json!({ "content": "from legacy route" }), + ) + .await; + assert_eq!(response["state"], "accepted"); + assert_eq!(response["user_item"]["content"], "from legacy route"); + assert!(!response.to_string().contains("companion_llm_not_connected")); + + let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2); + let transcript = loop { + let transcript = get_json(app.clone(), "/api/companion/transcript").await; + let has_assistant = transcript["items"].as_array().unwrap().iter().any(|item| { + item["role"] == "assistant" + && item["content"] == "server companion echoed: from legacy route" + && item["source"] == "worker_runtime" + }); + if has_assistant { + break transcript; + } + assert!( + std::time::Instant::now() < deadline, + "timed out waiting for server companion transcript: {transcript}" + ); + tokio::time::sleep(std::time::Duration::from_millis(20)).await; + }; + assert!( + transcript["items"] + .as_array() + .unwrap() + .iter() + .any(|item| { item["role"] == "user" && item["content"] == "from legacy route" }) + ); + + let worker_transcript = get_json( + app, + &format!("/api/runtimes/embedded-worker-runtime/workers/{worker_id}/transcript"), + ) + .await; + assert!( + worker_transcript["items"] + .as_array() + .unwrap() + .iter() + .any(|item| { + item["role"] == "assistant" + && item["content"] == "server companion echoed: from legacy route" + }) + ); + } + #[tokio::test] async fn embedded_runtime_api_routes_by_runtime_and_worker_ids_without_leaking_internals() { let dir = tempfile::tempdir().unwrap();