diff --git a/crates/workspace-server/src/hosts.rs b/crates/workspace-server/src/hosts.rs index d09ed546..7a184794 100644 --- a/crates/workspace-server/src/hosts.rs +++ b/crates/workspace-server/src/hosts.rs @@ -10,9 +10,26 @@ use std::{ path::{Path, PathBuf}, sync::Arc, }; +use worker_runtime::catalog::{ + CreateWorkerRequest, ProfileSelector, WorkerDetail as EmbeddedWorkerDetail, WorkerIntent, + WorkerStatus as EmbeddedWorkerStatus, +}; +use worker_runtime::error::RuntimeError as EmbeddedRuntimeError; +use worker_runtime::identity::{ + RuntimeId as EmbeddedRuntimeId, WorkerId as EmbeddedWorkerId, WorkerRef as EmbeddedWorkerRef, +}; +use worker_runtime::interaction::{ + WorkerInput as EmbeddedWorkerInput, WorkerInputKind as EmbeddedWorkerInputKind, +}; +use worker_runtime::management::{RuntimeOptions as EmbeddedRuntimeOptions, RuntimeStatus}; +use worker_runtime::observation::{ + TranscriptProjection as EmbeddedTranscriptProjection, TranscriptQuery, TranscriptRole, +}; const LOCAL_RUNTIME_ID: &str = "local-worker-runtime"; +const EMBEDDED_RUNTIME_ID: &str = "embedded-worker-runtime"; const LOCAL_HOST_KIND: &str = "local-worker-host"; +const EMBEDDED_HOST_KIND: &str = "embedded-worker-runtime-host"; const MAX_DIAGNOSTICS: usize = 16; const MAX_HOST_SCAN: usize = 256; const MAX_IDENTIFIER_LEN: usize = 120; @@ -88,12 +105,21 @@ impl RuntimeSourceSummary { } } + pub fn embedded_worker_runtime() -> Self { + Self { + kind: RuntimeSourceKind::EmbeddedWorkerRuntime, + status: RuntimeSourceStatus::Active, + identity_authority: RuntimeIdentityAuthority::RuntimeRegistryProjection, + note: "backend-internal embedded worker-runtime Runtime exposed only through runtime_id plus worker_id projections".to_string(), + } + } + pub fn embedded_worker_runtime_reserved() -> Self { Self { kind: RuntimeSourceKind::EmbeddedWorkerRuntime, status: RuntimeSourceStatus::Reserved, identity_authority: RuntimeIdentityAuthority::RuntimeRegistryProjection, - note: "reserved boundary for a future embedded worker-runtime adapter; not connected in this registry foundation".to_string(), + note: "reserved boundary for an embedded worker-runtime adapter; not connected by this fixture source".to_string(), } } @@ -296,6 +322,54 @@ pub struct WorkerStopResult { pub diagnostics: Vec, } +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum WorkerInputKind { + User, + System, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct WorkerInputRequest { + #[serde(default = "default_worker_input_kind")] + pub kind: WorkerInputKind, + pub content: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct WorkerInputResult { + pub state: WorkerOperationState, + pub runtime_id: String, + pub worker_id: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub transcript_sequence: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub event_id: Option, + pub diagnostics: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct WorkerTranscriptItem { + pub sequence: u64, + pub role: String, + pub content: String, + pub event_id: u64, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct WorkerTranscriptProjection { + pub state: WorkerOperationState, + pub runtime_id: String, + pub worker_id: String, + pub start: usize, + pub limit: usize, + pub total_items: usize, + #[serde(skip_serializing_if = "Option::is_none")] + pub next_start: Option, + pub items: Vec, + pub diagnostics: Vec, +} + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct WorkerProxyConnectPoint { pub kind: String, @@ -337,6 +411,10 @@ impl RuntimeRegistryError { } } +fn default_worker_input_kind() -> WorkerInputKind { + WorkerInputKind::User +} + pub trait WorkspaceWorkerRuntime: Send + Sync { fn runtime_id(&self) -> &str; @@ -378,6 +456,48 @@ pub trait WorkspaceWorkerRuntime: Send + Sync { } } + fn send_input(&self, worker_id: &str, _request: WorkerInputRequest) -> WorkerInputResult { + WorkerInputResult { + state: WorkerOperationState::Unsupported, + runtime_id: self.runtime_id().to_string(), + worker_id: worker_id.to_string(), + transcript_sequence: None, + event_id: None, + diagnostics: vec![diagnostic( + "worker_input_pending", + DiagnosticSeverity::Info, + format!( + "worker input for '{worker_id}' is reserved for the runtime service boundary and is not implemented by this registry source" + ), + )], + } + } + + fn transcript( + &self, + worker_id: &str, + start: usize, + limit: usize, + ) -> WorkerTranscriptProjection { + WorkerTranscriptProjection { + state: WorkerOperationState::Unsupported, + runtime_id: self.runtime_id().to_string(), + worker_id: worker_id.to_string(), + start, + limit, + total_items: 0, + next_start: None, + items: Vec::new(), + diagnostics: vec![diagnostic( + "worker_transcript_pending", + DiagnosticSeverity::Info, + format!( + "bounded transcript for '{worker_id}' is not implemented by this registry source" + ), + )], + } + } + fn proxy_connect_points(&self, worker_id: &str) -> Vec { vec![WorkerProxyConnectPoint { kind: "stream_proxy".to_string(), @@ -408,6 +528,20 @@ impl RuntimeRegistry { Self::new(vec![Arc::new(runtime)]) } + pub fn for_workspace( + local_runtime: LocalWorkerRuntime, + embedded_runtime: EmbeddedWorkerRuntime, + ) -> Self { + Self::new(vec![Arc::new(local_runtime), Arc::new(embedded_runtime)]) + } + + pub fn register(&mut self, runtime: R) + where + R: WorkspaceWorkerRuntime + 'static, + { + self.runtimes.push(Arc::new(runtime)); + } + pub fn list_runtimes(&self, limit: usize) -> RuntimeList { let mut diagnostics = Vec::new(); let mut items = Vec::new(); @@ -495,11 +629,7 @@ impl RuntimeRegistry { ) -> Result { validate_backend_identifier("runtime_id", runtime_id)?; validate_backend_identifier("worker_id", worker_id)?; - let runtime = self - .runtimes - .iter() - .find(|runtime| runtime.runtime_id() == runtime_id) - .ok_or_else(|| RuntimeRegistryError::UnknownRuntime(runtime_id.to_string()))?; + let runtime = self.runtime(runtime_id)?; let lookup = runtime.worker(worker_id); lookup .worker @@ -508,6 +638,425 @@ impl RuntimeRegistry { worker_id: worker_id.to_string(), }) } + + pub fn spawn_worker( + &self, + runtime_id: &str, + request: WorkerSpawnRequest, + ) -> Result { + validate_backend_identifier("runtime_id", runtime_id)?; + let runtime = self.runtime(runtime_id)?; + Ok(runtime.spawn_worker(request)) + } + + pub fn send_input( + &self, + runtime_id: &str, + worker_id: &str, + request: WorkerInputRequest, + ) -> Result { + validate_backend_identifier("runtime_id", runtime_id)?; + validate_backend_identifier("worker_id", worker_id)?; + let runtime = self.runtime(runtime_id)?; + if runtime.worker(worker_id).worker.is_none() { + return Err(RuntimeRegistryError::UnknownWorker { + runtime_id: runtime_id.to_string(), + worker_id: worker_id.to_string(), + }); + } + Ok(runtime.send_input(worker_id, request)) + } + + pub fn transcript( + &self, + runtime_id: &str, + worker_id: &str, + start: usize, + limit: usize, + ) -> Result { + validate_backend_identifier("runtime_id", runtime_id)?; + validate_backend_identifier("worker_id", worker_id)?; + let runtime = self.runtime(runtime_id)?; + if runtime.worker(worker_id).worker.is_none() { + return Err(RuntimeRegistryError::UnknownWorker { + runtime_id: runtime_id.to_string(), + worker_id: worker_id.to_string(), + }); + } + Ok(runtime.transcript(worker_id, start, limit)) + } + + fn runtime( + &self, + runtime_id: &str, + ) -> Result<&Arc, RuntimeRegistryError> { + self.runtimes + .iter() + .find(|runtime| runtime.runtime_id() == runtime_id) + .ok_or_else(|| RuntimeRegistryError::UnknownRuntime(runtime_id.to_string())) + } +} + +#[derive(Clone)] +pub struct EmbeddedWorkerRuntime { + runtime_id: String, + host_id: String, + runtime: worker_runtime::Runtime, +} + +impl EmbeddedWorkerRuntime { + pub fn new_memory(workspace_id: impl AsRef) -> Self { + let runtime_id = EmbeddedRuntimeId::new(EMBEDDED_RUNTIME_ID) + .expect("embedded runtime id is a non-empty literal"); + let runtime = worker_runtime::Runtime::with_options(EmbeddedRuntimeOptions { + runtime_id: Some(runtime_id), + display_name: Some("Workspace backend embedded Runtime".to_string()), + ..EmbeddedRuntimeOptions::default() + }); + Self::from_runtime(workspace_id, runtime) + } + + pub fn from_runtime(workspace_id: impl AsRef, runtime: worker_runtime::Runtime) -> Self { + let runtime_id = runtime + .runtime_id() + .ok() + .map(|id| id.as_str().to_string()) + .unwrap_or_else(|| EMBEDDED_RUNTIME_ID.to_string()); + Self { + runtime_id, + host_id: host_id_for_embedded_workspace(workspace_id.as_ref()), + runtime, + } + } + + fn worker_ref(&self, worker_id: &str) -> Option { + Some(EmbeddedWorkerRef::new( + EmbeddedRuntimeId::new(self.runtime_id.clone())?, + EmbeddedWorkerId::new(worker_id.to_string())?, + )) + } + + fn map_worker_summary(&self, summary: worker_runtime::catalog::WorkerSummary) -> WorkerSummary { + WorkerSummary { + runtime_id: self.runtime_id.clone(), + worker_id: summary.worker_ref.worker_id.as_str().to_string(), + host_id: self.host_id.clone(), + label: safe_display_hint(summary.worker_ref.worker_id.as_str()), + role: embedded_intent_label(&summary.intent), + profile: embedded_profile_label(&summary.profile), + workspace: WorkerWorkspaceSummary { + visibility: "backend_internal".to_string(), + identity: "runtime_registry_worker".to_string(), + }, + state: embedded_worker_status_label(summary.status).to_string(), + status: embedded_worker_status_label(summary.status).to_string(), + last_seen_at: None, + implementation: WorkerImplementationSummary { + kind: "embedded_worker_runtime".to_string(), + display_hint: "backend-internal worker-runtime Worker".to_string(), + }, + capabilities: WorkerCapabilitySummary { + can_accept_input: true, + can_stream_events: false, + can_stop: false, + can_spawn_followup: false, + can_read_bounded_transcript: true, + }, + diagnostics: vec![diagnostic( + "embedded_runtime_projection", + DiagnosticSeverity::Info, + "Worker identity is projected only as runtime_id plus worker_id; embedded runtime internals remain backend-private".to_string(), + )], + } + } + + fn map_worker_detail(&self, detail: EmbeddedWorkerDetail) -> WorkerSummary { + WorkerSummary { + runtime_id: self.runtime_id.clone(), + worker_id: detail.worker_id.as_str().to_string(), + host_id: self.host_id.clone(), + label: safe_display_hint(detail.worker_id.as_str()), + role: embedded_intent_label(&detail.intent), + profile: embedded_profile_label(&detail.profile), + workspace: WorkerWorkspaceSummary { + visibility: "backend_internal".to_string(), + identity: "runtime_registry_worker".to_string(), + }, + state: embedded_worker_status_label(detail.status).to_string(), + status: embedded_worker_status_label(detail.status).to_string(), + last_seen_at: None, + implementation: WorkerImplementationSummary { + kind: "embedded_worker_runtime".to_string(), + display_hint: "backend-internal worker-runtime Worker".to_string(), + }, + capabilities: WorkerCapabilitySummary { + can_accept_input: true, + can_stream_events: false, + can_stop: false, + can_spawn_followup: false, + can_read_bounded_transcript: true, + }, + diagnostics: vec![diagnostic( + "embedded_runtime_projection", + DiagnosticSeverity::Info, + "Worker identity is projected only as runtime_id plus worker_id; embedded runtime internals remain backend-private".to_string(), + )], + } + } +} + +impl WorkspaceWorkerRuntime for EmbeddedWorkerRuntime { + fn runtime_id(&self) -> &str { + &self.runtime_id + } + + fn runtime_summary(&self, limit: usize) -> RuntimeSummary { + let mut diagnostics = Vec::new(); + let summary = match self.runtime.summary() { + Ok(summary) => summary, + Err(err) => { + diagnostics.push(embedded_runtime_diagnostic(&err)); + return RuntimeSummary { + runtime_id: self.runtime_id.clone(), + label: "Embedded backend Runtime".to_string(), + kind: "embedded_worker_runtime".to_string(), + status: "unavailable".to_string(), + source: RuntimeSourceSummary::embedded_worker_runtime(), + host_ids: Vec::new(), + capabilities: embedded_runtime_capabilities(limit, false), + diagnostics, + }; + } + }; + + RuntimeSummary { + runtime_id: self.runtime_id.clone(), + label: summary + .display_name + .clone() + .unwrap_or_else(|| "Embedded backend Runtime".to_string()), + kind: "embedded_worker_runtime".to_string(), + status: embedded_runtime_status_label(summary.status).to_string(), + source: RuntimeSourceSummary::embedded_worker_runtime(), + host_ids: if limit == 0 { + Vec::new() + } else { + vec![self.host_id.clone()] + }, + capabilities: embedded_runtime_capabilities(limit, true), + diagnostics, + } + } + + fn list_hosts(&self, limit: usize) -> RuntimeList { + if limit == 0 { + return RuntimeList::new(Vec::new(), Vec::new()); + } + RuntimeList::new( + vec![HostSummary { + runtime_id: self.runtime_id.clone(), + host_id: self.host_id.clone(), + label: "Workspace backend embedded Runtime".to_string(), + kind: EMBEDDED_HOST_KIND.to_string(), + status: "available".to_string(), + observed_at: Utc::now().to_rfc3339(), + last_seen_at: None, + capabilities: embedded_runtime_capabilities(limit, true), + diagnostics: vec![diagnostic( + "embedded_runtime_host_boundary", + DiagnosticSeverity::Info, + "Backend-internal host exposes only bounded runtime and worker projections" + .to_string(), + )], + }], + Vec::new(), + ) + } + + fn list_workers(&self, limit: usize) -> RuntimeList { + if limit == 0 { + return RuntimeList::new(Vec::new(), Vec::new()); + } + match self.runtime.list_workers() { + Ok(workers) => RuntimeList::new( + workers + .into_iter() + .take(limit) + .map(|worker| self.map_worker_summary(worker)) + .collect(), + Vec::new(), + ), + Err(err) => RuntimeList::new(Vec::new(), vec![embedded_runtime_diagnostic(&err)]), + } + } + + fn worker(&self, worker_id: &str) -> WorkerLookupResult { + let Some(worker_ref) = self.worker_ref(worker_id) else { + return WorkerLookupResult { + worker: None, + diagnostics: vec![diagnostic( + "embedded_worker_id_invalid", + DiagnosticSeverity::Warning, + "Worker id was empty and cannot be resolved".to_string(), + )], + }; + }; + match self.runtime.worker_detail(&worker_ref) { + Ok(detail) => WorkerLookupResult { + worker: Some(self.map_worker_detail(detail)), + diagnostics: Vec::new(), + }, + Err(EmbeddedRuntimeError::WorkerNotFound { .. }) => WorkerLookupResult { + worker: None, + diagnostics: Vec::new(), + }, + Err(err) => WorkerLookupResult { + worker: None, + diagnostics: vec![embedded_runtime_diagnostic(&err)], + }, + } + } + + fn spawn_worker(&self, request: WorkerSpawnRequest) -> WorkerSpawnResult { + let mut diagnostics = Vec::new(); + if matches!( + request.acceptance, + WorkerSpawnAcceptanceRequirement::SocketReady + ) { + diagnostics.push(diagnostic( + "embedded_runtime_no_socket", + DiagnosticSeverity::Warning, + "Embedded backend Runtime is transportless; use run_accepted/create acceptance for backend-internal Workers".to_string(), + )); + return WorkerSpawnResult { + state: WorkerOperationState::Rejected, + worker: None, + acceptance_evidence: Vec::new(), + diagnostics, + }; + } + if request.requested_worker_name.is_some() { + diagnostics.push(diagnostic( + "embedded_worker_name_ignored", + DiagnosticSeverity::Info, + "Embedded Runtime v0 allocates opaque runtime-local worker ids; requested display names are not authority".to_string(), + )); + } + if matches!(request.acceptance, WorkerSpawnAcceptanceRequirement::RunAccepted { expected_segments } if expected_segments > 0) + { + diagnostics.push(diagnostic( + "embedded_runtime_tools_less", + DiagnosticSeverity::Info, + "Embedded Runtime v0 creates a tools-less catalog Worker and does not spawn provider segments".to_string(), + )); + } + + let create_request = CreateWorkerRequest::tools_less( + embedded_create_intent(&request.intent), + embedded_profile_selector(&request.intent), + ); + match self.runtime.create_worker(create_request) { + Ok(detail) => WorkerSpawnResult { + state: WorkerOperationState::Accepted, + worker: Some(self.map_worker_detail(detail)), + acceptance_evidence: vec![ + WorkerSpawnAcceptanceEvidence { + kind: "embedded_runtime_worker_created".to_string(), + detail: + "worker-runtime catalog accepted a backend-internal tools-less Worker" + .to_string(), + }, + WorkerSpawnAcceptanceEvidence { + kind: "embedded_runtime_backend_internal_projection".to_string(), + detail: "only runtime_id plus worker_id backend projections were exposed" + .to_string(), + }, + ], + diagnostics, + }, + Err(err) => { + diagnostics.push(embedded_runtime_diagnostic(&err)); + WorkerSpawnResult { + state: WorkerOperationState::Rejected, + worker: None, + acceptance_evidence: Vec::new(), + diagnostics, + } + } + } + } + + fn send_input(&self, worker_id: &str, request: WorkerInputRequest) -> WorkerInputResult { + let Some(worker_ref) = self.worker_ref(worker_id) else { + return embedded_input_rejected( + &self.runtime_id, + worker_id, + diagnostic( + "embedded_worker_id_invalid", + DiagnosticSeverity::Warning, + "Worker id was empty and cannot be resolved".to_string(), + ), + ); + }; + let input = EmbeddedWorkerInput { + kind: match request.kind { + WorkerInputKind::User => EmbeddedWorkerInputKind::User, + WorkerInputKind::System => EmbeddedWorkerInputKind::System, + }, + content: request.content, + }; + match self.runtime.send_input(&worker_ref, input) { + Ok(ack) => WorkerInputResult { + state: WorkerOperationState::Accepted, + runtime_id: self.runtime_id.clone(), + worker_id: worker_id.to_string(), + transcript_sequence: Some(ack.transcript_sequence), + event_id: Some(ack.event_id), + diagnostics: Vec::new(), + }, + Err(err) => embedded_input_rejected( + &self.runtime_id, + worker_id, + embedded_runtime_diagnostic(&err), + ), + } + } + + fn transcript( + &self, + worker_id: &str, + start: usize, + limit: usize, + ) -> WorkerTranscriptProjection { + let Some(worker_ref) = self.worker_ref(worker_id) else { + return embedded_transcript_rejected( + &self.runtime_id, + worker_id, + start, + limit, + diagnostic( + "embedded_worker_id_invalid", + DiagnosticSeverity::Warning, + "Worker id was empty and cannot be resolved".to_string(), + ), + ); + }; + match self + .runtime + .transcript_projection(&worker_ref, TranscriptQuery::new(start, limit)) + { + Ok(projection) => { + embedded_transcript_projection(&self.runtime_id, worker_id, projection) + } + Err(err) => embedded_transcript_rejected( + &self.runtime_id, + worker_id, + start, + limit, + embedded_runtime_diagnostic(&err), + ), + } + } } #[derive(Clone)] @@ -804,6 +1353,214 @@ enum WorkerReadOutcome { Diagnostic(RuntimeDiagnostic), } +fn embedded_runtime_capabilities(limit: usize, available: bool) -> RuntimeCapabilitySummary { + RuntimeCapabilitySummary { + can_list_hosts: true, + can_list_workers: available, + can_get_worker: available, + can_spawn_worker: available, + can_stop_worker: false, + can_accept_input: available, + can_stream_events: false, + can_read_bounded_transcript: available, + has_workspace_fs: false, + has_shell: false, + has_git: false, + supports_worktrees: false, + supports_backend_internal_tools: true, + local_pod_inspection: "not_applicable".to_string(), + workspace_scope: "backend_internal".to_string(), + max_workers: limit, + os: std::env::consts::OS.to_string(), + arch: std::env::consts::ARCH.to_string(), + } +} + +fn embedded_runtime_status_label(status: RuntimeStatus) -> &'static str { + match status { + RuntimeStatus::Running => "running", + RuntimeStatus::Stopped => "stopped", + } +} + +fn embedded_worker_status_label(status: EmbeddedWorkerStatus) -> &'static str { + match status { + EmbeddedWorkerStatus::Running => "running", + EmbeddedWorkerStatus::Stopped => "stopped", + EmbeddedWorkerStatus::Cancelled => "cancelled", + } +} + +fn embedded_create_intent(intent: &WorkerSpawnIntent) -> WorkerIntent { + match intent { + WorkerSpawnIntent::WorkspaceCompanion => WorkerIntent::Role { + role: "workspace_companion".to_string(), + purpose: Some("workspace backend internal companion".to_string()), + }, + WorkerSpawnIntent::WorkspaceOrchestrator => WorkerIntent::Role { + role: "workspace_orchestrator".to_string(), + purpose: Some("workspace backend internal orchestration".to_string()), + }, + WorkerSpawnIntent::TicketRole { ticket_id, role } => WorkerIntent::Role { + role: ticket_role_profile_slug(role).to_string(), + purpose: Some(format!("ticket {ticket_id}")), + }, + } +} + +fn embedded_profile_selector(intent: &WorkerSpawnIntent) -> ProfileSelector { + match intent { + WorkerSpawnIntent::TicketRole { role, .. } => { + ProfileSelector::Builtin(format!("builtin:{}", ticket_role_profile_slug(role))) + } + WorkerSpawnIntent::WorkspaceCompanion | WorkerSpawnIntent::WorkspaceOrchestrator => { + ProfileSelector::RuntimeDefault + } + } +} + +fn ticket_role_profile_slug(role: &TicketWorkerRole) -> &'static str { + match role { + TicketWorkerRole::Intake => "intake", + TicketWorkerRole::Orchestrator => "orchestrator", + TicketWorkerRole::Coder => "coder", + TicketWorkerRole::Reviewer => "reviewer", + } +} + +fn embedded_intent_label(intent: &WorkerIntent) -> Option { + match intent { + WorkerIntent::Assistant { purpose } => { + purpose.clone().or_else(|| Some("assistant".to_string())) + } + WorkerIntent::Task { objective } => Some(safe_display_hint(objective)), + WorkerIntent::Role { role, .. } => Some(safe_display_hint(role)), + } +} + +fn embedded_profile_label(profile: &ProfileSelector) -> Option { + Some(match profile { + ProfileSelector::RuntimeDefault => "runtime_default".to_string(), + ProfileSelector::Builtin(name) | ProfileSelector::Named(name) => safe_display_hint(name), + }) +} + +fn embedded_input_rejected( + runtime_id: &str, + worker_id: &str, + diagnostic: RuntimeDiagnostic, +) -> WorkerInputResult { + WorkerInputResult { + state: WorkerOperationState::Rejected, + runtime_id: runtime_id.to_string(), + worker_id: worker_id.to_string(), + transcript_sequence: None, + event_id: None, + diagnostics: vec![diagnostic], + } +} + +fn embedded_transcript_projection( + runtime_id: &str, + worker_id: &str, + projection: EmbeddedTranscriptProjection, +) -> WorkerTranscriptProjection { + WorkerTranscriptProjection { + state: WorkerOperationState::Accepted, + runtime_id: runtime_id.to_string(), + worker_id: worker_id.to_string(), + start: projection.start, + limit: projection.limit, + total_items: projection.total_items, + next_start: projection.next_start, + items: projection + .items + .into_iter() + .map(|item| WorkerTranscriptItem { + sequence: item.sequence, + role: embedded_transcript_role_label(item.role).to_string(), + content: item.content, + event_id: item.event_id, + }) + .collect(), + diagnostics: Vec::new(), + } +} + +fn embedded_transcript_rejected( + runtime_id: &str, + worker_id: &str, + start: usize, + limit: usize, + diagnostic: RuntimeDiagnostic, +) -> WorkerTranscriptProjection { + WorkerTranscriptProjection { + state: WorkerOperationState::Rejected, + runtime_id: runtime_id.to_string(), + worker_id: worker_id.to_string(), + start, + limit, + total_items: 0, + next_start: None, + items: Vec::new(), + diagnostics: vec![diagnostic], + } +} + +fn embedded_transcript_role_label(role: TranscriptRole) -> &'static str { + match role { + TranscriptRole::User => "user", + TranscriptRole::System => "system", + } +} + +fn embedded_runtime_diagnostic(error: &EmbeddedRuntimeError) -> RuntimeDiagnostic { + match error { + EmbeddedRuntimeError::RuntimeStopped { .. } => diagnostic( + "embedded_runtime_stopped", + DiagnosticSeverity::Warning, + "Embedded Runtime is stopped".to_string(), + ), + EmbeddedRuntimeError::WrongRuntime { .. } + | EmbeddedRuntimeError::WrongRuntimeCursor { .. } => diagnostic( + "embedded_runtime_wrong_identity", + DiagnosticSeverity::Warning, + "Embedded Runtime rejected a worker/runtime identity mismatch".to_string(), + ), + EmbeddedRuntimeError::WorkerNotFound { .. } => diagnostic( + "embedded_worker_not_found", + DiagnosticSeverity::Warning, + "Embedded Runtime worker was not found".to_string(), + ), + EmbeddedRuntimeError::LimitTooLarge { requested, max } => diagnostic( + "embedded_runtime_limit_too_large", + DiagnosticSeverity::Warning, + format!("Requested limit {requested} exceeds embedded Runtime maximum {max}"), + ), + EmbeddedRuntimeError::InvalidRequest(_) => diagnostic( + "embedded_runtime_invalid_request", + DiagnosticSeverity::Warning, + "Embedded Runtime rejected the request".to_string(), + ), + EmbeddedRuntimeError::StoreIo { .. } + | EmbeddedRuntimeError::StoreMissing { .. } + | EmbeddedRuntimeError::StoreCorrupt { .. } => diagnostic( + "embedded_runtime_store_error", + DiagnosticSeverity::Error, + "Embedded Runtime storage operation failed; internal paths are not exposed".to_string(), + ), + EmbeddedRuntimeError::StatePoisoned => diagnostic( + "embedded_runtime_state_unavailable", + DiagnosticSeverity::Error, + "Embedded Runtime state is unavailable".to_string(), + ), + } +} + +fn host_id_for_embedded_workspace(workspace_id: &str) -> String { + bounded_backend_identifier("embedded-", workspace_id) +} + fn local_runtime_capabilities( limit: usize, inspection_available: bool, @@ -1384,6 +2141,140 @@ mod tests { ); } + #[test] + fn embedded_runtime_registers_routes_input_and_transcript_without_internal_leaks() { + let temp = TempDir::new().unwrap(); + let mut registry = RuntimeRegistry::for_local_pods(LocalWorkerRuntime::new( + "local:test", + "/workspace/project", + Some(temp.path().to_path_buf()), + )); + registry.register(EmbeddedWorkerRuntime::new_memory("local:test")); + + let runtimes = registry.list_runtimes(10); + let embedded_summary = runtimes + .items + .iter() + .find(|runtime| runtime.runtime_id == EMBEDDED_RUNTIME_ID) + .expect("embedded runtime summary"); + assert_eq!( + embedded_summary.source.kind, + RuntimeSourceKind::EmbeddedWorkerRuntime + ); + assert_eq!(embedded_summary.source.status, RuntimeSourceStatus::Active); + assert!(embedded_summary.capabilities.can_spawn_worker); + assert!(embedded_summary.capabilities.can_accept_input); + assert!(embedded_summary.capabilities.can_read_bounded_transcript); + + let spawned = registry + .spawn_worker( + EMBEDDED_RUNTIME_ID, + WorkerSpawnRequest { + intent: WorkerSpawnIntent::TicketRole { + ticket_id: "00001KVZSGT0Q".to_string(), + role: TicketWorkerRole::Coder, + }, + requested_worker_name: Some("friendly-name-is-not-authority".to_string()), + acceptance: WorkerSpawnAcceptanceRequirement::RunAccepted { + expected_segments: 0, + }, + }, + ) + .unwrap(); + assert_eq!(spawned.state, WorkerOperationState::Accepted); + assert!( + spawned + .acceptance_evidence + .iter() + .any(|evidence| evidence.kind == "embedded_runtime_backend_internal_projection") + ); + let worker = spawned.worker.expect("created embedded worker"); + assert_eq!(worker.runtime_id, EMBEDDED_RUNTIME_ID); + assert_eq!(worker.workspace.visibility, "backend_internal"); + assert_eq!(worker.workspace.identity, "runtime_registry_worker"); + assert_eq!(worker.implementation.kind, "embedded_worker_runtime"); + assert_eq!(worker.profile.as_deref(), Some("builtin:coder")); + assert!(worker.capabilities.can_accept_input); + assert!(worker.capabilities.can_read_bounded_transcript); + + let input = registry + .send_input( + EMBEDDED_RUNTIME_ID, + &worker.worker_id, + WorkerInputRequest { + kind: WorkerInputKind::User, + content: "hello embedded runtime".to_string(), + }, + ) + .unwrap(); + assert_eq!(input.state, WorkerOperationState::Accepted); + assert_eq!(input.runtime_id, EMBEDDED_RUNTIME_ID); + assert_eq!(input.worker_id, worker.worker_id); + assert_eq!(input.transcript_sequence, Some(1)); + + let transcript = registry + .transcript(EMBEDDED_RUNTIME_ID, &worker.worker_id, 0, 10) + .unwrap(); + assert_eq!(transcript.state, WorkerOperationState::Accepted); + assert_eq!(transcript.items.len(), 1); + assert_eq!(transcript.items[0].role, "user"); + assert_eq!(transcript.items[0].content, "hello embedded runtime"); + + assert!(matches!( + registry.send_input( + LOCAL_RUNTIME_ID, + &worker.worker_id, + WorkerInputRequest { + kind: WorkerInputKind::User, + content: "wrong runtime".to_string(), + }, + ), + Err(RuntimeRegistryError::UnknownWorker { runtime_id, worker_id }) + if runtime_id == LOCAL_RUNTIME_ID && worker_id == worker.worker_id + )); + + let json = serde_json::to_string(&(embedded_summary, worker, transcript)).unwrap(); + for forbidden in [ + "/workspace/project", + "metadata.json", + "session", + "socket", + "token", + "credential", + "provider", + ] { + assert!( + !json.contains(forbidden), + "embedded runtime projection leaked forbidden term: {forbidden}: {json}" + ); + } + } + + #[test] + fn embedded_runtime_rejects_socket_ready_acceptance_without_socket_identity() { + let registry = RuntimeRegistry::new(vec![Arc::new(EmbeddedWorkerRuntime::new_memory( + "local:test", + ))]); + let result = registry + .spawn_worker( + EMBEDDED_RUNTIME_ID, + WorkerSpawnRequest { + intent: WorkerSpawnIntent::WorkspaceCompanion, + requested_worker_name: None, + acceptance: WorkerSpawnAcceptanceRequirement::SocketReady, + }, + ) + .unwrap(); + assert_eq!(result.state, WorkerOperationState::Rejected); + assert!(result.worker.is_none()); + assert!( + result + .diagnostics + .iter() + .any(|diag| diag.code == "embedded_runtime_no_socket") + ); + } + #[test] fn generated_worker_ids_are_opaque_bounded_unique_and_resolvable() { let temp = TempDir::new().unwrap(); diff --git a/crates/workspace-server/src/server.rs b/crates/workspace-server/src/server.rs index 2bdf19cd..95c567db 100644 --- a/crates/workspace-server/src/server.rs +++ b/crates/workspace-server/src/server.rs @@ -6,15 +6,16 @@ use axum::extract::{Path as AxumPath, Query, State}; use axum::http::header::CONTENT_TYPE; use axum::http::{StatusCode, Uri}; use axum::response::{IntoResponse, Response}; -use axum::routing::get; +use axum::routing::{get, post}; use axum::{Json, Router}; use futures::StreamExt; use serde::{Deserialize, Serialize}; use tokio::net::TcpListener; use crate::hosts::{ - DiagnosticSeverity, HostSummary, LocalWorkerRuntime, RuntimeDiagnostic, RuntimeRegistry, - RuntimeSummary, WorkerSummary, + DiagnosticSeverity, EmbeddedWorkerRuntime, HostSummary, LocalWorkerRuntime, RuntimeDiagnostic, + RuntimeRegistry, RuntimeSummary, WorkerInputRequest, WorkerInputResult, WorkerSpawnRequest, + WorkerSpawnResult, WorkerSummary, WorkerTranscriptProjection, }; use crate::identity::WorkspaceIdentity; use crate::observation::{ @@ -87,11 +88,14 @@ impl WorkspaceApi { updated_at: config.workspace_created_at.clone(), }) .await?; - let runtime = Arc::new(RuntimeRegistry::for_local_pods(LocalWorkerRuntime::new( - config.workspace_id.clone(), - config.workspace_root.clone(), - config.local_runtime_data_dir.clone(), - ))); + let runtime = Arc::new(RuntimeRegistry::for_workspace( + LocalWorkerRuntime::new( + config.workspace_id.clone(), + config.workspace_root.clone(), + config.local_runtime_data_dir.clone(), + ), + EmbeddedWorkerRuntime::new_memory(config.workspace_id.clone()), + )); let observation_proxy = BackendObservationProxy::new(config.runtime_event_sources.clone()); Ok(Self { records: LocalProjectRecordReader::new(config.workspace_root.clone()), @@ -139,6 +143,22 @@ pub fn build_router(api: WorkspaceApi) -> Router { .route("/api/hosts", get(list_hosts)) .route("/api/runtimes", get(list_runtimes)) .route("/api/workers", get(list_workers)) + .route( + "/api/runtimes/{runtime_id}/workers", + post(create_runtime_worker), + ) + .route( + "/api/runtimes/{runtime_id}/workers/{worker_id}", + get(get_runtime_worker), + ) + .route( + "/api/runtimes/{runtime_id}/workers/{worker_id}/input", + post(send_runtime_worker_input), + ) + .route( + "/api/runtimes/{runtime_id}/workers/{worker_id}/transcript", + get(get_runtime_worker_transcript), + ) .route( "/api/runtimes/{runtime_id}/workers/{worker_id}/events/ws", get(worker_observation_ws), @@ -252,6 +272,12 @@ struct TicketKanbanQuery { limit: Option, } +#[derive(Debug, Deserialize)] +struct TranscriptQuery { + start: Option, + limit: Option, +} + async fn get_workspace(State(api): State) -> ApiResult> { let schema_version = api.store.schema_version().await?; let stored = api.store.get_workspace(api.workspace_id()).await?; @@ -438,6 +464,55 @@ async fn list_workers( workers_response(api).map(Json) } +async fn get_runtime_worker( + State(api): State, + AxumPath((runtime_id, worker_id)): AxumPath<(String, String)>, +) -> ApiResult> { + let worker = api + .runtime + .worker(&runtime_id, &worker_id) + .map_err(|err| err.into_error())?; + Ok(Json(worker)) +} + +async fn create_runtime_worker( + State(api): State, + AxumPath(runtime_id): AxumPath, + Json(request): Json, +) -> ApiResult> { + let result = api + .runtime + .spawn_worker(&runtime_id, request) + .map_err(|err| err.into_error())?; + Ok(Json(result)) +} + +async fn send_runtime_worker_input( + State(api): State, + AxumPath((runtime_id, worker_id)): AxumPath<(String, String)>, + Json(request): Json, +) -> ApiResult> { + let result = api + .runtime + .send_input(&runtime_id, &worker_id, request) + .map_err(|err| err.into_error())?; + Ok(Json(result)) +} + +async fn get_runtime_worker_transcript( + State(api): State, + AxumPath((runtime_id, worker_id)): AxumPath<(String, String)>, + Query(query): Query, +) -> ApiResult> { + let limit = query.limit.unwrap_or(api.config.max_records).min(200); + let start = query.start.unwrap_or(0); + let result = api + .runtime + .transcript(&runtime_id, &worker_id, start, limit) + .map_err(|err| err.into_error())?; + Ok(Json(result)) +} + async fn worker_observation_ws( State(api): State, AxumPath((runtime_id, worker_id)): AxumPath<(String, String)>, @@ -790,7 +865,7 @@ mod tests { use axum::body::{Body, to_bytes}; use axum::http::Request; use futures::{SinkExt, StreamExt}; - use serde_json::Value; + use serde_json::{Value, json}; use tokio_tungstenite::connect_async; use tokio_tungstenite::tungstenite::Message; use tower::ServiceExt; @@ -1001,6 +1076,138 @@ mod tests { ); } + #[tokio::test] + async fn embedded_runtime_api_routes_by_runtime_and_worker_ids_without_leaking_internals() { + let dir = tempfile::tempdir().unwrap(); + let store = SqliteWorkspaceStore::in_memory().unwrap(); + let mut config = ServerConfig::local_dev(dir.path(), test_identity()); + config.local_runtime_data_dir = Some(dir.path().join("data")); + let api = WorkspaceApi::new(config, Arc::new(store)).await.unwrap(); + let app = build_router(api); + + let runtimes = get_json(app.clone(), "/api/runtimes").await; + let embedded_summary = runtimes["items"] + .as_array() + .unwrap() + .iter() + .find(|runtime| runtime["runtime_id"] == "embedded-worker-runtime") + .expect("embedded runtime summary"); + assert_eq!( + embedded_summary["source"]["kind"], + "embedded_worker_runtime" + ); + assert_eq!(embedded_summary["source"]["status"], "active"); + assert_eq!( + embedded_summary["capabilities"]["workspace_scope"], + "backend_internal" + ); + assert_eq!(embedded_summary["capabilities"]["has_workspace_fs"], false); + + let spawned = post_json( + app.clone(), + "/api/runtimes/embedded-worker-runtime/workers", + json!({ + "intent": { + "kind": "ticket_role", + "ticket_id": "00001KVZSGT0Q", + "role": "coder" + }, + "requested_worker_name": "api-friendly-name", + "acceptance": { + "kind": "run_accepted", + "expected_segments": 0 + } + }), + ) + .await; + assert_eq!(spawned["state"], "accepted"); + let worker_id = spawned["worker"]["worker_id"].as_str().unwrap().to_string(); + assert_eq!(spawned["worker"]["runtime_id"], "embedded-worker-runtime"); + assert_eq!( + spawned["worker"]["workspace"]["visibility"], + "backend_internal" + ); + assert_eq!( + spawned["worker"]["implementation"]["kind"], + "embedded_worker_runtime" + ); + + let worker = get_json( + app.clone(), + &format!("/api/runtimes/embedded-worker-runtime/workers/{worker_id}"), + ) + .await; + assert_eq!(worker["worker_id"], worker_id); + assert_eq!(worker["runtime_id"], "embedded-worker-runtime"); + + let accepted = post_json( + app.clone(), + &format!("/api/runtimes/embedded-worker-runtime/workers/{worker_id}/input"), + json!({ + "kind": "user", + "content": "hello from browser-facing api" + }), + ) + .await; + assert_eq!(accepted["state"], "accepted"); + assert_eq!(accepted["runtime_id"], "embedded-worker-runtime"); + assert_eq!(accepted["worker_id"], worker_id); + assert_eq!(accepted["transcript_sequence"], 1); + + let transcript = get_json( + app.clone(), + &format!("/api/runtimes/embedded-worker-runtime/workers/{worker_id}/transcript?start=0&limit=10"), + ) + .await; + assert_eq!(transcript["state"], "accepted"); + assert_eq!(transcript["items"][0]["role"], "user"); + assert_eq!( + transcript["items"][0]["content"], + "hello from browser-facing api" + ); + + let wrong_runtime = app + .clone() + .oneshot( + Request::builder() + .method("POST") + .uri(format!( + "/api/runtimes/local-worker-runtime/workers/{worker_id}/input" + )) + .header(CONTENT_TYPE, "application/json") + .body(Body::from( + serde_json::to_vec(&json!({ + "kind": "user", + "content": "wrong runtime" + })) + .unwrap(), + )) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(wrong_runtime.status(), StatusCode::NOT_FOUND); + + let projected = format!( + "{}{}{}{}{}", + embedded_summary, spawned, worker, accepted, transcript + ); + for forbidden in [ + dir.path().to_string_lossy().as_ref(), + "metadata.json", + "socket", + "session", + "token", + "credential", + "provider", + ] { + assert!( + !projected.contains(forbidden), + "embedded api projection leaked forbidden term: {forbidden}: {projected}" + ); + } + } + #[tokio::test] async fn proxies_worker_observation_ws_with_backend_cursors_and_diagnostics() { let runtime = worker_runtime::Runtime::new_memory(); @@ -1268,6 +1475,23 @@ mod tests { serde_json::from_slice(&bytes).unwrap() } + async fn post_json(app: Router, uri: &str, body: Value) -> Value { + let response = app + .oneshot( + Request::builder() + .method("POST") + .uri(uri) + .header(CONTENT_TYPE, "application/json") + .body(Body::from(serde_json::to_vec(&body).unwrap())) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK, "{uri}"); + let bytes = to_bytes(response.into_body(), usize::MAX).await.unwrap(); + serde_json::from_slice(&bytes).unwrap() + } + fn write_ticket(root: &Path, id: &str, title: &str, state: &str) { let ticket_dir = root.join(".yoi/tickets").join(id); std::fs::create_dir_all(&ticket_dir).unwrap();