use crate::Error; use chrono::Utc; use reqwest::StatusCode; use reqwest::blocking::{Client as BlockingHttpClient, RequestBuilder}; use reqwest::header::{AUTHORIZATION, CONTENT_TYPE}; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; use sha2::{Digest, Sha256}; use std::{sync::Arc, time::Duration}; use worker_runtime::catalog::{ CapabilityRequest, ConfigBundleRef, CreateWorkerRequest, ProfileSelector, WorkerDetail as EmbeddedWorkerDetail, WorkerIntent, WorkerStatus as EmbeddedWorkerStatus, }; use worker_runtime::config_bundle::{ConfigBundle, ConfigBundleAvailability, ConfigBundleSummary}; use worker_runtime::error::RuntimeError as EmbeddedRuntimeError; use worker_runtime::execution::WorkerExecutionRunState; use worker_runtime::http_server::{ RuntimeHttpConfigBundleAvailabilityResponse, RuntimeHttpConfigBundleSyncRequest, RuntimeHttpErrorResponse, RuntimeHttpSummaryResponse, RuntimeHttpTranscriptResponse, RuntimeHttpWorkerInputResponse, RuntimeHttpWorkerLifecycleRequest, RuntimeHttpWorkerLifecycleResponse, RuntimeHttpWorkerResponse, RuntimeHttpWorkersResponse, }; use worker_runtime::identity::{ RuntimeId as EmbeddedRuntimeId, WorkerId as EmbeddedWorkerId, WorkerRef as EmbeddedWorkerRef, }; use worker_runtime::interaction::{ WorkerInput as EmbeddedWorkerInput, WorkerInputKind as EmbeddedWorkerInputKind, }; use worker_runtime::management::{RuntimeOptions as EmbeddedRuntimeOptions, RuntimeStatus}; use worker_runtime::observation::{ TranscriptProjection as EmbeddedTranscriptProjection, TranscriptQuery, TranscriptRole, }; const EMBEDDED_RUNTIME_ID: &str = "embedded-worker-runtime"; const EMBEDDED_HOST_KIND: &str = "embedded-worker-runtime-host"; const REMOTE_HOST_KIND: &str = "remote-worker-runtime-host"; const MAX_DIAGNOSTICS: usize = 16; const MAX_HOST_SCAN: usize = 256; const MAX_IDENTIFIER_LEN: usize = 120; const ID_DIGEST_HEX_LEN: usize = 16; #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct RuntimeDiagnostic { pub code: String, pub severity: DiagnosticSeverity, pub message: String, } impl RuntimeDiagnostic { pub fn new(code: impl Into, severity: &str, message: impl Into) -> Self { let severity = match severity { "error" => DiagnosticSeverity::Error, "warning" => DiagnosticSeverity::Warning, _ => DiagnosticSeverity::Info, }; diagnostic(code, severity, message) } } #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "snake_case")] pub enum DiagnosticSeverity { Info, Warning, Error, } #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "snake_case")] pub enum RuntimeSourceKind { EmbeddedWorkerRuntime, RemoteHttp, } #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "snake_case")] pub enum RuntimeSourceStatus { Active, Reserved, } #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "snake_case")] pub enum RuntimeIdentityAuthority { /// Public Runtime/Host/Worker ids are registry projections, never raw /// socket addresses, session ids, credentials, or paths. RuntimeRegistryProjection, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct RuntimeSourceSummary { pub kind: RuntimeSourceKind, pub status: RuntimeSourceStatus, pub identity_authority: RuntimeIdentityAuthority, pub note: String, } impl RuntimeSourceSummary { pub fn embedded_worker_runtime() -> Self { Self { kind: RuntimeSourceKind::EmbeddedWorkerRuntime, status: RuntimeSourceStatus::Active, identity_authority: RuntimeIdentityAuthority::RuntimeRegistryProjection, note: "backend-internal embedded worker-runtime Runtime exposed only through runtime_id plus worker_id projections".to_string(), } } pub fn embedded_worker_runtime_reserved() -> Self { Self { kind: RuntimeSourceKind::EmbeddedWorkerRuntime, status: RuntimeSourceStatus::Reserved, identity_authority: RuntimeIdentityAuthority::RuntimeRegistryProjection, note: "reserved boundary for an embedded worker-runtime adapter; not connected by this fixture source".to_string(), } } pub fn remote_http() -> Self { Self { kind: RuntimeSourceKind::RemoteHttp, status: RuntimeSourceStatus::Active, identity_authority: RuntimeIdentityAuthority::RuntimeRegistryProjection, note: "backend-owned remote worker-runtime REST/WS client; endpoints and credentials remain backend-private".to_string(), } } pub fn remote_http_reserved() -> Self { Self { kind: RuntimeSourceKind::RemoteHttp, status: RuntimeSourceStatus::Reserved, identity_authority: RuntimeIdentityAuthority::RuntimeRegistryProjection, note: "reserved boundary for a future remote Runtime adapter; no HTTP client or REST server is implemented here".to_string(), } } } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct RuntimeCapabilitySummary { pub can_list_hosts: bool, pub can_list_workers: bool, pub can_get_worker: bool, pub can_spawn_worker: bool, pub can_stop_worker: bool, pub can_accept_input: bool, pub has_workspace_fs: bool, pub has_shell: bool, pub has_git: bool, pub supports_worktrees: bool, pub supports_backend_internal_tools: bool, pub workspace_scope: String, pub max_workers: usize, pub os: String, pub arch: String, } pub type HostCapabilitySummary = RuntimeCapabilitySummary; #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct RuntimeSummary { pub runtime_id: String, pub label: String, pub kind: String, pub status: String, pub source: RuntimeSourceSummary, pub host_ids: Vec, pub capabilities: RuntimeCapabilitySummary, pub diagnostics: Vec, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct HostSummary { pub runtime_id: String, pub host_id: String, pub label: String, pub kind: String, pub status: String, pub observed_at: String, pub last_seen_at: Option, pub capabilities: HostCapabilitySummary, pub diagnostics: Vec, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct WorkerWorkspaceSummary { pub visibility: String, pub identity: String, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct WorkerImplementationSummary { pub kind: String, pub display_hint: String, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct WorkerCapabilitySummary { pub can_accept_input: bool, pub can_stop: bool, pub can_spawn_followup: bool, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct WorkerSummary { pub runtime_id: String, pub worker_id: String, pub host_id: String, pub label: String, pub role: Option, pub profile: Option, pub workspace: WorkerWorkspaceSummary, pub state: String, pub status: String, pub last_seen_at: Option, pub implementation: WorkerImplementationSummary, pub capabilities: WorkerCapabilitySummary, pub diagnostics: Vec, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct RuntimeList { pub items: Vec, pub diagnostics: Vec, } impl RuntimeList { fn new(items: Vec, diagnostics: Vec) -> Self { Self { items, diagnostics } } } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct WorkerLookupResult { #[serde(skip_serializing_if = "Option::is_none")] pub worker: Option, pub diagnostics: Vec, } /// Browser-safe worker spawn request shape. /// /// The request intentionally carries only workspace policy intents, stable /// worker identifiers, optional profile selectors, config bundle refs, and /// requested capability names. Raw workspace roots, child cwd, executable path, /// Runtime endpoints/credentials, raw bundle storage paths, and host-local /// resolved WorkerSpec content are resolved by the runtime service and never /// accepted from Workspace API callers. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct WorkerSpawnRequest { pub intent: WorkerSpawnIntent, #[serde(skip_serializing_if = "Option::is_none")] pub requested_worker_name: Option, pub acceptance: WorkerSpawnAcceptanceRequirement, #[serde(default, skip_serializing_if = "Option::is_none")] pub profile: Option, #[serde(default, skip_serializing_if = "Option::is_none")] pub config_bundle: Option, #[serde(default, skip_serializing_if = "Vec::is_empty")] pub requested_capabilities: Vec, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(tag = "kind", rename_all = "snake_case")] pub enum WorkerSpawnIntent { WorkspaceCompanion, WorkspaceOrchestrator, TicketRole { ticket_id: String, role: TicketWorkerRole, }, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "snake_case")] pub enum TicketWorkerRole { Intake, Orchestrator, Coder, Reviewer, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(tag = "kind", rename_all = "snake_case")] pub enum WorkerSpawnAcceptanceRequirement { SocketReady, RunAccepted { expected_segments: usize }, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct WorkerSpawnResult { pub state: WorkerOperationState, #[serde(skip_serializing_if = "Option::is_none")] pub worker: Option, pub acceptance_evidence: Vec, pub diagnostics: Vec, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct ConfigBundleSyncResult { pub state: WorkerOperationState, #[serde(skip_serializing_if = "Option::is_none")] pub availability: Option, pub diagnostics: Vec, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct ConfigBundleCheckResult { pub state: WorkerOperationState, #[serde(skip_serializing_if = "Option::is_none")] pub availability: Option, pub diagnostics: Vec, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct ConfigBundleListResult { pub bundles: Vec, pub diagnostics: Vec, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "snake_case")] pub enum WorkerOperationState { Accepted, Unsupported, Rejected, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct WorkerSpawnAcceptanceEvidence { pub kind: String, pub detail: String, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct WorkerStopRequest { pub worker_id: String, pub mode: WorkerStopMode, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "snake_case")] pub enum WorkerStopMode { Graceful, Force, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct WorkerStopResult { pub state: WorkerOperationState, pub diagnostics: Vec, } #[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)] pub struct WorkerLifecycleRequest { #[serde(default, skip_serializing_if = "Option::is_none")] pub reason: Option, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct WorkerLifecycleResult { pub state: WorkerOperationState, pub runtime_id: String, pub worker_id: String, #[serde(skip_serializing_if = "Option::is_none")] pub event_id: Option, pub diagnostics: Vec, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "snake_case")] pub enum WorkerInputKind { User, System, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct WorkerInputRequest { #[serde(default = "default_worker_input_kind")] pub kind: WorkerInputKind, pub content: String, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct WorkerInputResult { pub state: WorkerOperationState, pub runtime_id: String, pub worker_id: String, #[serde(skip_serializing_if = "Option::is_none")] pub transcript_sequence: Option, #[serde(skip_serializing_if = "Option::is_none")] pub event_id: Option, pub diagnostics: Vec, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct WorkerTranscriptItem { pub sequence: u64, pub role: String, pub content: String, pub event_id: u64, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct WorkerTranscriptProjection { pub state: WorkerOperationState, pub runtime_id: String, pub worker_id: String, pub start: usize, pub limit: usize, pub total_items: usize, #[serde(skip_serializing_if = "Option::is_none")] pub next_start: Option, pub items: Vec, pub diagnostics: Vec, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct WorkerProxyConnectPoint { pub kind: String, pub status: String, pub diagnostics: Vec, } #[derive(Debug, Clone, PartialEq, Eq)] pub enum RuntimeRegistryError { InvalidIdentifier { kind: &'static str, value: String, }, UnknownRuntime(String), UnknownHost(String), UnknownWorker { runtime_id: String, worker_id: String, }, RuntimeOperationFailed { runtime_id: String, code: String, message: String, }, } impl RuntimeRegistryError { pub fn into_error(self) -> Error { match self { Self::InvalidIdentifier { kind, value } => Error::InvalidRuntimeIdentifier { kind: kind.to_string(), value, }, Self::UnknownRuntime(runtime_id) => Error::UnknownRuntime(runtime_id), Self::UnknownHost(host_id) => Error::UnknownHost(host_id), Self::UnknownWorker { runtime_id, worker_id, } => Error::UnknownWorker { runtime_id, worker_id, }, Self::RuntimeOperationFailed { runtime_id, code, message, } => Error::RuntimeOperationFailed { runtime_id, code, message, }, } } } fn default_worker_input_kind() -> WorkerInputKind { WorkerInputKind::User } pub trait WorkspaceWorkerRuntime: Send + Sync { fn runtime_id(&self) -> &str; fn runtime_summary(&self, limit: usize) -> RuntimeSummary; fn list_hosts(&self, limit: usize) -> RuntimeList; fn list_workers(&self, limit: usize) -> RuntimeList; fn worker(&self, worker_id: &str) -> WorkerLookupResult; fn spawn_worker(&self, request: WorkerSpawnRequest) -> WorkerSpawnResult { WorkerSpawnResult { state: WorkerOperationState::Unsupported, worker: None, acceptance_evidence: Vec::new(), diagnostics: vec![diagnostic( "worker_spawn_resolver_pending", DiagnosticSeverity::Info, format!( "worker spawn intent '{}' was accepted as a typed request shape, but launch resolution is not implemented by this registry surface", worker_spawn_intent_label(&request.intent) ), )], } } fn sync_config_bundle(&self, _bundle: ConfigBundle) -> ConfigBundleSyncResult { ConfigBundleSyncResult { state: WorkerOperationState::Unsupported, availability: None, diagnostics: vec![diagnostic( "config_bundle_sync_unsupported", DiagnosticSeverity::Info, "runtime does not implement config bundle sync".to_string(), )], } } fn check_config_bundle(&self, _reference: ConfigBundleRef) -> ConfigBundleCheckResult { ConfigBundleCheckResult { state: WorkerOperationState::Unsupported, availability: None, diagnostics: vec![diagnostic( "config_bundle_check_unsupported", DiagnosticSeverity::Info, "runtime does not implement config bundle availability checks".to_string(), )], } } fn list_config_bundles(&self) -> ConfigBundleListResult { ConfigBundleListResult { bundles: Vec::new(), diagnostics: vec![diagnostic( "config_bundle_list_unsupported", DiagnosticSeverity::Info, "runtime does not implement config bundle listing".to_string(), )], } } fn stop_worker( &self, worker_id: &str, _request: WorkerLifecycleRequest, ) -> WorkerLifecycleResult { WorkerLifecycleResult { state: WorkerOperationState::Unsupported, runtime_id: self.runtime_id().to_string(), worker_id: worker_id.to_string(), event_id: None, diagnostics: vec![diagnostic( "worker_stop_pending", DiagnosticSeverity::Info, format!( "worker stop for '{worker_id}' is reserved for the runtime service boundary and is not implemented by this registry surface" ), )], } } fn cancel_worker( &self, worker_id: &str, _request: WorkerLifecycleRequest, ) -> WorkerLifecycleResult { WorkerLifecycleResult { state: WorkerOperationState::Unsupported, runtime_id: self.runtime_id().to_string(), worker_id: worker_id.to_string(), event_id: None, diagnostics: vec![diagnostic( "worker_cancel_pending", DiagnosticSeverity::Info, format!( "worker cancel for '{worker_id}' is reserved for the runtime service boundary and is not implemented by this registry surface" ), )], } } fn observation_source( &self, _worker_id: &str, ) -> Option { None } fn send_input(&self, worker_id: &str, _request: WorkerInputRequest) -> WorkerInputResult { WorkerInputResult { state: WorkerOperationState::Unsupported, runtime_id: self.runtime_id().to_string(), worker_id: worker_id.to_string(), transcript_sequence: None, event_id: None, diagnostics: vec![diagnostic( "worker_input_pending", DiagnosticSeverity::Info, format!( "worker input for '{worker_id}' is reserved for the runtime service boundary and is not implemented by this registry source" ), )], } } fn transcript( &self, worker_id: &str, start: usize, limit: usize, ) -> WorkerTranscriptProjection { WorkerTranscriptProjection { state: WorkerOperationState::Unsupported, runtime_id: self.runtime_id().to_string(), worker_id: worker_id.to_string(), start, limit, total_items: 0, next_start: None, items: Vec::new(), diagnostics: vec![diagnostic( "worker_transcript_pending", DiagnosticSeverity::Info, format!( "bounded transcript for '{worker_id}' is not implemented by this registry source" ), )], } } fn proxy_connect_points(&self, worker_id: &str) -> Vec { vec![WorkerProxyConnectPoint { kind: "stream_proxy".to_string(), status: "not_implemented".to_string(), diagnostics: vec![diagnostic( "worker_proxy_pending", DiagnosticSeverity::Info, format!( "worker proxy connect points for '{}' are not implemented by this overview-only registry surface", worker_id ), )], }] } } #[derive(Clone)] pub struct RuntimeRegistry { runtimes: Vec>, } impl RuntimeRegistry { pub fn new(runtimes: Vec>) -> Self { Self { runtimes } } pub fn for_workspace(embedded_runtime: EmbeddedWorkerRuntime) -> Self { Self::new(vec![Arc::new(embedded_runtime)]) } pub fn register(&mut self, runtime: R) where R: WorkspaceWorkerRuntime + 'static, { self.runtimes.push(Arc::new(runtime)); } pub fn list_runtimes(&self, limit: usize) -> RuntimeList { let mut diagnostics = Vec::new(); let mut items = Vec::new(); for runtime in self.runtimes.iter().take(limit) { let summary = runtime.runtime_summary(limit); diagnostics.extend(summary.diagnostics.iter().cloned()); items.push(summary); } diagnostics.truncate(MAX_DIAGNOSTICS); RuntimeList::new(items, diagnostics) } pub fn list_hosts(&self, limit: usize) -> RuntimeList { let mut items = Vec::new(); let mut diagnostics = Vec::new(); for runtime in &self.runtimes { if items.len() >= limit { break; } let mut list = runtime.list_hosts(limit.saturating_sub(items.len())); diagnostics.append(&mut list.diagnostics); items.append(&mut list.items); } diagnostics.truncate(MAX_DIAGNOSTICS); RuntimeList::new(items, diagnostics) } pub fn list_workers(&self, limit: usize) -> RuntimeList { let mut items = Vec::new(); let mut diagnostics = Vec::new(); for runtime in &self.runtimes { if items.len() >= limit { break; } let mut list = runtime.list_workers(limit.saturating_sub(items.len())); diagnostics.append(&mut list.diagnostics); items.append(&mut list.items); } diagnostics.truncate(MAX_DIAGNOSTICS); RuntimeList::new(items, diagnostics) } pub fn list_workers_for_host( &self, host_id: &str, limit: usize, ) -> Result, RuntimeRegistryError> { validate_backend_identifier("host_id", host_id)?; let mut host_found = false; let mut diagnostics = Vec::new(); let mut items = Vec::new(); for runtime in &self.runtimes { let host_list = runtime.list_hosts(MAX_HOST_SCAN); diagnostics.extend(host_list.diagnostics); if !host_list.items.iter().any(|host| host.host_id == host_id) { continue; } host_found = true; let worker_list = runtime.list_workers(limit); diagnostics.extend(worker_list.diagnostics); items.extend( worker_list .items .into_iter() .filter(|worker| worker.host_id == host_id) .take(limit.saturating_sub(items.len())), ); if items.len() >= limit { break; } } diagnostics.truncate(MAX_DIAGNOSTICS); if host_found { Ok(RuntimeList::new(items, diagnostics)) } else { Err(RuntimeRegistryError::UnknownHost(host_id.to_string())) } } pub fn worker( &self, runtime_id: &str, worker_id: &str, ) -> Result { validate_backend_identifier("runtime_id", runtime_id)?; validate_backend_identifier("worker_id", worker_id)?; let runtime = self.runtime(runtime_id)?; let lookup = runtime.worker(worker_id); lookup.worker.ok_or_else(|| { operation_failed_or_unknown_worker(runtime_id, worker_id, lookup.diagnostics) }) } pub fn spawn_worker( &self, runtime_id: &str, request: WorkerSpawnRequest, ) -> Result { validate_backend_identifier("runtime_id", runtime_id)?; let runtime = self.runtime(runtime_id)?; Ok(runtime.spawn_worker(request)) } pub fn sync_config_bundle( &self, runtime_id: &str, bundle: ConfigBundle, ) -> Result { validate_backend_identifier("runtime_id", runtime_id)?; let runtime = self.runtime(runtime_id)?; Ok(runtime.sync_config_bundle(bundle)) } pub fn check_config_bundle( &self, runtime_id: &str, reference: ConfigBundleRef, ) -> Result { validate_backend_identifier("runtime_id", runtime_id)?; let runtime = self.runtime(runtime_id)?; Ok(runtime.check_config_bundle(reference)) } pub fn list_config_bundles( &self, runtime_id: &str, ) -> Result { validate_backend_identifier("runtime_id", runtime_id)?; let runtime = self.runtime(runtime_id)?; Ok(runtime.list_config_bundles()) } pub fn send_input( &self, runtime_id: &str, worker_id: &str, request: WorkerInputRequest, ) -> Result { validate_backend_identifier("runtime_id", runtime_id)?; validate_backend_identifier("worker_id", worker_id)?; let runtime = self.runtime(runtime_id)?; let lookup = runtime.worker(worker_id); if lookup.worker.is_none() { return Err(operation_failed_or_unknown_worker( runtime_id, worker_id, lookup.diagnostics, )); } Ok(runtime.send_input(worker_id, request)) } pub fn transcript( &self, runtime_id: &str, worker_id: &str, start: usize, limit: usize, ) -> Result { validate_backend_identifier("runtime_id", runtime_id)?; validate_backend_identifier("worker_id", worker_id)?; let runtime = self.runtime(runtime_id)?; let lookup = runtime.worker(worker_id); if lookup.worker.is_none() { return Err(operation_failed_or_unknown_worker( runtime_id, worker_id, lookup.diagnostics, )); } Ok(runtime.transcript(worker_id, start, limit)) } pub fn stop_worker( &self, runtime_id: &str, worker_id: &str, request: WorkerLifecycleRequest, ) -> Result { validate_backend_identifier("runtime_id", runtime_id)?; validate_backend_identifier("worker_id", worker_id)?; let runtime = self.runtime(runtime_id)?; let lookup = runtime.worker(worker_id); if lookup.worker.is_none() { return Err(operation_failed_or_unknown_worker( runtime_id, worker_id, lookup.diagnostics, )); } Ok(runtime.stop_worker(worker_id, request)) } pub fn cancel_worker( &self, runtime_id: &str, worker_id: &str, request: WorkerLifecycleRequest, ) -> Result { validate_backend_identifier("runtime_id", runtime_id)?; validate_backend_identifier("worker_id", worker_id)?; let runtime = self.runtime(runtime_id)?; let lookup = runtime.worker(worker_id); if lookup.worker.is_none() { return Err(operation_failed_or_unknown_worker( runtime_id, worker_id, lookup.diagnostics, )); } Ok(runtime.cancel_worker(worker_id, request)) } pub fn observation_source( &self, runtime_id: &str, worker_id: &str, ) -> Result { validate_backend_identifier("runtime_id", runtime_id)?; validate_backend_identifier("worker_id", worker_id)?; let runtime = self.runtime(runtime_id)?; runtime .observation_source(worker_id) .ok_or_else(|| RuntimeRegistryError::UnknownWorker { runtime_id: runtime_id.to_string(), worker_id: worker_id.to_string(), }) } fn runtime( &self, runtime_id: &str, ) -> Result<&Arc, RuntimeRegistryError> { self.runtimes .iter() .find(|runtime| runtime.runtime_id() == runtime_id) .ok_or_else(|| RuntimeRegistryError::UnknownRuntime(runtime_id.to_string())) } } #[derive(Clone)] pub struct EmbeddedWorkerRuntime { runtime_id: String, host_id: String, runtime: worker_runtime::Runtime, execution_enabled: bool, } impl EmbeddedWorkerRuntime { pub fn new_memory(workspace_id: impl AsRef) -> Self { let runtime_id = EmbeddedRuntimeId::new(EMBEDDED_RUNTIME_ID) .expect("embedded runtime id is a non-empty literal"); let runtime = worker_runtime::Runtime::with_options(EmbeddedRuntimeOptions { runtime_id: Some(runtime_id), display_name: Some("Workspace backend embedded Runtime".to_string()), ..EmbeddedRuntimeOptions::default() }); Self::from_runtime(workspace_id, runtime) } pub fn new_memory_with_execution_backend( workspace_id: impl AsRef, backend: std::sync::Arc, ) -> Result { let runtime_id = EmbeddedRuntimeId::new(EMBEDDED_RUNTIME_ID) .expect("embedded runtime id is a non-empty literal"); let runtime = worker_runtime::Runtime::with_execution_backend( EmbeddedRuntimeOptions { runtime_id: Some(runtime_id), display_name: Some("Workspace backend embedded Runtime".to_string()), ..EmbeddedRuntimeOptions::default() }, backend, )?; let mut embedded = Self::from_runtime(workspace_id, runtime); embedded.execution_enabled = true; Ok(embedded) } pub fn from_runtime(workspace_id: impl AsRef, runtime: worker_runtime::Runtime) -> Self { let runtime_id = runtime .runtime_id() .ok() .map(|id| id.as_str().to_string()) .unwrap_or_else(|| EMBEDDED_RUNTIME_ID.to_string()); Self { runtime_id, host_id: host_id_for_embedded_workspace(workspace_id.as_ref()), runtime, execution_enabled: false, } } fn worker_ref(&self, worker_id: &str) -> Option { Some(EmbeddedWorkerRef::new( EmbeddedRuntimeId::new(self.runtime_id.clone())?, EmbeddedWorkerId::new(worker_id.to_string())?, )) } fn can_accept_embedded_input( &self, status: EmbeddedWorkerStatus, execution: &worker_runtime::execution::WorkerExecutionStatus, ) -> bool { self.execution_enabled && status == EmbeddedWorkerStatus::Running && execution.backend == worker_runtime::execution::WorkerExecutionBackendKind::Connected && execution.run_state == WorkerExecutionRunState::Idle && !execution_last_result_blocks_control(execution) } fn can_stop_embedded_worker( &self, status: EmbeddedWorkerStatus, execution: &worker_runtime::execution::WorkerExecutionStatus, ) -> bool { self.execution_enabled && status == EmbeddedWorkerStatus::Running && execution.backend == worker_runtime::execution::WorkerExecutionBackendKind::Connected && !matches!( execution.run_state, WorkerExecutionRunState::Rejected | WorkerExecutionRunState::Errored | WorkerExecutionRunState::Unconnected ) && !execution_last_result_blocks_control(execution) } fn map_worker_summary(&self, summary: worker_runtime::catalog::WorkerSummary) -> WorkerSummary { WorkerSummary { runtime_id: self.runtime_id.clone(), worker_id: summary.worker_ref.worker_id.as_str().to_string(), host_id: self.host_id.clone(), label: safe_display_hint(summary.worker_ref.worker_id.as_str()), role: embedded_intent_label(&summary.intent), profile: embedded_profile_label(&summary.profile), workspace: WorkerWorkspaceSummary { visibility: "backend_internal".to_string(), identity: "runtime_registry_worker".to_string(), }, state: embedded_worker_status_label(summary.status).to_string(), status: embedded_worker_execution_status_label(summary.status, summary.execution.run_state) .to_string(), last_seen_at: None, implementation: WorkerImplementationSummary { kind: "embedded_worker_runtime".to_string(), display_hint: "backend-internal worker-runtime Worker".to_string(), }, capabilities: WorkerCapabilitySummary { can_accept_input: self.can_accept_embedded_input(summary.status, &summary.execution), can_stop: self.can_stop_embedded_worker(summary.status, &summary.execution), can_spawn_followup: false, }, diagnostics: vec![diagnostic( "embedded_runtime_projection", DiagnosticSeverity::Info, "Worker identity is projected only as runtime_id plus worker_id; embedded runtime internals remain backend-private".to_string(), )], } } fn map_worker_detail(&self, detail: EmbeddedWorkerDetail) -> WorkerSummary { WorkerSummary { runtime_id: self.runtime_id.clone(), worker_id: detail.worker_id.as_str().to_string(), host_id: self.host_id.clone(), label: safe_display_hint(detail.worker_id.as_str()), role: embedded_intent_label(&detail.intent), profile: embedded_profile_label(&detail.profile), workspace: WorkerWorkspaceSummary { visibility: "backend_internal".to_string(), identity: "runtime_registry_worker".to_string(), }, state: embedded_worker_status_label(detail.status).to_string(), status: embedded_worker_execution_status_label(detail.status, detail.execution.run_state) .to_string(), last_seen_at: None, implementation: WorkerImplementationSummary { kind: "embedded_worker_runtime".to_string(), display_hint: "backend-internal worker-runtime Worker".to_string(), }, capabilities: WorkerCapabilitySummary { can_accept_input: self.can_accept_embedded_input(detail.status, &detail.execution), can_stop: self.can_stop_embedded_worker(detail.status, &detail.execution), can_spawn_followup: false, }, diagnostics: vec![diagnostic( "embedded_runtime_projection", DiagnosticSeverity::Info, "Worker identity is projected only as runtime_id plus worker_id; embedded runtime internals remain backend-private".to_string(), )], } } } impl WorkspaceWorkerRuntime for EmbeddedWorkerRuntime { fn runtime_id(&self) -> &str { &self.runtime_id } fn runtime_summary(&self, limit: usize) -> RuntimeSummary { let mut diagnostics = Vec::new(); let summary = match self.runtime.summary() { Ok(summary) => summary, Err(err) => { diagnostics.push(embedded_runtime_diagnostic(&err)); return RuntimeSummary { runtime_id: self.runtime_id.clone(), label: "Embedded backend Runtime".to_string(), kind: "embedded_worker_runtime".to_string(), status: "unavailable".to_string(), source: RuntimeSourceSummary::embedded_worker_runtime(), host_ids: Vec::new(), capabilities: embedded_runtime_capabilities(limit, false, false), diagnostics, }; } }; RuntimeSummary { runtime_id: self.runtime_id.clone(), label: summary .display_name .clone() .unwrap_or_else(|| "Embedded backend Runtime".to_string()), kind: "embedded_worker_runtime".to_string(), status: embedded_runtime_status_label(summary.status).to_string(), source: RuntimeSourceSummary::embedded_worker_runtime(), host_ids: if limit == 0 { Vec::new() } else { vec![self.host_id.clone()] }, capabilities: embedded_runtime_capabilities(limit, true, self.execution_enabled), diagnostics, } } fn list_hosts(&self, limit: usize) -> RuntimeList { if limit == 0 { return RuntimeList::new(Vec::new(), Vec::new()); } RuntimeList::new( vec![HostSummary { runtime_id: self.runtime_id.clone(), host_id: self.host_id.clone(), label: "Workspace backend embedded Runtime".to_string(), kind: EMBEDDED_HOST_KIND.to_string(), status: "available".to_string(), observed_at: Utc::now().to_rfc3339(), last_seen_at: None, capabilities: embedded_runtime_capabilities(limit, true, self.execution_enabled), diagnostics: vec![diagnostic( "embedded_runtime_host_boundary", DiagnosticSeverity::Info, "Backend-internal host exposes only bounded runtime and worker projections" .to_string(), )], }], Vec::new(), ) } fn list_workers(&self, limit: usize) -> RuntimeList { if limit == 0 { return RuntimeList::new(Vec::new(), Vec::new()); } match self.runtime.list_workers() { Ok(workers) => RuntimeList::new( workers .into_iter() .take(limit) .map(|worker| self.map_worker_summary(worker)) .collect(), Vec::new(), ), Err(err) => RuntimeList::new(Vec::new(), vec![embedded_runtime_diagnostic(&err)]), } } fn worker(&self, worker_id: &str) -> WorkerLookupResult { let Some(worker_ref) = self.worker_ref(worker_id) else { return WorkerLookupResult { worker: None, diagnostics: vec![diagnostic( "embedded_worker_id_invalid", DiagnosticSeverity::Warning, "Worker id was empty and cannot be resolved".to_string(), )], }; }; match self.runtime.worker_detail(&worker_ref) { Ok(detail) => WorkerLookupResult { worker: Some(self.map_worker_detail(detail)), diagnostics: Vec::new(), }, Err(EmbeddedRuntimeError::WorkerNotFound { .. }) => WorkerLookupResult { worker: None, diagnostics: Vec::new(), }, Err(err) => WorkerLookupResult { worker: None, diagnostics: vec![embedded_runtime_diagnostic(&err)], }, } } fn spawn_worker(&self, request: WorkerSpawnRequest) -> WorkerSpawnResult { let mut diagnostics = Vec::new(); if matches!( request.acceptance, WorkerSpawnAcceptanceRequirement::SocketReady ) { diagnostics.push(diagnostic( "embedded_runtime_no_socket", DiagnosticSeverity::Warning, "Embedded backend Runtime is transportless; use run_accepted/create acceptance for backend-internal Workers".to_string(), )); return WorkerSpawnResult { state: WorkerOperationState::Rejected, worker: None, acceptance_evidence: Vec::new(), diagnostics, }; } if request.requested_worker_name.is_some() { diagnostics.push(diagnostic( "embedded_worker_name_ignored", DiagnosticSeverity::Info, "Embedded Runtime v0 allocates opaque runtime-local worker ids; requested display names are not authority".to_string(), )); } if matches!(request.acceptance, WorkerSpawnAcceptanceRequirement::RunAccepted { expected_segments } if expected_segments > 0) { diagnostics.push(diagnostic( "embedded_runtime_tools_less", DiagnosticSeverity::Info, "Embedded Runtime v0 creates a tools-less catalog Worker and does not spawn provider segments".to_string(), )); } let create_request = CreateWorkerRequest { intent: embedded_create_intent(&request.intent), profile: request .profile .clone() .unwrap_or_else(|| embedded_profile_selector(&request.intent)), config_bundle: request.config_bundle.clone(), requested_capabilities: if request.requested_capabilities.is_empty() { vec![CapabilityRequest::named("read")] } else { request.requested_capabilities.clone() }, workspace_refs: Vec::new(), mount_refs: Vec::new(), }; match self.runtime.create_worker(create_request) { Ok(detail) => { let execution_failure = embedded_spawn_execution_failure_diagnostic(&detail.execution); if let Some(diagnostic) = execution_failure { diagnostics.push(diagnostic); WorkerSpawnResult { state: WorkerOperationState::Rejected, worker: Some(self.map_worker_detail(detail)), acceptance_evidence: Vec::new(), diagnostics, } } else { WorkerSpawnResult { state: WorkerOperationState::Accepted, worker: Some(self.map_worker_detail(detail)), acceptance_evidence: vec![ WorkerSpawnAcceptanceEvidence { kind: "embedded_runtime_worker_created".to_string(), detail: "worker-runtime catalog accepted a backend-internal tools-less Worker" .to_string(), }, WorkerSpawnAcceptanceEvidence { kind: "embedded_runtime_backend_internal_projection".to_string(), detail: "only runtime_id plus worker_id backend projections were exposed" .to_string(), }, ], diagnostics, } } } Err(err) => { diagnostics.push(embedded_runtime_diagnostic(&err)); WorkerSpawnResult { state: WorkerOperationState::Rejected, worker: None, acceptance_evidence: Vec::new(), diagnostics, } } } } fn sync_config_bundle(&self, bundle: ConfigBundle) -> ConfigBundleSyncResult { match self.runtime.store_config_bundle(bundle) { Ok(availability) => ConfigBundleSyncResult { state: WorkerOperationState::Accepted, availability: Some(availability), diagnostics: Vec::new(), }, Err(error) => ConfigBundleSyncResult { state: WorkerOperationState::Rejected, availability: None, diagnostics: vec![embedded_runtime_diagnostic(&error)], }, } } fn check_config_bundle(&self, reference: ConfigBundleRef) -> ConfigBundleCheckResult { match self.runtime.check_config_bundle(&reference) { Ok(availability) => ConfigBundleCheckResult { state: WorkerOperationState::Accepted, availability: Some(availability), diagnostics: Vec::new(), }, Err(error) => ConfigBundleCheckResult { state: WorkerOperationState::Rejected, availability: None, diagnostics: vec![embedded_runtime_diagnostic(&error)], }, } } fn list_config_bundles(&self) -> ConfigBundleListResult { match self.runtime.list_config_bundles() { Ok(bundles) => ConfigBundleListResult { bundles, diagnostics: Vec::new(), }, Err(error) => ConfigBundleListResult { bundles: Vec::new(), diagnostics: vec![embedded_runtime_diagnostic(&error)], }, } } fn stop_worker( &self, worker_id: &str, request: WorkerLifecycleRequest, ) -> WorkerLifecycleResult { if !self.execution_enabled { return embedded_lifecycle_rejected( &self.runtime_id, worker_id, diagnostic( "embedded_worker_execution_unavailable", DiagnosticSeverity::Info, format!("worker stop for '{worker_id}' requires an embedded execution backend"), ), ); } let Some(worker_ref) = self.worker_ref(worker_id) else { return embedded_lifecycle_rejected( &self.runtime_id, worker_id, diagnostic( "embedded_worker_id_invalid", DiagnosticSeverity::Warning, "Worker id was empty and cannot be resolved".to_string(), ), ); }; match self.runtime.stop_worker(&worker_ref, request.reason) { Ok(ack) => WorkerLifecycleResult { state: WorkerOperationState::Accepted, runtime_id: self.runtime_id.clone(), worker_id: worker_id.to_string(), event_id: Some(ack.event_id), diagnostics: Vec::new(), }, Err(error) => embedded_lifecycle_rejected( &self.runtime_id, worker_id, embedded_runtime_diagnostic(&error), ), } } fn cancel_worker( &self, worker_id: &str, request: WorkerLifecycleRequest, ) -> WorkerLifecycleResult { if !self.execution_enabled { return embedded_lifecycle_rejected( &self.runtime_id, worker_id, diagnostic( "embedded_worker_execution_unavailable", DiagnosticSeverity::Info, format!( "worker cancel for '{worker_id}' requires an embedded execution backend" ), ), ); } let Some(worker_ref) = self.worker_ref(worker_id) else { return embedded_lifecycle_rejected( &self.runtime_id, worker_id, diagnostic( "embedded_worker_id_invalid", DiagnosticSeverity::Warning, "Worker id was empty and cannot be resolved".to_string(), ), ); }; match self.runtime.cancel_worker(&worker_ref, request.reason) { Ok(ack) => WorkerLifecycleResult { state: WorkerOperationState::Accepted, runtime_id: self.runtime_id.clone(), worker_id: worker_id.to_string(), event_id: Some(ack.event_id), diagnostics: Vec::new(), }, Err(error) => embedded_lifecycle_rejected( &self.runtime_id, worker_id, embedded_runtime_diagnostic(&error), ), } } fn observation_source( &self, worker_id: &str, ) -> Option { let worker_ref = self.worker_ref(worker_id)?; if self.runtime.worker_detail(&worker_ref).is_err() { return None; } Some(crate::observation::RuntimeObservationSource::embedded( crate::observation::EmbeddedRuntimeObservationSource { runtime_id: self.runtime_id.clone(), worker_id: worker_id.to_string(), runtime: self.runtime.clone(), worker_ref, }, )) } fn send_input(&self, worker_id: &str, request: WorkerInputRequest) -> WorkerInputResult { if !self.execution_enabled { return embedded_input_rejected( &self.runtime_id, worker_id, diagnostic( "embedded_worker_execution_unavailable", DiagnosticSeverity::Info, format!( "worker input for '{worker_id}' requires an embedded execution backend" ), ), ); } let Some(worker_ref) = self.worker_ref(worker_id) else { return embedded_input_rejected( &self.runtime_id, worker_id, diagnostic( "embedded_worker_id_invalid", DiagnosticSeverity::Warning, "Worker id was empty and cannot be resolved".to_string(), ), ); }; let input = EmbeddedWorkerInput { kind: match request.kind { WorkerInputKind::User => EmbeddedWorkerInputKind::User, WorkerInputKind::System => EmbeddedWorkerInputKind::System, }, content: request.content, }; match self.runtime.send_input(&worker_ref, input) { Ok(ack) => WorkerInputResult { state: WorkerOperationState::Accepted, runtime_id: self.runtime_id.clone(), worker_id: worker_id.to_string(), transcript_sequence: Some(ack.transcript_sequence), event_id: Some(ack.event_id), diagnostics: Vec::new(), }, Err(error) => embedded_input_rejected( &self.runtime_id, worker_id, embedded_runtime_diagnostic(&error), ), } } fn transcript( &self, worker_id: &str, start: usize, limit: usize, ) -> WorkerTranscriptProjection { let Some(worker_ref) = self.worker_ref(worker_id) else { return embedded_transcript_rejected( &self.runtime_id, worker_id, start, limit, diagnostic( "embedded_worker_id_invalid", DiagnosticSeverity::Warning, "Worker id was empty and cannot be resolved".to_string(), ), ); }; match self .runtime .transcript_projection(&worker_ref, TranscriptQuery::new(start, limit)) { Ok(projection) => { embedded_transcript_projection(&self.runtime_id, worker_id, projection) } Err(err) => embedded_transcript_rejected( &self.runtime_id, worker_id, start, limit, embedded_runtime_diagnostic(&err), ), } } } #[derive(Clone)] pub struct RemoteRuntimeConfig { pub runtime_id: String, pub display_name: String, pub base_url: String, pub bearer_token: Option, pub cached_capabilities: RuntimeCapabilitySummary, pub cached_status: String, pub timeout: Duration, } impl std::fmt::Debug for RemoteRuntimeConfig { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("RemoteRuntimeConfig") .field("runtime_id", &self.runtime_id) .field("display_name", &self.display_name) .field("base_url", &"") .field( "bearer_token", &self.bearer_token.as_ref().map(|_| ""), ) .field("cached_capabilities", &self.cached_capabilities) .field("cached_status", &self.cached_status) .field("timeout", &self.timeout) .finish() } } impl RemoteRuntimeConfig { pub fn new( runtime_id: impl Into, display_name: impl Into, base_url: impl Into, bearer_token: Option, ) -> Self { Self { runtime_id: runtime_id.into(), display_name: display_name.into(), base_url: base_url.into(), bearer_token, cached_capabilities: remote_runtime_capabilities(200, false), cached_status: "configured".to_string(), timeout: Duration::from_secs(10), } } pub fn with_cached_capabilities(mut self, capabilities: RuntimeCapabilitySummary) -> Self { self.cached_capabilities = capabilities; self } pub fn with_cached_status(mut self, status: impl Into) -> Self { self.cached_status = status.into(); self } pub fn with_timeout(mut self, timeout: Duration) -> Self { self.timeout = timeout; self } } #[derive(Clone)] pub struct RemoteWorkerRuntime { runtime_id: String, display_name: String, base_url: String, bearer_token: Option, cached_capabilities: RuntimeCapabilitySummary, cached_status: String, host_id: String, http: BlockingHttpClient, } impl RemoteWorkerRuntime { pub fn new(config: RemoteRuntimeConfig) -> Result { validate_backend_identifier("runtime_id", &config.runtime_id)?; let base_url = config.base_url.trim_end_matches('/').to_string(); let http = BlockingHttpClient::builder() .timeout(config.timeout) .build() .map_err(|err| RuntimeRegistryError::RuntimeOperationFailed { runtime_id: config.runtime_id.clone(), code: "remote_runtime_client_build_failed".to_string(), message: err.to_string(), })?; Ok(Self { host_id: host_id_for_remote_runtime(&config.runtime_id), runtime_id: config.runtime_id, display_name: config.display_name, base_url, bearer_token: config.bearer_token, cached_capabilities: config.cached_capabilities, cached_status: config.cached_status, http, }) } fn endpoint(&self, path: &str) -> String { format!("{}{}", self.base_url, path) } fn bundle_availability_path(reference: &ConfigBundleRef) -> String { format!( "/v1/config-bundles/{}/availability?digest={}", url_path_segment_encode(&reference.id), url_query_value_encode(&reference.digest) ) } fn ws_endpoint(&self, worker_id: &str) -> String { let mut base = self.base_url.clone(); if let Some(rest) = base.strip_prefix("https://") { base = format!("wss://{rest}"); } else if let Some(rest) = base.strip_prefix("http://") { base = format!("ws://{rest}"); } format!("{base}/v1/workers/{worker_id}/events/ws") } fn authorize(&self, request: RequestBuilder) -> RequestBuilder { let request = request.header(CONTENT_TYPE, "application/json"); if let Some(token) = self.bearer_token.as_deref() { request.header(AUTHORIZATION, format!("Bearer {token}")) } else { request } } fn get_json(&self, path: &str) -> Result where T: DeserializeOwned, { self.send_json(self.http.get(self.endpoint(path))) } fn post_json(&self, path: &str, body: &B) -> Result where B: Serialize + ?Sized, T: DeserializeOwned, { self.send_json(self.http.post(self.endpoint(path)).json(body)) } fn send_json(&self, request: RequestBuilder) -> Result where T: DeserializeOwned, { let response = self .authorize(request) .send() .map_err(|err| remote_reqwest_diagnostic(&self.runtime_id, err))?; let status = response.status(); if status.is_success() { response.json::().map_err(|err| { diagnostic( "remote_runtime_malformed_response", DiagnosticSeverity::Error, format!( "Remote Runtime returned malformed JSON for '{}': {err}", self.runtime_id ), ) }) } else { Err(remote_http_status_diagnostic( &self.runtime_id, status, response, )) } } fn map_worker_summary(&self, summary: worker_runtime::catalog::WorkerSummary) -> WorkerSummary { WorkerSummary { runtime_id: self.runtime_id.clone(), worker_id: summary.worker_ref.worker_id.as_str().to_string(), host_id: self.host_id.clone(), label: safe_display_hint(summary.worker_ref.worker_id.as_str()), role: embedded_intent_label(&summary.intent), profile: embedded_profile_label(&summary.profile), workspace: WorkerWorkspaceSummary { visibility: "remote_runtime".to_string(), identity: "runtime_registry_worker".to_string(), }, state: embedded_worker_status_label(summary.status).to_string(), status: embedded_worker_status_label(summary.status).to_string(), last_seen_at: None, implementation: WorkerImplementationSummary { kind: "remote_worker_runtime".to_string(), display_hint: "Backend-proxied remote worker-runtime Worker".to_string(), }, capabilities: WorkerCapabilitySummary { can_accept_input: true, can_stop: true, can_spawn_followup: false, }, diagnostics: vec![diagnostic( "remote_runtime_projection", DiagnosticSeverity::Info, "Remote Worker identity is projected only as runtime_id plus worker_id; endpoint and credentials remain backend-private".to_string(), )], } } fn map_worker_detail(&self, detail: EmbeddedWorkerDetail) -> WorkerSummary { WorkerSummary { runtime_id: self.runtime_id.clone(), worker_id: detail.worker_id.as_str().to_string(), host_id: self.host_id.clone(), label: safe_display_hint(detail.worker_id.as_str()), role: embedded_intent_label(&detail.intent), profile: embedded_profile_label(&detail.profile), workspace: WorkerWorkspaceSummary { visibility: "remote_runtime".to_string(), identity: "runtime_registry_worker".to_string(), }, state: embedded_worker_status_label(detail.status).to_string(), status: embedded_worker_status_label(detail.status).to_string(), last_seen_at: None, implementation: WorkerImplementationSummary { kind: "remote_worker_runtime".to_string(), display_hint: "Backend-proxied remote worker-runtime Worker".to_string(), }, capabilities: WorkerCapabilitySummary { can_accept_input: true, can_stop: true, can_spawn_followup: false, }, diagnostics: vec![diagnostic( "remote_runtime_projection", DiagnosticSeverity::Info, "Remote Worker identity is projected only as runtime_id plus worker_id; endpoint and credentials remain backend-private".to_string(), )], } } fn lifecycle_result_from_response( &self, worker_id: &str, response: RuntimeHttpWorkerLifecycleResponse, ) -> WorkerLifecycleResult { WorkerLifecycleResult { state: WorkerOperationState::Accepted, runtime_id: self.runtime_id.clone(), worker_id: worker_id.to_string(), event_id: Some(response.ack.event_id), diagnostics: vec![diagnostic( "remote_runtime_lifecycle_accepted", DiagnosticSeverity::Info, format!( "Remote Runtime acknowledged lifecycle operation for '{worker_id}' with status {}", embedded_worker_status_label(response.ack.status) ), )], } } } impl WorkspaceWorkerRuntime for RemoteWorkerRuntime { fn runtime_id(&self) -> &str { &self.runtime_id } fn runtime_summary(&self, limit: usize) -> RuntimeSummary { match self.get_json::("/v1/runtime") { Ok(response) => RuntimeSummary { runtime_id: self.runtime_id.clone(), label: response .runtime .display_name .unwrap_or_else(|| self.display_name.clone()), kind: "remote_worker_runtime".to_string(), status: embedded_runtime_status_label(response.runtime.status).to_string(), source: RuntimeSourceSummary::remote_http(), host_ids: if limit == 0 { Vec::new() } else { vec![self.host_id.clone()] }, capabilities: remote_runtime_capabilities(limit, true), diagnostics: vec![diagnostic( "remote_runtime_backend_proxy", DiagnosticSeverity::Info, "Remote Runtime is accessed only by backend-owned REST/WS clients".to_string(), )], }, Err(diagnostic) => RuntimeSummary { runtime_id: self.runtime_id.clone(), label: self.display_name.clone(), kind: "remote_worker_runtime".to_string(), status: self.cached_status.clone(), source: RuntimeSourceSummary::remote_http(), host_ids: if limit == 0 { Vec::new() } else { vec![self.host_id.clone()] }, capabilities: self.cached_capabilities.clone(), diagnostics: vec![diagnostic], }, } } fn list_hosts(&self, limit: usize) -> RuntimeList { if limit == 0 { return RuntimeList::new(Vec::new(), Vec::new()); } RuntimeList::new( vec![HostSummary { runtime_id: self.runtime_id.clone(), host_id: self.host_id.clone(), label: self.display_name.clone(), kind: REMOTE_HOST_KIND.to_string(), status: "configured".to_string(), observed_at: Utc::now().to_rfc3339(), last_seen_at: None, capabilities: remote_runtime_capabilities(limit, true), diagnostics: vec![diagnostic( "remote_runtime_backend_proxy", DiagnosticSeverity::Info, "Remote host endpoint and credentials are backend-private".to_string(), )], }], Vec::new(), ) } fn list_workers(&self, limit: usize) -> RuntimeList { if limit == 0 { return RuntimeList::new(Vec::new(), Vec::new()); } match self.get_json::("/v1/workers") { Ok(response) => RuntimeList::new( response .workers .into_iter() .take(limit) .map(|worker| self.map_worker_summary(worker)) .collect(), Vec::new(), ), Err(diagnostic) => RuntimeList::new(Vec::new(), vec![diagnostic]), } } fn worker(&self, worker_id: &str) -> WorkerLookupResult { match self.get_json::(&format!("/v1/workers/{worker_id}")) { Ok(response) => WorkerLookupResult { worker: Some(self.map_worker_detail(response.worker)), diagnostics: Vec::new(), }, Err(diagnostic) if diagnostic.code == "remote_worker_not_found" => WorkerLookupResult { worker: None, diagnostics: Vec::new(), }, Err(diagnostic) => WorkerLookupResult { worker: None, diagnostics: vec![diagnostic], }, } } fn spawn_worker(&self, request: WorkerSpawnRequest) -> WorkerSpawnResult { if matches!( request.acceptance, WorkerSpawnAcceptanceRequirement::SocketReady ) { return WorkerSpawnResult { state: WorkerOperationState::Rejected, worker: None, acceptance_evidence: Vec::new(), diagnostics: vec![diagnostic( "remote_runtime_no_socket_ready_acceptance", DiagnosticSeverity::Warning, "Remote Runtime v0 exposes backend-proxied REST/WS control, not direct socket readiness".to_string(), )], }; } let create = CreateWorkerRequest { intent: embedded_create_intent(&request.intent), profile: request .profile .clone() .unwrap_or_else(|| embedded_profile_selector(&request.intent)), config_bundle: request.config_bundle.clone(), requested_capabilities: if request.requested_capabilities.is_empty() { vec![CapabilityRequest::named("read")] } else { request.requested_capabilities.clone() }, workspace_refs: Vec::new(), mount_refs: Vec::new(), }; match self.post_json::<_, RuntimeHttpWorkerResponse>("/v1/workers", &create) { Ok(response) => WorkerSpawnResult { state: WorkerOperationState::Accepted, worker: Some(self.map_worker_detail(response.worker)), acceptance_evidence: vec![WorkerSpawnAcceptanceEvidence { kind: "remote_runtime_worker_created".to_string(), detail: "worker-runtime REST create endpoint accepted the Worker".to_string(), }], diagnostics: vec![diagnostic( "remote_runtime_backend_proxy", DiagnosticSeverity::Info, "Remote create used a backend-owned REST client; browser-facing payload exposes only runtime_id plus worker_id".to_string(), )], }, Err(diagnostic) => WorkerSpawnResult { state: WorkerOperationState::Rejected, worker: None, acceptance_evidence: Vec::new(), diagnostics: vec![diagnostic], }, } } fn sync_config_bundle(&self, bundle: ConfigBundle) -> ConfigBundleSyncResult { let request = RuntimeHttpConfigBundleSyncRequest { bundle }; match self.post_json::<_, RuntimeHttpConfigBundleAvailabilityResponse>( "/v1/config-bundles", &request, ) { Ok(response) => ConfigBundleSyncResult { state: WorkerOperationState::Accepted, availability: Some(response.availability), diagnostics: Vec::new(), }, Err(diagnostic) => ConfigBundleSyncResult { state: WorkerOperationState::Rejected, availability: None, diagnostics: vec![diagnostic], }, } } fn check_config_bundle(&self, reference: ConfigBundleRef) -> ConfigBundleCheckResult { let path = Self::bundle_availability_path(&reference); match self.get_json::(&path) { Ok(response) => ConfigBundleCheckResult { state: WorkerOperationState::Accepted, availability: Some(response.availability), diagnostics: Vec::new(), }, Err(diagnostic) => ConfigBundleCheckResult { state: WorkerOperationState::Rejected, availability: None, diagnostics: vec![diagnostic], }, } } fn stop_worker( &self, worker_id: &str, request: WorkerLifecycleRequest, ) -> WorkerLifecycleResult { let body = RuntimeHttpWorkerLifecycleRequest { reason: request.reason, }; match self.post_json::<_, RuntimeHttpWorkerLifecycleResponse>( &format!("/v1/workers/{worker_id}/stop"), &body, ) { Ok(response) => self.lifecycle_result_from_response(worker_id, response), Err(diagnostic) => remote_lifecycle_rejected(&self.runtime_id, worker_id, diagnostic), } } fn cancel_worker( &self, worker_id: &str, request: WorkerLifecycleRequest, ) -> WorkerLifecycleResult { let body = RuntimeHttpWorkerLifecycleRequest { reason: request.reason, }; match self.post_json::<_, RuntimeHttpWorkerLifecycleResponse>( &format!("/v1/workers/{worker_id}/cancel"), &body, ) { Ok(response) => self.lifecycle_result_from_response(worker_id, response), Err(diagnostic) => remote_lifecycle_rejected(&self.runtime_id, worker_id, diagnostic), } } fn observation_source( &self, worker_id: &str, ) -> Option { Some(crate::observation::RuntimeObservationSource::remote_ws( crate::observation::RuntimeObservationSourceConfig { runtime_id: self.runtime_id.clone(), worker_id: worker_id.to_string(), endpoint: self.ws_endpoint(worker_id), bearer_token: self.bearer_token.clone(), }, )) } fn send_input(&self, worker_id: &str, request: WorkerInputRequest) -> WorkerInputResult { let input = EmbeddedWorkerInput { kind: match request.kind { WorkerInputKind::User => EmbeddedWorkerInputKind::User, WorkerInputKind::System => EmbeddedWorkerInputKind::System, }, content: request.content, }; match self.post_json::<_, RuntimeHttpWorkerInputResponse>( &format!("/v1/workers/{worker_id}/input"), &input, ) { Ok(response) => WorkerInputResult { state: WorkerOperationState::Accepted, runtime_id: self.runtime_id.clone(), worker_id: worker_id.to_string(), transcript_sequence: Some(response.ack.transcript_sequence), event_id: Some(response.ack.event_id), diagnostics: Vec::new(), }, Err(diagnostic) => remote_input_rejected(&self.runtime_id, worker_id, diagnostic), } } fn transcript( &self, worker_id: &str, start: usize, limit: usize, ) -> WorkerTranscriptProjection { match self.get_json::(&format!( "/v1/workers/{worker_id}/transcript?start={start}&limit={limit}" )) { Ok(response) => { embedded_transcript_projection(&self.runtime_id, worker_id, response.transcript) } Err(diagnostic) => { embedded_transcript_rejected(&self.runtime_id, worker_id, start, limit, diagnostic) } } } } fn embedded_runtime_capabilities( limit: usize, available: bool, execution_enabled: bool, ) -> RuntimeCapabilitySummary { RuntimeCapabilitySummary { can_list_hosts: true, can_list_workers: available, can_get_worker: available, can_spawn_worker: available, can_stop_worker: available && execution_enabled, can_accept_input: available && execution_enabled, has_workspace_fs: false, has_shell: false, has_git: false, supports_worktrees: false, supports_backend_internal_tools: true, workspace_scope: "backend_internal".to_string(), max_workers: limit, os: std::env::consts::OS.to_string(), arch: std::env::consts::ARCH.to_string(), } } fn embedded_runtime_status_label(status: RuntimeStatus) -> &'static str { match status { RuntimeStatus::Running => "running", RuntimeStatus::Stopped => "stopped", } } fn embedded_spawn_execution_failure_diagnostic( execution: &worker_runtime::execution::WorkerExecutionStatus, ) -> Option { let result = execution.last_result.as_ref()?; let severity = match result.outcome { worker_runtime::execution::WorkerExecutionOutcome::Accepted => return None, worker_runtime::execution::WorkerExecutionOutcome::Rejected | worker_runtime::execution::WorkerExecutionOutcome::Busy | worker_runtime::execution::WorkerExecutionOutcome::Unsupported => { DiagnosticSeverity::Warning } worker_runtime::execution::WorkerExecutionOutcome::Errored => DiagnosticSeverity::Error, }; let status = match result.outcome { worker_runtime::execution::WorkerExecutionOutcome::Accepted => "accepted", worker_runtime::execution::WorkerExecutionOutcome::Rejected => "rejected", worker_runtime::execution::WorkerExecutionOutcome::Busy => "busy", worker_runtime::execution::WorkerExecutionOutcome::Unsupported => "unsupported", worker_runtime::execution::WorkerExecutionOutcome::Errored => "errored", }; Some(diagnostic( format!("embedded_worker_execution_spawn_{status}"), severity, format!( "Embedded Worker execution spawn was {status} during setup; check runtime configuration" ), )) } fn execution_last_result_blocks_control( execution: &worker_runtime::execution::WorkerExecutionStatus, ) -> bool { execution.last_result.as_ref().is_some_and(|result| { matches!( result.outcome, worker_runtime::execution::WorkerExecutionOutcome::Rejected | worker_runtime::execution::WorkerExecutionOutcome::Errored | worker_runtime::execution::WorkerExecutionOutcome::Unsupported ) }) } fn embedded_worker_status_label(status: EmbeddedWorkerStatus) -> &'static str { match status { EmbeddedWorkerStatus::Running => "running", EmbeddedWorkerStatus::Stopped => "stopped", EmbeddedWorkerStatus::Cancelled => "cancelled", } } fn embedded_worker_execution_status_label( status: EmbeddedWorkerStatus, run_state: WorkerExecutionRunState, ) -> &'static str { match status { EmbeddedWorkerStatus::Stopped => "stopped", EmbeddedWorkerStatus::Cancelled => "cancelled", EmbeddedWorkerStatus::Running => match run_state { WorkerExecutionRunState::Idle => "idle", WorkerExecutionRunState::Busy => "running", WorkerExecutionRunState::Stopped => "stopped", WorkerExecutionRunState::Rejected => "rejected", WorkerExecutionRunState::Errored => "errored", WorkerExecutionRunState::Unconnected => "unconnected", }, } } fn embedded_create_intent(intent: &WorkerSpawnIntent) -> WorkerIntent { match intent { WorkerSpawnIntent::WorkspaceCompanion => WorkerIntent::Role { role: "workspace_companion".to_string(), purpose: Some("workspace backend internal companion".to_string()), }, WorkerSpawnIntent::WorkspaceOrchestrator => WorkerIntent::Role { role: "workspace_orchestrator".to_string(), purpose: Some("workspace backend internal orchestration".to_string()), }, WorkerSpawnIntent::TicketRole { ticket_id, role } => WorkerIntent::Role { role: ticket_role_profile_slug(role).to_string(), purpose: Some(format!("ticket {ticket_id}")), }, } } fn embedded_profile_selector(intent: &WorkerSpawnIntent) -> ProfileSelector { match intent { WorkerSpawnIntent::TicketRole { role, .. } => { ProfileSelector::Builtin(format!("builtin:{}", ticket_role_profile_slug(role))) } WorkerSpawnIntent::WorkspaceCompanion | WorkerSpawnIntent::WorkspaceOrchestrator => { ProfileSelector::RuntimeDefault } } } fn ticket_role_profile_slug(role: &TicketWorkerRole) -> &'static str { match role { TicketWorkerRole::Intake => "intake", TicketWorkerRole::Orchestrator => "orchestrator", TicketWorkerRole::Coder => "coder", TicketWorkerRole::Reviewer => "reviewer", } } fn embedded_intent_label(intent: &WorkerIntent) -> Option { match intent { WorkerIntent::Assistant { purpose } => { purpose.clone().or_else(|| Some("assistant".to_string())) } WorkerIntent::Task { objective } => Some(safe_display_hint(objective)), WorkerIntent::Role { role, .. } => Some(safe_display_hint(role)), } } fn embedded_profile_label(profile: &ProfileSelector) -> Option { Some(match profile { ProfileSelector::RuntimeDefault => "runtime_default".to_string(), ProfileSelector::Builtin(name) | ProfileSelector::Named(name) => safe_display_hint(name), }) } fn embedded_input_rejected( runtime_id: &str, worker_id: &str, diagnostic: RuntimeDiagnostic, ) -> WorkerInputResult { WorkerInputResult { state: WorkerOperationState::Rejected, runtime_id: runtime_id.to_string(), worker_id: worker_id.to_string(), transcript_sequence: None, event_id: None, diagnostics: vec![diagnostic], } } fn remote_input_rejected( runtime_id: &str, worker_id: &str, diagnostic: RuntimeDiagnostic, ) -> WorkerInputResult { WorkerInputResult { state: WorkerOperationState::Rejected, runtime_id: runtime_id.to_string(), worker_id: worker_id.to_string(), transcript_sequence: None, event_id: None, diagnostics: vec![diagnostic], } } fn embedded_lifecycle_rejected( runtime_id: &str, worker_id: &str, diagnostic: RuntimeDiagnostic, ) -> WorkerLifecycleResult { WorkerLifecycleResult { state: WorkerOperationState::Rejected, runtime_id: runtime_id.to_string(), worker_id: worker_id.to_string(), event_id: None, diagnostics: vec![diagnostic], } } fn remote_lifecycle_rejected( runtime_id: &str, worker_id: &str, diagnostic: RuntimeDiagnostic, ) -> WorkerLifecycleResult { WorkerLifecycleResult { state: WorkerOperationState::Rejected, runtime_id: runtime_id.to_string(), worker_id: worker_id.to_string(), event_id: None, diagnostics: vec![diagnostic], } } fn embedded_transcript_projection( runtime_id: &str, worker_id: &str, projection: EmbeddedTranscriptProjection, ) -> WorkerTranscriptProjection { WorkerTranscriptProjection { state: WorkerOperationState::Accepted, runtime_id: runtime_id.to_string(), worker_id: worker_id.to_string(), start: projection.start, limit: projection.limit, total_items: projection.total_items, next_start: projection.next_start, items: projection .items .into_iter() .map(|item| WorkerTranscriptItem { sequence: item.sequence, role: embedded_transcript_role_label(item.role).to_string(), content: item.content, event_id: item.event_id, }) .collect(), diagnostics: Vec::new(), } } fn embedded_transcript_rejected( runtime_id: &str, worker_id: &str, start: usize, limit: usize, diagnostic: RuntimeDiagnostic, ) -> WorkerTranscriptProjection { WorkerTranscriptProjection { state: WorkerOperationState::Rejected, runtime_id: runtime_id.to_string(), worker_id: worker_id.to_string(), start, limit, total_items: 0, next_start: None, items: Vec::new(), diagnostics: vec![diagnostic], } } fn embedded_transcript_role_label(role: TranscriptRole) -> &'static str { match role { TranscriptRole::User => "user", TranscriptRole::Assistant => "assistant", TranscriptRole::System => "system", } } fn embedded_runtime_diagnostic(error: &EmbeddedRuntimeError) -> RuntimeDiagnostic { match error { EmbeddedRuntimeError::RuntimeStopped { .. } => diagnostic( "embedded_runtime_stopped", DiagnosticSeverity::Warning, "Embedded Runtime is stopped".to_string(), ), EmbeddedRuntimeError::WrongRuntime { .. } | EmbeddedRuntimeError::WrongRuntimeCursor { .. } => diagnostic( "embedded_runtime_wrong_identity", DiagnosticSeverity::Warning, "Embedded Runtime rejected a worker/runtime identity mismatch".to_string(), ), EmbeddedRuntimeError::WorkerNotFound { .. } => diagnostic( "embedded_worker_not_found", DiagnosticSeverity::Warning, "Embedded Runtime worker was not found".to_string(), ), EmbeddedRuntimeError::WorkerExecutionUnavailable { .. } => diagnostic( "embedded_worker_execution_unavailable", DiagnosticSeverity::Warning, "Embedded Worker has no execution backend attached".to_string(), ), EmbeddedRuntimeError::WorkerExecutionRejected { .. } => diagnostic( "embedded_worker_execution_rejected", DiagnosticSeverity::Warning, "Embedded Worker execution backend rejected the operation".to_string(), ), EmbeddedRuntimeError::LimitTooLarge { requested, max } => diagnostic( "embedded_runtime_limit_too_large", DiagnosticSeverity::Warning, format!("Requested limit {requested} exceeds embedded Runtime maximum {max}"), ), EmbeddedRuntimeError::InvalidRequest(_) | EmbeddedRuntimeError::ConfigBundleMissing { .. } | EmbeddedRuntimeError::ConfigBundleDigestMismatch { .. } | EmbeddedRuntimeError::InvalidProfileSelector { .. } | EmbeddedRuntimeError::UnsupportedConfigDeclaration { .. } => diagnostic( "embedded_runtime_invalid_request", DiagnosticSeverity::Warning, "Embedded Runtime rejected the request".to_string(), ), EmbeddedRuntimeError::StoreIo { .. } | EmbeddedRuntimeError::StoreMissing { .. } | EmbeddedRuntimeError::StoreCorrupt { .. } => diagnostic( "embedded_runtime_store_error", DiagnosticSeverity::Error, "Embedded Runtime storage operation failed; internal paths are not exposed".to_string(), ), EmbeddedRuntimeError::StatePoisoned => diagnostic( "embedded_runtime_state_unavailable", DiagnosticSeverity::Error, "Embedded Runtime state is unavailable".to_string(), ), } } fn host_id_for_embedded_workspace(workspace_id: &str) -> String { bounded_backend_identifier("embedded-", workspace_id) } fn host_id_for_remote_runtime(runtime_id: &str) -> String { bounded_backend_identifier("remote-", runtime_id) } fn url_path_segment_encode(input: &str) -> String { percent_encode(input, |byte| { byte.is_ascii_alphanumeric() || matches!(byte, b'-' | b'_' | b'.' | b'~' | b':') }) } fn url_query_value_encode(input: &str) -> String { percent_encode(input, |byte| { byte.is_ascii_alphanumeric() || matches!(byte, b'-' | b'_' | b'.' | b'~') }) } fn percent_encode(input: &str, keep: impl Fn(u8) -> bool) -> String { let mut encoded = String::with_capacity(input.len()); for byte in input.bytes() { if keep(byte) { encoded.push(byte as char); } else { encoded.push_str(&format!("%{byte:02X}")); } } encoded } fn remote_runtime_capabilities(limit: usize, available: bool) -> RuntimeCapabilitySummary { RuntimeCapabilitySummary { can_list_hosts: true, can_list_workers: available, can_get_worker: available, can_spawn_worker: available, can_stop_worker: available, can_accept_input: available, has_workspace_fs: false, has_shell: false, has_git: false, supports_worktrees: false, supports_backend_internal_tools: false, workspace_scope: "remote_runtime_backend_private".to_string(), max_workers: limit, os: "remote".to_string(), arch: "remote".to_string(), } } fn remote_reqwest_diagnostic(runtime_id: &str, err: reqwest::Error) -> RuntimeDiagnostic { if err.is_timeout() { diagnostic( "remote_runtime_timeout", DiagnosticSeverity::Error, format!("Timed out while contacting remote Runtime '{runtime_id}'"), ) } else if err.is_connect() || err.is_request() { diagnostic( "remote_runtime_network_error", DiagnosticSeverity::Error, format!("Failed to contact remote Runtime '{runtime_id}'"), ) } else { diagnostic( "remote_runtime_client_error", DiagnosticSeverity::Error, format!("Remote Runtime client error for '{runtime_id}'"), ) } } fn remote_http_status_diagnostic( runtime_id: &str, status: StatusCode, response: reqwest::blocking::Response, ) -> RuntimeDiagnostic { let error = response.json::().ok(); let remote_code = error .as_ref() .map(|error| error.error.code.as_str()) .unwrap_or("remote_http_error"); let (code, severity) = match status { StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => { ("remote_runtime_auth_failed", DiagnosticSeverity::Error) } StatusCode::NOT_FOUND => ("remote_worker_not_found", DiagnosticSeverity::Warning), StatusCode::METHOD_NOT_ALLOWED | StatusCode::NOT_IMPLEMENTED => { ("remote_runtime_unsupported", DiagnosticSeverity::Warning) } _ if status.is_server_error() => ("remote_runtime_http_error", DiagnosticSeverity::Error), _ => ("remote_runtime_http_rejected", DiagnosticSeverity::Warning), }; diagnostic( code, severity, format!( "Remote Runtime '{runtime_id}' rejected request ({remote_code}, HTTP {status}); internal details were sanitized" ), ) } fn diagnostic( code: impl Into, severity: DiagnosticSeverity, message: impl Into, ) -> RuntimeDiagnostic { RuntimeDiagnostic { code: code.into(), severity, message: message.into(), } } fn operation_failed_or_unknown_worker( runtime_id: &str, worker_id: &str, diagnostics: Vec, ) -> RuntimeRegistryError { diagnostics .into_iter() .find(|diagnostic| matches!(diagnostic.severity, DiagnosticSeverity::Error)) .map(|diagnostic| RuntimeRegistryError::RuntimeOperationFailed { runtime_id: runtime_id.to_string(), code: diagnostic.code, message: diagnostic.message, }) .unwrap_or_else(|| RuntimeRegistryError::UnknownWorker { runtime_id: runtime_id.to_string(), worker_id: worker_id.to_string(), }) } fn bounded_backend_identifier(prefix: &str, value: &str) -> String { let digest = digest_hex(value.as_bytes(), ID_DIGEST_HEX_LEN); let mut body = sanitize_identifier_body(value); if body.is_empty() { body = "id".to_string(); } let suffix_len = 1 + ID_DIGEST_HEX_LEN; let body_budget = MAX_IDENTIFIER_LEN .saturating_sub(prefix.len()) .saturating_sub(suffix_len) .max(1); if body.len() > body_budget { body.truncate(body_budget); body = body.trim_matches('-').to_string(); if body.is_empty() { body = "id".to_string(); } } let mut id = format!("{prefix}{body}-{digest}"); if id.len() > MAX_IDENTIFIER_LEN { let digest_suffix = format!("-{digest}"); let prefix_budget = MAX_IDENTIFIER_LEN.saturating_sub(digest_suffix.len()); id = format!( "{}{}", prefix.chars().take(prefix_budget).collect::(), digest_suffix ); } id } fn sanitize_identifier_body(value: &str) -> String { let mut out = String::with_capacity(value.len()); for ch in value.chars() { if ch.is_ascii_alphanumeric() { out.push(ch.to_ascii_lowercase()); } else if ch == '-' || ch == '_' { out.push(ch); } else { out.push('-'); } } out.trim_matches('-').to_string() } fn digest_hex(bytes: &[u8], hex_len: usize) -> String { let digest = Sha256::digest(bytes); let mut out = String::with_capacity(hex_len); for byte in digest { if out.len() >= hex_len { break; } out.push_str(&format!("{byte:02x}")); } out.truncate(hex_len); out } fn validate_backend_identifier( kind: &'static str, value: &str, ) -> Result<(), RuntimeRegistryError> { if value.is_empty() || value.len() > MAX_IDENTIFIER_LEN || value .chars() .any(|ch| !(ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' || ch == ':')) { return Err(RuntimeRegistryError::InvalidIdentifier { kind, value: value.chars().take(MAX_IDENTIFIER_LEN).collect(), }); } Ok(()) } fn safe_display_hint(value: &str) -> String { value .chars() .filter(|ch| !ch.is_control() && *ch != '/' && *ch != '\\') .take(80) .collect() } fn worker_spawn_intent_label(intent: &WorkerSpawnIntent) -> &'static str { match intent { WorkerSpawnIntent::WorkspaceCompanion => "workspace_companion", WorkerSpawnIntent::WorkspaceOrchestrator => "workspace_orchestrator", WorkerSpawnIntent::TicketRole { role, .. } => match role { TicketWorkerRole::Intake => "ticket_intake", TicketWorkerRole::Orchestrator => "ticket_orchestrator", TicketWorkerRole::Coder => "ticket_coder", TicketWorkerRole::Reviewer => "ticket_reviewer", }, } } pub fn placeholder_worker(host_id: impl Into) -> WorkerSummary { let host_id = host_id.into(); WorkerSummary { runtime_id: "placeholder".to_string(), worker_id: "worker-placeholder".to_string(), host_id, label: "Worker runtime actions are not implemented".to_string(), role: None, profile: None, workspace: WorkerWorkspaceSummary { visibility: "none".to_string(), identity: "unsupported".to_string(), }, state: "unsupported".to_string(), status: "Worker runtime control is not wired yet".to_string(), last_seen_at: None, implementation: WorkerImplementationSummary { kind: "placeholder".to_string(), display_hint: "unsupported".to_string(), }, capabilities: WorkerCapabilitySummary { can_accept_input: false, can_stop: false, can_spawn_followup: false, }, diagnostics: vec![diagnostic( "runtime_capability_unsupported", DiagnosticSeverity::Info, "worker control is outside this overview-only registry surface".to_string(), )], } } pub fn placeholder_spawn_response(host_id: impl Into) -> WorkerSpawnResult { WorkerSpawnResult { state: WorkerOperationState::Unsupported, worker: Some(placeholder_worker(host_id)), acceptance_evidence: Vec::new(), diagnostics: vec![diagnostic( "worker_spawn_unsupported", DiagnosticSeverity::Info, "Workspace worker runtime control is not implemented yet".to_string(), )], } } #[cfg(test)] mod tests { use super::*; use serde_json::json; use std::collections::HashMap; use std::io::{Read as _, Write as _}; use std::net::TcpListener; use std::sync::{Arc, Mutex}; use std::thread; fn test_config_bundle() -> ConfigBundle { ConfigBundle { metadata: worker_runtime::config_bundle::ConfigBundleMetadata { id: "bundle-1".to_string(), digest: String::new(), revision: "rev-1".to_string(), workspace_id: "local:test".to_string(), created_at: "2026-06-26T00:00:00Z".to_string(), provenance: worker_runtime::config_bundle::ConfigBundleProvenance { source: "workspace-server-test".to_string(), detail: None, }, }, profiles: vec![worker_runtime::config_bundle::ConfigProfileDescriptor { selector: ProfileSelector::Builtin("builtin:coder".to_string()), label: Some("Coder".to_string()), }], declarations: vec![worker_runtime::config_bundle::ConfigDeclaration { kind: worker_runtime::config_bundle::ConfigDeclarationKind::CapabilityGrant, name: "read".to_string(), reference: "capability:read".to_string(), }], } .with_computed_digest() } struct FailingSpawnBackend; impl worker_runtime::execution::WorkerExecutionBackend for FailingSpawnBackend { fn backend_id(&self) -> &str { "workspace-server-failing-spawn-backend" } fn spawn_worker( &self, _request: worker_runtime::execution::WorkerExecutionSpawnRequest, ) -> worker_runtime::execution::WorkerExecutionSpawnResult { worker_runtime::execution::WorkerExecutionSpawnResult::Errored( worker_runtime::execution::WorkerExecutionResult::errored( worker_runtime::execution::WorkerExecutionOperation::Spawn, "provider setup failed at /tmp/secret-provider-config", ), ) } fn dispatch_input( &self, _handle: &worker_runtime::execution::WorkerExecutionHandle, _input: EmbeddedWorkerInput, ) -> worker_runtime::execution::WorkerExecutionResult { worker_runtime::execution::WorkerExecutionResult::rejected( worker_runtime::execution::WorkerExecutionOperation::Input, "spawn failed before input could be dispatched", ) } } #[derive(Default)] struct AcceptingExecutionBackend { contexts: Mutex>, } impl worker_runtime::execution::WorkerExecutionBackend for AcceptingExecutionBackend { fn backend_id(&self) -> &str { "workspace-server-test-backend" } fn spawn_worker( &self, request: worker_runtime::execution::WorkerExecutionSpawnRequest, ) -> worker_runtime::execution::WorkerExecutionSpawnResult { self.contexts .lock() .unwrap() .insert(request.worker_ref.clone(), request.context); worker_runtime::execution::WorkerExecutionSpawnResult::Connected { handle: worker_runtime::execution::WorkerExecutionHandle::new( request.worker_ref, self.backend_id(), ), run_state: WorkerExecutionRunState::Idle, } } fn dispatch_input( &self, handle: &worker_runtime::execution::WorkerExecutionHandle, input: EmbeddedWorkerInput, ) -> worker_runtime::execution::WorkerExecutionResult { let context = self .contexts .lock() .unwrap() .get(handle.worker_ref()) .cloned(); let Some(context) = context else { return worker_runtime::execution::WorkerExecutionResult::rejected( worker_runtime::execution::WorkerExecutionOperation::Input, "missing test context", ); }; let content = input.content; std::thread::spawn(move || { std::thread::sleep(std::time::Duration::from_millis(10)); let _ = context.publish_protocol_event(protocol::Event::Status { status: protocol::WorkerStatus::Running, }); let _ = context.publish_protocol_event(protocol::Event::TextDone { text: format!("echo: {content}"), }); let _ = context.publish_protocol_event(protocol::Event::RunEnd { result: protocol::RunResult::Finished, }); let _ = context.publish_protocol_event(protocol::Event::Status { status: protocol::WorkerStatus::Idle, }); }); worker_runtime::execution::WorkerExecutionResult::accepted( worker_runtime::execution::WorkerExecutionOperation::Input, WorkerExecutionRunState::Busy, ) } } #[derive(Clone)] struct FixtureRuntime { runtime_id: String, host_id: String, workers: Vec, } impl FixtureRuntime { fn with_worker(runtime_id: &str, host_id: &str, worker_id: &str, label: &str) -> Self { Self { runtime_id: runtime_id.to_string(), host_id: host_id.to_string(), workers: vec![WorkerSummary { runtime_id: runtime_id.to_string(), worker_id: worker_id.to_string(), host_id: host_id.to_string(), label: label.to_string(), role: None, profile: None, workspace: WorkerWorkspaceSummary { visibility: "opaque".to_string(), identity: host_id.to_string(), }, state: "running".to_string(), status: "available".to_string(), last_seen_at: None, implementation: WorkerImplementationSummary { kind: "fixture".to_string(), display_hint: "test fixture".to_string(), }, capabilities: WorkerCapabilitySummary { can_accept_input: false, can_stop: false, can_spawn_followup: false, }, diagnostics: Vec::new(), }], } } } impl WorkspaceWorkerRuntime for FixtureRuntime { fn runtime_id(&self) -> &str { &self.runtime_id } fn runtime_summary(&self, _limit: usize) -> RuntimeSummary { RuntimeSummary { runtime_id: self.runtime_id.clone(), label: self.runtime_id.clone(), kind: "fixture".to_string(), status: "available".to_string(), source: RuntimeSourceSummary::embedded_worker_runtime_reserved(), host_ids: vec![self.host_id.clone()], capabilities: RuntimeCapabilitySummary { can_list_hosts: true, can_list_workers: true, can_get_worker: true, can_spawn_worker: false, can_stop_worker: false, can_accept_input: false, has_workspace_fs: false, has_shell: false, has_git: false, supports_worktrees: false, supports_backend_internal_tools: false, workspace_scope: "none".to_string(), max_workers: self.workers.len(), os: "test".to_string(), arch: "test".to_string(), }, diagnostics: Vec::new(), } } fn list_hosts(&self, _limit: usize) -> RuntimeList { RuntimeList::new( vec![HostSummary { runtime_id: self.runtime_id.clone(), host_id: self.host_id.clone(), label: "fixture host".to_string(), kind: "fixture".to_string(), status: "available".to_string(), observed_at: "unknown".to_string(), last_seen_at: None, capabilities: self.runtime_summary(1).capabilities, diagnostics: Vec::new(), }], Vec::new(), ) } fn list_workers(&self, limit: usize) -> RuntimeList { RuntimeList::new( self.workers.iter().take(limit).cloned().collect(), Vec::new(), ) } fn worker(&self, worker_id: &str) -> WorkerLookupResult { WorkerLookupResult { worker: self .workers .iter() .find(|worker| worker.worker_id == worker_id) .cloned(), diagnostics: Vec::new(), } } } #[test] fn registry_worker_lookup_is_scoped_by_runtime_id() { let registry = RuntimeRegistry::new(vec![ Arc::new(FixtureRuntime::with_worker( "runtime-a", "host-a", "shared-worker", "worker from runtime a", )), Arc::new(FixtureRuntime::with_worker( "runtime-b", "host-b", "shared-worker", "worker from runtime b", )), ]); let from_runtime_b = registry.worker("runtime-b", "shared-worker").unwrap(); assert_eq!(from_runtime_b.runtime_id, "runtime-b"); assert_eq!(from_runtime_b.host_id, "host-b"); assert_eq!(from_runtime_b.label, "worker from runtime b"); let from_runtime_a = registry.worker("runtime-a", "shared-worker").unwrap(); assert_eq!(from_runtime_a.runtime_id, "runtime-a"); assert_eq!(from_runtime_a.host_id, "host-a"); assert_eq!(from_runtime_a.label, "worker from runtime a"); } #[test] fn registry_worker_lookup_reports_unknown_runtime_and_worker_separately() { let registry = RuntimeRegistry::new(vec![Arc::new(FixtureRuntime::with_worker( "runtime-a", "host-a", "worker-a", "worker from runtime a", ))]); let unknown_runtime = registry.worker("runtime-missing", "worker-a").unwrap_err(); assert_eq!( unknown_runtime, RuntimeRegistryError::UnknownRuntime("runtime-missing".to_string()) ); assert!(matches!( unknown_runtime.into_error(), Error::UnknownRuntime(runtime_id) if runtime_id == "runtime-missing" )); let unknown_worker = registry.worker("runtime-a", "worker-missing").unwrap_err(); assert_eq!( unknown_worker, RuntimeRegistryError::UnknownWorker { runtime_id: "runtime-a".to_string(), worker_id: "worker-missing".to_string(), } ); assert!(matches!( unknown_worker.into_error(), Error::UnknownWorker { runtime_id, worker_id } if runtime_id == "runtime-a" && worker_id == "worker-missing" )); } fn embedded_spawn_request() -> WorkerSpawnRequest { WorkerSpawnRequest { intent: WorkerSpawnIntent::TicketRole { ticket_id: "00001KVZSGT0Q".to_string(), role: TicketWorkerRole::Coder, }, requested_worker_name: None, acceptance: WorkerSpawnAcceptanceRequirement::RunAccepted { expected_segments: 0, }, profile: None, config_bundle: None, requested_capabilities: Vec::new(), } } #[test] fn embedded_runtime_spawn_execution_failure_is_rejected_and_not_input_capable() { let runtime = EmbeddedWorkerRuntime::new_memory_with_execution_backend( "local:test", Arc::new(FailingSpawnBackend), ) .expect("test backend should connect"); let spawned = runtime.spawn_worker(embedded_spawn_request()); assert_eq!(spawned.state, WorkerOperationState::Rejected); assert!(spawned.acceptance_evidence.is_empty()); assert!(spawned.diagnostics.iter().any(|diagnostic| { diagnostic.code == "embedded_worker_execution_spawn_errored" && !diagnostic.message.contains("/tmp/secret-provider-config") })); let worker = spawned.worker.expect("failed execution is still projected"); assert_eq!(worker.status, "errored"); assert!(!worker.capabilities.can_accept_input); assert!(!worker.capabilities.can_stop); } #[test] fn embedded_runtime_with_execution_backend_routes_input_and_projects_transcript() { let runtime = EmbeddedWorkerRuntime::new_memory_with_execution_backend( "local:test", Arc::new(AcceptingExecutionBackend::default()), ) .expect("test backend should connect"); let spawned = runtime.spawn_worker(embedded_spawn_request()); assert_eq!(spawned.state, WorkerOperationState::Accepted); let worker = spawned.worker.expect("created embedded worker"); assert!(worker.capabilities.can_accept_input); assert!(worker.capabilities.can_stop); let input = runtime.send_input( &worker.worker_id, WorkerInputRequest { kind: WorkerInputKind::User, content: "hello".to_string(), }, ); assert_eq!(input.state, WorkerOperationState::Accepted); let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2); loop { let detail = runtime .worker(&worker.worker_id) .worker .expect("worker detail"); let transcript = runtime.transcript(&worker.worker_id, 0, 10); if detail.status == "idle" && transcript .items .iter() .any(|entry| entry.role == "assistant" && entry.content == "echo: hello") { assert!(detail.capabilities.can_accept_input); break; } assert!( std::time::Instant::now() < deadline, "timed out waiting for embedded execution projection" ); std::thread::sleep(std::time::Duration::from_millis(10)); } } #[test] fn embedded_runtime_registers_routes_input_and_transcript_without_internal_leaks() { let registry = RuntimeRegistry::for_workspace(EmbeddedWorkerRuntime::new_memory("local:test")); let runtimes = registry.list_runtimes(10); let embedded_summary = runtimes .items .iter() .find(|runtime| runtime.runtime_id == EMBEDDED_RUNTIME_ID) .expect("embedded runtime summary"); assert_eq!( embedded_summary.source.kind, RuntimeSourceKind::EmbeddedWorkerRuntime ); assert_eq!(embedded_summary.source.status, RuntimeSourceStatus::Active); assert!(embedded_summary.capabilities.can_spawn_worker); assert!(!embedded_summary.capabilities.can_accept_input); let spawned = registry .spawn_worker( EMBEDDED_RUNTIME_ID, WorkerSpawnRequest { intent: WorkerSpawnIntent::TicketRole { ticket_id: "00001KVZSGT0Q".to_string(), role: TicketWorkerRole::Coder, }, requested_worker_name: Some("friendly-name-is-not-authority".to_string()), acceptance: WorkerSpawnAcceptanceRequirement::RunAccepted { expected_segments: 0, }, profile: None, config_bundle: None, requested_capabilities: Vec::new(), }, ) .unwrap(); assert_eq!(spawned.state, WorkerOperationState::Accepted); assert!( spawned .acceptance_evidence .iter() .any(|evidence| evidence.kind == "embedded_runtime_backend_internal_projection") ); let worker = spawned.worker.expect("created embedded worker"); assert_eq!(worker.runtime_id, EMBEDDED_RUNTIME_ID); assert_eq!(worker.workspace.visibility, "backend_internal"); assert_eq!(worker.workspace.identity, "runtime_registry_worker"); assert_eq!(worker.implementation.kind, "embedded_worker_runtime"); assert_eq!(worker.profile.as_deref(), Some("builtin:coder")); assert!(!worker.capabilities.can_accept_input); let input = registry .send_input( EMBEDDED_RUNTIME_ID, &worker.worker_id, WorkerInputRequest { kind: WorkerInputKind::User, content: "hello embedded runtime".to_string(), }, ) .unwrap(); assert_eq!(input.state, WorkerOperationState::Rejected); assert_eq!(input.runtime_id, EMBEDDED_RUNTIME_ID); assert_eq!(input.worker_id, worker.worker_id); assert!( input .diagnostics .iter() .any(|diagnostic| diagnostic.code == "embedded_worker_execution_unavailable") ); let transcript = registry .transcript(EMBEDDED_RUNTIME_ID, &worker.worker_id, 0, 10) .unwrap(); assert_eq!(transcript.state, WorkerOperationState::Accepted); assert!(transcript.items.is_empty()); let json = serde_json::to_string(&(embedded_summary, worker, transcript)).unwrap(); for forbidden in [ "/workspace/project", "metadata.json", "session", "socket", "token", "credential", "provider", "can_stream_events", "can_read_bounded_transcript", ] { assert!( !json.contains(forbidden), "embedded runtime projection leaked forbidden term: {forbidden}: {json}" ); } } #[test] fn embedded_backend_syncs_config_bundle_and_spawns_with_bundle_ref() { let registry = RuntimeRegistry::new(vec![Arc::new(EmbeddedWorkerRuntime::new_memory( "local:test", ))]); let bundle = test_config_bundle(); let sync = registry .sync_config_bundle(EMBEDDED_RUNTIME_ID, bundle.clone()) .unwrap(); assert_eq!(sync.state, WorkerOperationState::Accepted); let reference = sync.availability.expect("bundle availability").reference; assert_eq!(reference.id, bundle.metadata.id); assert_eq!(reference.digest, bundle.metadata.digest); let check = registry .check_config_bundle(EMBEDDED_RUNTIME_ID, reference.clone()) .unwrap(); assert_eq!(check.state, WorkerOperationState::Accepted); let spawned = registry .spawn_worker( EMBEDDED_RUNTIME_ID, WorkerSpawnRequest { intent: WorkerSpawnIntent::TicketRole { ticket_id: "00001KVZSGT0Q".to_string(), role: TicketWorkerRole::Coder, }, requested_worker_name: None, acceptance: WorkerSpawnAcceptanceRequirement::RunAccepted { expected_segments: 0, }, profile: Some(ProfileSelector::Builtin("builtin:coder".to_string())), config_bundle: Some(reference), requested_capabilities: vec![CapabilityRequest::named("read")], }, ) .unwrap(); assert_eq!(spawned.state, WorkerOperationState::Accepted); assert_eq!( spawned.worker.unwrap().profile.as_deref(), Some("builtin:coder") ); } #[test] fn embedded_runtime_rejects_socket_ready_acceptance_without_socket_identity() { let registry = RuntimeRegistry::new(vec![Arc::new(EmbeddedWorkerRuntime::new_memory( "local:test", ))]); let result = registry .spawn_worker( EMBEDDED_RUNTIME_ID, WorkerSpawnRequest { intent: WorkerSpawnIntent::WorkspaceCompanion, requested_worker_name: None, acceptance: WorkerSpawnAcceptanceRequirement::SocketReady, profile: None, config_bundle: None, requested_capabilities: Vec::new(), }, ) .unwrap(); assert_eq!(result.state, WorkerOperationState::Rejected); assert!(result.worker.is_none()); assert!( result .diagnostics .iter() .any(|diag| diag.code == "embedded_runtime_no_socket") ); } #[test] fn remote_runtime_registry_routes_commands_without_browser_secret_leaks() { let worker_json = worker_json("remote:primary", "worker-remote-1"); let (base_url, server) = serve_mock_http(vec![ mock_response( "GET", "/v1/workers", true, 200, json!({ "workers": [worker_json.clone()] }).to_string(), ), mock_response( "GET", "/v1/workers/worker-remote-1", true, 200, json!({ "worker": worker_json.clone() }).to_string(), ), mock_response( "POST", "/v1/workers/worker-remote-1/input", true, 200, json!({ "ack": { "worker_ref": { "runtime_id": "remote:primary", "worker_id": "worker-remote-1" }, "status": "running", "transcript_sequence": 7, "event_id": 8 } }) .to_string(), ), ]); let secret = "secret-token-do-not-leak".to_string(); let mut registry = RuntimeRegistry::new(Vec::new()); registry.register( RemoteWorkerRuntime::new(RemoteRuntimeConfig::new( "remote:primary", "Remote Primary", base_url.clone(), Some(secret.clone()), )) .unwrap(), ); let observation = registry .observation_source("remote:primary", "worker-remote-1") .expect("remote runtime exposes backend-owned WS observation source"); let crate::observation::RuntimeObservationSource::RemoteWs(observation) = observation else { panic!("remote runtime should expose a remote WS observation source"); }; assert!(observation.endpoint.starts_with("ws://127.0.0.1:")); assert!( observation .endpoint .ends_with("/v1/workers/worker-remote-1/events/ws") ); assert_eq!(observation.bearer_token.as_deref(), Some(secret.as_str())); let workers = registry.list_workers(10); assert_eq!(workers.items.len(), 1); assert_eq!(workers.items[0].runtime_id, "remote:primary"); assert_eq!(workers.items[0].worker_id, "worker-remote-1"); assert_eq!( workers.items[0].implementation.kind, "remote_worker_runtime" ); assert_eq!( workers.items[0].workspace.identity, "runtime_registry_worker" ); let input = registry .send_input( "remote:primary", "worker-remote-1", WorkerInputRequest { kind: WorkerInputKind::User, content: "hello remote".to_string(), }, ) .unwrap(); assert_eq!(input.state, WorkerOperationState::Accepted); assert_eq!(input.transcript_sequence, Some(7)); assert_eq!(input.event_id, Some(8)); server.join().expect("mock remote server finished"); let browser_payload = serde_json::to_string(&(workers, input)).unwrap(); assert!( !browser_payload.contains(&base_url), "leaked base URL: {browser_payload}" ); assert!( !browser_payload.contains(&secret), "leaked token: {browser_payload}" ); assert!(browser_payload.contains("runtime_id")); assert!(browser_payload.contains("worker_id")); } #[test] fn remote_config_bundle_sync_and_check_diagnostics_are_sanitized_and_path_safe() { let leaked_store_path = "/var/lib/yoi/runtime/bundles/bundle-1.json"; let leaked_session_path = ".yoi/sessions/session.jsonl"; let digest = "0".repeat(64); let (base_url, server) = serve_mock_http(vec![ mock_response( "POST", "/v1/config-bundles", true, 500, json!({ "error": { "code": "store_io", "message": format!("failed to write {leaked_store_path}") } }) .to_string(), ), mock_response( "GET", "/v1/config-bundles/bundle%2F1%3Fx/availability?digest=0000000000000000000000000000000000000000000000000000000000000000", true, 400, json!({ "error": { "code": "invalid_request", "message": format!("invalid path {leaked_session_path}") } }) .to_string(), ), ]); let mut registry = RuntimeRegistry::new(Vec::new()); registry.register( RemoteWorkerRuntime::new(RemoteRuntimeConfig::new( "remote:primary", "Remote Primary", base_url, Some("secret-token".to_string()), )) .unwrap(), ); let sync = registry .sync_config_bundle("remote:primary", test_config_bundle()) .unwrap(); assert_eq!(sync.state, WorkerOperationState::Rejected); let sync_payload = serde_json::to_string(&sync).unwrap(); assert!(!sync_payload.contains(leaked_store_path), "{sync_payload}"); let check = registry .check_config_bundle( "remote:primary", ConfigBundleRef { id: "bundle/1?x".to_string(), digest, }, ) .unwrap(); assert_eq!(check.state, WorkerOperationState::Rejected); let check_payload = serde_json::to_string(&check).unwrap(); assert!( !check_payload.contains(leaked_session_path), "{check_payload}" ); assert!(!check_payload.contains(".yoi/sessions"), "{check_payload}"); server.join().expect("mock remote server finished"); } #[test] fn remote_runtime_auth_errors_map_to_typed_backend_error() { let (base_url, server) = serve_mock_http(vec![mock_response( "GET", "/v1/workers/worker-missing", true, 401, json!({ "error": { "code": "unauthorized", "message": "bad token" } }).to_string(), )]); let mut registry = RuntimeRegistry::new(Vec::new()); registry.register( RemoteWorkerRuntime::new(RemoteRuntimeConfig::new( "remote:primary", "Remote Primary", base_url, Some("secret-token".to_string()), )) .unwrap(), ); let error = registry .worker("remote:primary", "worker-missing") .expect_err("auth failure is a backend operation error"); assert!(matches!( error, RuntimeRegistryError::RuntimeOperationFailed { runtime_id, code, .. } if runtime_id == "remote:primary" && code == "remote_runtime_auth_failed" )); server.join().expect("mock remote server finished"); } #[derive(Clone)] struct MockResponse { method: &'static str, path: &'static str, require_auth: bool, status: u16, body: String, } fn mock_response( method: &'static str, path: &'static str, require_auth: bool, status: u16, body: String, ) -> MockResponse { MockResponse { method, path, require_auth, status, body, } } fn serve_mock_http(responses: Vec) -> (String, thread::JoinHandle<()>) { let listener = TcpListener::bind("127.0.0.1:0").unwrap(); let base_url = format!("http://{}", listener.local_addr().unwrap()); let handle = thread::spawn(move || { for expected in responses { let (mut stream, _) = listener.accept().unwrap(); let mut buffer = [0_u8; 8192]; let read = stream.read(&mut buffer).unwrap(); let request = String::from_utf8_lossy(&buffer[..read]); let first_line = request.lines().next().unwrap_or_default(); let expected_line = format!("{} {} ", expected.method, expected.path); assert!( first_line.starts_with(&expected_line), "unexpected request line: {first_line}, expected prefix {expected_line}" ); if expected.require_auth { assert!( request .to_ascii_lowercase() .contains("authorization: bearer secret-token"), "authorization header missing from request: {request}" ); } let status_text = match expected.status { 200 => "OK", 401 => "Unauthorized", 404 => "Not Found", _ => "Mock", }; let response = format!( "HTTP/1.1 {} {status_text}\r\ncontent-type: application/json\r\ncontent-length: {}\r\nconnection: close\r\n\r\n{}", expected.status, expected.body.len(), expected.body ); stream.write_all(response.as_bytes()).unwrap(); } }); (base_url, handle) } fn worker_json(runtime_id: &str, worker_id: &str) -> serde_json::Value { json!({ "worker_ref": { "runtime_id": runtime_id, "worker_id": worker_id }, "runtime_id": runtime_id, "worker_id": worker_id, "status": "running", "execution": { "backend": "connected", "run_state": "idle" }, "intent": { "kind": "role", "role": "coder", "purpose": "remote test" }, "profile": { "kind": "builtin", "value": "coder" }, "config_bundle": null, "requested_capabilities": [], "workspace_refs": [], "mount_refs": [], "requested_capability_count": 0, "has_config_bundle": false, "transcript_len": 0, "last_event_id": 0 }) } }