diff --git a/Cargo.lock b/Cargo.lock index 9a55d81f..983d5cb9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6036,6 +6036,7 @@ dependencies = [ "pod-store", "project-record", "protocol", + "reqwest", "rusqlite", "serde", "serde_json", diff --git a/crates/workspace-server/Cargo.toml b/crates/workspace-server/Cargo.toml index 4fa7b487..99558ec2 100644 --- a/crates/workspace-server/Cargo.toml +++ b/crates/workspace-server/Cargo.toml @@ -14,6 +14,7 @@ futures.workspace = true pod-store = { workspace = true } protocol = { workspace = true } project-record.workspace = true +reqwest = { version = "0.13", default-features = false, features = ["blocking", "json", "native-tls"] } rusqlite.workspace = true serde = { workspace = true, features = ["derive"] } serde_json.workspace = true diff --git a/crates/workspace-server/src/hosts.rs b/crates/workspace-server/src/hosts.rs index 7a184794..f5e6615f 100644 --- a/crates/workspace-server/src/hosts.rs +++ b/crates/workspace-server/src/hosts.rs @@ -1,6 +1,10 @@ use crate::Error; use chrono::Utc; use pod_store::WorkerMetadata; +use reqwest::StatusCode; +use reqwest::blocking::{Client as BlockingHttpClient, RequestBuilder}; +use reqwest::header::{AUTHORIZATION, CONTENT_TYPE}; +use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; use serde_json::Value; use sha2::{Digest, Sha256}; @@ -9,12 +13,18 @@ use std::{ fs, path::{Path, PathBuf}, sync::Arc, + time::Duration, }; use worker_runtime::catalog::{ CreateWorkerRequest, ProfileSelector, WorkerDetail as EmbeddedWorkerDetail, WorkerIntent, WorkerStatus as EmbeddedWorkerStatus, }; use worker_runtime::error::RuntimeError as EmbeddedRuntimeError; +use worker_runtime::http_server::{ + RuntimeHttpErrorResponse, RuntimeHttpSummaryResponse, RuntimeHttpTranscriptResponse, + RuntimeHttpWorkerInputResponse, RuntimeHttpWorkerLifecycleRequest, + RuntimeHttpWorkerLifecycleResponse, RuntimeHttpWorkerResponse, RuntimeHttpWorkersResponse, +}; use worker_runtime::identity::{ RuntimeId as EmbeddedRuntimeId, WorkerId as EmbeddedWorkerId, WorkerRef as EmbeddedWorkerRef, }; @@ -30,6 +40,7 @@ const LOCAL_RUNTIME_ID: &str = "local-worker-runtime"; const EMBEDDED_RUNTIME_ID: &str = "embedded-worker-runtime"; const LOCAL_HOST_KIND: &str = "local-worker-host"; const EMBEDDED_HOST_KIND: &str = "embedded-worker-runtime-host"; +const REMOTE_HOST_KIND: &str = "remote-worker-runtime-host"; const MAX_DIAGNOSTICS: usize = 16; const MAX_HOST_SCAN: usize = 256; const MAX_IDENTIFIER_LEN: usize = 120; @@ -123,6 +134,15 @@ impl RuntimeSourceSummary { } } + pub fn remote_http() -> Self { + Self { + kind: RuntimeSourceKind::RemoteHttp, + status: RuntimeSourceStatus::Active, + identity_authority: RuntimeIdentityAuthority::RuntimeRegistryProjection, + note: "backend-owned remote worker-runtime REST/WS client; endpoints and credentials remain backend-private".to_string(), + } + } + pub fn remote_http_reserved() -> Self { Self { kind: RuntimeSourceKind::RemoteHttp, @@ -322,6 +342,22 @@ pub struct WorkerStopResult { pub diagnostics: Vec, } +#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)] +pub struct WorkerLifecycleRequest { + #[serde(default, skip_serializing_if = "Option::is_none")] + pub reason: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct WorkerLifecycleResult { + pub state: WorkerOperationState, + pub runtime_id: String, + pub worker_id: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub event_id: Option, + pub diagnostics: Vec, +} + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "snake_case")] pub enum WorkerInputKind { @@ -389,6 +425,11 @@ pub enum RuntimeRegistryError { runtime_id: String, worker_id: String, }, + RuntimeOperationFailed { + runtime_id: String, + code: String, + message: String, + }, } impl RuntimeRegistryError { @@ -407,6 +448,15 @@ impl RuntimeRegistryError { runtime_id, worker_id, }, + Self::RuntimeOperationFailed { + runtime_id, + code, + message, + } => Error::RuntimeOperationFailed { + runtime_id, + code, + message, + }, } } } @@ -442,20 +492,53 @@ pub trait WorkspaceWorkerRuntime: Send + Sync { } } - fn stop_worker(&self, request: WorkerStopRequest) -> WorkerStopResult { - WorkerStopResult { + fn stop_worker( + &self, + worker_id: &str, + _request: WorkerLifecycleRequest, + ) -> WorkerLifecycleResult { + WorkerLifecycleResult { state: WorkerOperationState::Unsupported, + runtime_id: self.runtime_id().to_string(), + worker_id: worker_id.to_string(), + event_id: None, diagnostics: vec![diagnostic( "worker_stop_pending", DiagnosticSeverity::Info, format!( - "worker stop for '{}' is reserved for the runtime service boundary and is not implemented by this registry surface", - request.worker_id + "worker stop for '{worker_id}' is reserved for the runtime service boundary and is not implemented by this registry surface" ), )], } } + fn cancel_worker( + &self, + worker_id: &str, + _request: WorkerLifecycleRequest, + ) -> WorkerLifecycleResult { + WorkerLifecycleResult { + state: WorkerOperationState::Unsupported, + runtime_id: self.runtime_id().to_string(), + worker_id: worker_id.to_string(), + event_id: None, + diagnostics: vec![diagnostic( + "worker_cancel_pending", + DiagnosticSeverity::Info, + format!( + "worker cancel for '{worker_id}' is reserved for the runtime service boundary and is not implemented by this registry surface" + ), + )], + } + } + + fn observation_source( + &self, + _worker_id: &str, + ) -> Option { + None + } + fn send_input(&self, worker_id: &str, _request: WorkerInputRequest) -> WorkerInputResult { WorkerInputResult { state: WorkerOperationState::Unsupported, @@ -631,12 +714,9 @@ impl RuntimeRegistry { validate_backend_identifier("worker_id", worker_id)?; let runtime = self.runtime(runtime_id)?; let lookup = runtime.worker(worker_id); - lookup - .worker - .ok_or_else(|| RuntimeRegistryError::UnknownWorker { - runtime_id: runtime_id.to_string(), - worker_id: worker_id.to_string(), - }) + lookup.worker.ok_or_else(|| { + operation_failed_or_unknown_worker(runtime_id, worker_id, lookup.diagnostics) + }) } pub fn spawn_worker( @@ -658,11 +738,13 @@ impl RuntimeRegistry { validate_backend_identifier("runtime_id", runtime_id)?; validate_backend_identifier("worker_id", worker_id)?; let runtime = self.runtime(runtime_id)?; - if runtime.worker(worker_id).worker.is_none() { - return Err(RuntimeRegistryError::UnknownWorker { - runtime_id: runtime_id.to_string(), - worker_id: worker_id.to_string(), - }); + let lookup = runtime.worker(worker_id); + if lookup.worker.is_none() { + return Err(operation_failed_or_unknown_worker( + runtime_id, + worker_id, + lookup.diagnostics, + )); } Ok(runtime.send_input(worker_id, request)) } @@ -677,15 +759,73 @@ impl RuntimeRegistry { validate_backend_identifier("runtime_id", runtime_id)?; validate_backend_identifier("worker_id", worker_id)?; let runtime = self.runtime(runtime_id)?; - if runtime.worker(worker_id).worker.is_none() { - return Err(RuntimeRegistryError::UnknownWorker { - runtime_id: runtime_id.to_string(), - worker_id: worker_id.to_string(), - }); + let lookup = runtime.worker(worker_id); + if lookup.worker.is_none() { + return Err(operation_failed_or_unknown_worker( + runtime_id, + worker_id, + lookup.diagnostics, + )); } Ok(runtime.transcript(worker_id, start, limit)) } + pub fn stop_worker( + &self, + runtime_id: &str, + worker_id: &str, + request: WorkerLifecycleRequest, + ) -> Result { + validate_backend_identifier("runtime_id", runtime_id)?; + validate_backend_identifier("worker_id", worker_id)?; + let runtime = self.runtime(runtime_id)?; + let lookup = runtime.worker(worker_id); + if lookup.worker.is_none() { + return Err(operation_failed_or_unknown_worker( + runtime_id, + worker_id, + lookup.diagnostics, + )); + } + Ok(runtime.stop_worker(worker_id, request)) + } + + pub fn cancel_worker( + &self, + runtime_id: &str, + worker_id: &str, + request: WorkerLifecycleRequest, + ) -> Result { + validate_backend_identifier("runtime_id", runtime_id)?; + validate_backend_identifier("worker_id", worker_id)?; + let runtime = self.runtime(runtime_id)?; + let lookup = runtime.worker(worker_id); + if lookup.worker.is_none() { + return Err(operation_failed_or_unknown_worker( + runtime_id, + worker_id, + lookup.diagnostics, + )); + } + Ok(runtime.cancel_worker(worker_id, request)) + } + + pub fn observation_source( + &self, + runtime_id: &str, + worker_id: &str, + ) -> Result { + validate_backend_identifier("runtime_id", runtime_id)?; + validate_backend_identifier("worker_id", worker_id)?; + let runtime = self.runtime(runtime_id)?; + runtime + .observation_source(worker_id) + .ok_or_else(|| RuntimeRegistryError::UnknownWorker { + runtime_id: runtime_id.to_string(), + worker_id: worker_id.to_string(), + }) + } + fn runtime( &self, runtime_id: &str, @@ -1059,6 +1199,497 @@ impl WorkspaceWorkerRuntime for EmbeddedWorkerRuntime { } } +#[derive(Clone)] +pub struct RemoteRuntimeConfig { + pub runtime_id: String, + pub display_name: String, + pub base_url: String, + pub bearer_token: Option, + pub cached_capabilities: RuntimeCapabilitySummary, + pub cached_status: String, + pub timeout: Duration, +} + +impl std::fmt::Debug for RemoteRuntimeConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RemoteRuntimeConfig") + .field("runtime_id", &self.runtime_id) + .field("display_name", &self.display_name) + .field("base_url", &"") + .field( + "bearer_token", + &self.bearer_token.as_ref().map(|_| ""), + ) + .field("cached_capabilities", &self.cached_capabilities) + .field("cached_status", &self.cached_status) + .field("timeout", &self.timeout) + .finish() + } +} + +impl RemoteRuntimeConfig { + pub fn new( + runtime_id: impl Into, + display_name: impl Into, + base_url: impl Into, + bearer_token: Option, + ) -> Self { + Self { + runtime_id: runtime_id.into(), + display_name: display_name.into(), + base_url: base_url.into(), + bearer_token, + cached_capabilities: remote_runtime_capabilities(200, false), + cached_status: "configured".to_string(), + timeout: Duration::from_secs(10), + } + } + + pub fn with_cached_capabilities(mut self, capabilities: RuntimeCapabilitySummary) -> Self { + self.cached_capabilities = capabilities; + self + } + + pub fn with_cached_status(mut self, status: impl Into) -> Self { + self.cached_status = status.into(); + self + } + + pub fn with_timeout(mut self, timeout: Duration) -> Self { + self.timeout = timeout; + self + } +} + +#[derive(Clone)] +pub struct RemoteWorkerRuntime { + runtime_id: String, + display_name: String, + base_url: String, + bearer_token: Option, + cached_capabilities: RuntimeCapabilitySummary, + cached_status: String, + host_id: String, + http: BlockingHttpClient, +} + +impl RemoteWorkerRuntime { + pub fn new(config: RemoteRuntimeConfig) -> Result { + validate_backend_identifier("runtime_id", &config.runtime_id)?; + let base_url = config.base_url.trim_end_matches('/').to_string(); + let http = BlockingHttpClient::builder() + .timeout(config.timeout) + .build() + .map_err(|err| RuntimeRegistryError::RuntimeOperationFailed { + runtime_id: config.runtime_id.clone(), + code: "remote_runtime_client_build_failed".to_string(), + message: err.to_string(), + })?; + Ok(Self { + host_id: host_id_for_remote_runtime(&config.runtime_id), + runtime_id: config.runtime_id, + display_name: config.display_name, + base_url, + bearer_token: config.bearer_token, + cached_capabilities: config.cached_capabilities, + cached_status: config.cached_status, + http, + }) + } + + fn endpoint(&self, path: &str) -> String { + format!("{}{}", self.base_url, path) + } + + fn ws_endpoint(&self, worker_id: &str) -> String { + let mut base = self.base_url.clone(); + if let Some(rest) = base.strip_prefix("https://") { + base = format!("wss://{rest}"); + } else if let Some(rest) = base.strip_prefix("http://") { + base = format!("ws://{rest}"); + } + format!("{base}/v1/workers/{worker_id}/events/ws") + } + + fn authorize(&self, request: RequestBuilder) -> RequestBuilder { + let request = request.header(CONTENT_TYPE, "application/json"); + if let Some(token) = self.bearer_token.as_deref() { + request.header(AUTHORIZATION, format!("Bearer {token}")) + } else { + request + } + } + + fn get_json(&self, path: &str) -> Result + where + T: DeserializeOwned, + { + self.send_json(self.http.get(self.endpoint(path))) + } + + fn post_json(&self, path: &str, body: &B) -> Result + where + B: Serialize + ?Sized, + T: DeserializeOwned, + { + self.send_json(self.http.post(self.endpoint(path)).json(body)) + } + + fn send_json(&self, request: RequestBuilder) -> Result + where + T: DeserializeOwned, + { + let response = self + .authorize(request) + .send() + .map_err(|err| remote_reqwest_diagnostic(&self.runtime_id, err))?; + let status = response.status(); + if status.is_success() { + response.json::().map_err(|err| { + diagnostic( + "remote_runtime_malformed_response", + DiagnosticSeverity::Error, + format!( + "Remote Runtime returned malformed JSON for '{}': {err}", + self.runtime_id + ), + ) + }) + } else { + Err(remote_http_status_diagnostic( + &self.runtime_id, + status, + response, + )) + } + } + + fn map_worker_summary(&self, summary: worker_runtime::catalog::WorkerSummary) -> WorkerSummary { + WorkerSummary { + runtime_id: self.runtime_id.clone(), + worker_id: summary.worker_ref.worker_id.as_str().to_string(), + host_id: self.host_id.clone(), + label: safe_display_hint(summary.worker_ref.worker_id.as_str()), + role: embedded_intent_label(&summary.intent), + profile: embedded_profile_label(&summary.profile), + workspace: WorkerWorkspaceSummary { + visibility: "remote_runtime".to_string(), + identity: "runtime_registry_worker".to_string(), + }, + state: embedded_worker_status_label(summary.status).to_string(), + status: embedded_worker_status_label(summary.status).to_string(), + last_seen_at: None, + implementation: WorkerImplementationSummary { + kind: "remote_worker_runtime".to_string(), + display_hint: "Backend-proxied remote worker-runtime Worker".to_string(), + }, + capabilities: WorkerCapabilitySummary { + can_accept_input: true, + can_stream_events: true, + can_stop: true, + can_spawn_followup: false, + can_read_bounded_transcript: true, + }, + diagnostics: vec![diagnostic( + "remote_runtime_projection", + DiagnosticSeverity::Info, + "Remote Worker identity is projected only as runtime_id plus worker_id; endpoint and credentials remain backend-private".to_string(), + )], + } + } + + fn map_worker_detail(&self, detail: EmbeddedWorkerDetail) -> WorkerSummary { + WorkerSummary { + runtime_id: self.runtime_id.clone(), + worker_id: detail.worker_id.as_str().to_string(), + host_id: self.host_id.clone(), + label: safe_display_hint(detail.worker_id.as_str()), + role: embedded_intent_label(&detail.intent), + profile: embedded_profile_label(&detail.profile), + workspace: WorkerWorkspaceSummary { + visibility: "remote_runtime".to_string(), + identity: "runtime_registry_worker".to_string(), + }, + state: embedded_worker_status_label(detail.status).to_string(), + status: embedded_worker_status_label(detail.status).to_string(), + last_seen_at: None, + implementation: WorkerImplementationSummary { + kind: "remote_worker_runtime".to_string(), + display_hint: "Backend-proxied remote worker-runtime Worker".to_string(), + }, + capabilities: WorkerCapabilitySummary { + can_accept_input: true, + can_stream_events: true, + can_stop: true, + can_spawn_followup: false, + can_read_bounded_transcript: true, + }, + diagnostics: vec![diagnostic( + "remote_runtime_projection", + DiagnosticSeverity::Info, + "Remote Worker identity is projected only as runtime_id plus worker_id; endpoint and credentials remain backend-private".to_string(), + )], + } + } + + fn lifecycle_result_from_response( + &self, + worker_id: &str, + response: RuntimeHttpWorkerLifecycleResponse, + ) -> WorkerLifecycleResult { + WorkerLifecycleResult { + state: WorkerOperationState::Accepted, + runtime_id: self.runtime_id.clone(), + worker_id: worker_id.to_string(), + event_id: Some(response.ack.event_id), + diagnostics: vec![diagnostic( + "remote_runtime_lifecycle_accepted", + DiagnosticSeverity::Info, + format!( + "Remote Runtime acknowledged lifecycle operation for '{worker_id}' with status {}", + embedded_worker_status_label(response.ack.status) + ), + )], + } + } +} + +impl WorkspaceWorkerRuntime for RemoteWorkerRuntime { + fn runtime_id(&self) -> &str { + &self.runtime_id + } + + fn runtime_summary(&self, limit: usize) -> RuntimeSummary { + match self.get_json::("/v1/runtime") { + Ok(response) => RuntimeSummary { + runtime_id: self.runtime_id.clone(), + label: response + .runtime + .display_name + .unwrap_or_else(|| self.display_name.clone()), + kind: "remote_worker_runtime".to_string(), + status: embedded_runtime_status_label(response.runtime.status).to_string(), + source: RuntimeSourceSummary::remote_http(), + host_ids: if limit == 0 { + Vec::new() + } else { + vec![self.host_id.clone()] + }, + capabilities: remote_runtime_capabilities(limit, true), + diagnostics: vec![diagnostic( + "remote_runtime_backend_proxy", + DiagnosticSeverity::Info, + "Remote Runtime is accessed only by backend-owned REST/WS clients".to_string(), + )], + }, + Err(diagnostic) => RuntimeSummary { + runtime_id: self.runtime_id.clone(), + label: self.display_name.clone(), + kind: "remote_worker_runtime".to_string(), + status: self.cached_status.clone(), + source: RuntimeSourceSummary::remote_http(), + host_ids: if limit == 0 { + Vec::new() + } else { + vec![self.host_id.clone()] + }, + capabilities: self.cached_capabilities.clone(), + diagnostics: vec![diagnostic], + }, + } + } + + fn list_hosts(&self, limit: usize) -> RuntimeList { + if limit == 0 { + return RuntimeList::new(Vec::new(), Vec::new()); + } + RuntimeList::new( + vec![HostSummary { + runtime_id: self.runtime_id.clone(), + host_id: self.host_id.clone(), + label: self.display_name.clone(), + kind: REMOTE_HOST_KIND.to_string(), + status: "configured".to_string(), + observed_at: Utc::now().to_rfc3339(), + last_seen_at: None, + capabilities: remote_runtime_capabilities(limit, true), + diagnostics: vec![diagnostic( + "remote_runtime_backend_proxy", + DiagnosticSeverity::Info, + "Remote host endpoint and credentials are backend-private".to_string(), + )], + }], + Vec::new(), + ) + } + + fn list_workers(&self, limit: usize) -> RuntimeList { + if limit == 0 { + return RuntimeList::new(Vec::new(), Vec::new()); + } + match self.get_json::("/v1/workers") { + Ok(response) => RuntimeList::new( + response + .workers + .into_iter() + .take(limit) + .map(|worker| self.map_worker_summary(worker)) + .collect(), + Vec::new(), + ), + Err(diagnostic) => RuntimeList::new(Vec::new(), vec![diagnostic]), + } + } + + fn worker(&self, worker_id: &str) -> WorkerLookupResult { + match self.get_json::(&format!("/v1/workers/{worker_id}")) { + Ok(response) => WorkerLookupResult { + worker: Some(self.map_worker_detail(response.worker)), + diagnostics: Vec::new(), + }, + Err(diagnostic) if diagnostic.code == "remote_worker_not_found" => WorkerLookupResult { + worker: None, + diagnostics: Vec::new(), + }, + Err(diagnostic) => WorkerLookupResult { + worker: None, + diagnostics: vec![diagnostic], + }, + } + } + + fn spawn_worker(&self, request: WorkerSpawnRequest) -> WorkerSpawnResult { + if matches!( + request.acceptance, + WorkerSpawnAcceptanceRequirement::SocketReady + ) { + return WorkerSpawnResult { + state: WorkerOperationState::Rejected, + worker: None, + acceptance_evidence: Vec::new(), + diagnostics: vec![diagnostic( + "remote_runtime_no_socket_ready_acceptance", + DiagnosticSeverity::Warning, + "Remote Runtime v0 exposes backend-proxied REST/WS control, not direct socket readiness".to_string(), + )], + }; + } + let create = CreateWorkerRequest::tools_less( + embedded_create_intent(&request.intent), + embedded_profile_selector(&request.intent), + ); + match self.post_json::<_, RuntimeHttpWorkerResponse>("/v1/workers", &create) { + Ok(response) => WorkerSpawnResult { + state: WorkerOperationState::Accepted, + worker: Some(self.map_worker_detail(response.worker)), + acceptance_evidence: vec![WorkerSpawnAcceptanceEvidence { + kind: "remote_runtime_worker_created".to_string(), + detail: "worker-runtime REST create endpoint accepted the Worker".to_string(), + }], + diagnostics: vec![diagnostic( + "remote_runtime_backend_proxy", + DiagnosticSeverity::Info, + "Remote create used a backend-owned REST client; browser-facing payload exposes only runtime_id plus worker_id".to_string(), + )], + }, + Err(diagnostic) => WorkerSpawnResult { + state: WorkerOperationState::Rejected, + worker: None, + acceptance_evidence: Vec::new(), + diagnostics: vec![diagnostic], + }, + } + } + + fn stop_worker( + &self, + worker_id: &str, + request: WorkerLifecycleRequest, + ) -> WorkerLifecycleResult { + let body = RuntimeHttpWorkerLifecycleRequest { + reason: request.reason, + }; + match self.post_json::<_, RuntimeHttpWorkerLifecycleResponse>( + &format!("/v1/workers/{worker_id}/stop"), + &body, + ) { + Ok(response) => self.lifecycle_result_from_response(worker_id, response), + Err(diagnostic) => remote_lifecycle_rejected(&self.runtime_id, worker_id, diagnostic), + } + } + + fn cancel_worker( + &self, + worker_id: &str, + request: WorkerLifecycleRequest, + ) -> WorkerLifecycleResult { + let body = RuntimeHttpWorkerLifecycleRequest { + reason: request.reason, + }; + match self.post_json::<_, RuntimeHttpWorkerLifecycleResponse>( + &format!("/v1/workers/{worker_id}/cancel"), + &body, + ) { + Ok(response) => self.lifecycle_result_from_response(worker_id, response), + Err(diagnostic) => remote_lifecycle_rejected(&self.runtime_id, worker_id, diagnostic), + } + } + + fn observation_source( + &self, + worker_id: &str, + ) -> Option { + Some(crate::observation::RuntimeObservationSourceConfig { + runtime_id: self.runtime_id.clone(), + worker_id: worker_id.to_string(), + endpoint: self.ws_endpoint(worker_id), + bearer_token: self.bearer_token.clone(), + }) + } + + fn send_input(&self, worker_id: &str, request: WorkerInputRequest) -> WorkerInputResult { + let input = EmbeddedWorkerInput { + kind: match request.kind { + WorkerInputKind::User => EmbeddedWorkerInputKind::User, + WorkerInputKind::System => EmbeddedWorkerInputKind::System, + }, + content: request.content, + }; + match self.post_json::<_, RuntimeHttpWorkerInputResponse>( + &format!("/v1/workers/{worker_id}/input"), + &input, + ) { + Ok(response) => WorkerInputResult { + state: WorkerOperationState::Accepted, + runtime_id: self.runtime_id.clone(), + worker_id: worker_id.to_string(), + transcript_sequence: Some(response.ack.transcript_sequence), + event_id: Some(response.ack.event_id), + diagnostics: Vec::new(), + }, + Err(diagnostic) => remote_input_rejected(&self.runtime_id, worker_id, diagnostic), + } + } + + fn transcript( + &self, + worker_id: &str, + start: usize, + limit: usize, + ) -> WorkerTranscriptProjection { + match self.get_json::(&format!( + "/v1/workers/{worker_id}/transcript?start={start}&limit={limit}" + )) { + Ok(response) => { + embedded_transcript_projection(&self.runtime_id, worker_id, response.transcript) + } + Err(diagnostic) => { + embedded_transcript_rejected(&self.runtime_id, worker_id, start, limit, diagnostic) + } + } + } +} + #[derive(Clone)] pub struct LocalWorkerRuntime { runtime_id: String, @@ -1460,6 +2091,35 @@ fn embedded_input_rejected( } } +fn remote_input_rejected( + runtime_id: &str, + worker_id: &str, + diagnostic: RuntimeDiagnostic, +) -> WorkerInputResult { + WorkerInputResult { + state: WorkerOperationState::Rejected, + runtime_id: runtime_id.to_string(), + worker_id: worker_id.to_string(), + transcript_sequence: None, + event_id: None, + diagnostics: vec![diagnostic], + } +} + +fn remote_lifecycle_rejected( + runtime_id: &str, + worker_id: &str, + diagnostic: RuntimeDiagnostic, +) -> WorkerLifecycleResult { + WorkerLifecycleResult { + state: WorkerOperationState::Rejected, + runtime_id: runtime_id.to_string(), + worker_id: worker_id.to_string(), + event_id: None, + diagnostics: vec![diagnostic], + } +} + fn embedded_transcript_projection( runtime_id: &str, worker_id: &str, @@ -1561,6 +2221,87 @@ fn host_id_for_embedded_workspace(workspace_id: &str) -> String { bounded_backend_identifier("embedded-", workspace_id) } +fn host_id_for_remote_runtime(runtime_id: &str) -> String { + bounded_backend_identifier("remote-", runtime_id) +} + +fn remote_runtime_capabilities(limit: usize, available: bool) -> RuntimeCapabilitySummary { + RuntimeCapabilitySummary { + can_list_hosts: true, + can_list_workers: available, + can_get_worker: available, + can_spawn_worker: available, + can_stop_worker: available, + can_accept_input: available, + can_stream_events: available, + can_read_bounded_transcript: available, + has_workspace_fs: false, + has_shell: false, + has_git: false, + supports_worktrees: false, + supports_backend_internal_tools: false, + local_pod_inspection: "unavailable".to_string(), + workspace_scope: "remote_runtime_backend_private".to_string(), + max_workers: limit, + os: "remote".to_string(), + arch: "remote".to_string(), + } +} + +fn remote_reqwest_diagnostic(runtime_id: &str, err: reqwest::Error) -> RuntimeDiagnostic { + if err.is_timeout() { + diagnostic( + "remote_runtime_timeout", + DiagnosticSeverity::Error, + format!("Timed out while contacting remote Runtime '{runtime_id}'"), + ) + } else if err.is_connect() || err.is_request() { + diagnostic( + "remote_runtime_network_error", + DiagnosticSeverity::Error, + format!("Failed to contact remote Runtime '{runtime_id}'"), + ) + } else { + diagnostic( + "remote_runtime_client_error", + DiagnosticSeverity::Error, + format!("Remote Runtime client error for '{runtime_id}'"), + ) + } +} + +fn remote_http_status_diagnostic( + runtime_id: &str, + status: StatusCode, + response: reqwest::blocking::Response, +) -> RuntimeDiagnostic { + let error = response.json::().ok(); + let remote_code = error + .as_ref() + .map(|error| error.error.code.as_str()) + .unwrap_or("remote_http_error"); + let remote_message = error + .as_ref() + .map(|error| error.error.message.clone()) + .unwrap_or_else(|| format!("remote Runtime returned HTTP {status}")); + let (code, severity) = match status { + StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => { + ("remote_runtime_auth_failed", DiagnosticSeverity::Error) + } + StatusCode::NOT_FOUND => ("remote_worker_not_found", DiagnosticSeverity::Warning), + StatusCode::METHOD_NOT_ALLOWED | StatusCode::NOT_IMPLEMENTED => { + ("remote_runtime_unsupported", DiagnosticSeverity::Warning) + } + _ if status.is_server_error() => ("remote_runtime_http_error", DiagnosticSeverity::Error), + _ => ("remote_runtime_http_rejected", DiagnosticSeverity::Warning), + }; + diagnostic( + code, + severity, + format!("Remote Runtime '{runtime_id}' rejected request ({remote_code}): {remote_message}"), + ) +} + fn local_runtime_capabilities( limit: usize, inspection_available: bool, @@ -1604,6 +2345,25 @@ fn diagnostic( } } +fn operation_failed_or_unknown_worker( + runtime_id: &str, + worker_id: &str, + diagnostics: Vec, +) -> RuntimeRegistryError { + diagnostics + .into_iter() + .find(|diagnostic| matches!(diagnostic.severity, DiagnosticSeverity::Error)) + .map(|diagnostic| RuntimeRegistryError::RuntimeOperationFailed { + runtime_id: runtime_id.to_string(), + code: diagnostic.code, + message: diagnostic.message, + }) + .unwrap_or_else(|| RuntimeRegistryError::UnknownWorker { + runtime_id: runtime_id.to_string(), + worker_id: worker_id.to_string(), + }) +} + fn host_id_for_workspace(workspace_id: &str) -> String { bounded_backend_identifier("local-", workspace_id) } @@ -1806,7 +2566,10 @@ mod tests { use super::*; use serde_json::json; use std::fs; + use std::io::{Read as _, Write as _}; + use std::net::TcpListener; use std::sync::Arc; + use std::thread; use tempfile::TempDir; fn write_metadata(dir: &Path, worker_name: &str, metadata: &WorkerMetadata) { @@ -2275,6 +3038,220 @@ mod tests { ); } + #[test] + fn remote_runtime_registry_routes_commands_without_browser_secret_leaks() { + let worker_json = worker_json("remote:primary", "worker-remote-1"); + let (base_url, server) = serve_mock_http(vec![ + mock_response( + "GET", + "/v1/workers", + true, + 200, + json!({ "workers": [worker_json.clone()] }).to_string(), + ), + mock_response( + "GET", + "/v1/workers/worker-remote-1", + true, + 200, + json!({ "worker": worker_json.clone() }).to_string(), + ), + mock_response( + "POST", + "/v1/workers/worker-remote-1/input", + true, + 200, + json!({ + "ack": { + "worker_ref": { "runtime_id": "remote:primary", "worker_id": "worker-remote-1" }, + "status": "running", + "transcript_sequence": 7, + "event_id": 8 + } + }) + .to_string(), + ), + ]); + let secret = "secret-token-do-not-leak".to_string(); + let mut registry = RuntimeRegistry::new(Vec::new()); + registry.register( + RemoteWorkerRuntime::new(RemoteRuntimeConfig::new( + "remote:primary", + "Remote Primary", + base_url.clone(), + Some(secret.clone()), + )) + .unwrap(), + ); + + let observation = registry + .observation_source("remote:primary", "worker-remote-1") + .expect("remote runtime exposes backend-owned WS observation source"); + assert!(observation.endpoint.starts_with("ws://127.0.0.1:")); + assert!( + observation + .endpoint + .ends_with("/v1/workers/worker-remote-1/events/ws") + ); + assert_eq!(observation.bearer_token.as_deref(), Some(secret.as_str())); + + let workers = registry.list_workers(10); + assert_eq!(workers.items.len(), 1); + assert_eq!(workers.items[0].runtime_id, "remote:primary"); + assert_eq!(workers.items[0].worker_id, "worker-remote-1"); + assert_eq!( + workers.items[0].implementation.kind, + "remote_worker_runtime" + ); + assert_eq!( + workers.items[0].workspace.identity, + "runtime_registry_worker" + ); + + let input = registry + .send_input( + "remote:primary", + "worker-remote-1", + WorkerInputRequest { + kind: WorkerInputKind::User, + content: "hello remote".to_string(), + }, + ) + .unwrap(); + assert_eq!(input.state, WorkerOperationState::Accepted); + assert_eq!(input.transcript_sequence, Some(7)); + assert_eq!(input.event_id, Some(8)); + + server.join().expect("mock remote server finished"); + let browser_payload = serde_json::to_string(&(workers, input)).unwrap(); + assert!( + !browser_payload.contains(&base_url), + "leaked base URL: {browser_payload}" + ); + assert!( + !browser_payload.contains(&secret), + "leaked token: {browser_payload}" + ); + assert!(browser_payload.contains("runtime_id")); + assert!(browser_payload.contains("worker_id")); + } + + #[test] + fn remote_runtime_auth_errors_map_to_typed_backend_error() { + let (base_url, server) = serve_mock_http(vec![mock_response( + "GET", + "/v1/workers/worker-missing", + true, + 401, + json!({ "error": { "code": "unauthorized", "message": "bad token" } }).to_string(), + )]); + let mut registry = RuntimeRegistry::new(Vec::new()); + registry.register( + RemoteWorkerRuntime::new(RemoteRuntimeConfig::new( + "remote:primary", + "Remote Primary", + base_url, + Some("secret-token".to_string()), + )) + .unwrap(), + ); + + let error = registry + .worker("remote:primary", "worker-missing") + .expect_err("auth failure is a backend operation error"); + assert!(matches!( + error, + RuntimeRegistryError::RuntimeOperationFailed { runtime_id, code, .. } + if runtime_id == "remote:primary" && code == "remote_runtime_auth_failed" + )); + server.join().expect("mock remote server finished"); + } + + #[derive(Clone)] + struct MockResponse { + method: &'static str, + path: &'static str, + require_auth: bool, + status: u16, + body: String, + } + + fn mock_response( + method: &'static str, + path: &'static str, + require_auth: bool, + status: u16, + body: String, + ) -> MockResponse { + MockResponse { + method, + path, + require_auth, + status, + body, + } + } + + fn serve_mock_http(responses: Vec) -> (String, thread::JoinHandle<()>) { + let listener = TcpListener::bind("127.0.0.1:0").unwrap(); + let base_url = format!("http://{}", listener.local_addr().unwrap()); + let handle = thread::spawn(move || { + for expected in responses { + let (mut stream, _) = listener.accept().unwrap(); + let mut buffer = [0_u8; 8192]; + let read = stream.read(&mut buffer).unwrap(); + let request = String::from_utf8_lossy(&buffer[..read]); + let first_line = request.lines().next().unwrap_or_default(); + let expected_line = format!("{} {} ", expected.method, expected.path); + assert!( + first_line.starts_with(&expected_line), + "unexpected request line: {first_line}, expected prefix {expected_line}" + ); + if expected.require_auth { + assert!( + request + .to_ascii_lowercase() + .contains("authorization: bearer secret-token"), + "authorization header missing from request: {request}" + ); + } + let status_text = match expected.status { + 200 => "OK", + 401 => "Unauthorized", + 404 => "Not Found", + _ => "Mock", + }; + let response = format!( + "HTTP/1.1 {} {status_text}\r\ncontent-type: application/json\r\ncontent-length: {}\r\nconnection: close\r\n\r\n{}", + expected.status, + expected.body.len(), + expected.body + ); + stream.write_all(response.as_bytes()).unwrap(); + } + }); + (base_url, handle) + } + + fn worker_json(runtime_id: &str, worker_id: &str) -> serde_json::Value { + json!({ + "worker_ref": { "runtime_id": runtime_id, "worker_id": worker_id }, + "runtime_id": runtime_id, + "worker_id": worker_id, + "status": "running", + "intent": { "kind": "role", "role": "coder", "purpose": "remote test" }, + "profile": { "kind": "builtin", "value": "coder" }, + "config_bundle": null, + "requested_capabilities": [], + "workspace_refs": [], + "mount_refs": [], + "requested_capability_count": 0, + "has_config_bundle": false, + "transcript_len": 0, + "last_event_id": 0 + }) + } + #[test] fn generated_worker_ids_are_opaque_bounded_unique_and_resolvable() { let temp = TempDir::new().unwrap(); diff --git a/crates/workspace-server/src/lib.rs b/crates/workspace-server/src/lib.rs index 5928c3b0..ef7305e9 100644 --- a/crates/workspace-server/src/lib.rs +++ b/crates/workspace-server/src/lib.rs @@ -50,6 +50,12 @@ pub enum Error { }, #[error("invalid runtime {kind} `{value}`")] InvalidRuntimeIdentifier { kind: String, value: String }, + #[error("runtime `{runtime_id}` operation failed ({code}): {message}")] + RuntimeOperationFailed { + runtime_id: String, + code: String, + message: String, + }, #[error("runtime `{runtime_id}` does not support `{capability}`")] RuntimeCapabilityUnsupported { runtime_id: String, diff --git a/crates/workspace-server/src/observation.rs b/crates/workspace-server/src/observation.rs index a750877f..fce73f30 100644 --- a/crates/workspace-server/src/observation.rs +++ b/crates/workspace-server/src/observation.rs @@ -10,7 +10,7 @@ use tokio_tungstenite::tungstenite::{Error as TungsteniteError, Message as Tungs use worker_runtime::http_server::{RuntimeWorkerEventWsEnvelope, RuntimeWorkerEventWsFrame}; /// Backend-private source for a runtime worker observation stream. -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Clone, PartialEq, Eq)] pub struct RuntimeObservationSourceConfig { pub runtime_id: String, pub worker_id: String, @@ -18,6 +18,20 @@ pub struct RuntimeObservationSourceConfig { pub bearer_token: Option, } +impl std::fmt::Debug for RuntimeObservationSourceConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RuntimeObservationSourceConfig") + .field("runtime_id", &self.runtime_id) + .field("worker_id", &self.worker_id) + .field("endpoint", &"") + .field( + "bearer_token", + &self.bearer_token.as_ref().map(|_| ""), + ) + .finish() + } +} + /// Event consumed from a Runtime-owned worker observation WebSocket. #[derive(Clone, Debug, Serialize, Deserialize)] pub struct RuntimeObservationUpstreamEvent { @@ -181,12 +195,21 @@ pub struct BackendObservationOpen { } /// Backend-owned in-memory v0 observation proxy state. -#[derive(Clone, Debug)] +#[derive(Clone)] pub struct BackendObservationProxy { sources: Arc>, state: Arc>, } +impl std::fmt::Debug for BackendObservationProxy { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("BackendObservationProxy") + .field("source_count", &self.sources.len()) + .field("state", &"") + .finish() + } +} + impl BackendObservationProxy { pub fn new(sources: Vec) -> Self { let sources = sources @@ -475,3 +498,56 @@ impl RuntimeWsObservationClient { } } } + +#[cfg(test)] +mod tests { + use super::*; + + fn sensitive_source() -> RuntimeObservationSourceConfig { + RuntimeObservationSourceConfig { + runtime_id: "remote-runtime".to_string(), + worker_id: "worker-1".to_string(), + endpoint: "wss://remote.example.invalid/private/workers/worker-1/events/ws".to_string(), + bearer_token: Some("top-secret-bearer-token".to_string()), + } + } + + #[test] + fn runtime_observation_source_debug_redacts_endpoint_and_token() { + let debug = format!("{:?}", sensitive_source()); + + assert!(debug.contains("remote-runtime")); + assert!(debug.contains("worker-1")); + assert!(debug.contains("")); + assert!(debug.contains("")); + for forbidden in [ + "remote.example.invalid", + "/private/workers/worker-1/events/ws", + "top-secret-bearer-token", + ] { + assert!( + !debug.contains(forbidden), + "debug leaked {forbidden}: {debug}" + ); + } + } + + #[test] + fn backend_observation_proxy_debug_redacts_contained_sources() { + let proxy = BackendObservationProxy::new(vec![sensitive_source()]); + let debug = format!("{proxy:?}"); + + assert!(debug.contains("BackendObservationProxy")); + assert!(debug.contains("source_count")); + for forbidden in [ + "remote.example.invalid", + "/private/workers/worker-1/events/ws", + "top-secret-bearer-token", + ] { + assert!( + !debug.contains(forbidden), + "debug leaked {forbidden}: {debug}" + ); + } + } +} diff --git a/crates/workspace-server/src/server.rs b/crates/workspace-server/src/server.rs index 95c567db..e3a98dc0 100644 --- a/crates/workspace-server/src/server.rs +++ b/crates/workspace-server/src/server.rs @@ -13,9 +13,10 @@ use serde::{Deserialize, Serialize}; use tokio::net::TcpListener; use crate::hosts::{ - DiagnosticSeverity, EmbeddedWorkerRuntime, HostSummary, LocalWorkerRuntime, RuntimeDiagnostic, - RuntimeRegistry, RuntimeSummary, WorkerInputRequest, WorkerInputResult, WorkerSpawnRequest, - WorkerSpawnResult, WorkerSummary, WorkerTranscriptProjection, + DiagnosticSeverity, EmbeddedWorkerRuntime, HostSummary, LocalWorkerRuntime, + RemoteRuntimeConfig, RemoteWorkerRuntime, RuntimeDiagnostic, RuntimeRegistry, RuntimeSummary, + WorkerInputRequest, WorkerInputResult, WorkerLifecycleRequest, WorkerLifecycleResult, + WorkerSpawnRequest, WorkerSpawnResult, WorkerSummary, WorkerTranscriptProjection, }; use crate::identity::WorkspaceIdentity; use crate::observation::{ @@ -47,6 +48,7 @@ pub struct ServerConfig { pub max_records: usize, pub local_runtime_data_dir: Option, pub runtime_event_sources: Vec, + pub remote_runtime_sources: Vec, } impl ServerConfig { @@ -64,6 +66,7 @@ impl ServerConfig { max_records: 200, local_runtime_data_dir: manifest::paths::data_dir(), runtime_event_sources: Vec::new(), + remote_runtime_sources: Vec::new(), } } } @@ -88,14 +91,19 @@ impl WorkspaceApi { updated_at: config.workspace_created_at.clone(), }) .await?; - let runtime = Arc::new(RuntimeRegistry::for_workspace( + let mut runtime = RuntimeRegistry::for_workspace( LocalWorkerRuntime::new( config.workspace_id.clone(), config.workspace_root.clone(), config.local_runtime_data_dir.clone(), ), EmbeddedWorkerRuntime::new_memory(config.workspace_id.clone()), - )); + ); + for remote_config in config.remote_runtime_sources.iter().cloned() { + runtime + .register(RemoteWorkerRuntime::new(remote_config).map_err(|err| err.into_error())?); + } + let runtime = Arc::new(runtime); let observation_proxy = BackendObservationProxy::new(config.runtime_event_sources.clone()); Ok(Self { records: LocalProjectRecordReader::new(config.workspace_root.clone()), @@ -155,6 +163,14 @@ pub fn build_router(api: WorkspaceApi) -> Router { "/api/runtimes/{runtime_id}/workers/{worker_id}/input", post(send_runtime_worker_input), ) + .route( + "/api/runtimes/{runtime_id}/workers/{worker_id}/stop", + post(stop_runtime_worker), + ) + .route( + "/api/runtimes/{runtime_id}/workers/{worker_id}/cancel", + post(cancel_runtime_worker), + ) .route( "/api/runtimes/{runtime_id}/workers/{worker_id}/transcript", get(get_runtime_worker_transcript), @@ -499,6 +515,30 @@ async fn send_runtime_worker_input( Ok(Json(result)) } +async fn stop_runtime_worker( + State(api): State, + AxumPath((runtime_id, worker_id)): AxumPath<(String, String)>, + Json(request): Json, +) -> ApiResult> { + let result = api + .runtime + .stop_worker(&runtime_id, &worker_id, request) + .map_err(|err| err.into_error())?; + Ok(Json(result)) +} + +async fn cancel_runtime_worker( + State(api): State, + AxumPath((runtime_id, worker_id)): AxumPath<(String, String)>, + Json(request): Json, +) -> ApiResult> { + let result = api + .runtime + .cancel_worker(&runtime_id, &worker_id, request) + .map_err(|err| err.into_error())?; + Ok(Json(result)) +} + async fn get_runtime_worker_transcript( State(api): State, AxumPath((runtime_id, worker_id)): AxumPath<(String, String)>, @@ -523,11 +563,16 @@ async fn worker_observation_ws( Ok(source) => ws.on_upgrade(move |socket| { worker_observation_ws_session(api.observation_proxy, source, query, socket) }), + Err(ObservationProxyError::WorkerNotFound(_)) => { + match api.runtime.observation_source(&runtime_id, &worker_id) { + Ok(source) => ws.on_upgrade(move |socket| { + worker_observation_ws_session(api.observation_proxy, source, query, socket) + }), + Err(error) => ApiError(error.into_error()).into_response(), + } + } Err(error) => { - let status = match error { - ObservationProxyError::WorkerNotFound(_) => StatusCode::NOT_FOUND, - _ => StatusCode::BAD_REQUEST, - }; + let status = StatusCode::BAD_REQUEST; ( status, Json(serde_json::json!({ @@ -844,6 +889,16 @@ impl IntoResponse for ApiError { | Error::UnknownRepository(_) => StatusCode::NOT_FOUND, Error::Ticket(_) => StatusCode::NOT_FOUND, Error::RuntimeCapabilityUnsupported { .. } => StatusCode::NOT_IMPLEMENTED, + Error::RuntimeOperationFailed { code, .. } if code == "remote_runtime_auth_failed" => { + StatusCode::UNAUTHORIZED + } + Error::RuntimeOperationFailed { code, .. } if code == "remote_runtime_timeout" => { + StatusCode::GATEWAY_TIMEOUT + } + Error::RuntimeOperationFailed { code, .. } if code == "remote_runtime_unsupported" => { + StatusCode::NOT_IMPLEMENTED + } + Error::RuntimeOperationFailed { .. } => StatusCode::BAD_GATEWAY, _ => StatusCode::INTERNAL_SERVER_ERROR, }; ( diff --git a/package.nix b/package.nix index 662a3d63..5d2cb9f1 100644 --- a/package.nix +++ b/package.nix @@ -43,7 +43,7 @@ rustPlatform.buildRustPackage rec { filter = sourceFilter; }; - cargoHash = "sha256-5vmZTzO5PSRPHvQfiK0rNiBkHNyc0y3BCeDJNFJaAqA="; + cargoHash = "sha256-kZ9TAb1lNpslAhzcyC2RyIZg5Yh5hrAGCTZIhhYl/e4="; depsExtraArgs = { # Older fetchCargoVendor utilities used crates.io's API download endpoint,