diff --git a/crates/pod/src/controller.rs b/crates/pod/src/controller.rs index fa40a558..32044460 100644 --- a/crates/pod/src/controller.rs +++ b/crates/pod/src/controller.rs @@ -8,9 +8,7 @@ use pod_store::PodMetadataStore; use session_store::Store; use tokio::sync::{broadcast, mpsc, oneshot}; -use crate::discovery::{ - PodDiscovery, attach_or_restore_pod_tool, inspect_pod_tool, list_visible_pods_tool, -}; +use crate::discovery::{PodDiscovery, list_pods_tool, restore_pod_tool}; use crate::ipc::alerter::Alerter; use crate::ipc::notify_buffer::NotifyBuffer; use crate::ipc::server::SocketServer; @@ -18,9 +16,7 @@ use crate::pod::{Pod, PodError, PodRunResult, SystemItemCommitter}; use crate::runtime::dir::RuntimeDir; use crate::segment_log_sink::SegmentLogSink; use crate::shared_state::PodSharedState; -use crate::spawn::comm_tools::{ - list_pods_tool, read_pod_output_tool, send_to_pod_tool, stop_pod_tool, -}; +use crate::spawn::comm_tools::{read_pod_output_tool, send_to_pod_tool, stop_pod_tool}; use crate::spawn::registry::SpawnedPodRegistry; use crate::spawn::tool::spawn_pod_tool; use protocol::{ @@ -563,12 +559,9 @@ where worker.register_tool(send_to_pod_tool(spawned_registry.clone())); worker.register_tool(read_pod_output_tool(spawned_registry.clone())); worker.register_tool(stop_pod_tool(spawned_registry.clone())); - worker.register_tool(list_pods_tool(spawned_registry.clone())); - let discovery = PodDiscovery::new(pod_store, spawner_name, runtime_base, pwd, spawned_registry); - worker.register_tool(list_visible_pods_tool(discovery.clone())); - worker.register_tool(inspect_pod_tool(discovery.clone())); - worker.register_tool(attach_or_restore_pod_tool(discovery)); + worker.register_tool(list_pods_tool(discovery.clone())); + worker.register_tool(restore_pod_tool(discovery)); pod.attach_tracker(tracker); fs_for_view } @@ -835,10 +828,10 @@ async fn controller_loop( break; } - Method::ListVisiblePods => match discovery.list_visible().await { + Method::ListPods => match discovery.list_visible().await { Ok(pods) => match serde_json::to_value(pods) { Ok(pods) => { - let _ = event_tx.send(Event::VisiblePods { pods }); + let _ = event_tx.send(Event::PodsListed { pods }); } Err(error) => { let _ = event_tx.send(Event::Error { @@ -855,35 +848,15 @@ async fn controller_loop( } }, - Method::InspectPod { name } => match discovery.inspect(&name).await { - Ok(pod) => match serde_json::to_value(pod) { - Ok(pod) => { - let _ = event_tx.send(Event::PodInspection { pod }); - } - Err(error) => { - let _ = event_tx.send(Event::Error { - code: ErrorCode::Internal, - message: format!("serialize pod inspection: {error}"), - }); - } - }, - Err(error) => { - let _ = event_tx.send(Event::Error { - code: ErrorCode::InvalidRequest, - message: error.to_string(), - }); - } - }, - - Method::AttachOrRestorePod { name } => match discovery.attach_or_restore(&name).await { + Method::RestorePod { name } => match discovery.restore(&name).await { Ok(result) => match serde_json::to_value(result) { Ok(result) => { - let _ = event_tx.send(Event::PodAttachRestore { result }); + let _ = event_tx.send(Event::PodRestored { result }); } Err(error) => { let _ = event_tx.send(Event::Error { code: ErrorCode::Internal, - message: format!("serialize pod attach/restore result: {error}"), + message: format!("serialize pod restore result: {error}"), }); } }, @@ -1096,11 +1069,7 @@ where notify_buffer.push_notify(message); } Some(Method::ListCompletions { .. }) => {} - Some( - Method::ListVisiblePods - | Method::InspectPod { .. } - | Method::AttachOrRestorePod { .. }, - ) => { + Some(Method::ListPods | Method::RestorePod { .. }) => { let _ = event_tx.send(Event::Error { code: ErrorCode::AlreadyRunning, message: "Pod discovery requests are only handled while the Pod is idle or paused" diff --git a/crates/pod/src/discovery.rs b/crates/pod/src/discovery.rs index acceaa33..e8b617dc 100644 --- a/crates/pod/src/discovery.rs +++ b/crates/pod/src/discovery.rs @@ -1,4 +1,4 @@ -//! Pod-state-backed discovery and restore/attach tools. +//! Pod-state-backed discovery and restore tools. //! //! This surface deliberately does not enumerate every Pod on the host. The //! listing path starts from the caller's visibility set (the caller itself and @@ -15,6 +15,7 @@ use std::time::Duration; use async_trait::async_trait; use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput}; +use manifest::{Permission, ScopeRule}; use pod_store::{PodActiveSegmentRef, PodMetadata, PodMetadataStore}; use protocol::stream::JsonLineReader; use protocol::{Event, PodStatus}; @@ -24,6 +25,7 @@ use session_store::{SegmentId, SessionId}; use tokio::net::UnixStream; use tokio::process::Command; +use crate::runtime::dir::SpawnedPodRecord; use crate::runtime::pod_registry; use crate::spawn::registry::SpawnedPodRegistry; @@ -97,26 +99,23 @@ where } } - pub async fn attach_or_restore( - &self, - pod_name: &str, - ) -> Result { - match self.plan_attach_or_restore(pod_name).await? { - AttachRestorePlan::Attach { + pub async fn restore(&self, pod_name: &str) -> Result { + match self.plan_restore(pod_name).await? { + RestorePlan::AlreadyLive { pod_name, socket_path, status, - } => Ok(AttachRestoreResult::Attached { + } => Ok(RestoreResult::AlreadyLive { pod_name, socket_path, status, }), - AttachRestorePlan::Restore { + RestorePlan::Restore { pod_name, socket_path, } => { self.spawn_restore_process(&pod_name, &socket_path).await?; - Ok(AttachRestoreResult::Restored { + Ok(RestoreResult::Restored { pod_name, socket_path, }) @@ -124,13 +123,10 @@ where } } - pub async fn plan_attach_or_restore( - &self, - pod_name: &str, - ) -> Result { + pub async fn plan_restore(&self, pod_name: &str) -> Result { let detail = self.inspect(pod_name).await?; if detail.live.reachable { - return Ok(AttachRestorePlan::Attach { + return Ok(RestorePlan::AlreadyLive { pod_name: pod_name.to_string(), socket_path: detail.live.socket_path, status: detail.live.status, @@ -153,7 +149,7 @@ where if let Some(lock) = lookup_segment_lock(segment_id)? { let lock_live = probe_socket(&lock.socket).await; return if lock_live.reachable { - Ok(AttachRestorePlan::Attach { + Ok(RestorePlan::AlreadyLive { pod_name: lock.pod_name, socket_path: lock.socket, status: lock_live.status, @@ -169,7 +165,7 @@ where }; } - Ok(AttachRestorePlan::Restore { + Ok(RestorePlan::Restore { pod_name: pod_name.to_string(), socket_path: self.default_socket_path(pod_name), }) @@ -178,6 +174,7 @@ where async fn visibility(&self) -> Result { let mut visible = BTreeMap::new(); let mut child_sockets = BTreeMap::new(); + let mut comm_registry = BTreeMap::new(); visible.insert(self.self_pod_name.clone(), VisibilityReason::SelfPod); // Durable parent -> child state is the primary visibility source. @@ -186,7 +183,8 @@ where visible .entry(child.pod_name.clone()) .or_insert(VisibilityReason::SpawnedChild); - child_sockets.insert(child.pod_name, child.socket_path); + child_sockets.insert(child.pod_name.clone(), child.socket_path.clone()); + comm_registry.insert(child.pod_name.clone(), comm_info_from_spawned_child(&child)); } } @@ -197,12 +195,17 @@ where visible .entry(record.pod_name.clone()) .or_insert(VisibilityReason::SpawnedChild); - child_sockets.insert(record.pod_name, record.socket_path); + child_sockets.insert(record.pod_name.clone(), record.socket_path.clone()); + comm_registry.insert( + record.pod_name.clone(), + CommRegistryInfo::from_record(&record), + ); } Ok(VisibilitySet { visible, child_sockets, + comm_registry, }) } @@ -222,6 +225,7 @@ where active: detail.active, live: detail.live, restore: detail.restore, + comm_registry: detail.comm_registry, spawned_children: detail.spawned_children, error: None, } @@ -233,6 +237,7 @@ where active: None, live: self.live_for_name(pod_name, None).await, restore: RestoreInfo::not_possible("pod state missing"), + comm_registry: visibility.comm_info_for(pod_name), spawned_children: SpawnedChildrenSummary::default(), error: None, }, @@ -243,6 +248,7 @@ where active: None, live: self.live_for_name(pod_name, None).await, restore: RestoreInfo::not_possible("pod state is unreadable"), + comm_registry: visibility.comm_info_for(pod_name), spawned_children: SpawnedChildrenSummary::default(), error: Some(error.to_string()), }, @@ -268,6 +274,7 @@ where active: metadata.active.map(ActivePointer::from), live, restore, + comm_registry: visibility.comm_info_for(&metadata.pod_name), spawned_children, } } @@ -424,6 +431,33 @@ pub struct SpawnedChildrenSummary { pub unreachable: usize, } +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct CommRegistryInfo { + pub registered: bool, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub socket_path: Option, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub scope_delegated: Vec, +} + +impl CommRegistryInfo { + fn missing() -> Self { + Self { + registered: false, + socket_path: None, + scope_delegated: Vec::new(), + } + } + + fn from_record(record: &SpawnedPodRecord) -> Self { + Self { + registered: true, + socket_path: Some(record.socket_path.clone()), + scope_delegated: record.scope_delegated.clone(), + } + } +} + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct VisiblePodItem { pub pod_name: String, @@ -433,6 +467,7 @@ pub struct VisiblePodItem { pub active: Option, pub live: LiveInfo, pub restore: RestoreInfo, + pub comm_registry: CommRegistryInfo, pub spawned_children: SpawnedChildrenSummary, #[serde(default, skip_serializing_if = "Option::is_none")] pub error: Option, @@ -446,13 +481,14 @@ pub struct PodDetail { pub active: Option, pub live: LiveInfo, pub restore: RestoreInfo, + pub comm_registry: CommRegistryInfo, pub spawned_children: SpawnedChildrenSummary, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(tag = "action", rename_all = "snake_case")] -pub enum AttachRestorePlan { - Attach { +pub enum RestorePlan { + AlreadyLive { pod_name: String, socket_path: PathBuf, #[serde(default, skip_serializing_if = "Option::is_none")] @@ -466,8 +502,8 @@ pub enum AttachRestorePlan { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(tag = "action", rename_all = "snake_case")] -pub enum AttachRestoreResult { - Attached { +pub enum RestoreResult { + AlreadyLive { pod_name: String, socket_path: PathBuf, #[serde(default, skip_serializing_if = "Option::is_none")] @@ -514,6 +550,7 @@ pub enum PodDiscoveryError { struct VisibilitySet { visible: BTreeMap, child_sockets: BTreeMap, + comm_registry: BTreeMap, } impl VisibilitySet { @@ -527,6 +564,37 @@ impl VisibilitySet { fn child_socket_for(&self, pod_name: &str) -> Option { self.child_sockets.get(pod_name).cloned() } + + fn comm_info_for(&self, pod_name: &str) -> CommRegistryInfo { + self.comm_registry + .get(pod_name) + .cloned() + .unwrap_or_else(CommRegistryInfo::missing) + } +} + +fn comm_info_from_spawned_child(child: &pod_store::PodSpawnedChild) -> CommRegistryInfo { + let scope_delegated = child + .scope_delegated + .iter() + .filter_map(|rule| { + let permission = match rule.permission.as_str() { + "read" => Permission::Read, + "write" => Permission::Write, + _ => return None, + }; + Some(ScopeRule { + target: rule.target.clone(), + permission, + recursive: rule.recursive, + }) + }) + .collect(); + CommRegistryInfo { + registered: true, + socket_path: Some(child.socket_path.clone()), + scope_delegated, + } } async fn summarize_spawned_children( @@ -604,16 +672,16 @@ fn resolve_pod_command() -> PathBuf { #[derive(Debug, Deserialize, JsonSchema)] struct PodNameInput { - /// Pod name to inspect, attach, or restore. + /// Pod name to restore. name: String, } -struct ListVisiblePodsTool { +struct ListPodsTool { discovery: PodDiscovery, } #[async_trait] -impl Tool for ListVisiblePodsTool +impl Tool for ListPodsTool where St: PodMetadataStore + Clone + Send + Sync + 'static, { @@ -631,53 +699,28 @@ where } } -struct InspectPodTool { +struct RestorePodTool { discovery: PodDiscovery, } #[async_trait] -impl Tool for InspectPodTool +impl Tool for RestorePodTool where St: PodMetadataStore + Clone + Send + Sync + 'static, { async fn execute(&self, input_json: &str) -> Result { let input: PodNameInput = serde_json::from_str(input_json) - .map_err(|e| ToolError::InvalidArgument(format!("invalid InspectPod input: {e}")))?; - let detail = self - .discovery - .inspect(&input.name) - .await - .map_err(discovery_error_to_tool_error)?; - Ok(ToolOutput { - summary: format!("pod `{}` inspected", detail.pod_name), - content: Some(json_content(&detail)?), - }) - } -} - -struct AttachOrRestorePodTool { - discovery: PodDiscovery, -} - -#[async_trait] -impl Tool for AttachOrRestorePodTool -where - St: PodMetadataStore + Clone + Send + Sync + 'static, -{ - async fn execute(&self, input_json: &str) -> Result { - let input: PodNameInput = serde_json::from_str(input_json).map_err(|e| { - ToolError::InvalidArgument(format!("invalid AttachOrRestorePod input: {e}")) - })?; + .map_err(|e| ToolError::InvalidArgument(format!("invalid RestorePod input: {e}")))?; let result = self .discovery - .attach_or_restore(&input.name) + .restore(&input.name) .await .map_err(discovery_error_to_tool_error)?; let summary = match &result { - AttachRestoreResult::Attached { pod_name, .. } => { - format!("pod `{pod_name}` is live; attached to existing socket") + RestoreResult::AlreadyLive { pod_name, .. } => { + format!("pod `{pod_name}` is already live") } - AttachRestoreResult::Restored { pod_name, .. } => { + RestoreResult::Restored { pod_name, .. } => { format!("pod `{pod_name}` restored from pod state") } }; @@ -688,55 +731,38 @@ where } } -pub fn list_visible_pods_tool(discovery: PodDiscovery) -> ToolDefinition +pub fn list_pods_tool(discovery: PodDiscovery) -> ToolDefinition where St: PodMetadataStore + Clone + Send + Sync + 'static, { Arc::new(move || { - let meta = ToolMeta::new("ListVisiblePods") + let meta = ToolMeta::new("ListPods") .description( - "List Pod state entries visible to this Pod. This is state-backed and does not expose the host-wide Pod universe.", + "List Pods visible to this Pod from durable Pod state and the spawned-child registry. This does not expose the host-wide Pod universe.", ) .input_schema(serde_json::json!({ "type": "object", "properties": {}, "additionalProperties": false, })); - let tool: Arc = Arc::new(ListVisiblePodsTool { + let tool: Arc = Arc::new(ListPodsTool { discovery: discovery.clone(), }); (meta, tool) }) } -pub fn inspect_pod_tool(discovery: PodDiscovery) -> ToolDefinition +pub fn restore_pod_tool(discovery: PodDiscovery) -> ToolDefinition where St: PodMetadataStore + Clone + Send + Sync + 'static, { Arc::new(move || { - let meta = ToolMeta::new("InspectPod") + let meta = ToolMeta::new("RestorePod") .description( - "Inspect one visible Pod by name from durable Pod state, distinguishing missing state from not-visible Pods.", + "Restore a visible stopped/restorable Pod, or report that a visible Pod is already live. Missing state is an error.", ) .input_schema(serde_json::to_value(schemars::schema_for!(PodNameInput)).unwrap()); - let tool: Arc = Arc::new(InspectPodTool { - discovery: discovery.clone(), - }); - (meta, tool) - }) -} - -pub fn attach_or_restore_pod_tool(discovery: PodDiscovery) -> ToolDefinition -where - St: PodMetadataStore + Clone + Send + Sync + 'static, -{ - Arc::new(move || { - let meta = ToolMeta::new("AttachOrRestorePod") - .description( - "Attach to a visible live Pod, or restore it from Pod state when no live socket is reachable. Missing state is an error.", - ) - .input_schema(serde_json::to_value(schemars::schema_for!(PodNameInput)).unwrap()); - let tool: Arc = Arc::new(AttachOrRestorePodTool { + let tool: Arc = Arc::new(RestorePodTool { discovery: discovery.clone(), }); (meta, tool) @@ -781,7 +807,7 @@ mod tests { static ENV_LOCK: Mutex<()> = Mutex::new(()); #[tokio::test(flavor = "current_thread")] - async fn state_backed_visibility_and_attach_restore_planning() { + async fn state_backed_visibility_and_restore_planning() { let _env = ENV_LOCK.lock().unwrap(); let root = TempDir::new().unwrap(); let store_dir = root.path().join("store"); @@ -873,6 +899,13 @@ mod tests { registry, ); + let list_tool_def = list_pods_tool(discovery.clone()); + let (list_meta, _) = list_tool_def(); + assert_eq!(list_meta.name, "ListPods"); + let restore_tool_def = restore_pod_tool(discovery.clone()); + let (restore_meta, _) = restore_tool_def(); + assert_eq!(restore_meta.name, "RestorePod"); + let list = discovery.list_visible().await.unwrap(); let names: Vec<_> = list.iter().map(|p| p.pod_name.as_str()).collect(); assert_eq!( @@ -912,25 +945,16 @@ mod tests { missing_err, PodDiscoveryError::StateMissing { .. } )); - let hidden_restore_err = discovery - .plan_attach_or_restore("hidden") - .await - .unwrap_err(); + let hidden_restore_err = discovery.plan_restore("hidden").await.unwrap_err(); assert!(matches!( hidden_restore_err, PodDiscoveryError::NotVisible { .. } )); - let attach_plan = discovery - .plan_attach_or_restore("child-live") - .await - .unwrap(); - assert!(matches!(attach_plan, AttachRestorePlan::Attach { .. })); - let restore_plan = discovery - .plan_attach_or_restore("child-stale") - .await - .unwrap(); - assert!(matches!(restore_plan, AttachRestorePlan::Restore { .. })); + let live_plan = discovery.plan_restore("child-live").await.unwrap(); + assert!(matches!(live_plan, RestorePlan::AlreadyLive { .. })); + let restore_plan = discovery.plan_restore("child-stale").await.unwrap(); + assert!(matches!(restore_plan, RestorePlan::Restore { .. })); let lock_socket = runtime_base.join("lock-owner.sock"); let _guard = pod_registry::install_top_level( @@ -945,10 +969,7 @@ mod tests { active_child_segment, ) .unwrap(); - let locked_err = discovery - .plan_attach_or_restore("child-stale") - .await - .unwrap_err(); + let locked_err = discovery.plan_restore("child-stale").await.unwrap_err(); assert!(matches!(locked_err, PodDiscoveryError::LockConflict { .. })); live_listener.abort(); diff --git a/crates/pod/src/spawn/comm_tools.rs b/crates/pod/src/spawn/comm_tools.rs index b3217980..44fadb5b 100644 --- a/crates/pod/src/spawn/comm_tools.rs +++ b/crates/pod/src/spawn/comm_tools.rs @@ -1,7 +1,7 @@ //! Pod-to-Pod communication tools. //! -//! Four tools in one module — `SendToPod`, `ReadPodOutput`, `StopPod`, -//! `ListPods` — all built on the same `SpawnedPodRegistry` handed in by +//! Three tools in one module: `SendToPod`, `ReadPodOutput`, `StopPod`, +//! all built on the same `SpawnedPodRegistry` handed in by //! the controller. Each operation is request-response: connect to the //! target's Unix socket, perform one method exchange, disconnect. //! @@ -23,7 +23,6 @@ use session_store::LogEntry; use tokio::net::UnixStream; use crate::runtime::dir::SpawnedPodRecord; -use crate::runtime::pod_registry::{self, LockFileGuard}; use crate::spawn::registry::SpawnedPodRegistry; /// Timeout applied to each socket-level operation — connect, write, @@ -244,76 +243,6 @@ pub fn stop_pod_tool(registry: Arc) -> ToolDefinition { }) } -// --------------------------------------------------------------------------- -// ListPods -// --------------------------------------------------------------------------- - -const LIST_PODS_DESCRIPTION: &str = "List all Pods spawned by this Pod along with their reachability \ -status (`alive` / `stopped`) and the scope each was granted."; - -#[derive(Debug, Deserialize, schemars::JsonSchema)] -struct EmptyInput {} - -struct ListPodsTool { - registry: Arc, -} - -#[async_trait] -impl Tool for ListPodsTool { - async fn execute(&self, _input_json: &str) -> Result { - let records = self.registry.list().await; - if records.is_empty() { - return Ok(ToolOutput { - summary: "no spawned pods".into(), - content: None, - }); - } - - let mut lines: Vec = Vec::with_capacity(records.len()); - let mut stale_names: Vec = Vec::new(); - for record in &records { - let alive = is_reachable(&record.socket_path).await; - let status = if alive { "alive" } else { "stopped" }; - let scope = summarize_scope(record); - lines.push(format!("{} [{status}] scope={scope}", record.pod_name)); - if !alive { - stale_names.push(record.pod_name.clone()); - } - } - - // Trigger stale reclaim on unreachable pods so the lock file's - // allocation table doesn't keep growing indefinitely when - // children crash without a clean exit path. - if !stale_names.is_empty() { - if let Ok(lock_path) = pod_registry::default_registry_path() - && let Ok(mut guard) = LockFileGuard::open(&lock_path) - { - pod_registry::reclaim_stale(&mut guard); - } - } - - let summary = format!("{} pod(s) known", records.len()); - Ok(ToolOutput { - summary, - content: Some(lines.join("\n")), - }) - } -} - -pub fn list_pods_tool(registry: Arc) -> ToolDefinition { - Arc::new(move || { - let schema = schemars::schema_for!(EmptyInput); - let schema_value = serde_json::to_value(schema).unwrap_or(serde_json::json!({})); - let meta = ToolMeta::new("ListPods") - .description(LIST_PODS_DESCRIPTION) - .input_schema(schema_value); - let tool: Arc = Arc::new(ListPodsTool { - registry: registry.clone(), - }); - (meta, tool) - }) -} - // --------------------------------------------------------------------------- // Helpers // --------------------------------------------------------------------------- @@ -322,6 +251,29 @@ fn unknown_pod_err(name: &str) -> ToolError { ToolError::InvalidArgument(format!("no spawned pod named `{name}`")) } +fn summarize_scope(record: &SpawnedPodRecord) -> String { + if record.scope_delegated.is_empty() { + return "(none)".into(); + } + let parts: Vec = record + .scope_delegated + .iter() + .map(|rule| { + let perm = match rule.permission { + manifest::Permission::Read => "read", + manifest::Permission::Write => "write", + }; + let recursive = if rule.recursive { + "" + } else { + " [non-recursive]" + }; + format!("{perm}:{}{recursive}", rule.target.display()) + }) + .collect(); + parts.join(", ") +} + /// Connect with a timeout, drain the server's connect-time snapshot, /// write one `Method` line, flush, and close. /// @@ -487,14 +439,6 @@ async fn fetch_history(socket: &Path) -> std::io::Result> } } -/// Probe-connect test. Connection accepted within timeout → alive. -async fn is_reachable(socket: &Path) -> bool { - tokio::time::timeout(SOCKET_OP_TIMEOUT, UnixStream::connect(socket)) - .await - .map(|r| r.is_ok()) - .unwrap_or(false) -} - fn extract_assistant_text(entries: &[serde_json::Value]) -> String { let mut out = String::new(); for value in entries { @@ -536,25 +480,6 @@ fn push_assistant_text(out: &mut String, logged: session_store::LoggedItem) { } } -fn summarize_scope(record: &SpawnedPodRecord) -> String { - if record.scope_delegated.is_empty() { - return "(none)".into(); - } - let parts: Vec = record - .scope_delegated - .iter() - .map(|r| { - let perm = match r.permission { - manifest::Permission::Read => "read", - manifest::Permission::Write => "write", - }; - let tag = if r.recursive { "" } else { " [non-recursive]" }; - format!("{perm}:{}{tag}", r.target.display()) - }) - .collect(); - parts.join(", ") -} - #[cfg(test)] mod tests { use super::*; diff --git a/crates/pod/src/spawn/registry.rs b/crates/pod/src/spawn/registry.rs index e66a2602..307100ed 100644 --- a/crates/pod/src/spawn/registry.rs +++ b/crates/pod/src/spawn/registry.rs @@ -1,9 +1,10 @@ //! Shared registry of Pods spawned by this Pod. //! //! `SpawnPod` writes here; the pod-comm tools (`SendToPod`, -//! `ReadPodOutput`, `StopPod`, `ListPods`) read and mutate the same -//! instance. Runtime write-through still materialises `spawned_pods.json`, -//! but durable state lives in the spawner's Pod metadata. +//! `ReadPodOutput`, `StopPod`) read and mutate the same instance. Discovery +//! tools consult this registry together with durable Pod state. Runtime +//! write-through still materialises `spawned_pods.json`, but durable state lives +//! in the spawner's Pod metadata. //! //! `ReadPodOutput` additionally owns a per-spawned-pod cursor here so //! two consecutive reads yield only new assistant text. The cursor is diff --git a/crates/pod/src/spawn/tool.rs b/crates/pod/src/spawn/tool.rs index 925bce85..086bc1cd 100644 --- a/crates/pod/src/spawn/tool.rs +++ b/crates/pod/src/spawn/tool.rs @@ -220,8 +220,8 @@ pub struct SpawnPodTool { /// override it. Defaults to the spawner's pwd — see module docs. spawner_pwd: PathBuf, /// Shared registry of spawned children, also used by the - /// pod-comm tools (`SendToPod` / `ReadPodOutput` / `StopPod` / - /// `ListPods`). Writes the list to runtime and durable Pod state on + /// pod-comm tools (`SendToPod` / `ReadPodOutput` / `StopPod`) and by + /// Pod discovery. Writes the list to runtime and durable Pod state on /// each add. registry: Arc, /// THIS Pod's own parent-callback socket, if any. After a diff --git a/crates/pod/tests/pod_comm_tools_test.rs b/crates/pod/tests/pod_comm_tools_test.rs index 95984124..4daf0321 100644 --- a/crates/pod/tests/pod_comm_tools_test.rs +++ b/crates/pod/tests/pod_comm_tools_test.rs @@ -1,5 +1,5 @@ //! Integration tests for the pod-comm tools (`SendToPod`, -//! `ReadPodOutput`, `StopPod`, `ListPods`). +//! `ReadPodOutput`, `StopPod`). //! //! The real child Pod binary is not started. Instead each test stands //! up a mock `UnixListener` that speaks the socket protocol directly: @@ -16,9 +16,7 @@ use llm_worker::tool::ToolOutput; use manifest::{Permission, Scope, ScopeRule, SharedScope}; use pod::runtime::dir::{RuntimeDir, SpawnedPodRecord}; use pod::runtime::pod_registry::{self, LockFileGuard}; -use pod::spawn::comm_tools::{ - list_pods_tool, read_pod_output_tool, send_to_pod_tool, stop_pod_tool, -}; +use pod::spawn::comm_tools::{read_pod_output_tool, send_to_pod_tool, stop_pod_tool}; use pod::spawn::registry::SpawnedPodRegistry; use pod_store::{CombinedStore, FsPodStore, PodMetadataStore}; use protocol::stream::{JsonLineReader, JsonLineWriter}; @@ -544,13 +542,6 @@ async fn restored_registry_uses_pod_state_without_runtime_file() { .await .unwrap(); - let def = list_pods_tool(restored.clone()); - let (_meta, tool) = def(); - let output: ToolOutput = tool.execute("{}").await.unwrap(); - assert!(output.summary.contains("1 pod"), "{}", output.summary); - let body = output.content.expect("restored ListPods should list child"); - assert!(body.contains("child [alive]"), "body: {body}"); - let def = send_to_pod_tool(restored.clone()); let (_meta, tool) = def(); let input = json!({ "name": "child", "message": "after restart" }).to_string(); @@ -718,51 +709,3 @@ async fn load_from_pod_state_reclaims_missing_child_scope_and_records_history() let runtime_records: Vec = serde_json::from_str(&runtime_contents).unwrap(); assert!(runtime_records.is_empty()); } - -// --------------------------------------------------------------------------- -// ListPods -// --------------------------------------------------------------------------- - -#[tokio::test] -async fn list_pods_reports_alive_and_stopped() { - let (tmp, registry, _rd) = setup_registry().await; - - // One child is reachable… - let (live_socket, listener) = bind_mock_socket(tmp.path(), "alive").await; - // Keep the listener alive by moving it into a task that never exits. - let _accept = tokio::spawn(async move { - loop { - let Ok((stream, _)) = listener.accept().await else { - return; - }; - drop(stream); - } - }); - register_child(®istry, "alive", &live_socket, tmp.path()).await; - - // …the other is not. - let dead_socket = tmp.path().join("dead.sock"); - register_child(®istry, "dead", &dead_socket, tmp.path()).await; - - let def = list_pods_tool(registry); - let (_meta, tool) = def(); - let output: ToolOutput = tool.execute("{}").await.unwrap(); - assert!(output.summary.contains("2 pod"), "{}", output.summary); - let body = output.content.expect("list_pods should populate content"); - assert!(body.contains("alive [alive]"), "body: {body}"); - assert!(body.contains("dead [stopped]"), "body: {body}"); -} - -#[tokio::test] -async fn list_pods_empty_when_nothing_registered() { - let (_tmp, registry, _rd) = setup_registry().await; - let def = list_pods_tool(registry); - let (_meta, tool) = def(); - let output: ToolOutput = tool.execute("{}").await.unwrap(); - assert!( - output.summary.contains("no spawned pods"), - "{}", - output.summary - ); - assert!(output.content.is_none()); -} diff --git a/crates/protocol/src/lib.rs b/crates/protocol/src/lib.rs index 3cbc1be9..cce3da53 100644 --- a/crates/protocol/src/lib.rs +++ b/crates/protocol/src/lib.rs @@ -56,16 +56,12 @@ pub enum Method { kind: CompletionKind, prefix: String, }, - /// List Pods visible to this Pod from durable Pod state. This is not a - /// host-wide Pod universe query. - ListVisiblePods, - /// Inspect one Pod by name if its state exists and it is visible to this Pod. - InspectPod { - name: String, - }, - /// Attach to a visible live Pod, or restore it from durable Pod state when - /// it is not live. Missing state and not-visible state are distinct errors. - AttachOrRestorePod { + /// List Pods visible to this Pod from durable Pod state and the spawned-child + /// registry. This is not a host-wide Pod universe query. + ListPods, + /// Restore a visible stopped/restorable Pod, or report that it is already + /// live. Missing state and not-visible state are distinct errors. + RestorePod { name: String, }, } @@ -474,18 +470,14 @@ pub enum Event { input: Vec, summary: RewindSummary, }, - /// Reply to `Method::ListVisiblePods`. Payload is a stable JSON value so - /// the Pod crate can evolve discovery fields without introducing a protocol + /// Reply to `Method::ListPods`. Payload is a stable JSON value so the Pod + /// crate can evolve discovery fields without introducing a protocol /// dependency on session-store. - VisiblePods { + PodsListed { pods: serde_json::Value, }, - /// Reply to `Method::InspectPod`. - PodInspection { - pod: serde_json::Value, - }, - /// Reply to `Method::AttachOrRestorePod`. - PodAttachRestore { + /// Reply to `Method::RestorePod`. + PodRestored { result: serde_json::Value, }, Alert(Alert), @@ -1469,11 +1461,8 @@ mod tests { #[test] fn pod_discovery_methods_roundtrip() { let methods = [ - Method::ListVisiblePods, - Method::InspectPod { - name: "child".into(), - }, - Method::AttachOrRestorePod { + Method::ListPods, + Method::RestorePod { name: "child".into(), }, ]; @@ -1481,9 +1470,8 @@ mod tests { let json = serde_json::to_string(&method).unwrap(); let decoded: Method = serde_json::from_str(&json).unwrap(); match (decoded, method) { - (Method::ListVisiblePods, Method::ListVisiblePods) - | (Method::InspectPod { .. }, Method::InspectPod { .. }) - | (Method::AttachOrRestorePod { .. }, Method::AttachOrRestorePod { .. }) => {} + (Method::ListPods, Method::ListPods) + | (Method::RestorePod { .. }, Method::RestorePod { .. }) => {} (decoded, expected) => panic!("decoded {decoded:?}, expected {expected:?}"), } } @@ -1492,30 +1480,23 @@ mod tests { #[test] fn pod_discovery_events_roundtrip() { let events = [ - Event::VisiblePods { + Event::PodsListed { pods: serde_json::json!([{ "pod_name": "child" }]), }, - Event::PodInspection { - pod: serde_json::json!({ "pod_name": "child" }), - }, - Event::PodAttachRestore { - result: serde_json::json!({ "action": "attach" }), + Event::PodRestored { + result: serde_json::json!({ "action": "already_live" }), }, ]; for event in events { let json = serde_json::to_string(&event).unwrap(); let decoded: Event = serde_json::from_str(&json).unwrap(); match (decoded, event) { - (Event::VisiblePods { pods }, Event::VisiblePods { pods: expected }) => { + (Event::PodsListed { pods }, Event::PodsListed { pods: expected }) => { assert_eq!(pods, expected) } - (Event::PodInspection { pod }, Event::PodInspection { pod: expected }) => { - assert_eq!(pod, expected) + (Event::PodRestored { result }, Event::PodRestored { result: expected }) => { + assert_eq!(result, expected) } - ( - Event::PodAttachRestore { result }, - Event::PodAttachRestore { result: expected }, - ) => assert_eq!(result, expected), (decoded, expected) => panic!("decoded {decoded:?}, expected {expected:?}"), } } diff --git a/crates/tui/src/app.rs b/crates/tui/src/app.rs index b2898fa1..1489953a 100644 --- a/crates/tui/src/app.rs +++ b/crates/tui/src/app.rs @@ -1204,9 +1204,7 @@ impl App { message, }); } - Event::VisiblePods { .. } - | Event::PodInspection { .. } - | Event::PodAttachRestore { .. } => {} + Event::PodsListed { .. } | Event::PodRestored { .. } => {} Event::Shutdown => { self.mark_orphan_compacts_incomplete(); self.quit = true; diff --git a/crates/tui/src/picker.rs b/crates/tui/src/picker.rs index 6f64fc04..7e60242c 100644 --- a/crates/tui/src/picker.rs +++ b/crates/tui/src/picker.rs @@ -262,7 +262,7 @@ fn draw(f: &mut Frame<'_>, list: &PodList) { Span::styled("[↑/↓]", Style::default().fg(Color::DarkGray)), Span::raw(" select "), Span::styled("[enter]", Style::default().fg(Color::Green)), - Span::raw(" attach/restore "), + Span::raw(" open/restore "), Span::styled("[esc]", Style::default().fg(Color::Yellow)), Span::raw(" cancel"), ])), diff --git a/docs/architecture.md b/docs/architecture.md index 61cb769c..d0dfde80 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -64,11 +64,11 @@ Pod の制御・監視に使う JSONL ベースのメッセージプロトコル - context/session 制御: `Compact`, `ListRewindTargets`, `RewindTo` - typed injection / child lifecycle: `Notify`, `PodEvent` - client 補助: `ListCompletions` - - Pod visibility / attach: `ListVisiblePods`, `InspectPod`, `AttachOrRestorePod` + - Pod visibility / restore: `ListPods`, `RestorePod` - **Pod → Client (`Event`)** - accepted input / history seed: `Snapshot`, `UserMessage`, `SystemItem`, `SegmentRotated` - generation stream: `TurnStart`, `TurnEnd`, `LlmCallStart`, `LlmCallEnd`, retry/continuation events, `Text*`, `Thinking*`, `ToolCall*`, `ToolResult`, `Usage`, `RunEnd` - - control replies: completions, rewind, visible Pod / inspect / attach results + - control replies: completions, rewind, visible Pod list / restore results - operational status: `Status`, `Alert`, `MemoryWorker`, `Compact*`, `Error`, `Shutdown` - リクエストとレスポンスの紐付けを一般化した RPC にはしない。多くの状態は broadcast event と Pod status で観測する - 一部の reply(例: completions)は要求 socket にだけ返る。broadcast event と request-local reply の違いは enum variant のコメントを正とする @@ -146,8 +146,7 @@ Pod が操作できるファイルパスの制御。 | File / shell | `Read`, `Write`, `Edit`, `Glob`, `Grep`, `Bash` | workspace ファイル操作と shell 実行。file tools は `ScopedFs` と read-before-edit tracker を通る。`Bash` は permission policy と出力退避で制御する | | Task | `TaskCreate`, `TaskUpdate`, `TaskList`, `TaskGet` | セッション内の短期 task 状態管理 | | Memory / Knowledge | `MemoryQuery`, `MemoryRead`, `MemoryWrite`, `MemoryEdit`, `MemoryDelete`, `KnowledgeQuery` | manifest の memory 設定が有効な時に登録される durable memory / knowledge 操作 | -| Pod orchestration | `SpawnPod`, `SendToPod`, `ReadPodOutput`, `StopPod`, `ListPods` | child Pod の起動・通信・停止・一覧 | -| Visible Pod state | `ListVisiblePods`, `InspectPod`, `AttachOrRestorePod` | durable Pod state と visibility に基づく Pod inspection / attach / restore | +| Pod orchestration | `SpawnPod`, `SendToPod`, `ReadPodOutput`, `StopPod`, `ListPods`, `RestorePod` | child / visible Pod の起動・通信・停止・一覧・復元 | | Web | `WebSearch`, `WebFetch` | manifest/env で明示設定された provider 経由の bounded web access | すべての tool call は manifest tool permission と scope/policy のチェックを通る。ファイル write scope、Pod delegation、memory layout、web provider 設定はそれぞれ別の authority を持ち、UI 表示だけで権限を広げない。