diff --git a/Cargo.lock b/Cargo.lock index 1909b286..775ed1c0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6058,6 +6058,7 @@ dependencies = [ "serde", "serde_json", "serde_yaml", + "sha2 0.11.0", "tempfile", "thiserror 2.0.18", "ticket", diff --git a/crates/workspace-server/Cargo.toml b/crates/workspace-server/Cargo.toml index cd147fe7..cab80896 100644 --- a/crates/workspace-server/Cargo.toml +++ b/crates/workspace-server/Cargo.toml @@ -16,6 +16,7 @@ rusqlite.workspace = true serde = { workspace = true, features = ["derive"] } serde_json.workspace = true serde_yaml.workspace = true +sha2.workspace = true thiserror.workspace = true ticket.workspace = true tokio = { workspace = true, features = ["fs", "macros", "net", "rt-multi-thread", "sync"] } diff --git a/crates/workspace-server/src/hosts.rs b/crates/workspace-server/src/hosts.rs index ac7027e7..aba40ab9 100644 --- a/crates/workspace-server/src/hosts.rs +++ b/crates/workspace-server/src/hosts.rs @@ -1,74 +1,147 @@ -use std::fs; -use std::path::{Path, PathBuf}; -use std::time::{SystemTime, UNIX_EPOCH}; - -use pod_store::{PodMetadata, validate_pod_name}; +use crate::Error; +use chrono::Utc; +use pod_store::PodMetadata; use serde::{Deserialize, Serialize}; +use serde_json::Value; +use sha2::{Digest, Sha256}; +use std::{ + collections::BTreeSet, + fs, + path::{Path, PathBuf}, + sync::Arc, +}; -const MAX_DIAGNOSTICS: usize = 20; -const MAX_LABEL_LEN: usize = 120; -const MAX_PATH_LEN: usize = 512; +const LOCAL_RUNTIME_ID: &str = "local-pod-runtime"; +const LOCAL_HOST_KIND: &str = "local-pod-host"; +const MAX_DIAGNOSTICS: usize = 16; +const MAX_HOST_SCAN: usize = 256; +const MAX_IDENTIFIER_LEN: usize = 120; +const ID_DIGEST_HEX_LEN: usize = 16; #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct RuntimeDiagnostic { pub code: String, - pub severity: String, + pub severity: DiagnosticSeverity, pub message: String, } +impl RuntimeDiagnostic { + pub fn new(code: impl Into, severity: &str, message: impl Into) -> Self { + let severity = match severity { + "error" => DiagnosticSeverity::Error, + "warning" => DiagnosticSeverity::Warning, + _ => DiagnosticSeverity::Info, + }; + diagnostic(code, severity, message) + } +} + +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum DiagnosticSeverity { + Info, + Warning, + Error, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct RuntimeCapabilitySummary { + pub can_list_hosts: bool, + pub can_list_workers: bool, + pub can_get_worker: bool, + pub can_spawn_worker: bool, + pub can_stop_worker: bool, + pub can_accept_input: bool, + pub can_stream_events: bool, + pub can_read_bounded_transcript: bool, + pub has_workspace_fs: bool, + pub has_shell: bool, + pub has_git: bool, + pub supports_worktrees: bool, + pub supports_backend_internal_tools: bool, + pub local_pod_inspection: String, + pub workspace_scope: String, + pub max_workers: usize, + pub os: String, + pub arch: String, +} + +pub type HostCapabilitySummary = RuntimeCapabilitySummary; + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct RuntimeSummary { + pub runtime_id: String, + pub label: String, + pub kind: String, + pub status: String, + pub host_ids: Vec, + pub capabilities: RuntimeCapabilitySummary, + pub diagnostics: Vec, +} + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct HostSummary { + pub runtime_id: String, pub host_id: String, pub label: String, pub kind: String, pub status: String, pub observed_at: String, - pub last_seen_at: String, + pub last_seen_at: Option, pub capabilities: HostCapabilitySummary, pub diagnostics: Vec, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub struct HostCapabilitySummary { - pub local_pod_inspection: String, - pub workspace_root: String, - pub os: String, - pub arch: String, - pub max_workers: usize, +pub struct WorkerWorkspaceSummary { + pub visibility: String, + pub identity: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct WorkerImplementationSummary { + pub kind: String, + pub display_hint: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct WorkerCapabilitySummary { + pub can_accept_input: bool, + pub can_stream_events: bool, + pub can_stop: bool, + pub can_spawn_followup: bool, + pub can_read_bounded_transcript: bool, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct WorkerSummary { + pub runtime_id: String, pub worker_id: String, pub host_id: String, pub label: String, - pub pod_name: String, - #[serde(skip_serializing_if = "Option::is_none")] pub role: Option, - #[serde(skip_serializing_if = "Option::is_none")] pub profile: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub workspace_root: Option, + pub workspace: WorkerWorkspaceSummary, pub state: String, pub status: String, - #[serde(skip_serializing_if = "Option::is_none")] pub last_seen_at: Option, - pub implementation: WorkerImplementation, + pub implementation: WorkerImplementationSummary, + pub capabilities: WorkerCapabilitySummary, pub diagnostics: Vec, } -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub struct WorkerImplementation { - pub kind: String, - pub pod_name: String, -} - #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct RuntimeList { pub items: Vec, pub diagnostics: Vec, } +impl RuntimeList { + fn new(items: Vec, diagnostics: Vec) -> Self { + Self { items, diagnostics } + } +} + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct WorkerLookupResult { #[serde(skip_serializing_if = "Option::is_none")] @@ -80,8 +153,8 @@ pub struct WorkerLookupResult { /// /// The request intentionally carries only workspace policy intents and stable /// worker identifiers. Raw workspace roots, child cwd, executable path, and raw -/// profile selectors are resolved by the host/runtime service and never accepted -/// from Workspace API callers. +/// profile selectors are resolved by the runtime service and never accepted from +/// Workspace API callers. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct WorkerSpawnRequest { pub intent: WorkerSpawnIntent, @@ -166,257 +239,61 @@ pub struct WorkerProxyConnectPoint { pub diagnostics: Vec, } -pub trait WorkspaceWorkerRuntime: Send + Sync { - fn list_hosts(&self, limit: usize) -> RuntimeList; - fn list_workers(&self, limit: usize) -> RuntimeList; - fn worker(&self, worker_id: &str) -> WorkerLookupResult; - fn spawn_worker(&self, request: WorkerSpawnRequest) -> WorkerSpawnResult; - fn stop_worker(&self, request: WorkerStopRequest) -> WorkerStopResult; - fn proxy_connect_points(&self, worker_id: &str) -> Vec; -} - #[derive(Debug, Clone, PartialEq, Eq)] -pub struct LocalRuntimeBridge { - workspace_id: String, - workspace_root: PathBuf, - data_dir: Option, +pub enum RuntimeRegistryError { + InvalidIdentifier { kind: &'static str, value: String }, + UnknownHost(String), + UnknownWorker(String), } -impl LocalRuntimeBridge { - pub fn new( - workspace_id: impl Into, - workspace_root: impl Into, - data_dir: Option, - ) -> Self { - Self { - workspace_id: workspace_id.into(), - workspace_root: workspace_root.into(), - data_dir, - } - } - - pub fn host_id(&self) -> String { - stable_local_host_id(&self.workspace_id) - } - - pub fn list_hosts(&self, limit: usize) -> (Vec, Vec) { - if limit == 0 { - return (Vec::new(), Vec::new()); - } - - let observed_at = unix_timestamp(SystemTime::now()); - let mut diagnostics = pod_root_diagnostics(self.pod_root().as_deref()); - let local_pod_inspection = if diagnostics.is_empty() { - "available" - } else { - "unavailable" - } - .to_string(); - let status = if local_pod_inspection == "available" { - "available" - } else { - "degraded" - } - .to_string(); - truncate_diagnostics(&mut diagnostics); - - let host = HostSummary { - host_id: self.host_id(), - label: format!( - "Local host ({})", - self.workspace_root - .file_name() - .and_then(|name| name.to_str()) - .expect("workspace root must have a final path component") - ), - kind: "local_host".to_string(), - status, - observed_at: observed_at.clone(), - last_seen_at: observed_at, - capabilities: HostCapabilitySummary { - local_pod_inspection, - workspace_root: bounded_path(&self.workspace_root), - os: std::env::consts::OS.to_string(), - arch: std::env::consts::ARCH.to_string(), - max_workers: limit.min(200), +impl RuntimeRegistryError { + pub fn into_error(self) -> Error { + match self { + Self::InvalidIdentifier { kind, value } => Error::InvalidRuntimeIdentifier { + kind: kind.to_string(), + value, }, - diagnostics: diagnostics.clone(), - }; - - (vec![host], diagnostics) - } - - pub fn list_workers(&self, limit: usize) -> (Vec, Vec) { - let limit = limit.min(200); - let Some(pod_root) = self.pod_root() else { - return ( - Vec::new(), - vec![RuntimeDiagnostic::new( - "local_yoi_data_dir_unavailable", - "warning", - "local Yoi data directory is not configured; local Pod workers cannot be inspected", - )], - ); - }; - - let mut diagnostics = Vec::new(); - if !pod_root.exists() { - diagnostics.push(RuntimeDiagnostic::new( - "local_pod_metadata_root_missing", - "info", - "local Pod metadata directory is absent; no local workers were discovered", - )); - return (Vec::new(), diagnostics); + Self::UnknownHost(host_id) => Error::UnknownHost(host_id), + Self::UnknownWorker(worker_id) => Error::UnknownWorker(worker_id), } - - let entries = match fs::read_dir(&pod_root) { - Ok(entries) => entries, - Err(error) => { - diagnostics.push(RuntimeDiagnostic::new( - "local_pod_metadata_root_unreadable", - "warning", - format!("local Pod metadata directory cannot be read: {error}"), - )); - return (Vec::new(), diagnostics); - } - }; - - let mut workers = Vec::new(); - let mut candidate_names = Vec::new(); - for entry in entries { - let entry = match entry { - Ok(entry) => entry, - Err(error) => { - push_diagnostic( - &mut diagnostics, - RuntimeDiagnostic::new( - "local_pod_metadata_entry_unreadable", - "warning", - format!("one local Pod metadata entry cannot be read: {error}"), - ), - ); - continue; - } - }; - let file_type = match entry.file_type() { - Ok(file_type) => file_type, - Err(error) => { - push_diagnostic( - &mut diagnostics, - RuntimeDiagnostic::new( - "local_pod_metadata_entry_type_unreadable", - "warning", - format!("one local Pod metadata entry type cannot be read: {error}"), - ), - ); - continue; - } - }; - if !file_type.is_dir() { - continue; - } - let Some(name) = entry.file_name().to_str().map(ToOwned::to_owned) else { - push_diagnostic( - &mut diagnostics, - RuntimeDiagnostic::new( - "local_pod_name_non_utf8", - "warning", - "one local Pod metadata directory has a non-UTF-8 name and was skipped", - ), - ); - continue; - }; - if validate_pod_name(&name).is_err() || name.len() > MAX_LABEL_LEN { - push_diagnostic( - &mut diagnostics, - RuntimeDiagnostic::new( - "local_pod_name_invalid", - "warning", - "one local Pod metadata directory has an invalid or oversized name and was skipped", - ), - ); - continue; - } - if entry.path().join("metadata.json").exists() { - candidate_names.push(name); - } - } - - candidate_names.sort(); - candidate_names.truncate(limit); - for pod_name in candidate_names { - match read_worker(&pod_root, &pod_name, &self.host_id()) { - Ok(worker) => workers.push(worker), - Err(diagnostic) => push_diagnostic(&mut diagnostics, diagnostic), - } - } - truncate_diagnostics(&mut diagnostics); - (workers, diagnostics) - } - - fn pod_root(&self) -> Option { - self.data_dir.as_ref().map(|data_dir| data_dir.join("pods")) } } -impl WorkspaceWorkerRuntime for LocalRuntimeBridge { - fn list_hosts(&self, limit: usize) -> RuntimeList { - let (items, diagnostics) = LocalRuntimeBridge::list_hosts(self, limit); - RuntimeList { items, diagnostics } - } +pub trait WorkspaceWorkerRuntime: Send + Sync { + fn runtime_id(&self) -> &str; - fn list_workers(&self, limit: usize) -> RuntimeList { - let (items, diagnostics) = LocalRuntimeBridge::list_workers(self, limit); - RuntimeList { items, diagnostics } - } + fn runtime_summary(&self, limit: usize) -> RuntimeSummary; - fn worker(&self, worker_id: &str) -> WorkerLookupResult { - let RuntimeList { - items, - mut diagnostics, - } = WorkspaceWorkerRuntime::list_workers(self, 200); - let worker = items - .into_iter() - .find(|worker| worker.worker_id == worker_id); - if worker.is_none() { - diagnostics.push(RuntimeDiagnostic::new( - "worker_not_found", - "info", - format!("worker '{worker_id}' was not found on the local runtime"), - )); - } - truncate_diagnostics(&mut diagnostics); - WorkerLookupResult { - worker, - diagnostics, - } - } + fn list_hosts(&self, limit: usize) -> RuntimeList; + + fn list_workers(&self, limit: usize) -> RuntimeList; + + fn worker(&self, worker_id: &str) -> WorkerLookupResult; fn spawn_worker(&self, request: WorkerSpawnRequest) -> WorkerSpawnResult { - let diagnostic = RuntimeDiagnostic::new( - "worker_spawn_resolver_pending", - "info", - format!( - "worker spawn intent '{}' was accepted as a typed request shape, but local launch resolution is not implemented by this ticket", - worker_spawn_intent_label(&request.intent) - ), - ); WorkerSpawnResult { state: WorkerOperationState::Unsupported, worker: None, acceptance_evidence: Vec::new(), - diagnostics: vec![diagnostic], + diagnostics: vec![diagnostic( + "worker_spawn_resolver_pending", + DiagnosticSeverity::Info, + format!( + "worker spawn intent '{}' was accepted as a typed request shape, but launch resolution is not implemented by this registry surface", + worker_spawn_intent_label(&request.intent) + ), + )], } } fn stop_worker(&self, request: WorkerStopRequest) -> WorkerStopResult { WorkerStopResult { state: WorkerOperationState::Unsupported, - diagnostics: vec![RuntimeDiagnostic::new( + diagnostics: vec![diagnostic( "worker_stop_pending", - "info", + DiagnosticSeverity::Info, format!( - "worker stop for '{}' is reserved for the runtime service boundary and is not implemented by this ticket", + "worker stop for '{}' is reserved for the runtime service boundary and is not implemented by this registry surface", request.worker_id ), )], @@ -427,224 +304,653 @@ impl WorkspaceWorkerRuntime for LocalRuntimeBridge { vec![WorkerProxyConnectPoint { kind: "stream_proxy".to_string(), status: "not_implemented".to_string(), - diagnostics: vec![RuntimeDiagnostic::new( - "worker_stream_proxy_pending", - "info", + diagnostics: vec![diagnostic( + "worker_proxy_pending", + DiagnosticSeverity::Info, format!( - "future stream/proxy connection point for '{worker_id}' is reserved without opening a protocol surface" + "worker proxy connect points for '{}' are not implemented by this overview-only registry surface", + worker_id ), )], }] } } -impl RuntimeDiagnostic { +#[derive(Clone)] +pub struct WorkerRuntimeRegistry { + runtimes: Vec>, +} + +impl WorkerRuntimeRegistry { + pub fn new(runtimes: Vec>) -> Self { + Self { runtimes } + } + + pub fn for_local_pods(runtime: LocalPodRuntime) -> Self { + Self::new(vec![Arc::new(runtime)]) + } + + pub fn list_runtimes(&self, limit: usize) -> RuntimeList { + let mut diagnostics = Vec::new(); + let mut items = Vec::new(); + for runtime in self.runtimes.iter().take(limit) { + let summary = runtime.runtime_summary(limit); + diagnostics.extend(summary.diagnostics.iter().cloned()); + items.push(summary); + } + diagnostics.truncate(MAX_DIAGNOSTICS); + RuntimeList::new(items, diagnostics) + } + + pub fn list_hosts(&self, limit: usize) -> RuntimeList { + let mut items = Vec::new(); + let mut diagnostics = Vec::new(); + for runtime in &self.runtimes { + if items.len() >= limit { + break; + } + let mut list = runtime.list_hosts(limit.saturating_sub(items.len())); + diagnostics.append(&mut list.diagnostics); + items.append(&mut list.items); + } + diagnostics.truncate(MAX_DIAGNOSTICS); + RuntimeList::new(items, diagnostics) + } + + pub fn list_workers(&self, limit: usize) -> RuntimeList { + let mut items = Vec::new(); + let mut diagnostics = Vec::new(); + for runtime in &self.runtimes { + if items.len() >= limit { + break; + } + let mut list = runtime.list_workers(limit.saturating_sub(items.len())); + diagnostics.append(&mut list.diagnostics); + items.append(&mut list.items); + } + diagnostics.truncate(MAX_DIAGNOSTICS); + RuntimeList::new(items, diagnostics) + } + + pub fn list_workers_for_host( + &self, + host_id: &str, + limit: usize, + ) -> Result, RuntimeRegistryError> { + validate_backend_identifier("host_id", host_id)?; + + let mut host_found = false; + let mut diagnostics = Vec::new(); + let mut items = Vec::new(); + for runtime in &self.runtimes { + let host_list = runtime.list_hosts(MAX_HOST_SCAN); + diagnostics.extend(host_list.diagnostics); + if !host_list.items.iter().any(|host| host.host_id == host_id) { + continue; + } + host_found = true; + let worker_list = runtime.list_workers(limit); + diagnostics.extend(worker_list.diagnostics); + items.extend( + worker_list + .items + .into_iter() + .filter(|worker| worker.host_id == host_id) + .take(limit.saturating_sub(items.len())), + ); + if items.len() >= limit { + break; + } + } + diagnostics.truncate(MAX_DIAGNOSTICS); + if host_found { + Ok(RuntimeList::new(items, diagnostics)) + } else { + Err(RuntimeRegistryError::UnknownHost(host_id.to_string())) + } + } + + pub fn worker(&self, worker_id: &str) -> Result { + validate_backend_identifier("worker_id", worker_id)?; + for runtime in &self.runtimes { + let lookup = runtime.worker(worker_id); + if let Some(worker) = lookup.worker { + return Ok(worker); + } + } + Err(RuntimeRegistryError::UnknownWorker(worker_id.to_string())) + } +} + +#[derive(Clone)] +pub struct LocalPodRuntime { + runtime_id: String, + host_id: String, + workspace_root: PathBuf, + data_dir: Option, +} + +pub type LocalRuntimeBridge = LocalPodRuntime; + +impl LocalPodRuntime { pub fn new( - code: impl Into, - severity: impl Into, - message: impl Into, + workspace_id: impl AsRef, + workspace_root: impl Into, + data_dir: Option, ) -> Self { Self { - code: truncate_string(&code.into(), MAX_LABEL_LEN), - severity: truncate_string(&severity.into(), MAX_LABEL_LEN), - message: truncate_string(&message.into(), 240), + runtime_id: LOCAL_RUNTIME_ID.to_string(), + host_id: host_id_for_workspace(workspace_id.as_ref()), + workspace_root: workspace_root.into(), + data_dir, + } + } + + fn pod_root(&self) -> Option { + self.data_dir.as_ref().map(|dir| dir.join("pods")) + } + + fn pod_names(&self, pod_root: &Path) -> Result, RuntimeDiagnostic> { + let entries = fs::read_dir(pod_root).map_err(|err| { + diagnostic( + "local_pod_registry_unreadable", + DiagnosticSeverity::Warning, + format!("local Pod registry could not be read: {err}"), + ) + })?; + + let mut pod_names = BTreeSet::new(); + for entry in entries.flatten() { + let Ok(file_type) = entry.file_type() else { + continue; + }; + if !file_type.is_dir() { + continue; + } + let Some(name) = entry.file_name().to_str().map(|name| name.to_string()) else { + continue; + }; + pod_names.insert(name); + } + Ok(pod_names) + } + + fn read_worker(&self, pod_root: &Path, pod_name: &str) -> WorkerReadOutcome { + let metadata_path = pod_root.join(pod_name).join("metadata.json"); + let data = match fs::read_to_string(metadata_path) { + Ok(data) => data, + Err(err) => { + return WorkerReadOutcome::Diagnostic(diagnostic( + "local_pod_metadata_unreadable", + DiagnosticSeverity::Warning, + format!("local Pod metadata could not be read: {err}"), + )); + } + }; + let metadata: PodMetadata = match serde_json::from_str(&data) { + Ok(metadata) => metadata, + Err(err) => { + return WorkerReadOutcome::Diagnostic(diagnostic( + "local_pod_metadata_invalid", + DiagnosticSeverity::Warning, + format!("local Pod metadata could not be parsed: {err}"), + )); + } + }; + + let Some(metadata_workspace_root) = metadata.workspace_root.as_ref() else { + return WorkerReadOutcome::Diagnostic(diagnostic( + "local_pod_workspace_root_missing", + DiagnosticSeverity::Warning, + "local Pod metadata did not include a workspace identity and was not included" + .to_string(), + )); + }; + if !same_workspace_root(metadata_workspace_root, &self.workspace_root) { + return WorkerReadOutcome::Diagnostic(diagnostic( + "local_pod_workspace_not_visible", + DiagnosticSeverity::Info, + "local Pod metadata belongs to another workspace and was not included".to_string(), + )); + } + + let label = safe_display_hint(&metadata.pod_name); + let role = manifest_hint_string(&metadata.resolved_manifest_snapshot, "role"); + let profile = + manifest_hint_string(&metadata.resolved_manifest_snapshot, "profile_selector"); + let worker_id = worker_id_for_pod(pod_name); + let status = match metadata + .active + .as_ref() + .and_then(|active| active.segment_id.as_ref()) + { + Some(_) => "active:segment".to_string(), + None if metadata.active.is_some() => "active:pending_segment".to_string(), + None => "metadata_only".to_string(), + }; + let state = if metadata.active.is_some() { + "active".to_string() + } else { + "metadata_only".to_string() + }; + let last_seen_at = None; + WorkerReadOutcome::Worker(WorkerSummary { + runtime_id: self.runtime_id.clone(), + worker_id, + host_id: self.host_id.clone(), + label, + role, + profile, + workspace: WorkerWorkspaceSummary { + visibility: "current_workspace".to_string(), + identity: "runtime_workspace".to_string(), + }, + state, + status, + last_seen_at, + implementation: WorkerImplementationSummary { + kind: "local_pod".to_string(), + display_hint: safe_display_hint(pod_name), + }, + capabilities: WorkerCapabilitySummary { + can_accept_input: true, + can_stream_events: false, + can_stop: false, + can_spawn_followup: false, + can_read_bounded_transcript: false, + }, + diagnostics: vec![diagnostic( + "worker_actions_not_implemented", + DiagnosticSeverity::Info, + "runtime overview is available; worker control and stream/proxy operations are not yet implemented" + .to_string(), + )], + }) + } +} + +impl WorkspaceWorkerRuntime for LocalPodRuntime { + fn runtime_id(&self) -> &str { + &self.runtime_id + } + + fn runtime_summary(&self, limit: usize) -> RuntimeSummary { + let host_list = self.list_hosts(1); + RuntimeSummary { + runtime_id: self.runtime_id.clone(), + label: "Local Pod runtime".to_string(), + kind: "local_pod".to_string(), + status: "available".to_string(), + host_ids: host_list + .items + .iter() + .take(limit) + .map(|host| host.host_id.clone()) + .collect(), + capabilities: local_runtime_capabilities(limit, self.pod_root().is_some()), + diagnostics: host_list.diagnostics, + } + } + + fn list_hosts(&self, limit: usize) -> RuntimeList { + if limit == 0 { + return RuntimeList::new(Vec::new(), Vec::new()); + } + + let mut diagnostics = Vec::new(); + let inspection_available = self.pod_root().is_some(); + if !inspection_available { + diagnostics.push(diagnostic( + "local_pod_registry_unavailable", + DiagnosticSeverity::Warning, + "local Pod data directory is not configured; worker discovery is unavailable" + .to_string(), + )); + } + + let item = HostSummary { + runtime_id: self.runtime_id.clone(), + host_id: self.host_id.clone(), + label: label_for_workspace(&self.workspace_root), + kind: LOCAL_HOST_KIND.to_string(), + status: if inspection_available { + "available" + } else { + "degraded" + } + .to_string(), + observed_at: Utc::now().to_rfc3339(), + last_seen_at: None, + capabilities: local_runtime_capabilities(limit, inspection_available), + diagnostics: diagnostics.clone(), + }; + RuntimeList::new(vec![item], diagnostics) + } + + fn list_workers(&self, limit: usize) -> RuntimeList { + let Some(pod_root) = self.pod_root() else { + return RuntimeList::new( + Vec::new(), + vec![diagnostic( + "local_pod_registry_unavailable", + DiagnosticSeverity::Warning, + "local Pod data directory is not configured; worker discovery is unavailable" + .to_string(), + )], + ); + }; + if limit == 0 { + return RuntimeList::new(Vec::new(), Vec::new()); + } + + let mut workers = Vec::new(); + let mut diagnostics = Vec::new(); + let pod_names = match self.pod_names(&pod_root) { + Ok(pod_names) => pod_names, + Err(diag) => return RuntimeList::new(Vec::new(), vec![diag]), + }; + + for pod_name in pod_names { + if workers.len() >= limit { + break; + } + match self.read_worker(&pod_root, &pod_name) { + WorkerReadOutcome::Worker(worker) => workers.push(worker), + WorkerReadOutcome::Diagnostic(diag) => { + if diagnostics.len() < MAX_DIAGNOSTICS { + diagnostics.push(diag); + } + } + } + } + RuntimeList::new(workers, diagnostics) + } + + fn worker(&self, worker_id: &str) -> WorkerLookupResult { + let Some(pod_root) = self.pod_root() else { + return WorkerLookupResult { + worker: None, + diagnostics: vec![diagnostic( + "local_pod_registry_unavailable", + DiagnosticSeverity::Warning, + "local Pod data directory is not configured; worker discovery is unavailable" + .to_string(), + )], + }; + }; + let pod_names = match self.pod_names(&pod_root) { + Ok(pod_names) => pod_names, + Err(diag) => { + return WorkerLookupResult { + worker: None, + diagnostics: vec![diag], + }; + } + }; + for pod_name in pod_names { + if worker_id_for_pod(&pod_name) != worker_id { + continue; + } + return match self.read_worker(&pod_root, &pod_name) { + WorkerReadOutcome::Worker(worker) => WorkerLookupResult { + worker: Some(worker), + diagnostics: Vec::new(), + }, + WorkerReadOutcome::Diagnostic(diag) => WorkerLookupResult { + worker: None, + diagnostics: vec![diag], + }, + }; + } + WorkerLookupResult { + worker: None, + diagnostics: Vec::new(), } } } -fn read_worker( - pod_root: &Path, - pod_name: &str, - host_id: &str, -) -> Result { - let metadata_path = pod_root.join(pod_name).join("metadata.json"); - let last_seen_at = metadata_path - .metadata() - .ok() - .and_then(|metadata| metadata.modified().ok()) - .map(unix_timestamp); - let raw = fs::read_to_string(&metadata_path).map_err(|error| { - RuntimeDiagnostic::new( - "local_pod_metadata_unreadable", - "warning", - format!("local Pod metadata for `{pod_name}` cannot be read: {error}"), - ) - })?; - let metadata: PodMetadata = serde_json::from_str(&raw).map_err(|error| { - RuntimeDiagnostic::new( - "local_pod_metadata_invalid", - "warning", - format!("local Pod metadata for `{pod_name}` is invalid: {error}"), - ) - })?; - - let mut worker_diagnostics = Vec::new(); - if metadata.pod_name != pod_name { - worker_diagnostics.push(RuntimeDiagnostic::new( - "local_pod_metadata_name_mismatch", - "warning", - "metadata pod_name differed from its directory name; the directory name was used", - )); - } - - let state = if metadata.active.is_some() { - "active" - } else { - "inactive" - } - .to_string(); - let status = match metadata.active.as_ref() { - Some(active) if active.segment_id.is_some() => "active_segment_known", - Some(_) => "active_session_pending_segment", - None => "metadata_only", - } - .to_string(); - let (role, profile) = extract_safe_role_profile(metadata.resolved_manifest_snapshot.as_ref()); - - Ok(WorkerSummary { - worker_id: format!("local-pod-{}", sanitize_identifier(pod_name, MAX_LABEL_LEN)), - host_id: host_id.to_string(), - label: truncate_string(pod_name, MAX_LABEL_LEN), - pod_name: truncate_string(pod_name, MAX_LABEL_LEN), - role, - profile, - workspace_root: metadata - .workspace_root - .as_ref() - .map(|path| bounded_path(path)), - state, - status, - last_seen_at, - implementation: WorkerImplementation { - kind: "local_pod".to_string(), - pod_name: truncate_string(pod_name, MAX_LABEL_LEN), - }, - diagnostics: worker_diagnostics, - }) +enum WorkerReadOutcome { + Worker(WorkerSummary), + Diagnostic(RuntimeDiagnostic), } -fn pod_root_diagnostics(pod_root: Option<&Path>) -> Vec { - let Some(pod_root) = pod_root else { - return vec![RuntimeDiagnostic::new( - "local_yoi_data_dir_unavailable", - "warning", - "local Yoi data directory is not configured; local Pod inspection is unavailable", - )]; - }; - if !pod_root.exists() { - return vec![RuntimeDiagnostic::new( - "local_pod_metadata_root_missing", - "info", - "local Pod metadata directory is absent; local Pod inspection found no workers", - )]; - } - match fs::read_dir(pod_root) { - Ok(_) => Vec::new(), - Err(error) => vec![RuntimeDiagnostic::new( - "local_pod_metadata_root_unreadable", - "warning", - format!("local Pod metadata directory cannot be read: {error}"), - )], +fn local_runtime_capabilities( + limit: usize, + inspection_available: bool, +) -> RuntimeCapabilitySummary { + RuntimeCapabilitySummary { + can_list_hosts: true, + can_list_workers: inspection_available, + can_get_worker: inspection_available, + can_spawn_worker: false, + can_stop_worker: false, + can_accept_input: false, + can_stream_events: false, + can_read_bounded_transcript: false, + has_workspace_fs: false, + has_shell: false, + has_git: false, + supports_worktrees: false, + supports_backend_internal_tools: false, + local_pod_inspection: if inspection_available { + "available" + } else { + "unavailable" + } + .to_string(), + workspace_scope: "current_workspace".to_string(), + max_workers: limit, + os: std::env::consts::OS.to_string(), + arch: std::env::consts::ARCH.to_string(), } } -fn extract_safe_role_profile( - snapshot: Option<&serde_json::Value>, -) -> (Option, Option) { - let Some(snapshot) = snapshot else { - return (None, None); - }; - let role = snapshot - .get("role") - .and_then(|value| value.as_str()) - .and_then(safe_metadata_label); - let profile = snapshot - .get("profile") - .and_then(|profile| { - profile - .get("name") - .or_else(|| profile.get("selector")) - .or_else(|| profile.get("id")) - }) - .and_then(|value| value.as_str()) - .and_then(safe_metadata_label); - (role, profile) +fn diagnostic( + code: impl Into, + severity: DiagnosticSeverity, + message: impl Into, +) -> RuntimeDiagnostic { + RuntimeDiagnostic { + code: code.into(), + severity, + message: message.into(), + } } -fn safe_metadata_label(value: &str) -> Option { +fn host_id_for_workspace(workspace_id: &str) -> String { + bounded_backend_identifier("local-", workspace_id) +} + +fn worker_id_for_pod(pod_name: &str) -> String { + bounded_backend_identifier("local-pod-", pod_name) +} + +fn bounded_backend_identifier(prefix: &str, value: &str) -> String { + let digest = digest_hex(value.as_bytes(), ID_DIGEST_HEX_LEN); + let mut body = sanitize_identifier_body(value); + if body.is_empty() { + body = "id".to_string(); + } + + let suffix_len = 1 + ID_DIGEST_HEX_LEN; + let body_budget = MAX_IDENTIFIER_LEN + .saturating_sub(prefix.len()) + .saturating_sub(suffix_len) + .max(1); + if body.len() > body_budget { + body.truncate(body_budget); + body = body.trim_matches('-').to_string(); + if body.is_empty() { + body = "id".to_string(); + } + } + + let mut id = format!("{prefix}{body}-{digest}"); + if id.len() > MAX_IDENTIFIER_LEN { + let digest_suffix = format!("-{digest}"); + let prefix_budget = MAX_IDENTIFIER_LEN.saturating_sub(digest_suffix.len()); + id = format!( + "{}{}", + prefix.chars().take(prefix_budget).collect::(), + digest_suffix + ); + } + id +} + +fn sanitize_identifier_body(value: &str) -> String { + let mut out = String::with_capacity(value.len()); + for ch in value.chars() { + if ch.is_ascii_alphanumeric() { + out.push(ch.to_ascii_lowercase()); + } else if ch == '-' || ch == '_' { + out.push(ch); + } else { + out.push('-'); + } + } + out.trim_matches('-').to_string() +} + +fn digest_hex(bytes: &[u8], hex_len: usize) -> String { + let digest = Sha256::digest(bytes); + let mut out = String::with_capacity(hex_len); + for byte in digest { + if out.len() >= hex_len { + break; + } + out.push_str(&format!("{byte:02x}")); + } + out.truncate(hex_len); + out +} + +fn validate_backend_identifier( + kind: &'static str, + value: &str, +) -> Result<(), RuntimeRegistryError> { if value.is_empty() - || value.len() > MAX_LABEL_LEN - || value.contains('/') - || value.contains('\\') - || value.contains('\0') - || value.chars().any(|ch| ch.is_control()) + || value.len() > MAX_IDENTIFIER_LEN + || value + .chars() + .any(|ch| !(ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' || ch == ':')) { - return None; + return Err(RuntimeRegistryError::InvalidIdentifier { + kind, + value: value.chars().take(MAX_IDENTIFIER_LEN).collect(), + }); } - Some(value.to_string()) + Ok(()) +} + +fn label_for_workspace(workspace_root: &Path) -> String { + workspace_root + .file_name() + .and_then(|name| name.to_str()) + .filter(|name| !name.is_empty()) + .unwrap_or("workspace") + .to_string() +} + +fn same_workspace_root(left: &Path, right: &Path) -> bool { + lexical_path_string(left) == lexical_path_string(right) +} + +fn lexical_path_string(path: &Path) -> String { + let mut value = path.to_string_lossy().replace('\\', "/"); + while value.len() > 1 && value.ends_with('/') { + value.pop(); + } + value +} + +fn safe_display_hint(value: &str) -> String { + value + .chars() + .filter(|ch| !ch.is_control() && *ch != '/' && *ch != '\\') + .take(80) + .collect() } fn worker_spawn_intent_label(intent: &WorkerSpawnIntent) -> &'static str { match intent { WorkerSpawnIntent::WorkspaceCompanion => "workspace_companion", WorkerSpawnIntent::WorkspaceOrchestrator => "workspace_orchestrator", - WorkerSpawnIntent::TicketRole { .. } => "ticket_role", + WorkerSpawnIntent::TicketRole { role, .. } => match role { + TicketWorkerRole::Intake => "ticket_intake", + TicketWorkerRole::Orchestrator => "ticket_orchestrator", + TicketWorkerRole::Coder => "ticket_coder", + TicketWorkerRole::Reviewer => "ticket_reviewer", + }, } } -fn stable_local_host_id(workspace_id: &str) -> String { - format!("local-{}", sanitize_identifier(workspace_id, 96)) +fn manifest_hint_string(snapshot: &Option, key: &str) -> Option { + let value = snapshot.as_ref()?; + let candidate = match key { + "role" => value + .get("role") + .or_else(|| value.get("role_claim").and_then(|role| role.get("role"))), + "profile_selector" => value + .get("profile_selector") + .or_else(|| value.get("profile")), + _ => value.get(key), + }?; + let text = candidate.as_str().map(ToOwned::to_owned).or_else(|| { + candidate + .get("name") + .and_then(|name| name.as_str()) + .map(ToOwned::to_owned) + })?; + let safe = safe_display_hint(&text); + (!safe.is_empty()).then_some(safe) } -fn sanitize_identifier(value: &str, max_len: usize) -> String { - let mut output = String::new(); - for ch in value.chars() { - if output.len() >= max_len { - break; - } - if ch.is_ascii_alphanumeric() { - output.push(ch.to_ascii_lowercase()); - } else if !output.ends_with('-') { - output.push('-'); - } - } - let output = output.trim_matches('-'); - if output.is_empty() { - "local".to_string() - } else { - output.to_string() +pub fn placeholder_worker(host_id: impl Into) -> WorkerSummary { + let host_id = host_id.into(); + WorkerSummary { + runtime_id: "placeholder".to_string(), + worker_id: "worker-placeholder".to_string(), + host_id, + label: "Worker runtime actions are not implemented".to_string(), + role: None, + profile: None, + workspace: WorkerWorkspaceSummary { + visibility: "none".to_string(), + identity: "unsupported".to_string(), + }, + state: "unsupported".to_string(), + status: "Worker runtime control is not wired yet".to_string(), + last_seen_at: None, + implementation: WorkerImplementationSummary { + kind: "placeholder".to_string(), + display_hint: "unsupported".to_string(), + }, + capabilities: WorkerCapabilitySummary { + can_accept_input: false, + can_stream_events: false, + can_stop: false, + can_spawn_followup: false, + can_read_bounded_transcript: false, + }, + diagnostics: vec![diagnostic( + "runtime_capability_unsupported", + DiagnosticSeverity::Info, + "worker control is outside this overview-only registry surface".to_string(), + )], } } -fn bounded_path(path: &Path) -> String { - truncate_string(&path.to_string_lossy(), MAX_PATH_LEN) -} - -fn truncate_string(value: &str, max_len: usize) -> String { - if value.len() <= max_len { - return value.to_string(); - } - let mut end = max_len; - while !value.is_char_boundary(end) { - end -= 1; - } - value[..end].to_string() -} - -fn push_diagnostic(diagnostics: &mut Vec, diagnostic: RuntimeDiagnostic) { - if diagnostics.len() < MAX_DIAGNOSTICS { - diagnostics.push(diagnostic); - } -} - -fn truncate_diagnostics(diagnostics: &mut Vec) { - diagnostics.truncate(MAX_DIAGNOSTICS); -} - -fn unix_timestamp(time: SystemTime) -> String { - match time.duration_since(UNIX_EPOCH) { - Ok(duration) => format!("unix:{}", duration.as_secs()), - Err(_) => "unix:0".to_string(), +pub fn placeholder_spawn_response(host_id: impl Into) -> WorkerSpawnResult { + WorkerSpawnResult { + state: WorkerOperationState::Unsupported, + worker: Some(placeholder_worker(host_id)), + acceptance_evidence: Vec::new(), + diagnostics: vec![diagnostic( + "worker_spawn_unsupported", + DiagnosticSeverity::Info, + "Workspace worker runtime control is not implemented yet".to_string(), + )], } } @@ -652,100 +958,203 @@ fn unix_timestamp(time: SystemTime) -> String { mod tests { use super::*; use serde_json::json; + use std::fs; + use tempfile::TempDir; - #[test] - fn lists_workers_from_local_pod_metadata_without_exposing_snapshot_contents() { - let temp = tempfile::tempdir().unwrap(); - let data_dir = temp.path().join("data"); - let worker_dir = data_dir.join("pods/coder"); - fs::create_dir_all(&worker_dir).unwrap(); + fn write_metadata(dir: &Path, pod_name: &str, metadata: &PodMetadata) { + let pod_dir = dir.join("pods").join(pod_name); + fs::create_dir_all(&pod_dir).unwrap(); fs::write( - worker_dir.join("metadata.json"), - serde_json::to_vec_pretty(&json!({ - "pod_name": "coder", - "active": { - "session_id": "018f4b8e-7c8a-7b41-8d66-111111111111", - "segment_id": "018f4b8e-7c8a-7b41-8d66-222222222222" - }, - "workspace_root": "/workspace/project", - "resolved_manifest_snapshot": { - "role": "coder", - "profile": { "name": "builtin-coder" }, - "secret_token": "do-not-return", - "system_prompt": "do-not-return" - } - })) - .unwrap(), + pod_dir.join("metadata.json"), + serde_json::to_vec(metadata).unwrap(), ) .unwrap(); + } - let bridge = LocalRuntimeBridge::new("local:test", "/workspace/project", Some(data_dir)); - let (workers, diagnostics) = bridge.list_workers(20); - assert!(diagnostics.is_empty()); - assert_eq!(workers.len(), 1); - let worker = &workers[0]; - assert_eq!(worker.worker_id, "local-pod-coder"); - assert_eq!(worker.host_id, "local-local-test"); - assert_eq!(worker.pod_name, "coder"); - assert_eq!(worker.role.as_deref(), Some("coder")); - assert_eq!(worker.profile.as_deref(), Some("builtin-coder")); - assert_eq!(worker.workspace_root.as_deref(), Some("/workspace/project")); - assert_eq!(worker.state, "active"); - assert_eq!(worker.status, "active_segment_known"); - assert_eq!(worker.implementation.kind, "local_pod"); - assert_eq!(worker.implementation.pod_name, "coder"); + fn metadata(workspace_root: Option<&str>) -> PodMetadata { + let mut metadata = PodMetadata::new("coder", None); + metadata.workspace_root = workspace_root.map(PathBuf::from); + metadata.resolved_manifest_snapshot = Some(json!({ + "role": "coder", + "profile_selector": "builtin:coder" + })); + metadata + } - let response_json = serde_json::to_string(&workers).unwrap(); - assert!(!response_json.contains("do-not-return")); - assert!(!response_json.contains("system_prompt")); - assert!(!response_json.contains("session_id")); - assert!(!response_json.contains("segment_id")); + fn assert_valid_generated_id(id: &str) { + assert!(id.len() <= MAX_IDENTIFIER_LEN, "id too long: {id}"); + validate_backend_identifier("test_id", id).unwrap(); + } + + fn host_id() -> String { + host_id_for_workspace("local:test") } #[test] - fn missing_local_pod_data_dir_degrades_to_empty_workers_and_diagnostic() { - let temp = tempfile::tempdir().unwrap(); - let bridge = LocalRuntimeBridge::new( + fn local_runtime_reports_host_without_private_paths() { + let bridge = LocalPodRuntime::new("local:test", "/workspace/project", None); + let hosts = bridge.list_hosts(10); + assert_eq!(hosts.items.len(), 1); + let host = &hosts.items[0]; + assert_eq!(host.runtime_id, LOCAL_RUNTIME_ID); + assert_eq!(host.host_id, host_id()); + assert_valid_generated_id(&host.host_id); + assert_eq!(host.capabilities.local_pod_inspection, "unavailable"); + assert_eq!(host.capabilities.workspace_scope, "current_workspace"); + let json = serde_json::to_string(host).unwrap(); + assert!(!json.contains("/workspace/project")); + assert!(!json.contains("metadata.json")); + } + + #[test] + fn registry_lists_runtimes_hosts_and_workers() { + let temp = TempDir::new().unwrap(); + write_metadata(temp.path(), "coder", &metadata(Some("/workspace/project"))); + let registry = WorkerRuntimeRegistry::for_local_pods(LocalPodRuntime::new( "local:test", - temp.path(), - Some(temp.path().join("missing-data")), - ); - let (workers, diagnostics) = bridge.list_workers(20); - assert!(workers.is_empty()); - assert_eq!(diagnostics[0].code, "local_pod_metadata_root_missing"); + "/workspace/project", + Some(temp.path().to_path_buf()), + )); - let (hosts, host_diagnostics) = bridge.list_hosts(20); - assert_eq!(hosts.len(), 1); - assert_eq!(hosts[0].status, "degraded"); - assert_eq!(hosts[0].capabilities.local_pod_inspection, "unavailable"); - assert_eq!(host_diagnostics[0].code, "local_pod_metadata_root_missing"); + let runtimes = registry.list_runtimes(10); + assert_eq!(runtimes.items[0].runtime_id, LOCAL_RUNTIME_ID); + assert_eq!(runtimes.items[0].host_ids, vec![host_id()]); + + let hosts = registry.list_hosts(10); + assert_eq!(hosts.items[0].host_id, host_id()); + assert_valid_generated_id(&hosts.items[0].host_id); + + let workers = registry.list_workers(10); + assert_eq!(workers.items.len(), 1); + let worker = &workers.items[0]; + assert_eq!(worker.runtime_id, LOCAL_RUNTIME_ID); + assert_eq!(worker.worker_id, worker_id_for_pod("coder")); + assert_valid_generated_id(&worker.worker_id); + assert_eq!(worker.host_id, host_id()); + assert_eq!(worker.workspace.visibility, "current_workspace"); + assert_eq!(worker.implementation.display_hint, "coder"); + let json = serde_json::to_string(worker).unwrap(); + assert!(!json.contains("/workspace/project")); + assert!(!json.contains("metadata.json")); } #[test] - fn worker_list_and_diagnostics_are_bounded() { - let temp = tempfile::tempdir().unwrap(); - let data_dir = temp.path().join("data"); - let pod_root = data_dir.join("pods"); - fs::create_dir_all(&pod_root).unwrap(); - for index in 0..250 { - let worker_dir = pod_root.join(format!("worker-{index:03}")); - fs::create_dir_all(&worker_dir).unwrap(); - fs::write( - worker_dir.join("metadata.json"), - serde_json::to_vec_pretty(&json!({ - "pod_name": format!("worker-{index:03}"), - "workspace_root": "/workspace/project" - })) - .unwrap(), - ) - .unwrap(); + fn registry_resolves_backend_validated_host_ids() { + let temp = TempDir::new().unwrap(); + write_metadata(temp.path(), "coder", &metadata(Some("/workspace/project"))); + let registry = WorkerRuntimeRegistry::for_local_pods(LocalPodRuntime::new( + "local:test", + "/workspace/project", + Some(temp.path().to_path_buf()), + )); + + let workers = registry.list_workers_for_host(&host_id(), 10).unwrap(); + assert_eq!(workers.items.len(), 1); + assert!(matches!( + registry.list_workers_for_host("../secret", 10), + Err(RuntimeRegistryError::InvalidIdentifier { .. }) + )); + assert!(matches!( + registry.list_workers_for_host("local-missing", 10), + Err(RuntimeRegistryError::UnknownHost(_)) + )); + } + + #[test] + fn local_runtime_excludes_other_workspace_metadata() { + let temp = TempDir::new().unwrap(); + write_metadata(temp.path(), "other", &metadata(Some("/workspace/other"))); + write_metadata(temp.path(), "missing", &metadata(None)); + let bridge = LocalPodRuntime::new( + "local:test", + "/workspace/project", + Some(temp.path().to_path_buf()), + ); + let workers = bridge.list_workers(10); + assert!(workers.items.is_empty()); + assert!( + workers + .diagnostics + .iter() + .any(|diag| diag.code == "local_pod_workspace_not_visible") + ); + assert!( + workers + .diagnostics + .iter() + .any(|diag| diag.code == "local_pod_workspace_root_missing") + ); + } + + #[test] + fn local_runtime_worker_detail_is_safe_and_bounded() { + let temp = TempDir::new().unwrap(); + write_metadata(temp.path(), "coder", &metadata(Some("/workspace/project"))); + let bridge = LocalPodRuntime::new( + "local:test", + "/workspace/project", + Some(temp.path().to_path_buf()), + ); + let worker_id = worker_id_for_pod("coder"); + let worker = bridge.worker(&worker_id).worker.unwrap(); + assert_eq!(worker.label, "coder"); + assert_eq!(worker.workspace.identity, "runtime_workspace"); + assert!( + bridge + .worker(&worker_id_for_pod("missing")) + .worker + .is_none() + ); + } + + #[test] + fn generated_worker_ids_are_opaque_bounded_unique_and_resolvable() { + let temp = TempDir::new().unwrap(); + let long_a = format!("{}-A", "TicketWorkerWithVeryLongName".repeat(8)); + let long_b = format!("{}-B", "TicketWorkerWithVeryLongName".repeat(8)); + let pod_names = vec![ + "00001KVWECEQG-Coder.Pod".to_string(), + "foo.bar".to_string(), + "foo-bar".to_string(), + "Ticket#Worker@Reviewer".to_string(), + long_a, + long_b, + ]; + for pod_name in &pod_names { + write_metadata(temp.path(), pod_name, &metadata(Some("/workspace/project"))); + } + let bridge = LocalPodRuntime::new( + "local:test", + "/workspace/project", + Some(temp.path().to_path_buf()), + ); + let registry = WorkerRuntimeRegistry::for_local_pods(bridge.clone()); + + let listed = registry.list_workers(100); + assert_eq!(listed.items.len(), pod_names.len()); + let mut ids = BTreeSet::new(); + for worker in listed.items { + assert_valid_generated_id(&worker.worker_id); + assert!( + ids.insert(worker.worker_id.clone()), + "duplicate id: {}", + worker.worker_id + ); + assert!(!worker.worker_id.contains('.')); + assert!(!worker.worker_id.contains('@')); + assert!(!worker.worker_id.contains('#')); + + let from_registry = registry.worker(&worker.worker_id).unwrap(); + assert_eq!(from_registry.worker_id, worker.worker_id); + let from_runtime = bridge.worker(&worker.worker_id).worker.unwrap(); + assert_eq!(from_runtime.worker_id, worker.worker_id); } - let bridge = LocalRuntimeBridge::new("local:test", "/workspace/project", Some(data_dir)); - let (workers, diagnostics) = bridge.list_workers(5); - assert_eq!(workers.len(), 5); - assert!(diagnostics.len() <= MAX_DIAGNOSTICS); - assert_eq!(workers[0].pod_name, "worker-000"); - assert_eq!(workers[4].pod_name, "worker-004"); + let dotted = worker_id_for_pod("foo.bar"); + let dashed = worker_id_for_pod("foo-bar"); + assert_ne!(dotted, dashed); + assert!(ids.contains(&dotted)); + assert!(ids.contains(&dashed)); + assert!(ids.iter().any(|id| id.len() == MAX_IDENTIFIER_LEN)); } } diff --git a/crates/workspace-server/src/lib.rs b/crates/workspace-server/src/lib.rs index 8d3b9f69..7a804518 100644 --- a/crates/workspace-server/src/lib.rs +++ b/crates/workspace-server/src/lib.rs @@ -40,6 +40,15 @@ pub enum Error { MissingFrontmatter(String), #[error("unknown local host `{0}`")] UnknownHost(String), + #[error("unknown local worker `{0}`")] + UnknownWorker(String), + #[error("invalid runtime {kind} `{value}`")] + InvalidRuntimeIdentifier { kind: String, value: String }, + #[error("runtime `{runtime_id}` does not support `{capability}`")] + RuntimeCapabilityUnsupported { + runtime_id: String, + capability: String, + }, #[error("unknown local repository `{0}`")] UnknownRepository(String), #[error("workspace identity error: {0}")] diff --git a/crates/workspace-server/src/repositories.rs b/crates/workspace-server/src/repositories.rs index aac20730..840149d3 100644 --- a/crates/workspace-server/src/repositories.rs +++ b/crates/workspace-server/src/repositories.rs @@ -3,7 +3,7 @@ use std::process::{Command, Output}; use serde::{Deserialize, Serialize}; -use crate::hosts::RuntimeDiagnostic; +use crate::hosts::{DiagnosticSeverity, RuntimeDiagnostic}; const LEGACY_LOCAL_REPOSITORY_ID: &str = "local"; const LOCAL_REPOSITORY_PREFIX: &str = "local-"; @@ -340,7 +340,11 @@ fn truncate_field(value: &str, limit: usize) -> String { fn diagnostic(code: &str, severity: &str, message: String) -> RuntimeDiagnostic { RuntimeDiagnostic { code: code.to_string(), - severity: severity.to_string(), + severity: match severity { + "error" => DiagnosticSeverity::Error, + "warning" => DiagnosticSeverity::Warning, + _ => DiagnosticSeverity::Info, + }, message, } } diff --git a/crates/workspace-server/src/server.rs b/crates/workspace-server/src/server.rs index 12fc7814..f908da94 100644 --- a/crates/workspace-server/src/server.rs +++ b/crates/workspace-server/src/server.rs @@ -11,7 +11,8 @@ use serde::{Deserialize, Serialize}; use tokio::net::TcpListener; use crate::hosts::{ - HostSummary, LocalRuntimeBridge, RuntimeDiagnostic, WorkerSummary, WorkspaceWorkerRuntime, + DiagnosticSeverity, HostSummary, LocalPodRuntime, RuntimeDiagnostic, RuntimeSummary, + WorkerRuntimeRegistry, WorkerSummary, }; use crate::identity::WorkspaceIdentity; use crate::records::{ @@ -63,7 +64,7 @@ pub struct WorkspaceApi { config: ServerConfig, store: Arc, records: LocalProjectRecordReader, - runtime: Arc, + runtime: Arc, } impl WorkspaceApi { @@ -77,11 +78,11 @@ impl WorkspaceApi { updated_at: config.workspace_created_at.clone(), }) .await?; - let runtime = Arc::new(LocalRuntimeBridge::new( + let runtime = Arc::new(WorkerRuntimeRegistry::for_local_pods(LocalPodRuntime::new( config.workspace_id.clone(), config.workspace_root.clone(), config.local_runtime_data_dir.clone(), - )); + ))); Ok(Self { records: LocalProjectRecordReader::new(config.workspace_root.clone()), config, @@ -125,6 +126,7 @@ pub fn build_router(api: WorkspaceApi) -> Router { get(repository_tickets), ) .route("/api/hosts", get(list_hosts)) + .route("/api/runtimes", get(list_runtimes)) .route("/api/workers", get(list_workers)) .route("/api/hosts/{host_id}/workers", get(list_host_workers)) .fallback(get(static_or_spa_fallback)) @@ -381,7 +383,7 @@ async fn repository_tickets( source: "workspace_local_ticket_fallback".to_string(), diagnostics: vec![RuntimeDiagnostic { code: "repository_ticket_target_metadata_absent".to_string(), - severity: "info".to_string(), + severity: DiagnosticSeverity::Info, message: "Ticket target Repository metadata is not available yet; Kanban groups all workspace-local Tickets by state as a read-only fallback.".to_string(), }], })) @@ -396,11 +398,25 @@ async fn list_hosts( workspace_id: api.config.workspace_id, limit, items: runtime_hosts.items, - source: "worker_runtime".to_string(), + source: "worker_runtime_registry".to_string(), diagnostics: runtime_hosts.diagnostics, })) } +async fn list_runtimes( + State(api): State, +) -> ApiResult>> { + let limit = api.config.max_records.min(200); + let runtimes = api.runtime.list_runtimes(limit); + Ok(Json(RuntimeListResponse { + workspace_id: api.config.workspace_id, + limit, + items: runtimes.items, + source: "worker_runtime_registry".to_string(), + diagnostics: runtimes.diagnostics, + })) +} + async fn list_workers( State(api): State, ) -> ApiResult>> { @@ -411,16 +427,18 @@ async fn list_host_workers( State(api): State, AxumPath(host_id): AxumPath, ) -> ApiResult>> { - let runtime_hosts = api.runtime.list_hosts(1); - let expected_host_id = runtime_hosts - .items - .first() - .map(|host| host.host_id.as_str()) - .ok_or_else(|| Error::UnknownHost(host_id.clone()))?; - if host_id != expected_host_id { - return Err(Error::UnknownHost(host_id).into()); - } - workers_response(api).map(Json) + let limit = api.config.max_records.min(200); + let runtime_workers = api + .runtime + .list_workers_for_host(&host_id, limit) + .map_err(|err| err.into_error())?; + Ok(Json(RuntimeListResponse { + workspace_id: api.config.workspace_id, + limit, + items: runtime_workers.items, + source: "worker_runtime_registry".to_string(), + diagnostics: runtime_workers.diagnostics, + })) } fn workers_response(api: WorkspaceApi) -> ApiResult> { @@ -430,7 +448,7 @@ fn workers_response(api: WorkspaceApi) -> ApiResult for ApiError { impl IntoResponse for ApiError { fn into_response(self) -> Response { let status = match &self.0 { + Error::InvalidRuntimeIdentifier { .. } => StatusCode::BAD_REQUEST, Error::InvalidRecordId(_) | Error::MissingFrontmatter(_) | Error::UnknownHost(_) + | Error::UnknownWorker(_) | Error::UnknownRepository(_) => StatusCode::NOT_FOUND, Error::Ticket(_) => StatusCode::NOT_FOUND, + Error::RuntimeCapabilityUnsupported { .. } => StatusCode::NOT_IMPLEMENTED, _ => StatusCode::INTERNAL_SERVER_ERROR, }; ( @@ -703,25 +724,36 @@ mod tests { assert_eq!(unknown_repository_response.status(), StatusCode::NOT_FOUND); let hosts = get_json(app.clone(), "/api/hosts").await; - assert_eq!(hosts["items"][0]["host_id"], TEST_REPOSITORY_ID); - assert_eq!(hosts["items"][0]["kind"], "local_host"); + assert_eq!(hosts["source"], "worker_runtime_registry"); + assert_eq!(hosts["items"][0]["runtime_id"], "local-pod-runtime"); + let host_id = hosts["items"][0]["host_id"].as_str().unwrap().to_string(); + assert!(host_id.starts_with("local-")); + assert!(host_id.len() <= 120); + assert_ne!(host_id, TEST_REPOSITORY_ID); + assert_eq!(hosts["items"][0]["kind"], "local-pod-host"); assert_eq!( hosts["items"][0]["capabilities"]["local_pod_inspection"], - "unavailable" + "available" ); + assert_eq!( + hosts["items"][0]["capabilities"]["workspace_scope"], + "current_workspace" + ); + assert!(!hosts.to_string().contains("metadata.json")); + + let runtimes = get_json(app.clone(), "/api/runtimes").await; + assert_eq!(runtimes["source"], "worker_runtime_registry"); + assert_eq!(runtimes["items"][0]["runtime_id"], "local-pod-runtime"); + assert_eq!(runtimes["items"][0]["host_ids"][0], host_id); let workers = get_json(app.clone(), "/api/workers").await; assert!(workers["items"].as_array().unwrap().is_empty()); assert_eq!( workers["diagnostics"][0]["code"], - "local_pod_metadata_root_missing" + "local_pod_registry_unreadable" ); - let host_workers = get_json( - app.clone(), - &format!("/api/hosts/{TEST_REPOSITORY_ID}/workers"), - ) - .await; + let host_workers = get_json(app.clone(), &format!("/api/hosts/{host_id}/workers")).await; assert!(host_workers["items"].as_array().unwrap().is_empty()); let runs_response = app diff --git a/web/workspace/src/lib/workspace-pages/WorkspacePage.svelte b/web/workspace/src/lib/workspace-pages/WorkspacePage.svelte index 82b91414..b628195e 100644 --- a/web/workspace/src/lib/workspace-pages/WorkspacePage.svelte +++ b/web/workspace/src/lib/workspace-pages/WorkspacePage.svelte @@ -546,6 +546,14 @@
Local inspection
{host.capabilities.local_pod_inspection}
+
+
Runtime
+
{host.runtime_id}
+
+
+
Scope
+
{host.capabilities.workspace_scope}
+
Platform
{host.capabilities.os} / {host.capabilities.arch}
@@ -590,7 +598,7 @@ {worker.host_id} {worker.state} · {worker.status} - {worker.workspace_root ?? 'unknown'} + {worker.workspace.visibility} · {worker.workspace.identity} {worker.implementation.kind} {/each} diff --git a/web/workspace/src/lib/workspace-sidebar/types.ts b/web/workspace/src/lib/workspace-sidebar/types.ts index 8096d9c1..982bd5b1 100644 --- a/web/workspace/src/lib/workspace-sidebar/types.ts +++ b/web/workspace/src/lib/workspace-sidebar/types.ts @@ -25,35 +25,70 @@ export type Diagnostic = { message: string; }; +export type RuntimeCapabilities = { + can_list_hosts: boolean; + can_list_workers: boolean; + can_get_worker: boolean; + can_spawn_worker: boolean; + can_stop_worker: boolean; + can_accept_input: boolean; + can_stream_events: boolean; + can_read_bounded_transcript: boolean; + has_workspace_fs: boolean; + has_shell: boolean; + has_git: boolean; + supports_worktrees: boolean; + supports_backend_internal_tools: boolean; + local_pod_inspection: string; + workspace_scope: string; + os: string; + arch: string; + max_workers: number; +}; + +export type Runtime = { + runtime_id: string; + label: string; + kind: string; + status: string; + host_ids: string[]; + capabilities: RuntimeCapabilities; + diagnostics: Diagnostic[]; +}; + export type Host = { + runtime_id: string; host_id: string; label: string; kind: string; status: string; observed_at: string; - last_seen_at: string; - capabilities: { - local_pod_inspection: string; - workspace_root: string; - os: string; - arch: string; - max_workers: number; - }; + last_seen_at: string | null; + capabilities: RuntimeCapabilities; diagnostics: Diagnostic[]; }; +export type WorkerCapabilities = { + can_accept_input: boolean; + can_stream_events: boolean; + can_stop: boolean; + can_spawn_followup: boolean; + can_read_bounded_transcript: boolean; +}; + export type Worker = { + runtime_id: string; worker_id: string; host_id: string; label: string; - pod_name: string; - role?: string; - profile?: string; - workspace_root?: string; + role?: string | null; + profile?: string | null; + workspace: { visibility: string; identity: string }; state: string; status: string; - last_seen_at?: string; - implementation: { kind: string; pod_name: string }; + last_seen_at?: string | null; + implementation: { kind: string; display_hint: string }; + capabilities: WorkerCapabilities; diagnostics: Diagnostic[]; };