diff --git a/crates/pod/src/controller.rs b/crates/pod/src/controller.rs index c0a95631..799cfeb6 100644 --- a/crates/pod/src/controller.rs +++ b/crates/pod/src/controller.rs @@ -9,10 +9,14 @@ use tokio::sync::{broadcast, mpsc, oneshot}; use crate::notification_buffer::NotificationBuffer; use crate::notifier::Notifier; use crate::pod::{Pod, PodError, PodRunResult}; +use crate::pod_comm_tools::{ + list_pods_tool, read_pod_output_tool, send_to_pod_tool, stop_pod_tool, +}; use crate::runtime_dir::RuntimeDir; use crate::shared_state::{PodSharedState, PodStatus}; use crate::socket_server::SocketServer; use crate::spawn_pod::spawn_pod_tool; +use crate::spawned_pod_registry::SpawnedPodRegistry; use protocol::{ErrorCode, Event, Method, NotificationLevel, NotificationSource, RunResult, TurnResult}; // --------------------------------------------------------------------------- @@ -204,18 +208,25 @@ impl PodController { let tracker = tools::Tracker::new(); worker.register_tools(tools::builtin_tools(fs, tracker.clone())); - // SpawnPod is wired here rather than in `tools::builtin_tools` - // because it needs Pod-scoped handles (this Pod's own socket - // path, runtime_dir, spawner name) that the generic tools - // crate has no access to. + // Pod-orchestration tools (SpawnPod + the four comm tools) + // share a single `SpawnedPodRegistry`: `SpawnPod` writes to + // it, the others read/mutate. Wired here rather than in + // `tools::builtin_tools` because these need Pod-scoped + // handles (this Pod's own socket path, runtime_dir, spawner + // name) that the generic tools crate has no access to. let spawner_socket = runtime_dir.socket_path(); + let spawned_registry = SpawnedPodRegistry::new(runtime_dir.clone()); worker.register_tool(spawn_pod_tool( spawner_name, spawner_socket, runtime_base.to_path_buf(), pwd_for_tools, - runtime_dir.clone(), + spawned_registry.clone(), )); + worker.register_tool(send_to_pod_tool(spawned_registry.clone())); + worker.register_tool(read_pod_output_tool(spawned_registry.clone())); + worker.register_tool(stop_pod_tool(spawned_registry.clone())); + worker.register_tool(list_pods_tool(spawned_registry)); pod.attach_tracker(tracker); } @@ -482,16 +493,21 @@ where manifest::ProviderKind::Ollama => "ollama", }; // The tool list mirrors what `spawn()` registers on the Worker: - // builtin filesystem tools plus `SpawnPod`. `SpawnPod` is appended - // by name because constructing its factory here would require - // Pod-lifetime handles we haven't built yet (runtime_dir, socket). + // builtin filesystem tools plus the pod-orchestration tools. + // Orchestration tools are appended by name because constructing + // their factories here would require Pod-lifetime handles we + // haven't built yet (runtime_dir, socket). let fs = tools::ScopedFs::new(pod.scope().clone(), pod.pwd().to_path_buf()); let tracker = tools::Tracker::new(); let mut tool_names: Vec = tools::builtin_tools(fs, tracker) .iter() .map(|def| def().0.name) .collect(); - tool_names.push("SpawnPod".into()); + tool_names.extend( + ["SpawnPod", "SendToPod", "ReadPodOutput", "StopPod", "ListPods"] + .iter() + .map(|s| (*s).into()), + ); protocol::Greeting { pod_name: manifest.pod.name.clone(), cwd: pod.pwd().display().to_string(), diff --git a/crates/pod/src/lib.rs b/crates/pod/src/lib.rs index fd72103d..5961710b 100644 --- a/crates/pod/src/lib.rs +++ b/crates/pod/src/lib.rs @@ -4,8 +4,10 @@ pub mod notifier; pub mod runtime_dir; pub mod scope_lock; pub mod shared_state; +pub mod pod_comm_tools; pub mod socket_server; pub mod spawn_pod; +pub mod spawned_pod_registry; mod agents_md; mod compact_state; diff --git a/crates/pod/src/pod_comm_tools.rs b/crates/pod/src/pod_comm_tools.rs new file mode 100644 index 00000000..dd37684d --- /dev/null +++ b/crates/pod/src/pod_comm_tools.rs @@ -0,0 +1,428 @@ +//! Pod-to-Pod communication tools. +//! +//! Four tools in one module — `SendToPod`, `ReadPodOutput`, `StopPod`, +//! `ListPods` — all built on the same `SpawnedPodRegistry` handed in by +//! the controller. Each operation is request-response: connect to the +//! target's Unix socket, perform one method exchange, disconnect. +//! +//! These tools only touch Pods listed in the spawner's +//! `spawned_pods.json`; there is no machine-wide directory lookup, so +//! the spawner can only reach its own descendants. + +use std::path::Path; +use std::sync::Arc; +use std::time::Duration; + +use async_trait::async_trait; +use llm_worker::llm_client::types::{ContentPart, Item, Role}; +use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput}; +use protocol::stream::{JsonLineReader, JsonLineWriter}; +use protocol::{Event, Method}; +use serde::Deserialize; +use tokio::net::UnixStream; + +use crate::runtime_dir::SpawnedPodRecord; +use crate::scope_lock::{self, LockFileGuard}; +use crate::spawned_pod_registry::SpawnedPodRegistry; + +/// Timeout applied to each socket-level operation — connect, write, +/// read. Kept short so a stuck child doesn't block the spawner's turn. +const SOCKET_OP_TIMEOUT: Duration = Duration::from_secs(5); + +// --------------------------------------------------------------------------- +// Shared input types +// --------------------------------------------------------------------------- + +#[derive(Debug, Deserialize, schemars::JsonSchema)] +struct NameInput { + /// Name of a previously spawned Pod. + name: String, +} + +// --------------------------------------------------------------------------- +// SendToPod +// --------------------------------------------------------------------------- + +const SEND_TO_POD_DESCRIPTION: &str = + "Send a text message to a previously spawned Pod. The spawned Pod \ +processes it as a user turn. Does not wait for the Pod's response — \ +use `ReadPodOutput` to fetch results afterwards."; + +#[derive(Debug, Deserialize, schemars::JsonSchema)] +struct SendToPodInput { + /// Target Pod name. + name: String, + /// Text delivered to the Pod as the next user message. + message: String, +} + +struct SendToPodTool { + registry: Arc, +} + +#[async_trait] +impl Tool for SendToPodTool { + async fn execute(&self, input_json: &str) -> Result { + let input: SendToPodInput = serde_json::from_str(input_json) + .map_err(|e| ToolError::InvalidArgument(format!("invalid SendToPod input: {e}")))?; + let record = self + .registry + .get(&input.name) + .await + .ok_or_else(|| unknown_pod_err(&input.name))?; + + connect_and_send(&record.socket_path, &Method::Run { input: input.message }) + .await + .map_err(|e| ToolError::ExecutionFailed(format!("send to `{}`: {e}", input.name)))?; + + Ok(ToolOutput { + summary: format!("sent message to `{}`", input.name), + content: None, + }) + } +} + +pub fn send_to_pod_tool(registry: Arc) -> ToolDefinition { + Arc::new(move || { + let schema = schemars::schema_for!(SendToPodInput); + let schema_value = serde_json::to_value(schema).unwrap_or(serde_json::json!({})); + let meta = ToolMeta::new("SendToPod") + .description(SEND_TO_POD_DESCRIPTION) + .input_schema(schema_value); + let tool: Arc = Arc::new(SendToPodTool { + registry: registry.clone(), + }); + (meta, tool) + }) +} + +// --------------------------------------------------------------------------- +// ReadPodOutput +// --------------------------------------------------------------------------- + +const READ_POD_OUTPUT_DESCRIPTION: &str = + "Fetch new assistant text from a spawned Pod since the last read. \ +Uses an internal cursor per-Pod so consecutive calls return only \ +newly-produced output. Returns the Pod's current status and the new \ +text, or reports `stopped` if the Pod can no longer be reached."; + +struct ReadPodOutputTool { + registry: Arc, +} + +#[async_trait] +impl Tool for ReadPodOutputTool { + async fn execute(&self, input_json: &str) -> Result { + let input: NameInput = serde_json::from_str(input_json).map_err(|e| { + ToolError::InvalidArgument(format!("invalid ReadPodOutput input: {e}")) + })?; + let record = self + .registry + .get(&input.name) + .await + .ok_or_else(|| unknown_pod_err(&input.name))?; + + let items = match fetch_history(&record.socket_path).await { + Ok(items) => items, + Err(_) => { + return Ok(ToolOutput { + summary: format!("pod `{}` is stopped (unreachable)", input.name), + content: None, + }); + } + }; + + let cursor = self.registry.cursor(&input.name).await; + let new_items = if cursor >= items.len() { + &[] as &[serde_json::Value] + } else { + &items[cursor..] + }; + let new_text = extract_assistant_text(new_items); + self.registry.set_cursor(&input.name, items.len()).await; + + let summary = if new_text.is_empty() { + format!("pod `{}` running; no new assistant text", input.name) + } else { + let lines = new_text.lines().count(); + format!("pod `{}`: {lines} new line(s) of assistant text", input.name) + }; + let content = if new_text.is_empty() { + None + } else { + Some(new_text) + }; + Ok(ToolOutput { summary, content }) + } +} + +pub fn read_pod_output_tool(registry: Arc) -> ToolDefinition { + Arc::new(move || { + let schema = schemars::schema_for!(NameInput); + let schema_value = serde_json::to_value(schema).unwrap_or(serde_json::json!({})); + let meta = ToolMeta::new("ReadPodOutput") + .description(READ_POD_OUTPUT_DESCRIPTION) + .input_schema(schema_value); + let tool: Arc = Arc::new(ReadPodOutputTool { + registry: registry.clone(), + }); + (meta, tool) + }) +} + +// --------------------------------------------------------------------------- +// StopPod +// --------------------------------------------------------------------------- + +const STOP_POD_DESCRIPTION: &str = + "Terminate a spawned Pod and reclaim the delegated scope. The Pod \ +receives `Shutdown`; its scope entry is released in the machine-wide \ +registry so the spawner can spawn a new Pod over the same paths."; + +struct StopPodTool { + registry: Arc, +} + +#[async_trait] +impl Tool for StopPodTool { + async fn execute(&self, input_json: &str) -> Result { + let input: NameInput = serde_json::from_str(input_json) + .map_err(|e| ToolError::InvalidArgument(format!("invalid StopPod input: {e}")))?; + let record = self + .registry + .get(&input.name) + .await + .ok_or_else(|| unknown_pod_err(&input.name))?; + + // Best-effort Shutdown. The child's own `ScopeAllocationGuard` + // releases the entry on clean exit; we also release explicitly + // below so callers can't observe a window where the scope is + // still registered but StopPod has returned. Duplicate release + // is harmless — `ScopeAllocationGuard`'s drop path swallows + // `UnknownPod` errors. + let _ = connect_and_send(&record.socket_path, &Method::Shutdown).await; + + let scope_summary = summarize_scope(&record); + release_scope(&record.pod_name); + + self.registry + .remove(&record.pod_name) + .await + .map_err(|e| ToolError::ExecutionFailed(format!("update spawned_pods.json: {e}")))?; + + Ok(ToolOutput { + summary: format!( + "stopped pod `{}`; reclaimed scope: {scope_summary}", + record.pod_name + ), + content: None, + }) + } +} + +pub fn stop_pod_tool(registry: Arc) -> ToolDefinition { + Arc::new(move || { + let schema = schemars::schema_for!(NameInput); + let schema_value = serde_json::to_value(schema).unwrap_or(serde_json::json!({})); + let meta = ToolMeta::new("StopPod") + .description(STOP_POD_DESCRIPTION) + .input_schema(schema_value); + let tool: Arc = Arc::new(StopPodTool { + registry: registry.clone(), + }); + (meta, tool) + }) +} + +// --------------------------------------------------------------------------- +// ListPods +// --------------------------------------------------------------------------- + +const LIST_PODS_DESCRIPTION: &str = + "List all Pods spawned by this Pod along with their reachability \ +status (`alive` / `stopped`) and the scope each was granted."; + +#[derive(Debug, Deserialize, schemars::JsonSchema)] +struct EmptyInput {} + +struct ListPodsTool { + registry: Arc, +} + +#[async_trait] +impl Tool for ListPodsTool { + async fn execute(&self, _input_json: &str) -> Result { + let records = self.registry.list().await; + if records.is_empty() { + return Ok(ToolOutput { + summary: "no spawned pods".into(), + content: None, + }); + } + + let mut lines: Vec = Vec::with_capacity(records.len()); + let mut stale_names: Vec = Vec::new(); + for record in &records { + let alive = is_reachable(&record.socket_path).await; + let status = if alive { "alive" } else { "stopped" }; + let scope = summarize_scope(record); + lines.push(format!("{} [{status}] scope={scope}", record.pod_name)); + if !alive { + stale_names.push(record.pod_name.clone()); + } + } + + // Trigger stale reclaim on unreachable pods so the lock file's + // allocation table doesn't keep growing indefinitely when + // children crash without a clean exit path. + if !stale_names.is_empty() { + if let Ok(lock_path) = scope_lock::default_lock_path() + && let Ok(mut guard) = LockFileGuard::open(&lock_path) + { + scope_lock::reclaim_stale(&mut guard); + } + } + + let summary = format!("{} pod(s) known", records.len()); + Ok(ToolOutput { + summary, + content: Some(lines.join("\n")), + }) + } +} + +pub fn list_pods_tool(registry: Arc) -> ToolDefinition { + Arc::new(move || { + let schema = schemars::schema_for!(EmptyInput); + let schema_value = serde_json::to_value(schema).unwrap_or(serde_json::json!({})); + let meta = ToolMeta::new("ListPods") + .description(LIST_PODS_DESCRIPTION) + .input_schema(schema_value); + let tool: Arc = Arc::new(ListPodsTool { + registry: registry.clone(), + }); + (meta, tool) + }) +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +fn unknown_pod_err(name: &str) -> ToolError { + ToolError::InvalidArgument(format!("no spawned pod named `{name}`")) +} + +/// Connect with a timeout, write one `Method` line, flush, and close. +/// Any socket error maps to an `io::Error`; the caller decides whether +/// to surface it to the LLM or treat it as "pod stopped". +async fn connect_and_send(socket: &Path, method: &Method) -> std::io::Result<()> { + let stream = tokio::time::timeout(SOCKET_OP_TIMEOUT, UnixStream::connect(socket)) + .await + .map_err(|_| std::io::Error::new(std::io::ErrorKind::TimedOut, "connect timed out"))??; + let (_r, w) = stream.into_split(); + let mut writer = JsonLineWriter::new(w); + tokio::time::timeout(SOCKET_OP_TIMEOUT, writer.write(method)) + .await + .map_err(|_| std::io::Error::new(std::io::ErrorKind::TimedOut, "write timed out"))??; + Ok(()) +} + +/// Connect and ask the Pod for its conversation history. Skips +/// pre-History events (such as buffered notifications replayed to new +/// clients). Returns the raw JSON items as `serde_json::Value` since +/// the pod crate already round-trips via `Value` on the wire. +async fn fetch_history(socket: &Path) -> std::io::Result> { + let stream = tokio::time::timeout(SOCKET_OP_TIMEOUT, UnixStream::connect(socket)) + .await + .map_err(|_| std::io::Error::new(std::io::ErrorKind::TimedOut, "connect timed out"))??; + let (r, w) = stream.into_split(); + let mut writer = JsonLineWriter::new(w); + let mut reader = JsonLineReader::new(r); + + tokio::time::timeout(SOCKET_OP_TIMEOUT, writer.write(&Method::GetHistory)) + .await + .map_err(|_| std::io::Error::new(std::io::ErrorKind::TimedOut, "write timed out"))??; + + loop { + let event = tokio::time::timeout(SOCKET_OP_TIMEOUT, reader.next::()) + .await + .map_err(|_| std::io::Error::new(std::io::ErrorKind::TimedOut, "read timed out"))??; + match event { + Some(Event::History { items, .. }) => return Ok(items), + Some(_) => continue, + None => { + return Err(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "pod closed connection before History event", + )); + } + } + } +} + +/// Probe-connect test. Connection accepted within timeout → alive. +async fn is_reachable(socket: &Path) -> bool { + tokio::time::timeout(SOCKET_OP_TIMEOUT, UnixStream::connect(socket)) + .await + .map(|r| r.is_ok()) + .unwrap_or(false) +} + +fn extract_assistant_text(items: &[serde_json::Value]) -> String { + let mut out = String::new(); + for value in items { + let Ok(item) = serde_json::from_value::(value.clone()) else { + continue; + }; + if let Item::Message { + role: Role::Assistant, + content, + .. + } = item + { + for part in content { + if let ContentPart::Text { text } = part { + if !out.is_empty() { + out.push('\n'); + } + out.push_str(&text); + } + } + } + } + out +} + +fn summarize_scope(record: &SpawnedPodRecord) -> String { + if record.scope_delegated.is_empty() { + return "(none)".into(); + } + let parts: Vec = record + .scope_delegated + .iter() + .map(|r| { + let perm = match r.permission { + manifest::Permission::Read => "read", + manifest::Permission::Write => "write", + }; + let tag = if r.recursive { "" } else { " [non-recursive]" }; + format!("{perm}:{}{tag}", r.target.display()) + }) + .collect(); + parts.join(", ") +} + +/// Best-effort release of the pod's scope allocation. Swallows every +/// error: the caller has already completed its user-visible side +/// effects (Method::Shutdown was sent), and stale-reclaim will clean +/// up whatever we couldn't. +fn release_scope(pod_name: &str) { + let Ok(lock_path) = scope_lock::default_lock_path() else { + return; + }; + let Ok(mut guard) = LockFileGuard::open(&lock_path) else { + return; + }; + let _ = scope_lock::release_pod(&mut guard, pod_name); +} diff --git a/crates/pod/src/spawn_pod.rs b/crates/pod/src/spawn_pod.rs index 564cda7c..b60d8482 100644 --- a/crates/pod/src/spawn_pod.rs +++ b/crates/pod/src/spawn_pod.rs @@ -21,11 +21,11 @@ use protocol::stream::JsonLineWriter; use serde::Deserialize; use tokio::net::UnixStream; use tokio::process::Command; -use tokio::sync::Mutex; use tokio::time::sleep; -use crate::runtime_dir::{RuntimeDir, SpawnedPodRecord}; +use crate::runtime_dir::SpawnedPodRecord; use crate::scope_lock::{self, LockFileGuard, ScopeLockError}; +use crate::spawned_pod_registry::SpawnedPodRegistry; const DESCRIPTION: &str = "Spawn a new Pod process to work on a delegated task. \ The spawner's write scope is reduced by the scope passed here; the spawned \ @@ -101,11 +101,10 @@ pub struct SpawnPodTool { /// Directory the spawned Pod should run in when the LLM did not /// override it. Defaults to the spawner's pwd — see module docs. spawner_pwd: PathBuf, - /// Spawner's own runtime directory — target for `spawned_pods.json`. - runtime_dir: Arc, - /// Running list of successful spawns, replayed into - /// `spawned_pods.json` on every successful `execute`. - records: Arc>>, + /// Shared registry of spawned children, also used by the + /// pod-comm tools (`SendToPod` / `ReadPodOutput` / `StopPod` / + /// `ListPods`). Writes the list to `spawned_pods.json` on each add. + registry: Arc, } impl SpawnPodTool { @@ -114,15 +113,14 @@ impl SpawnPodTool { callback_socket: PathBuf, runtime_base: PathBuf, spawner_pwd: PathBuf, - runtime_dir: Arc, + registry: Arc, ) -> Self { Self { spawner_name, callback_socket, runtime_base, spawner_pwd, - runtime_dir, - records: Arc::new(Mutex::new(Vec::new())), + registry, } } } @@ -203,16 +201,10 @@ impl Tool for SpawnPodTool { scope_delegated: scope_allow, callback_address: self.callback_socket.clone(), }; - { - let mut records = self.records.lock().await; - records.push(record); - self.runtime_dir - .write_spawned_pods(records.as_slice()) - .await - .map_err(|e| { - ToolError::ExecutionFailed(format!("write spawned_pods.json: {e}")) - })?; - } + self.registry + .add(record) + .await + .map_err(|e| ToolError::ExecutionFailed(format!("write spawned_pods.json: {e}")))?; Ok(ToolOutput { summary: format!( @@ -367,7 +359,7 @@ pub fn spawn_pod_tool( callback_socket: PathBuf, runtime_base: PathBuf, spawner_pwd: PathBuf, - runtime_dir: Arc, + registry: Arc, ) -> ToolDefinition { Arc::new(move || { let schema = schemars::schema_for!(SpawnPodInput); @@ -380,7 +372,7 @@ pub fn spawn_pod_tool( callback_socket.clone(), runtime_base.clone(), spawner_pwd.clone(), - runtime_dir.clone(), + registry.clone(), )); (meta, tool) }) diff --git a/crates/pod/src/spawned_pod_registry.rs b/crates/pod/src/spawned_pod_registry.rs new file mode 100644 index 00000000..57a86b99 --- /dev/null +++ b/crates/pod/src/spawned_pod_registry.rs @@ -0,0 +1,93 @@ +//! Shared registry of Pods spawned by this Pod. +//! +//! `SpawnPod` writes here; the pod-comm tools (`SendToPod`, +//! `ReadPodOutput`, `StopPod`, `ListPods`) read and mutate the same +//! instance. Persisted to `spawned_pods.json` in the spawner's runtime +//! dir so a restarted spawner rebuilds its view from disk (future work +//! — today only write-through is implemented). +//! +//! `ReadPodOutput` additionally owns a per-spawned-pod cursor here so +//! two consecutive reads yield only new assistant text. The cursor is +//! an item-index into the child's history; push-only history makes +//! index stable across reads. +//! +//! The registry stays in-memory only for this Pod's lifetime — cursors +//! intentionally do not persist. + +use std::collections::HashMap; +use std::io; +use std::sync::Arc; + +use tokio::sync::Mutex; + +use crate::runtime_dir::{RuntimeDir, SpawnedPodRecord}; + +pub struct SpawnedPodRegistry { + records: Mutex>, + cursors: Mutex>, + runtime_dir: Arc, +} + +impl SpawnedPodRegistry { + pub fn new(runtime_dir: Arc) -> Arc { + Arc::new(Self { + records: Mutex::new(Vec::new()), + cursors: Mutex::new(HashMap::new()), + runtime_dir, + }) + } + + /// Append a new record and persist the full list. Returns an I/O + /// error if the persisted write fails; the in-memory state is still + /// updated in that case — the next successful write will reconcile. + pub async fn add(&self, record: SpawnedPodRecord) -> io::Result<()> { + let mut records = self.records.lock().await; + records.push(record); + self.runtime_dir.write_spawned_pods(records.as_slice()).await + } + + /// Look up a record by pod name. Cloned so callers can drop the lock. + pub async fn get(&self, pod_name: &str) -> Option { + self.records + .lock() + .await + .iter() + .find(|r| r.pod_name == pod_name) + .cloned() + } + + pub async fn list(&self) -> Vec { + self.records.lock().await.clone() + } + + /// Remove the record for `pod_name`, persist, and clear its cursor. + /// Returns the removed record (if any). + pub async fn remove(&self, pod_name: &str) -> io::Result> { + let removed = { + let mut records = self.records.lock().await; + let idx = records.iter().position(|r| r.pod_name == pod_name); + let removed = idx.map(|i| records.remove(i)); + self.runtime_dir.write_spawned_pods(records.as_slice()).await?; + removed + }; + self.cursors.lock().await.remove(pod_name); + Ok(removed) + } + + /// Read-only cursor lookup. Returns 0 when no cursor has been set. + pub async fn cursor(&self, pod_name: &str) -> usize { + self.cursors + .lock() + .await + .get(pod_name) + .copied() + .unwrap_or(0) + } + + pub async fn set_cursor(&self, pod_name: &str, cursor: usize) { + self.cursors + .lock() + .await + .insert(pod_name.to_string(), cursor); + } +} diff --git a/crates/pod/tests/pod_comm_tools_test.rs b/crates/pod/tests/pod_comm_tools_test.rs new file mode 100644 index 00000000..c18e7fd7 --- /dev/null +++ b/crates/pod/tests/pod_comm_tools_test.rs @@ -0,0 +1,379 @@ +//! Integration tests for the pod-comm tools (`SendToPod`, +//! `ReadPodOutput`, `StopPod`, `ListPods`). +//! +//! The real child Pod binary is not started. Instead each test stands +//! up a mock `UnixListener` that speaks the socket protocol directly +//! (accepting `Method::Run` / `Method::GetHistory` / `Method::Shutdown` +//! and responding with `Event::History` when asked). This keeps the +//! tests fast and independent of the LLM layer — the tools are exercised +//! for their wire behaviour alone. + +use std::path::{Path, PathBuf}; +use std::sync::{Arc, LazyLock, Mutex}; + +use llm_worker::llm_client::types::{ContentPart, Item, Role}; +use llm_worker::tool::ToolOutput; +use manifest::{Permission, ScopeRule}; +use pod::pod_comm_tools::{ + list_pods_tool, read_pod_output_tool, send_to_pod_tool, stop_pod_tool, +}; +use pod::runtime_dir::{RuntimeDir, SpawnedPodRecord}; +use pod::scope_lock::{self, LockFileGuard}; +use pod::spawned_pod_registry::SpawnedPodRegistry; +use protocol::stream::{JsonLineReader, JsonLineWriter}; +use protocol::{Event, Greeting, Method}; +use serde_json::json; +use tempfile::TempDir; +use tokio::net::UnixListener; +use tokio::task::JoinHandle; + +/// Serialises env-mutating tests. The test harness runs tasks across +/// threads, and `INSOMNIA_SCOPE_LOCK` is a process-wide resource. +static ENV_LOCK: LazyLock> = LazyLock::new(|| Mutex::new(())); + +struct EnvGuard { + _lock: std::sync::MutexGuard<'static, ()>, +} + +impl EnvGuard { + fn acquire() -> Self { + Self { + _lock: ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner()), + } + } +} + +/// Create a spawner-owned `RuntimeDir` + `SpawnedPodRegistry` scoped to +/// a fresh tempdir. The returned `TempDir` must be kept alive by the +/// caller for the duration of the test. +async fn setup_registry() -> (TempDir, Arc, Arc) { + let tmp = TempDir::new().unwrap(); + let rd = RuntimeDir::create(tmp.path(), "spawner").await.unwrap(); + let rd = Arc::new(rd); + let registry = SpawnedPodRegistry::new(rd.clone()); + (tmp, registry, rd) +} + +/// Register a fake spawned-child record pointing at a given socket +/// path, with a trivial write-scope for `scope_path`. Does not touch +/// scope.lock. +async fn register_child( + registry: &SpawnedPodRegistry, + name: &str, + socket: &Path, + scope_path: &Path, +) { + let record = SpawnedPodRecord { + pod_name: name.into(), + socket_path: socket.to_path_buf(), + scope_delegated: vec![ScopeRule { + target: scope_path.to_path_buf(), + permission: Permission::Write, + recursive: true, + }], + callback_address: "/dev/null".into(), + }; + registry.add(record).await.unwrap(); +} + +/// Bind a Unix listener at a socket path inside the given directory. +async fn bind_mock_socket(dir: &Path, name: &str) -> (PathBuf, UnixListener) { + let socket = dir.join(format!("{name}.sock")); + let listener = UnixListener::bind(&socket).unwrap(); + (socket, listener) +} + +/// Accept one connection and read exactly one `Method` line from it. +/// The reader half is kept open; caller awaits the returned handle. +fn accept_one_method(listener: UnixListener) -> JoinHandle> { + tokio::spawn(async move { + let (stream, _) = listener.accept().await.ok()?; + let (r, _w) = stream.into_split(); + let mut reader = JsonLineReader::new(r); + reader.next::().await.ok().flatten() + }) +} + +/// Pretend to be a spawned Pod that responds to `GetHistory` with a +/// fixed set of items. Accepts connections until the first one that +/// delivers a `GetHistory` method; earlier probes (empty accepts) and +/// non-history methods are ignored. Returns nothing — tests await the +/// handle only to keep the listener alive until shutdown. +fn serve_history(listener: UnixListener, items: Vec) -> JoinHandle<()> { + tokio::spawn(async move { + loop { + let Ok((stream, _)) = listener.accept().await else { + return; + }; + let (r, w) = stream.into_split(); + let mut reader = JsonLineReader::new(r); + let mut writer = JsonLineWriter::new(w); + match reader.next::().await { + Ok(Some(Method::GetHistory)) => { + let values: Vec = items + .iter() + .map(|i| serde_json::to_value(i).unwrap()) + .collect(); + let event = Event::History { + items: values, + greeting: Greeting { + pod_name: "child".into(), + cwd: "/tmp".into(), + provider: "anthropic".into(), + model: "x".into(), + scope_summary: String::new(), + tools: Vec::new(), + }, + }; + let _ = writer.write(&event).await; + } + Ok(Some(_)) | Ok(None) | Err(_) => { + // Ignore: loop accepts another connection. + } + } + } + }) +} + +fn assistant(text: &str) -> Item { + Item::Message { + id: None, + role: Role::Assistant, + content: vec![ContentPart::Text { text: text.into() }], + status: None, + } +} + +// --------------------------------------------------------------------------- +// SendToPod +// --------------------------------------------------------------------------- + +#[tokio::test] +async fn send_to_pod_delivers_run_method() { + let (tmp, registry, _rd) = setup_registry().await; + let (socket, listener) = bind_mock_socket(tmp.path(), "child").await; + let received = accept_one_method(listener); + register_child(®istry, "child", &socket, tmp.path()).await; + + let def = send_to_pod_tool(registry); + let (_meta, tool) = def(); + let input = json!({ "name": "child", "message": "hello there" }).to_string(); + let output: ToolOutput = tool.execute(&input).await.unwrap(); + assert!(output.summary.contains("child"), "summary: {}", output.summary); + + let method = received.await.unwrap().expect("expected a method"); + match method { + Method::Run { input } => assert_eq!(input, "hello there"), + other => panic!("expected Run, got {other:?}"), + } +} + +#[tokio::test] +async fn send_to_pod_errors_on_unknown_pod() { + let (_tmp, registry, _rd) = setup_registry().await; + let def = send_to_pod_tool(registry); + let (_meta, tool) = def(); + let input = json!({ "name": "nope", "message": "hi" }).to_string(); + let err = tool.execute(&input).await.unwrap_err(); + assert!(err.to_string().contains("no spawned pod"), "{err}"); +} + +// --------------------------------------------------------------------------- +// ReadPodOutput +// --------------------------------------------------------------------------- + +#[tokio::test] +async fn read_pod_output_returns_new_assistant_text_then_empty_on_second_call() { + let (tmp, registry, _rd) = setup_registry().await; + let (socket, listener) = bind_mock_socket(tmp.path(), "child").await; + register_child(®istry, "child", &socket, tmp.path()).await; + + let items = vec![ + Item::user_message("hello"), + assistant("hi back"), + assistant("still working"), + ]; + let _server = serve_history(listener, items); + + let def = read_pod_output_tool(registry); + let (_meta, tool) = def(); + let input = json!({ "name": "child" }).to_string(); + + let first: ToolOutput = tool.execute(&input).await.unwrap(); + let body = first.content.expect("first read should have content"); + assert!(body.contains("hi back"), "body: {body}"); + assert!(body.contains("still working"), "body: {body}"); + + // Cursor now points past all items — second call returns no new text. + let second: ToolOutput = tool.execute(&input).await.unwrap(); + assert!(second.content.is_none(), "unexpected content: {:?}", second.content); + assert!( + second.summary.contains("no new assistant text"), + "summary: {}", + second.summary + ); +} + +#[tokio::test] +async fn read_pod_output_reports_stopped_on_dead_socket() { + let (tmp, registry, _rd) = setup_registry().await; + // Register a record pointing at a socket that nobody is listening + // on. Connect must fail → tool reports "stopped". + let dead_socket = tmp.path().join("dead.sock"); + register_child(®istry, "child", &dead_socket, tmp.path()).await; + + let def = read_pod_output_tool(registry); + let (_meta, tool) = def(); + let input = json!({ "name": "child" }).to_string(); + let output: ToolOutput = tool.execute(&input).await.unwrap(); + assert!(output.summary.contains("stopped"), "{}", output.summary); +} + +// --------------------------------------------------------------------------- +// StopPod +// --------------------------------------------------------------------------- + +#[tokio::test] +async fn stop_pod_sends_shutdown_and_releases_scope() { + let _env = EnvGuard::acquire(); + let (tmp, registry, rd) = setup_registry().await; + let lock_path = tmp.path().join("scope.lock"); + unsafe { + std::env::set_var("INSOMNIA_SCOPE_LOCK", &lock_path); + } + + // Seed scope.lock with a top-level `spawner` allocation plus a + // delegated `child` allocation — mimics what SpawnPod would have + // done so StopPod has something to release. + { + let mut g = LockFileGuard::open(&lock_path).unwrap(); + scope_lock::register_pod( + &mut g, + "spawner".into(), + std::process::id(), + "/tmp/spawner.sock".into(), + vec![ScopeRule { + target: tmp.path().to_path_buf(), + permission: Permission::Write, + recursive: true, + }], + ) + .unwrap(); + scope_lock::delegate_scope( + &mut g, + "spawner", + "child".into(), + std::process::id(), + "/tmp/child.sock".into(), + vec![ScopeRule { + target: tmp.path().to_path_buf(), + permission: Permission::Write, + recursive: true, + }], + ) + .unwrap(); + } + + let (socket, listener) = bind_mock_socket(tmp.path(), "child").await; + let received = accept_one_method(listener); + register_child(®istry, "child", &socket, tmp.path()).await; + + let def = stop_pod_tool(registry.clone()); + let (_meta, tool) = def(); + let input = json!({ "name": "child" }).to_string(); + let output: ToolOutput = tool.execute(&input).await.unwrap(); + assert!(output.summary.contains("stopped"), "{}", output.summary); + + // The child got a Shutdown. + let method = received.await.unwrap().expect("expected shutdown"); + assert!(matches!(method, Method::Shutdown)); + + // Allocation for `child` is gone; `spawner` remains. + { + let g = LockFileGuard::open(&lock_path).unwrap(); + assert!(g.data().find("child").is_none(), "child still allocated"); + assert!(g.data().find("spawner").is_some(), "spawner missing"); + } + + // spawned_pods.json now lists zero children. + let spawned = rd.path().join("spawned_pods.json"); + let contents = std::fs::read_to_string(&spawned).unwrap(); + let records: Vec = serde_json::from_str(&contents).unwrap(); + assert!(records.is_empty()); + + unsafe { + std::env::remove_var("INSOMNIA_SCOPE_LOCK"); + } +} + +#[tokio::test] +async fn stop_pod_succeeds_even_when_child_unreachable() { + let _env = EnvGuard::acquire(); + let (tmp, registry, _rd) = setup_registry().await; + let lock_path = tmp.path().join("scope.lock"); + unsafe { + std::env::set_var("INSOMNIA_SCOPE_LOCK", &lock_path); + } + + // No live listener — socket never bound. Registered record points + // at a dead path. StopPod should still clean up local bookkeeping. + let dead_socket = tmp.path().join("dead.sock"); + register_child(®istry, "child", &dead_socket, tmp.path()).await; + + let def = stop_pod_tool(registry.clone()); + let (_meta, tool) = def(); + let input = json!({ "name": "child" }).to_string(); + let output: ToolOutput = tool.execute(&input).await.unwrap(); + assert!(output.summary.contains("stopped"), "{}", output.summary); + + // Registry no longer knows about the child. + assert!(registry.get("child").await.is_none()); + + unsafe { + std::env::remove_var("INSOMNIA_SCOPE_LOCK"); + } +} + +// --------------------------------------------------------------------------- +// ListPods +// --------------------------------------------------------------------------- + +#[tokio::test] +async fn list_pods_reports_alive_and_stopped() { + let (tmp, registry, _rd) = setup_registry().await; + + // One child is reachable… + let (live_socket, listener) = bind_mock_socket(tmp.path(), "alive").await; + // Keep the listener alive by moving it into a task that never exits. + let _accept = tokio::spawn(async move { + loop { + let Ok((stream, _)) = listener.accept().await else { + return; + }; + drop(stream); + } + }); + register_child(®istry, "alive", &live_socket, tmp.path()).await; + + // …the other is not. + let dead_socket = tmp.path().join("dead.sock"); + register_child(®istry, "dead", &dead_socket, tmp.path()).await; + + let def = list_pods_tool(registry); + let (_meta, tool) = def(); + let output: ToolOutput = tool.execute("{}").await.unwrap(); + assert!(output.summary.contains("2 pod"), "{}", output.summary); + let body = output.content.expect("list_pods should populate content"); + assert!(body.contains("alive [alive]"), "body: {body}"); + assert!(body.contains("dead [stopped]"), "body: {body}"); +} + +#[tokio::test] +async fn list_pods_empty_when_nothing_registered() { + let (_tmp, registry, _rd) = setup_registry().await; + let def = list_pods_tool(registry); + let (_meta, tool) = def(); + let output: ToolOutput = tool.execute("{}").await.unwrap(); + assert!(output.summary.contains("no spawned pods"), "{}", output.summary); + assert!(output.content.is_none()); +} diff --git a/crates/pod/tests/spawn_pod_test.rs b/crates/pod/tests/spawn_pod_test.rs index efcecd6d..716538d6 100644 --- a/crates/pod/tests/spawn_pod_test.rs +++ b/crates/pod/tests/spawn_pod_test.rs @@ -15,6 +15,7 @@ use manifest::{Permission, ScopeRule}; use pod::runtime_dir::{RuntimeDir, SpawnedPodRecord}; use pod::scope_lock::{self, LockFileGuard}; use pod::spawn_pod::spawn_pod_tool; +use pod::spawned_pod_registry::SpawnedPodRegistry; use protocol::Method; use protocol::stream::JsonLineReader; use serde_json::json; @@ -150,12 +151,13 @@ async fn spawn_pod_delegates_scope_and_sends_run() { let (_predicted_socket, listener) = bind_mock_pod_socket(&runtime_base, "child").await; let received = accept_one_method(listener); + let registry = SpawnedPodRegistry::new(spawner_rd.clone()); let def = spawn_pod_tool( "root".into(), spawner_socket.clone(), runtime_base.clone(), allow_root.path().to_path_buf(), - spawner_rd.clone(), + registry, ); let (_meta, tool) = def(); @@ -210,12 +212,13 @@ async fn spawn_pod_rejects_scope_outside_spawner() { setup_spawner("root", allow_root.path()).await; point_pod_command_at_true(); + let registry = SpawnedPodRegistry::new(spawner_rd); let def = spawn_pod_tool( "root".into(), spawner_socket, runtime_base, allow_root.path().to_path_buf(), - spawner_rd, + registry, ); let (_meta, tool) = def(); @@ -266,12 +269,13 @@ async fn spawn_pod_rolls_back_reservation_when_socket_never_appears() { // marked with `// slow_test`. Keep the rest of the test suite fast // by running this test alone when iterating. + let registry = SpawnedPodRegistry::new(spawner_rd); let def = spawn_pod_tool( "root".into(), spawner_socket, runtime_base, allow_root.path().to_path_buf(), - spawner_rd, + registry, ); let (_meta, tool) = def(); diff --git a/tickets/pod-comm-tools.md b/tickets/pod-comm-tools.md index 021ff6c3..4c7ffc42 100644 --- a/tickets/pod-comm-tools.md +++ b/tickets/pod-comm-tools.md @@ -1,5 +1,7 @@ # Pod 間通信ツール: SendToPod / ReadPodOutput / StopPod / ListPods +レビュー中: [pod-comm-tools.review.md](pod-comm-tools.review.md) + ## 背景 `SpawnPod` で Pod を生成した後、spawner の LLM が spawned Pod に指示を送り、結果を読み、完了したら停止させる手段が必要。 diff --git a/tickets/pod-comm-tools.review.md b/tickets/pod-comm-tools.review.md new file mode 100644 index 00000000..9890c28c --- /dev/null +++ b/tickets/pod-comm-tools.review.md @@ -0,0 +1,85 @@ +# Review: pod-comm-tools + +実装コミット前レビュー。`cargo build -p pod` clean、`cargo test -p pod --test pod_comm_tools_test` 8/8 pass。 + +## 総評 + +チケット要件の中核(4 ツールの wire 動作、`SpawnedPodRegistry` の共有、scope 回収、stale reclaim トリガー)は達成。テスト構成もモックソケットで実 `pod` バイナリ非依存で速い。ただし以下のうち修正必須項目を解消してから完了にしたい。 + +## 指摘と判断 + +### 修正必須 + +#### 1. `SendToPod` が RUNNING 状態を検知しない(設計合意違反) + +**状況**: `connect_and_send` は `Method::Run` を書いて即切断し、応答 event を読まない。Controller は RUNNING 中の `Run` に対し `Event::Error { code: AlreadyRunning }` を返すが、tool 側はそれを見ないので LLM に「送信成功」と返る。 + +**設計合意**: pod-comm-tools 議論の結論は「A. SendToPod は `Method::Run` を使い、Pod が IDLE でなければエラーを返す」。 + +**判断**: 修正必要。書き込み後に short timeout で 1 event 読み、`Error { AlreadyRunning }` を検知したら `ToolError::ExecutionFailed` に変換する。正常時は `TurnStart` など先頭 event を見て「受け付けられた」ことを確認して戻る。 + +### チケット文言と実装の整合 + +#### 2. Pod status (`running / idle / stopped`) の取得手段がない + +**状況**: チケットは `ReadPodOutput`/`ListPods` 出力に `running / idle / stopped` を含めることを求めているが、`Event::History` にはステータスが乗っておらず、実装は「接続可=alive / 接続不可=stopped」の 2 値のみ。 + +**判断**: 今回のスコープからは外す。`Greeting` または別 event に `PodStatus` を載せる改修は `Method::GetHistory` のレスポンス設計変更を伴うので、別チケット化する。本チケット文言を `alive / stopped` に揃えて実装と一致させる。 + +#### 3. `StopPod` は終了確認を待たない + +**状況**: チケット本文は「`Method::Shutdown` 送信 → 終了確認受信 → 切断」だが、実装は fire-and-forget。scope の明示 release と `scope_lock` 上の stale reclaim で整合性は担保されているので実害なし。 + +**判断**: チケット文言を「`Method::Shutdown` 送信(応答は待たない)」に修正。 + +### 範囲外として明文化 + +#### 4. `spawned_pods.json` からの再起動復旧は未実装 + +**状況**: `spawned_pod_registry.rs` のモジュールコメントに「today only write-through is implemented」と明記されている。spawner プロセス再起動後、`ListPods` は空リストを返す。 + +**判断**: 今回のスコープ外。チケットの範囲外に「spawner 再起動時の `spawned_pods.json` からの復旧」を明記し、必要になったタイミングで別チケットを切る。 + +#### 5. `ReadPodOutput` cursor の非永続性 + +**状況**: `SpawnedPodRegistry::cursors` は `HashMap` でインメモリのみ。spawner 再起動後、カーソルは 0 にリセットされ、既読だった assistant text が再送される。モジュールコメントに「cursors intentionally do not persist」と明記されている。 + +**判断**: 現状のままで OK。設計判断としてレビューで合意し、チケットの「設計で決めること」の回答として本 review に残す。チケット側には残さない(gitで追える)。 + +### 軽微 + +#### 6. `extract_assistant_text` のメッセージ境界 + +**状況**: 複数の assistant message を `\n` で連結するのみ。メッセージ境界が曖昧になる。 + +**判断**: `\n\n` に変更して境界を明確にする。修正コスト小。 + +#### 7. blocking `flock` を async から呼んでいる + +**状況**: `release_scope` / stale reclaim 経路で `LockFileGuard::open`(内部で `flock(2)`)を async fn から直接呼ぶ。通常は瞬時だが厳密には tokio runtime を一瞬止める。 + +**判断**: 既存コード全体が同じ慣習なので今回は踏襲。気になったら別タイミングで `spawn_blocking` 化を検討。本チケットでは対応しない。 + +#### 8. `SpawnedPodRegistry::new` が `Arc` を返す + +**状況**: コンストラクタが smart pointer を返す API は珍しい。 + +**判断**: consumer が全員 `Arc` で持つ設計なので許容。リファクタは不要。 + +## 完了条件との対応 + +| 完了条件 | 状態 | +|---|---| +| SendToPod で spawned Pod にメッセージ送信・Pod が処理 | ✅(指摘 #1 の修正後) | +| ReadPodOutput でカーソルベース差分取得 | ✅ | +| StopPod で graceful 停止 + scope 返却 | ✅ | +| ListPods で既知 Pod の状態一覧 + health check | ⚠️ `alive/stopped` のみ(指摘 #2) | +| 接続不可 Pod は `stopped` 扱い + stale 回収トリガー | ✅ | +| 各ツール正常系・異常系の単体テスト | ✅(8/8 pass) | + +## 完了に向けた作業 + +1. 指摘 #1: `SendToPod` に応答確認を実装 +2. 指摘 #2, #3: チケット本文の文言修正(`running/idle/stopped` → `alive/stopped`、Shutdown 応答待ちの記述削除) +3. 指摘 #4: チケットの「範囲外」に `spawned_pods.json` 復旧を追記 +4. 指摘 #6: `extract_assistant_text` の区切りを `\n\n` に