use std::fs; use std::path::{Path, PathBuf}; use std::time::{SystemTime, UNIX_EPOCH}; use pod_store::{PodMetadata, validate_pod_name}; use serde::{Deserialize, Serialize}; const MAX_DIAGNOSTICS: usize = 20; const MAX_LABEL_LEN: usize = 120; const MAX_PATH_LEN: usize = 512; #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct RuntimeDiagnostic { pub code: String, pub severity: String, pub message: String, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct HostSummary { pub host_id: String, pub label: String, pub kind: String, pub status: String, pub observed_at: String, pub last_seen_at: String, pub capabilities: HostCapabilitySummary, pub diagnostics: Vec, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct HostCapabilitySummary { pub local_pod_inspection: String, pub workspace_root: String, pub os: String, pub arch: String, pub max_workers: usize, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct WorkerSummary { pub worker_id: String, pub host_id: String, pub label: String, pub pod_name: String, #[serde(skip_serializing_if = "Option::is_none")] pub role: Option, #[serde(skip_serializing_if = "Option::is_none")] pub profile: Option, #[serde(skip_serializing_if = "Option::is_none")] pub workspace_root: Option, pub state: String, pub status: String, #[serde(skip_serializing_if = "Option::is_none")] pub last_seen_at: Option, pub implementation: WorkerImplementation, pub diagnostics: Vec, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct WorkerImplementation { pub kind: String, pub pod_name: String, } #[derive(Debug, Clone, PartialEq, Eq)] pub struct LocalRuntimeBridge { workspace_id: String, workspace_root: PathBuf, data_dir: Option, } impl LocalRuntimeBridge { pub fn new( workspace_id: impl Into, workspace_root: impl Into, data_dir: Option, ) -> Self { Self { workspace_id: workspace_id.into(), workspace_root: workspace_root.into(), data_dir, } } pub fn host_id(&self) -> String { stable_local_host_id(&self.workspace_id) } pub fn list_hosts(&self, limit: usize) -> (Vec, Vec) { if limit == 0 { return (Vec::new(), Vec::new()); } let observed_at = unix_timestamp(SystemTime::now()); let mut diagnostics = pod_root_diagnostics(self.pod_root().as_deref()); let local_pod_inspection = if diagnostics.is_empty() { "available" } else { "unavailable" } .to_string(); let status = if local_pod_inspection == "available" { "available" } else { "degraded" } .to_string(); truncate_diagnostics(&mut diagnostics); let host = HostSummary { host_id: self.host_id(), label: format!( "Local host ({})", self.workspace_root .file_name() .and_then(|name| name.to_str()) .unwrap_or("workspace") ), kind: "local_host".to_string(), status, observed_at: observed_at.clone(), last_seen_at: observed_at, capabilities: HostCapabilitySummary { local_pod_inspection, workspace_root: bounded_path(&self.workspace_root), os: std::env::consts::OS.to_string(), arch: std::env::consts::ARCH.to_string(), max_workers: limit.min(200), }, diagnostics: diagnostics.clone(), }; (vec![host], diagnostics) } pub fn list_workers(&self, limit: usize) -> (Vec, Vec) { let limit = limit.min(200); let Some(pod_root) = self.pod_root() else { return ( Vec::new(), vec![RuntimeDiagnostic::new( "local_yoi_data_dir_unavailable", "warning", "local Yoi data directory is not configured; local Pod workers cannot be inspected", )], ); }; let mut diagnostics = Vec::new(); if !pod_root.exists() { diagnostics.push(RuntimeDiagnostic::new( "local_pod_metadata_root_missing", "info", "local Pod metadata directory is absent; no local workers were discovered", )); return (Vec::new(), diagnostics); } let entries = match fs::read_dir(&pod_root) { Ok(entries) => entries, Err(error) => { diagnostics.push(RuntimeDiagnostic::new( "local_pod_metadata_root_unreadable", "warning", format!("local Pod metadata directory cannot be read: {error}"), )); return (Vec::new(), diagnostics); } }; let mut workers = Vec::new(); let mut candidate_names = Vec::new(); for entry in entries { let entry = match entry { Ok(entry) => entry, Err(error) => { push_diagnostic( &mut diagnostics, RuntimeDiagnostic::new( "local_pod_metadata_entry_unreadable", "warning", format!("one local Pod metadata entry cannot be read: {error}"), ), ); continue; } }; let file_type = match entry.file_type() { Ok(file_type) => file_type, Err(error) => { push_diagnostic( &mut diagnostics, RuntimeDiagnostic::new( "local_pod_metadata_entry_type_unreadable", "warning", format!("one local Pod metadata entry type cannot be read: {error}"), ), ); continue; } }; if !file_type.is_dir() { continue; } let Some(name) = entry.file_name().to_str().map(ToOwned::to_owned) else { push_diagnostic( &mut diagnostics, RuntimeDiagnostic::new( "local_pod_name_non_utf8", "warning", "one local Pod metadata directory has a non-UTF-8 name and was skipped", ), ); continue; }; if validate_pod_name(&name).is_err() || name.len() > MAX_LABEL_LEN { push_diagnostic( &mut diagnostics, RuntimeDiagnostic::new( "local_pod_name_invalid", "warning", "one local Pod metadata directory has an invalid or oversized name and was skipped", ), ); continue; } if entry.path().join("metadata.json").exists() { candidate_names.push(name); } } candidate_names.sort(); candidate_names.truncate(limit); for pod_name in candidate_names { match read_worker(&pod_root, &pod_name, &self.host_id()) { Ok(worker) => workers.push(worker), Err(diagnostic) => push_diagnostic(&mut diagnostics, diagnostic), } } truncate_diagnostics(&mut diagnostics); (workers, diagnostics) } fn pod_root(&self) -> Option { self.data_dir.as_ref().map(|data_dir| data_dir.join("pods")) } } impl RuntimeDiagnostic { pub fn new( code: impl Into, severity: impl Into, message: impl Into, ) -> Self { Self { code: truncate_string(&code.into(), MAX_LABEL_LEN), severity: truncate_string(&severity.into(), MAX_LABEL_LEN), message: truncate_string(&message.into(), 240), } } } fn read_worker( pod_root: &Path, pod_name: &str, host_id: &str, ) -> Result { let metadata_path = pod_root.join(pod_name).join("metadata.json"); let last_seen_at = metadata_path .metadata() .ok() .and_then(|metadata| metadata.modified().ok()) .map(unix_timestamp); let raw = fs::read_to_string(&metadata_path).map_err(|error| { RuntimeDiagnostic::new( "local_pod_metadata_unreadable", "warning", format!("local Pod metadata for `{pod_name}` cannot be read: {error}"), ) })?; let metadata: PodMetadata = serde_json::from_str(&raw).map_err(|error| { RuntimeDiagnostic::new( "local_pod_metadata_invalid", "warning", format!("local Pod metadata for `{pod_name}` is invalid: {error}"), ) })?; let mut worker_diagnostics = Vec::new(); if metadata.pod_name != pod_name { worker_diagnostics.push(RuntimeDiagnostic::new( "local_pod_metadata_name_mismatch", "warning", "metadata pod_name differed from its directory name; the directory name was used", )); } let state = if metadata.active.is_some() { "active" } else { "inactive" } .to_string(); let status = match metadata.active.as_ref() { Some(active) if active.segment_id.is_some() => "active_segment_known", Some(_) => "active_session_pending_segment", None => "metadata_only", } .to_string(); let (role, profile) = extract_safe_role_profile(metadata.resolved_manifest_snapshot.as_ref()); Ok(WorkerSummary { worker_id: format!("local-pod-{}", sanitize_identifier(pod_name, MAX_LABEL_LEN)), host_id: host_id.to_string(), label: truncate_string(pod_name, MAX_LABEL_LEN), pod_name: truncate_string(pod_name, MAX_LABEL_LEN), role, profile, workspace_root: metadata .workspace_root .as_ref() .map(|path| bounded_path(path)), state, status, last_seen_at, implementation: WorkerImplementation { kind: "local_pod".to_string(), pod_name: truncate_string(pod_name, MAX_LABEL_LEN), }, diagnostics: worker_diagnostics, }) } fn pod_root_diagnostics(pod_root: Option<&Path>) -> Vec { let Some(pod_root) = pod_root else { return vec![RuntimeDiagnostic::new( "local_yoi_data_dir_unavailable", "warning", "local Yoi data directory is not configured; local Pod inspection is unavailable", )]; }; if !pod_root.exists() { return vec![RuntimeDiagnostic::new( "local_pod_metadata_root_missing", "info", "local Pod metadata directory is absent; local Pod inspection found no workers", )]; } match fs::read_dir(pod_root) { Ok(_) => Vec::new(), Err(error) => vec![RuntimeDiagnostic::new( "local_pod_metadata_root_unreadable", "warning", format!("local Pod metadata directory cannot be read: {error}"), )], } } fn extract_safe_role_profile( snapshot: Option<&serde_json::Value>, ) -> (Option, Option) { let Some(snapshot) = snapshot else { return (None, None); }; let role = snapshot .get("role") .and_then(|value| value.as_str()) .and_then(safe_metadata_label); let profile = snapshot .get("profile") .and_then(|profile| { profile .get("name") .or_else(|| profile.get("selector")) .or_else(|| profile.get("id")) }) .and_then(|value| value.as_str()) .and_then(safe_metadata_label); (role, profile) } fn safe_metadata_label(value: &str) -> Option { if value.is_empty() || value.len() > MAX_LABEL_LEN || value.contains('/') || value.contains('\\') || value.contains('\0') || value.chars().any(|ch| ch.is_control()) { return None; } Some(value.to_string()) } fn stable_local_host_id(workspace_id: &str) -> String { format!("local-{}", sanitize_identifier(workspace_id, 96)) } fn sanitize_identifier(value: &str, max_len: usize) -> String { let mut output = String::new(); for ch in value.chars() { if output.len() >= max_len { break; } if ch.is_ascii_alphanumeric() { output.push(ch.to_ascii_lowercase()); } else if !output.ends_with('-') { output.push('-'); } } let output = output.trim_matches('-'); if output.is_empty() { "local".to_string() } else { output.to_string() } } fn bounded_path(path: &Path) -> String { truncate_string(&path.to_string_lossy(), MAX_PATH_LEN) } fn truncate_string(value: &str, max_len: usize) -> String { if value.len() <= max_len { return value.to_string(); } let mut end = max_len; while !value.is_char_boundary(end) { end -= 1; } value[..end].to_string() } fn push_diagnostic(diagnostics: &mut Vec, diagnostic: RuntimeDiagnostic) { if diagnostics.len() < MAX_DIAGNOSTICS { diagnostics.push(diagnostic); } } fn truncate_diagnostics(diagnostics: &mut Vec) { diagnostics.truncate(MAX_DIAGNOSTICS); } fn unix_timestamp(time: SystemTime) -> String { match time.duration_since(UNIX_EPOCH) { Ok(duration) => format!("unix:{}", duration.as_secs()), Err(_) => "unix:0".to_string(), } } #[cfg(test)] mod tests { use super::*; use serde_json::json; #[test] fn lists_workers_from_local_pod_metadata_without_exposing_snapshot_contents() { let temp = tempfile::tempdir().unwrap(); let data_dir = temp.path().join("data"); let worker_dir = data_dir.join("pods/coder"); fs::create_dir_all(&worker_dir).unwrap(); fs::write( worker_dir.join("metadata.json"), serde_json::to_vec_pretty(&json!({ "pod_name": "coder", "active": { "session_id": "018f4b8e-7c8a-7b41-8d66-111111111111", "segment_id": "018f4b8e-7c8a-7b41-8d66-222222222222" }, "workspace_root": "/workspace/project", "resolved_manifest_snapshot": { "role": "coder", "profile": { "name": "builtin-coder" }, "secret_token": "do-not-return", "system_prompt": "do-not-return" } })) .unwrap(), ) .unwrap(); let bridge = LocalRuntimeBridge::new("local:test", "/workspace/project", Some(data_dir)); let (workers, diagnostics) = bridge.list_workers(20); assert!(diagnostics.is_empty()); assert_eq!(workers.len(), 1); let worker = &workers[0]; assert_eq!(worker.worker_id, "local-pod-coder"); assert_eq!(worker.host_id, "local-local-test"); assert_eq!(worker.pod_name, "coder"); assert_eq!(worker.role.as_deref(), Some("coder")); assert_eq!(worker.profile.as_deref(), Some("builtin-coder")); assert_eq!(worker.workspace_root.as_deref(), Some("/workspace/project")); assert_eq!(worker.state, "active"); assert_eq!(worker.status, "active_segment_known"); assert_eq!(worker.implementation.kind, "local_pod"); assert_eq!(worker.implementation.pod_name, "coder"); let response_json = serde_json::to_string(&workers).unwrap(); assert!(!response_json.contains("do-not-return")); assert!(!response_json.contains("system_prompt")); assert!(!response_json.contains("session_id")); assert!(!response_json.contains("segment_id")); } #[test] fn missing_local_pod_data_dir_degrades_to_empty_workers_and_diagnostic() { let temp = tempfile::tempdir().unwrap(); let bridge = LocalRuntimeBridge::new( "local:test", temp.path(), Some(temp.path().join("missing-data")), ); let (workers, diagnostics) = bridge.list_workers(20); assert!(workers.is_empty()); assert_eq!(diagnostics[0].code, "local_pod_metadata_root_missing"); let (hosts, host_diagnostics) = bridge.list_hosts(20); assert_eq!(hosts.len(), 1); assert_eq!(hosts[0].status, "degraded"); assert_eq!(hosts[0].capabilities.local_pod_inspection, "unavailable"); assert_eq!(host_diagnostics[0].code, "local_pod_metadata_root_missing"); } #[test] fn worker_list_and_diagnostics_are_bounded() { let temp = tempfile::tempdir().unwrap(); let data_dir = temp.path().join("data"); let pod_root = data_dir.join("pods"); fs::create_dir_all(&pod_root).unwrap(); for index in 0..250 { let worker_dir = pod_root.join(format!("worker-{index:03}")); fs::create_dir_all(&worker_dir).unwrap(); fs::write( worker_dir.join("metadata.json"), serde_json::to_vec_pretty(&json!({ "pod_name": format!("worker-{index:03}"), "workspace_root": "/workspace/project" })) .unwrap(), ) .unwrap(); } let bridge = LocalRuntimeBridge::new("local:test", "/workspace/project", Some(data_dir)); let (workers, diagnostics) = bridge.list_workers(5); assert_eq!(workers.len(), 5); assert!(diagnostics.len() <= MAX_DIAGNOSTICS); assert_eq!(workers[0].pod_name, "worker-000"); assert_eq!(workers[4].pod_name, "worker-004"); } }