diff --git a/crates/worker-runtime/src/runtime.rs b/crates/worker-runtime/src/runtime.rs index c5c57e2e..83f51cd1 100644 --- a/crates/worker-runtime/src/runtime.rs +++ b/crates/worker-runtime/src/runtime.rs @@ -11,7 +11,8 @@ use crate::error::RuntimeError; use crate::execution::{ WorkerExecutionBackend, WorkerExecutionBackendKind, WorkerExecutionBackendRef, WorkerExecutionHandle, WorkerExecutionOperation, WorkerExecutionResult, - WorkerExecutionSpawnRequest, WorkerExecutionSpawnResult, WorkerExecutionStatus, + WorkerExecutionRunState, WorkerExecutionSpawnRequest, WorkerExecutionSpawnResult, + WorkerExecutionStatus, }; #[cfg(feature = "fs-store")] use crate::fs_store::{ @@ -723,9 +724,13 @@ impl Runtime { let mut state = self.lock()?; state.ensure_worker_ref(worker_ref)?; let transcript_sequence = state.project_protocol_event_to_transcript(worker_ref, &payload); + let execution_state_changed = + state.project_protocol_event_to_execution(worker_ref, &payload); let event = state.push_worker_observation_event(worker_ref.clone(), payload); - if let Some(sequence) = transcript_sequence { + if transcript_sequence.is_some() || execution_state_changed { state.persist_worker(&worker_ref.worker_id)?; + } + if let Some(sequence) = transcript_sequence { state.persist_transcript_entry(&worker_ref.worker_id, sequence)?; } Ok(event) @@ -1324,6 +1329,51 @@ impl RuntimeState { } } + #[cfg(feature = "ws-server")] + fn project_protocol_event_to_execution( + &mut self, + worker_ref: &WorkerRef, + event: &protocol::Event, + ) -> bool { + let Some(worker) = self.workers.get_mut(&worker_ref.worker_id) else { + return false; + }; + let status = &mut worker.execution; + let next_run_state = match event { + protocol::Event::Status { + status: protocol::WorkerStatus::Running, + } => Some(WorkerExecutionRunState::Busy), + protocol::Event::Status { + status: protocol::WorkerStatus::Idle, + } => Some(WorkerExecutionRunState::Idle), + protocol::Event::Status { + status: protocol::WorkerStatus::Paused, + } => Some(WorkerExecutionRunState::Busy), + protocol::Event::Snapshot { status, .. } => match status { + protocol::WorkerStatus::Running => Some(WorkerExecutionRunState::Busy), + protocol::WorkerStatus::Idle => Some(WorkerExecutionRunState::Idle), + protocol::WorkerStatus::Paused => Some(WorkerExecutionRunState::Busy), + }, + protocol::Event::RunEnd { result } => match result { + protocol::RunResult::Finished | protocol::RunResult::RolledBack => { + Some(WorkerExecutionRunState::Idle) + } + protocol::RunResult::Paused => Some(WorkerExecutionRunState::Busy), + protocol::RunResult::LimitReached => Some(WorkerExecutionRunState::Errored), + }, + protocol::Event::Error { .. } => Some(WorkerExecutionRunState::Errored), + _ => None, + }; + let Some(next_run_state) = next_run_state else { + return false; + }; + if status.run_state == next_run_state { + return false; + } + status.run_state = next_run_state; + true + } + fn push_diagnostic( &mut self, severity: DiagnosticSeverity, diff --git a/crates/worker/src/runtime_adapter.rs b/crates/worker/src/runtime_adapter.rs index 34705657..22854447 100644 --- a/crates/worker/src/runtime_adapter.rs +++ b/crates/worker/src/runtime_adapter.rs @@ -123,6 +123,24 @@ impl ProfileRuntimeWorkerFactory { request.worker_ref.worker_id.to_string() } + fn runtime_profile_value( + profile: &worker_runtime::catalog::ProfileSelector, + ) -> Option> { + match profile { + worker_runtime::catalog::ProfileSelector::RuntimeDefault => None, + worker_runtime::catalog::ProfileSelector::Named(name) => { + Some(std::borrow::Cow::Borrowed(name.as_str())) + } + worker_runtime::catalog::ProfileSelector::Builtin(name) => { + if name.starts_with("builtin:") { + Some(std::borrow::Cow::Borrowed(name.as_str())) + } else { + Some(std::borrow::Cow::Owned(format!("builtin:{name}"))) + } + } + } + } + fn runtime_profile<'a>( &'a self, request: &'a WorkerExecutionSpawnRequest, @@ -130,15 +148,7 @@ impl ProfileRuntimeWorkerFactory { if let Some(profile) = self.profile.as_deref() { return Some(std::borrow::Cow::Borrowed(profile)); } - match &request.request.profile { - worker_runtime::catalog::ProfileSelector::RuntimeDefault => None, - worker_runtime::catalog::ProfileSelector::Named(name) => { - Some(std::borrow::Cow::Borrowed(name.as_str())) - } - worker_runtime::catalog::ProfileSelector::Builtin(name) => { - Some(std::borrow::Cow::Owned(format!("builtin:{name}"))) - } - } + Self::runtime_profile_value(&request.request.profile) } } @@ -302,6 +312,7 @@ where operation: WorkerExecutionOperation, worker: WorkerHandle, method: Method, + accepted_run_state: WorkerExecutionRunState, ) -> WorkerExecutionResult { self.run_on_adapter_runtime(async move { worker @@ -309,7 +320,7 @@ where .await .map_err(|err| format!("failed to send Worker method: {err}")) }) - .map(|_| WorkerExecutionResult::accepted(operation, WorkerExecutionRunState::Idle)) + .map(|_| WorkerExecutionResult::accepted(operation, accepted_run_state)) .unwrap_or_else(|message| WorkerExecutionResult::errored(operation, message)) } } @@ -448,6 +459,7 @@ where Method::Run { input: vec![Segment::text(content)], }, + WorkerExecutionRunState::Busy, ); if result.outcome != worker_runtime::execution::WorkerExecutionOutcome::Accepted { busy.store(false, Ordering::SeqCst); @@ -463,7 +475,12 @@ where return result; } }; - self.send_method(WorkerExecutionOperation::Stop, worker, Method::Shutdown) + self.send_method( + WorkerExecutionOperation::Stop, + worker, + Method::Shutdown, + WorkerExecutionRunState::Stopped, + ) } fn cancel_worker(&self, handle: &WorkerExecutionHandle) -> WorkerExecutionResult { @@ -474,7 +491,12 @@ where return result; } }; - self.send_method(WorkerExecutionOperation::Cancel, worker, Method::Cancel) + self.send_method( + WorkerExecutionOperation::Cancel, + worker, + Method::Cancel, + WorkerExecutionRunState::Idle, + ) } } @@ -611,6 +633,24 @@ mod tests { } } + #[test] + fn builtin_profile_selector_is_not_double_prefixed() { + assert_eq!( + ProfileRuntimeWorkerFactory::runtime_profile_value( + &worker_runtime::catalog::ProfileSelector::Builtin("coder".to_string()) + ) + .as_deref(), + Some("builtin:coder") + ); + assert_eq!( + ProfileRuntimeWorkerFactory::runtime_profile_value( + &worker_runtime::catalog::ProfileSelector::Builtin("builtin:coder".to_string()) + ) + .as_deref(), + Some("builtin:coder") + ); + } + #[test] fn adapter_dispatches_user_input_through_worker_run_lifecycle() { let client = MockClient::new(simple_text_events()); diff --git a/crates/workspace-server/src/hosts.rs b/crates/workspace-server/src/hosts.rs index 667d560c..62d0ac65 100644 --- a/crates/workspace-server/src/hosts.rs +++ b/crates/workspace-server/src/hosts.rs @@ -13,6 +13,7 @@ use worker_runtime::catalog::{ }; use worker_runtime::config_bundle::{ConfigBundle, ConfigBundleAvailability, ConfigBundleSummary}; use worker_runtime::error::RuntimeError as EmbeddedRuntimeError; +use worker_runtime::execution::WorkerExecutionRunState; use worker_runtime::http_server::{ RuntimeHttpConfigBundleAvailabilityResponse, RuntimeHttpConfigBundleSyncRequest, RuntimeHttpErrorResponse, RuntimeHttpSummaryResponse, RuntimeHttpTranscriptResponse, @@ -903,6 +904,7 @@ pub struct EmbeddedWorkerRuntime { runtime_id: String, host_id: String, runtime: worker_runtime::Runtime, + execution_enabled: bool, } impl EmbeddedWorkerRuntime { @@ -931,7 +933,9 @@ impl EmbeddedWorkerRuntime { }, backend, )?; - Ok(Self::from_runtime(workspace_id, runtime)) + let mut embedded = Self::from_runtime(workspace_id, runtime); + embedded.execution_enabled = true; + Ok(embedded) } pub fn from_runtime(workspace_id: impl AsRef, runtime: worker_runtime::Runtime) -> Self { @@ -944,6 +948,7 @@ impl EmbeddedWorkerRuntime { runtime_id, host_id: host_id_for_embedded_workspace(workspace_id.as_ref()), runtime, + execution_enabled: false, } } @@ -954,6 +959,20 @@ impl EmbeddedWorkerRuntime { )) } + fn can_accept_embedded_input( + &self, + status: EmbeddedWorkerStatus, + run_state: WorkerExecutionRunState, + ) -> bool { + self.execution_enabled + && status == EmbeddedWorkerStatus::Running + && run_state != WorkerExecutionRunState::Busy + } + + fn can_stop_embedded_worker(&self, status: EmbeddedWorkerStatus) -> bool { + self.execution_enabled && status == EmbeddedWorkerStatus::Running + } + fn map_worker_summary(&self, summary: worker_runtime::catalog::WorkerSummary) -> WorkerSummary { WorkerSummary { runtime_id: self.runtime_id.clone(), @@ -967,15 +986,16 @@ impl EmbeddedWorkerRuntime { identity: "runtime_registry_worker".to_string(), }, state: embedded_worker_status_label(summary.status).to_string(), - status: embedded_worker_status_label(summary.status).to_string(), + status: embedded_worker_execution_status_label(summary.status, summary.execution.run_state) + .to_string(), last_seen_at: None, implementation: WorkerImplementationSummary { kind: "embedded_worker_runtime".to_string(), display_hint: "backend-internal worker-runtime Worker".to_string(), }, capabilities: WorkerCapabilitySummary { - can_accept_input: false, - can_stop: false, + can_accept_input: self.can_accept_embedded_input(summary.status, summary.execution.run_state), + can_stop: self.can_stop_embedded_worker(summary.status), can_spawn_followup: false, }, diagnostics: vec![diagnostic( @@ -999,15 +1019,16 @@ impl EmbeddedWorkerRuntime { identity: "runtime_registry_worker".to_string(), }, state: embedded_worker_status_label(detail.status).to_string(), - status: embedded_worker_status_label(detail.status).to_string(), + status: embedded_worker_execution_status_label(detail.status, detail.execution.run_state) + .to_string(), last_seen_at: None, implementation: WorkerImplementationSummary { kind: "embedded_worker_runtime".to_string(), display_hint: "backend-internal worker-runtime Worker".to_string(), }, capabilities: WorkerCapabilitySummary { - can_accept_input: false, - can_stop: false, + can_accept_input: self.can_accept_embedded_input(detail.status, detail.execution.run_state), + can_stop: self.can_stop_embedded_worker(detail.status), can_spawn_followup: false, }, diagnostics: vec![diagnostic( @@ -1037,7 +1058,7 @@ impl WorkspaceWorkerRuntime for EmbeddedWorkerRuntime { status: "unavailable".to_string(), source: RuntimeSourceSummary::embedded_worker_runtime(), host_ids: Vec::new(), - capabilities: embedded_runtime_capabilities(limit, false), + capabilities: embedded_runtime_capabilities(limit, false, false), diagnostics, }; } @@ -1057,7 +1078,7 @@ impl WorkspaceWorkerRuntime for EmbeddedWorkerRuntime { } else { vec![self.host_id.clone()] }, - capabilities: embedded_runtime_capabilities(limit, true), + capabilities: embedded_runtime_capabilities(limit, true, self.execution_enabled), diagnostics, } } @@ -1075,7 +1096,7 @@ impl WorkspaceWorkerRuntime for EmbeddedWorkerRuntime { status: "available".to_string(), observed_at: Utc::now().to_rfc3339(), last_seen_at: None, - capabilities: embedded_runtime_capabilities(limit, true), + capabilities: embedded_runtime_capabilities(limit, true, self.execution_enabled), diagnostics: vec![diagnostic( "embedded_runtime_host_boundary", DiagnosticSeverity::Info, @@ -1254,6 +1275,94 @@ impl WorkspaceWorkerRuntime for EmbeddedWorkerRuntime { } } + fn stop_worker( + &self, + worker_id: &str, + request: WorkerLifecycleRequest, + ) -> WorkerLifecycleResult { + if !self.execution_enabled { + return embedded_lifecycle_rejected( + &self.runtime_id, + worker_id, + diagnostic( + "embedded_worker_execution_unavailable", + DiagnosticSeverity::Info, + format!("worker stop for '{worker_id}' requires an embedded execution backend"), + ), + ); + } + let Some(worker_ref) = self.worker_ref(worker_id) else { + return embedded_lifecycle_rejected( + &self.runtime_id, + worker_id, + diagnostic( + "embedded_worker_id_invalid", + DiagnosticSeverity::Warning, + "Worker id was empty and cannot be resolved".to_string(), + ), + ); + }; + match self.runtime.stop_worker(&worker_ref, request.reason) { + Ok(ack) => WorkerLifecycleResult { + state: WorkerOperationState::Accepted, + runtime_id: self.runtime_id.clone(), + worker_id: worker_id.to_string(), + event_id: Some(ack.event_id), + diagnostics: Vec::new(), + }, + Err(error) => embedded_lifecycle_rejected( + &self.runtime_id, + worker_id, + embedded_runtime_diagnostic(&error), + ), + } + } + + fn cancel_worker( + &self, + worker_id: &str, + request: WorkerLifecycleRequest, + ) -> WorkerLifecycleResult { + if !self.execution_enabled { + return embedded_lifecycle_rejected( + &self.runtime_id, + worker_id, + diagnostic( + "embedded_worker_execution_unavailable", + DiagnosticSeverity::Info, + format!( + "worker cancel for '{worker_id}' requires an embedded execution backend" + ), + ), + ); + } + let Some(worker_ref) = self.worker_ref(worker_id) else { + return embedded_lifecycle_rejected( + &self.runtime_id, + worker_id, + diagnostic( + "embedded_worker_id_invalid", + DiagnosticSeverity::Warning, + "Worker id was empty and cannot be resolved".to_string(), + ), + ); + }; + match self.runtime.cancel_worker(&worker_ref, request.reason) { + Ok(ack) => WorkerLifecycleResult { + state: WorkerOperationState::Accepted, + runtime_id: self.runtime_id.clone(), + worker_id: worker_id.to_string(), + event_id: Some(ack.event_id), + diagnostics: Vec::new(), + }, + Err(error) => embedded_lifecycle_rejected( + &self.runtime_id, + worker_id, + embedded_runtime_diagnostic(&error), + ), + } + } + fn observation_source( &self, worker_id: &str, @@ -1272,17 +1381,53 @@ impl WorkspaceWorkerRuntime for EmbeddedWorkerRuntime { )) } - fn send_input(&self, worker_id: &str, _request: WorkerInputRequest) -> WorkerInputResult { - embedded_input_rejected( - &self.runtime_id, - worker_id, - diagnostic( - "embedded_worker_execution_unavailable", - DiagnosticSeverity::Error, - "Embedded Worker input is disabled until an execution backend is connected" - .to_string(), + fn send_input(&self, worker_id: &str, request: WorkerInputRequest) -> WorkerInputResult { + if !self.execution_enabled { + return embedded_input_rejected( + &self.runtime_id, + worker_id, + diagnostic( + "embedded_worker_execution_unavailable", + DiagnosticSeverity::Info, + format!( + "worker input for '{worker_id}' requires an embedded execution backend" + ), + ), + ); + } + let Some(worker_ref) = self.worker_ref(worker_id) else { + return embedded_input_rejected( + &self.runtime_id, + worker_id, + diagnostic( + "embedded_worker_id_invalid", + DiagnosticSeverity::Warning, + "Worker id was empty and cannot be resolved".to_string(), + ), + ); + }; + let input = EmbeddedWorkerInput { + kind: match request.kind { + WorkerInputKind::User => EmbeddedWorkerInputKind::User, + WorkerInputKind::System => EmbeddedWorkerInputKind::System, + }, + content: request.content, + }; + match self.runtime.send_input(&worker_ref, input) { + Ok(ack) => WorkerInputResult { + state: WorkerOperationState::Accepted, + runtime_id: self.runtime_id.clone(), + worker_id: worker_id.to_string(), + transcript_sequence: Some(ack.transcript_sequence), + event_id: Some(ack.event_id), + diagnostics: Vec::new(), + }, + Err(error) => embedded_input_rejected( + &self.runtime_id, + worker_id, + embedded_runtime_diagnostic(&error), ), - ) + } } fn transcript( @@ -1865,14 +2010,18 @@ impl WorkspaceWorkerRuntime for RemoteWorkerRuntime { } } -fn embedded_runtime_capabilities(limit: usize, available: bool) -> RuntimeCapabilitySummary { +fn embedded_runtime_capabilities( + limit: usize, + available: bool, + execution_enabled: bool, +) -> RuntimeCapabilitySummary { RuntimeCapabilitySummary { can_list_hosts: true, can_list_workers: available, can_get_worker: available, can_spawn_worker: available, - can_stop_worker: false, - can_accept_input: false, + can_stop_worker: available && execution_enabled, + can_accept_input: available && execution_enabled, has_workspace_fs: false, has_shell: false, has_git: false, @@ -1900,6 +2049,24 @@ fn embedded_worker_status_label(status: EmbeddedWorkerStatus) -> &'static str { } } +fn embedded_worker_execution_status_label( + status: EmbeddedWorkerStatus, + run_state: WorkerExecutionRunState, +) -> &'static str { + match status { + EmbeddedWorkerStatus::Stopped => "stopped", + EmbeddedWorkerStatus::Cancelled => "cancelled", + EmbeddedWorkerStatus::Running => match run_state { + WorkerExecutionRunState::Idle => "idle", + WorkerExecutionRunState::Busy => "running", + WorkerExecutionRunState::Stopped => "stopped", + WorkerExecutionRunState::Rejected => "rejected", + WorkerExecutionRunState::Errored => "errored", + WorkerExecutionRunState::Unconnected => "unconnected", + }, + } +} + fn embedded_create_intent(intent: &WorkerSpawnIntent) -> WorkerIntent { match intent { WorkerSpawnIntent::WorkspaceCompanion => WorkerIntent::Role { @@ -1984,6 +2151,20 @@ fn remote_input_rejected( } } +fn embedded_lifecycle_rejected( + runtime_id: &str, + worker_id: &str, + diagnostic: RuntimeDiagnostic, +) -> WorkerLifecycleResult { + WorkerLifecycleResult { + state: WorkerOperationState::Rejected, + runtime_id: runtime_id.to_string(), + worker_id: worker_id.to_string(), + event_id: None, + diagnostics: vec![diagnostic], + } +} + fn remote_lifecycle_rejected( runtime_id: &str, worker_id: &str, @@ -2394,9 +2575,10 @@ pub fn placeholder_spawn_response(host_id: impl Into) -> WorkerSpawnResu mod tests { use super::*; use serde_json::json; + use std::collections::HashMap; use std::io::{Read as _, Write as _}; use std::net::TcpListener; - use std::sync::Arc; + use std::sync::{Arc, Mutex}; use std::thread; fn test_config_bundle() -> ConfigBundle { @@ -2425,6 +2607,74 @@ mod tests { .with_computed_digest() } + #[derive(Default)] + struct AcceptingExecutionBackend { + contexts: + Mutex>, + } + + impl worker_runtime::execution::WorkerExecutionBackend for AcceptingExecutionBackend { + fn backend_id(&self) -> &str { + "workspace-server-test-backend" + } + + fn spawn_worker( + &self, + request: worker_runtime::execution::WorkerExecutionSpawnRequest, + ) -> worker_runtime::execution::WorkerExecutionSpawnResult { + self.contexts + .lock() + .unwrap() + .insert(request.worker_ref.clone(), request.context); + worker_runtime::execution::WorkerExecutionSpawnResult::Connected { + handle: worker_runtime::execution::WorkerExecutionHandle::new( + request.worker_ref, + self.backend_id(), + ), + run_state: WorkerExecutionRunState::Idle, + } + } + + fn dispatch_input( + &self, + handle: &worker_runtime::execution::WorkerExecutionHandle, + input: EmbeddedWorkerInput, + ) -> worker_runtime::execution::WorkerExecutionResult { + let context = self + .contexts + .lock() + .unwrap() + .get(handle.worker_ref()) + .cloned(); + let Some(context) = context else { + return worker_runtime::execution::WorkerExecutionResult::rejected( + worker_runtime::execution::WorkerExecutionOperation::Input, + "missing test context", + ); + }; + let content = input.content; + std::thread::spawn(move || { + std::thread::sleep(std::time::Duration::from_millis(10)); + let _ = context.publish_protocol_event(protocol::Event::Status { + status: protocol::WorkerStatus::Running, + }); + let _ = context.publish_protocol_event(protocol::Event::TextDone { + text: format!("echo: {content}"), + }); + let _ = context.publish_protocol_event(protocol::Event::RunEnd { + result: protocol::RunResult::Finished, + }); + let _ = context.publish_protocol_event(protocol::Event::Status { + status: protocol::WorkerStatus::Idle, + }); + }); + worker_runtime::execution::WorkerExecutionResult::accepted( + worker_runtime::execution::WorkerExecutionOperation::Input, + WorkerExecutionRunState::Busy, + ) + } + } + #[derive(Clone)] struct FixtureRuntime { runtime_id: String, @@ -2598,6 +2848,64 @@ mod tests { )); } + #[test] + fn embedded_runtime_with_execution_backend_routes_input_and_projects_transcript() { + let runtime = EmbeddedWorkerRuntime::new_memory_with_execution_backend( + "local:test", + Arc::new(AcceptingExecutionBackend::default()), + ) + .expect("test backend should connect"); + let spawned = runtime.spawn_worker(WorkerSpawnRequest { + intent: WorkerSpawnIntent::TicketRole { + ticket_id: "00001KVZSGT0Q".to_string(), + role: TicketWorkerRole::Coder, + }, + requested_worker_name: None, + acceptance: WorkerSpawnAcceptanceRequirement::RunAccepted { + expected_segments: 0, + }, + profile: None, + config_bundle: None, + requested_capabilities: Vec::new(), + }); + assert_eq!(spawned.state, WorkerOperationState::Accepted); + let worker = spawned.worker.expect("created embedded worker"); + assert!(worker.capabilities.can_accept_input); + assert!(worker.capabilities.can_stop); + + let input = runtime.send_input( + &worker.worker_id, + WorkerInputRequest { + kind: WorkerInputKind::User, + content: "hello".to_string(), + }, + ); + assert_eq!(input.state, WorkerOperationState::Accepted); + + let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2); + loop { + let detail = runtime + .worker(&worker.worker_id) + .worker + .expect("worker detail"); + let transcript = runtime.transcript(&worker.worker_id, 0, 10); + if detail.status == "idle" + && transcript + .items + .iter() + .any(|entry| entry.role == "assistant" && entry.content == "echo: hello") + { + assert!(detail.capabilities.can_accept_input); + break; + } + assert!( + std::time::Instant::now() < deadline, + "timed out waiting for embedded execution projection" + ); + std::thread::sleep(std::time::Duration::from_millis(10)); + } + } + #[test] fn embedded_runtime_registers_routes_input_and_transcript_without_internal_leaks() { let registry = diff --git a/crates/workspace-server/src/server.rs b/crates/workspace-server/src/server.rs index d9ba75c5..359f1da8 100644 --- a/crates/workspace-server/src/server.rs +++ b/crates/workspace-server/src/server.rs @@ -1149,7 +1149,7 @@ mod tests { .find(|worker| worker["role"] == "workspace_companion") .expect("companion worker is visible through runtime worker API"); assert_eq!(companion_worker["runtime_id"], "embedded-worker-runtime"); - assert_eq!(companion_worker["capabilities"]["can_stop"], false); + assert_eq!(companion_worker["capabilities"]["can_stop"], true); let companion_status = get_json(app.clone(), "/api/companion/status").await; assert_eq!(companion_status["state"], "ready");