Pod操作ツールの実装

This commit is contained in:
Keisuke Hirata 2026-04-19 06:32:44 +09:00
parent 73acfcb7f2
commit 5d63d0f6e2
9 changed files with 1035 additions and 34 deletions

View File

@ -9,10 +9,14 @@ use tokio::sync::{broadcast, mpsc, oneshot};
use crate::notification_buffer::NotificationBuffer;
use crate::notifier::Notifier;
use crate::pod::{Pod, PodError, PodRunResult};
use crate::pod_comm_tools::{
list_pods_tool, read_pod_output_tool, send_to_pod_tool, stop_pod_tool,
};
use crate::runtime_dir::RuntimeDir;
use crate::shared_state::{PodSharedState, PodStatus};
use crate::socket_server::SocketServer;
use crate::spawn_pod::spawn_pod_tool;
use crate::spawned_pod_registry::SpawnedPodRegistry;
use protocol::{ErrorCode, Event, Method, NotificationLevel, NotificationSource, RunResult, TurnResult};
// ---------------------------------------------------------------------------
@ -204,18 +208,25 @@ impl PodController {
let tracker = tools::Tracker::new();
worker.register_tools(tools::builtin_tools(fs, tracker.clone()));
// SpawnPod is wired here rather than in `tools::builtin_tools`
// because it needs Pod-scoped handles (this Pod's own socket
// path, runtime_dir, spawner name) that the generic tools
// crate has no access to.
// Pod-orchestration tools (SpawnPod + the four comm tools)
// share a single `SpawnedPodRegistry`: `SpawnPod` writes to
// it, the others read/mutate. Wired here rather than in
// `tools::builtin_tools` because these need Pod-scoped
// handles (this Pod's own socket path, runtime_dir, spawner
// name) that the generic tools crate has no access to.
let spawner_socket = runtime_dir.socket_path();
let spawned_registry = SpawnedPodRegistry::new(runtime_dir.clone());
worker.register_tool(spawn_pod_tool(
spawner_name,
spawner_socket,
runtime_base.to_path_buf(),
pwd_for_tools,
runtime_dir.clone(),
spawned_registry.clone(),
));
worker.register_tool(send_to_pod_tool(spawned_registry.clone()));
worker.register_tool(read_pod_output_tool(spawned_registry.clone()));
worker.register_tool(stop_pod_tool(spawned_registry.clone()));
worker.register_tool(list_pods_tool(spawned_registry));
pod.attach_tracker(tracker);
}
@ -482,16 +493,21 @@ where
manifest::ProviderKind::Ollama => "ollama",
};
// The tool list mirrors what `spawn()` registers on the Worker:
// builtin filesystem tools plus `SpawnPod`. `SpawnPod` is appended
// by name because constructing its factory here would require
// Pod-lifetime handles we haven't built yet (runtime_dir, socket).
// builtin filesystem tools plus the pod-orchestration tools.
// Orchestration tools are appended by name because constructing
// their factories here would require Pod-lifetime handles we
// haven't built yet (runtime_dir, socket).
let fs = tools::ScopedFs::new(pod.scope().clone(), pod.pwd().to_path_buf());
let tracker = tools::Tracker::new();
let mut tool_names: Vec<String> = tools::builtin_tools(fs, tracker)
.iter()
.map(|def| def().0.name)
.collect();
tool_names.push("SpawnPod".into());
tool_names.extend(
["SpawnPod", "SendToPod", "ReadPodOutput", "StopPod", "ListPods"]
.iter()
.map(|s| (*s).into()),
);
protocol::Greeting {
pod_name: manifest.pod.name.clone(),
cwd: pod.pwd().display().to_string(),

View File

@ -4,8 +4,10 @@ pub mod notifier;
pub mod runtime_dir;
pub mod scope_lock;
pub mod shared_state;
pub mod pod_comm_tools;
pub mod socket_server;
pub mod spawn_pod;
pub mod spawned_pod_registry;
mod agents_md;
mod compact_state;

View File

@ -0,0 +1,428 @@
//! Pod-to-Pod communication tools.
//!
//! Four tools in one module — `SendToPod`, `ReadPodOutput`, `StopPod`,
//! `ListPods` — all built on the same `SpawnedPodRegistry` handed in by
//! the controller. Each operation is request-response: connect to the
//! target's Unix socket, perform one method exchange, disconnect.
//!
//! These tools only touch Pods listed in the spawner's
//! `spawned_pods.json`; there is no machine-wide directory lookup, so
//! the spawner can only reach its own descendants.
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use llm_worker::llm_client::types::{ContentPart, Item, Role};
use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput};
use protocol::stream::{JsonLineReader, JsonLineWriter};
use protocol::{Event, Method};
use serde::Deserialize;
use tokio::net::UnixStream;
use crate::runtime_dir::SpawnedPodRecord;
use crate::scope_lock::{self, LockFileGuard};
use crate::spawned_pod_registry::SpawnedPodRegistry;
/// Timeout applied to each socket-level operation — connect, write,
/// read. Kept short so a stuck child doesn't block the spawner's turn.
const SOCKET_OP_TIMEOUT: Duration = Duration::from_secs(5);
// ---------------------------------------------------------------------------
// Shared input types
// ---------------------------------------------------------------------------
#[derive(Debug, Deserialize, schemars::JsonSchema)]
struct NameInput {
/// Name of a previously spawned Pod.
name: String,
}
// ---------------------------------------------------------------------------
// SendToPod
// ---------------------------------------------------------------------------
const SEND_TO_POD_DESCRIPTION: &str =
"Send a text message to a previously spawned Pod. The spawned Pod \
processes it as a user turn. Does not wait for the Pod's response \
use `ReadPodOutput` to fetch results afterwards.";
#[derive(Debug, Deserialize, schemars::JsonSchema)]
struct SendToPodInput {
/// Target Pod name.
name: String,
/// Text delivered to the Pod as the next user message.
message: String,
}
struct SendToPodTool {
registry: Arc<SpawnedPodRegistry>,
}
#[async_trait]
impl Tool for SendToPodTool {
async fn execute(&self, input_json: &str) -> Result<ToolOutput, ToolError> {
let input: SendToPodInput = serde_json::from_str(input_json)
.map_err(|e| ToolError::InvalidArgument(format!("invalid SendToPod input: {e}")))?;
let record = self
.registry
.get(&input.name)
.await
.ok_or_else(|| unknown_pod_err(&input.name))?;
connect_and_send(&record.socket_path, &Method::Run { input: input.message })
.await
.map_err(|e| ToolError::ExecutionFailed(format!("send to `{}`: {e}", input.name)))?;
Ok(ToolOutput {
summary: format!("sent message to `{}`", input.name),
content: None,
})
}
}
pub fn send_to_pod_tool(registry: Arc<SpawnedPodRegistry>) -> ToolDefinition {
Arc::new(move || {
let schema = schemars::schema_for!(SendToPodInput);
let schema_value = serde_json::to_value(schema).unwrap_or(serde_json::json!({}));
let meta = ToolMeta::new("SendToPod")
.description(SEND_TO_POD_DESCRIPTION)
.input_schema(schema_value);
let tool: Arc<dyn Tool> = Arc::new(SendToPodTool {
registry: registry.clone(),
});
(meta, tool)
})
}
// ---------------------------------------------------------------------------
// ReadPodOutput
// ---------------------------------------------------------------------------
const READ_POD_OUTPUT_DESCRIPTION: &str =
"Fetch new assistant text from a spawned Pod since the last read. \
Uses an internal cursor per-Pod so consecutive calls return only \
newly-produced output. Returns the Pod's current status and the new \
text, or reports `stopped` if the Pod can no longer be reached.";
struct ReadPodOutputTool {
registry: Arc<SpawnedPodRegistry>,
}
#[async_trait]
impl Tool for ReadPodOutputTool {
async fn execute(&self, input_json: &str) -> Result<ToolOutput, ToolError> {
let input: NameInput = serde_json::from_str(input_json).map_err(|e| {
ToolError::InvalidArgument(format!("invalid ReadPodOutput input: {e}"))
})?;
let record = self
.registry
.get(&input.name)
.await
.ok_or_else(|| unknown_pod_err(&input.name))?;
let items = match fetch_history(&record.socket_path).await {
Ok(items) => items,
Err(_) => {
return Ok(ToolOutput {
summary: format!("pod `{}` is stopped (unreachable)", input.name),
content: None,
});
}
};
let cursor = self.registry.cursor(&input.name).await;
let new_items = if cursor >= items.len() {
&[] as &[serde_json::Value]
} else {
&items[cursor..]
};
let new_text = extract_assistant_text(new_items);
self.registry.set_cursor(&input.name, items.len()).await;
let summary = if new_text.is_empty() {
format!("pod `{}` running; no new assistant text", input.name)
} else {
let lines = new_text.lines().count();
format!("pod `{}`: {lines} new line(s) of assistant text", input.name)
};
let content = if new_text.is_empty() {
None
} else {
Some(new_text)
};
Ok(ToolOutput { summary, content })
}
}
pub fn read_pod_output_tool(registry: Arc<SpawnedPodRegistry>) -> ToolDefinition {
Arc::new(move || {
let schema = schemars::schema_for!(NameInput);
let schema_value = serde_json::to_value(schema).unwrap_or(serde_json::json!({}));
let meta = ToolMeta::new("ReadPodOutput")
.description(READ_POD_OUTPUT_DESCRIPTION)
.input_schema(schema_value);
let tool: Arc<dyn Tool> = Arc::new(ReadPodOutputTool {
registry: registry.clone(),
});
(meta, tool)
})
}
// ---------------------------------------------------------------------------
// StopPod
// ---------------------------------------------------------------------------
const STOP_POD_DESCRIPTION: &str =
"Terminate a spawned Pod and reclaim the delegated scope. The Pod \
receives `Shutdown`; its scope entry is released in the machine-wide \
registry so the spawner can spawn a new Pod over the same paths.";
struct StopPodTool {
registry: Arc<SpawnedPodRegistry>,
}
#[async_trait]
impl Tool for StopPodTool {
async fn execute(&self, input_json: &str) -> Result<ToolOutput, ToolError> {
let input: NameInput = serde_json::from_str(input_json)
.map_err(|e| ToolError::InvalidArgument(format!("invalid StopPod input: {e}")))?;
let record = self
.registry
.get(&input.name)
.await
.ok_or_else(|| unknown_pod_err(&input.name))?;
// Best-effort Shutdown. The child's own `ScopeAllocationGuard`
// releases the entry on clean exit; we also release explicitly
// below so callers can't observe a window where the scope is
// still registered but StopPod has returned. Duplicate release
// is harmless — `ScopeAllocationGuard`'s drop path swallows
// `UnknownPod` errors.
let _ = connect_and_send(&record.socket_path, &Method::Shutdown).await;
let scope_summary = summarize_scope(&record);
release_scope(&record.pod_name);
self.registry
.remove(&record.pod_name)
.await
.map_err(|e| ToolError::ExecutionFailed(format!("update spawned_pods.json: {e}")))?;
Ok(ToolOutput {
summary: format!(
"stopped pod `{}`; reclaimed scope: {scope_summary}",
record.pod_name
),
content: None,
})
}
}
pub fn stop_pod_tool(registry: Arc<SpawnedPodRegistry>) -> ToolDefinition {
Arc::new(move || {
let schema = schemars::schema_for!(NameInput);
let schema_value = serde_json::to_value(schema).unwrap_or(serde_json::json!({}));
let meta = ToolMeta::new("StopPod")
.description(STOP_POD_DESCRIPTION)
.input_schema(schema_value);
let tool: Arc<dyn Tool> = Arc::new(StopPodTool {
registry: registry.clone(),
});
(meta, tool)
})
}
// ---------------------------------------------------------------------------
// ListPods
// ---------------------------------------------------------------------------
const LIST_PODS_DESCRIPTION: &str =
"List all Pods spawned by this Pod along with their reachability \
status (`alive` / `stopped`) and the scope each was granted.";
#[derive(Debug, Deserialize, schemars::JsonSchema)]
struct EmptyInput {}
struct ListPodsTool {
registry: Arc<SpawnedPodRegistry>,
}
#[async_trait]
impl Tool for ListPodsTool {
async fn execute(&self, _input_json: &str) -> Result<ToolOutput, ToolError> {
let records = self.registry.list().await;
if records.is_empty() {
return Ok(ToolOutput {
summary: "no spawned pods".into(),
content: None,
});
}
let mut lines: Vec<String> = Vec::with_capacity(records.len());
let mut stale_names: Vec<String> = Vec::new();
for record in &records {
let alive = is_reachable(&record.socket_path).await;
let status = if alive { "alive" } else { "stopped" };
let scope = summarize_scope(record);
lines.push(format!("{} [{status}] scope={scope}", record.pod_name));
if !alive {
stale_names.push(record.pod_name.clone());
}
}
// Trigger stale reclaim on unreachable pods so the lock file's
// allocation table doesn't keep growing indefinitely when
// children crash without a clean exit path.
if !stale_names.is_empty() {
if let Ok(lock_path) = scope_lock::default_lock_path()
&& let Ok(mut guard) = LockFileGuard::open(&lock_path)
{
scope_lock::reclaim_stale(&mut guard);
}
}
let summary = format!("{} pod(s) known", records.len());
Ok(ToolOutput {
summary,
content: Some(lines.join("\n")),
})
}
}
pub fn list_pods_tool(registry: Arc<SpawnedPodRegistry>) -> ToolDefinition {
Arc::new(move || {
let schema = schemars::schema_for!(EmptyInput);
let schema_value = serde_json::to_value(schema).unwrap_or(serde_json::json!({}));
let meta = ToolMeta::new("ListPods")
.description(LIST_PODS_DESCRIPTION)
.input_schema(schema_value);
let tool: Arc<dyn Tool> = Arc::new(ListPodsTool {
registry: registry.clone(),
});
(meta, tool)
})
}
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
fn unknown_pod_err(name: &str) -> ToolError {
ToolError::InvalidArgument(format!("no spawned pod named `{name}`"))
}
/// Connect with a timeout, write one `Method` line, flush, and close.
/// Any socket error maps to an `io::Error`; the caller decides whether
/// to surface it to the LLM or treat it as "pod stopped".
async fn connect_and_send(socket: &Path, method: &Method) -> std::io::Result<()> {
let stream = tokio::time::timeout(SOCKET_OP_TIMEOUT, UnixStream::connect(socket))
.await
.map_err(|_| std::io::Error::new(std::io::ErrorKind::TimedOut, "connect timed out"))??;
let (_r, w) = stream.into_split();
let mut writer = JsonLineWriter::new(w);
tokio::time::timeout(SOCKET_OP_TIMEOUT, writer.write(method))
.await
.map_err(|_| std::io::Error::new(std::io::ErrorKind::TimedOut, "write timed out"))??;
Ok(())
}
/// Connect and ask the Pod for its conversation history. Skips
/// pre-History events (such as buffered notifications replayed to new
/// clients). Returns the raw JSON items as `serde_json::Value` since
/// the pod crate already round-trips via `Value` on the wire.
async fn fetch_history(socket: &Path) -> std::io::Result<Vec<serde_json::Value>> {
let stream = tokio::time::timeout(SOCKET_OP_TIMEOUT, UnixStream::connect(socket))
.await
.map_err(|_| std::io::Error::new(std::io::ErrorKind::TimedOut, "connect timed out"))??;
let (r, w) = stream.into_split();
let mut writer = JsonLineWriter::new(w);
let mut reader = JsonLineReader::new(r);
tokio::time::timeout(SOCKET_OP_TIMEOUT, writer.write(&Method::GetHistory))
.await
.map_err(|_| std::io::Error::new(std::io::ErrorKind::TimedOut, "write timed out"))??;
loop {
let event = tokio::time::timeout(SOCKET_OP_TIMEOUT, reader.next::<Event>())
.await
.map_err(|_| std::io::Error::new(std::io::ErrorKind::TimedOut, "read timed out"))??;
match event {
Some(Event::History { items, .. }) => return Ok(items),
Some(_) => continue,
None => {
return Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"pod closed connection before History event",
));
}
}
}
}
/// Probe-connect test. Connection accepted within timeout → alive.
async fn is_reachable(socket: &Path) -> bool {
tokio::time::timeout(SOCKET_OP_TIMEOUT, UnixStream::connect(socket))
.await
.map(|r| r.is_ok())
.unwrap_or(false)
}
fn extract_assistant_text(items: &[serde_json::Value]) -> String {
let mut out = String::new();
for value in items {
let Ok(item) = serde_json::from_value::<Item>(value.clone()) else {
continue;
};
if let Item::Message {
role: Role::Assistant,
content,
..
} = item
{
for part in content {
if let ContentPart::Text { text } = part {
if !out.is_empty() {
out.push('\n');
}
out.push_str(&text);
}
}
}
}
out
}
fn summarize_scope(record: &SpawnedPodRecord) -> String {
if record.scope_delegated.is_empty() {
return "(none)".into();
}
let parts: Vec<String> = record
.scope_delegated
.iter()
.map(|r| {
let perm = match r.permission {
manifest::Permission::Read => "read",
manifest::Permission::Write => "write",
};
let tag = if r.recursive { "" } else { " [non-recursive]" };
format!("{perm}:{}{tag}", r.target.display())
})
.collect();
parts.join(", ")
}
/// Best-effort release of the pod's scope allocation. Swallows every
/// error: the caller has already completed its user-visible side
/// effects (Method::Shutdown was sent), and stale-reclaim will clean
/// up whatever we couldn't.
fn release_scope(pod_name: &str) {
let Ok(lock_path) = scope_lock::default_lock_path() else {
return;
};
let Ok(mut guard) = LockFileGuard::open(&lock_path) else {
return;
};
let _ = scope_lock::release_pod(&mut guard, pod_name);
}

View File

@ -21,11 +21,11 @@ use protocol::stream::JsonLineWriter;
use serde::Deserialize;
use tokio::net::UnixStream;
use tokio::process::Command;
use tokio::sync::Mutex;
use tokio::time::sleep;
use crate::runtime_dir::{RuntimeDir, SpawnedPodRecord};
use crate::runtime_dir::SpawnedPodRecord;
use crate::scope_lock::{self, LockFileGuard, ScopeLockError};
use crate::spawned_pod_registry::SpawnedPodRegistry;
const DESCRIPTION: &str = "Spawn a new Pod process to work on a delegated task. \
The spawner's write scope is reduced by the scope passed here; the spawned \
@ -101,11 +101,10 @@ pub struct SpawnPodTool {
/// Directory the spawned Pod should run in when the LLM did not
/// override it. Defaults to the spawner's pwd — see module docs.
spawner_pwd: PathBuf,
/// Spawner's own runtime directory — target for `spawned_pods.json`.
runtime_dir: Arc<RuntimeDir>,
/// Running list of successful spawns, replayed into
/// `spawned_pods.json` on every successful `execute`.
records: Arc<Mutex<Vec<SpawnedPodRecord>>>,
/// Shared registry of spawned children, also used by the
/// pod-comm tools (`SendToPod` / `ReadPodOutput` / `StopPod` /
/// `ListPods`). Writes the list to `spawned_pods.json` on each add.
registry: Arc<SpawnedPodRegistry>,
}
impl SpawnPodTool {
@ -114,15 +113,14 @@ impl SpawnPodTool {
callback_socket: PathBuf,
runtime_base: PathBuf,
spawner_pwd: PathBuf,
runtime_dir: Arc<RuntimeDir>,
registry: Arc<SpawnedPodRegistry>,
) -> Self {
Self {
spawner_name,
callback_socket,
runtime_base,
spawner_pwd,
runtime_dir,
records: Arc::new(Mutex::new(Vec::new())),
registry,
}
}
}
@ -203,16 +201,10 @@ impl Tool for SpawnPodTool {
scope_delegated: scope_allow,
callback_address: self.callback_socket.clone(),
};
{
let mut records = self.records.lock().await;
records.push(record);
self.runtime_dir
.write_spawned_pods(records.as_slice())
.await
.map_err(|e| {
ToolError::ExecutionFailed(format!("write spawned_pods.json: {e}"))
})?;
}
self.registry
.add(record)
.await
.map_err(|e| ToolError::ExecutionFailed(format!("write spawned_pods.json: {e}")))?;
Ok(ToolOutput {
summary: format!(
@ -367,7 +359,7 @@ pub fn spawn_pod_tool(
callback_socket: PathBuf,
runtime_base: PathBuf,
spawner_pwd: PathBuf,
runtime_dir: Arc<RuntimeDir>,
registry: Arc<SpawnedPodRegistry>,
) -> ToolDefinition {
Arc::new(move || {
let schema = schemars::schema_for!(SpawnPodInput);
@ -380,7 +372,7 @@ pub fn spawn_pod_tool(
callback_socket.clone(),
runtime_base.clone(),
spawner_pwd.clone(),
runtime_dir.clone(),
registry.clone(),
));
(meta, tool)
})

View File

@ -0,0 +1,93 @@
//! Shared registry of Pods spawned by this Pod.
//!
//! `SpawnPod` writes here; the pod-comm tools (`SendToPod`,
//! `ReadPodOutput`, `StopPod`, `ListPods`) read and mutate the same
//! instance. Persisted to `spawned_pods.json` in the spawner's runtime
//! dir so a restarted spawner rebuilds its view from disk (future work
//! — today only write-through is implemented).
//!
//! `ReadPodOutput` additionally owns a per-spawned-pod cursor here so
//! two consecutive reads yield only new assistant text. The cursor is
//! an item-index into the child's history; push-only history makes
//! index stable across reads.
//!
//! The registry stays in-memory only for this Pod's lifetime — cursors
//! intentionally do not persist.
use std::collections::HashMap;
use std::io;
use std::sync::Arc;
use tokio::sync::Mutex;
use crate::runtime_dir::{RuntimeDir, SpawnedPodRecord};
pub struct SpawnedPodRegistry {
records: Mutex<Vec<SpawnedPodRecord>>,
cursors: Mutex<HashMap<String, usize>>,
runtime_dir: Arc<RuntimeDir>,
}
impl SpawnedPodRegistry {
pub fn new(runtime_dir: Arc<RuntimeDir>) -> Arc<Self> {
Arc::new(Self {
records: Mutex::new(Vec::new()),
cursors: Mutex::new(HashMap::new()),
runtime_dir,
})
}
/// Append a new record and persist the full list. Returns an I/O
/// error if the persisted write fails; the in-memory state is still
/// updated in that case — the next successful write will reconcile.
pub async fn add(&self, record: SpawnedPodRecord) -> io::Result<()> {
let mut records = self.records.lock().await;
records.push(record);
self.runtime_dir.write_spawned_pods(records.as_slice()).await
}
/// Look up a record by pod name. Cloned so callers can drop the lock.
pub async fn get(&self, pod_name: &str) -> Option<SpawnedPodRecord> {
self.records
.lock()
.await
.iter()
.find(|r| r.pod_name == pod_name)
.cloned()
}
pub async fn list(&self) -> Vec<SpawnedPodRecord> {
self.records.lock().await.clone()
}
/// Remove the record for `pod_name`, persist, and clear its cursor.
/// Returns the removed record (if any).
pub async fn remove(&self, pod_name: &str) -> io::Result<Option<SpawnedPodRecord>> {
let removed = {
let mut records = self.records.lock().await;
let idx = records.iter().position(|r| r.pod_name == pod_name);
let removed = idx.map(|i| records.remove(i));
self.runtime_dir.write_spawned_pods(records.as_slice()).await?;
removed
};
self.cursors.lock().await.remove(pod_name);
Ok(removed)
}
/// Read-only cursor lookup. Returns 0 when no cursor has been set.
pub async fn cursor(&self, pod_name: &str) -> usize {
self.cursors
.lock()
.await
.get(pod_name)
.copied()
.unwrap_or(0)
}
pub async fn set_cursor(&self, pod_name: &str, cursor: usize) {
self.cursors
.lock()
.await
.insert(pod_name.to_string(), cursor);
}
}

View File

@ -0,0 +1,379 @@
//! Integration tests for the pod-comm tools (`SendToPod`,
//! `ReadPodOutput`, `StopPod`, `ListPods`).
//!
//! The real child Pod binary is not started. Instead each test stands
//! up a mock `UnixListener` that speaks the socket protocol directly
//! (accepting `Method::Run` / `Method::GetHistory` / `Method::Shutdown`
//! and responding with `Event::History` when asked). This keeps the
//! tests fast and independent of the LLM layer — the tools are exercised
//! for their wire behaviour alone.
use std::path::{Path, PathBuf};
use std::sync::{Arc, LazyLock, Mutex};
use llm_worker::llm_client::types::{ContentPart, Item, Role};
use llm_worker::tool::ToolOutput;
use manifest::{Permission, ScopeRule};
use pod::pod_comm_tools::{
list_pods_tool, read_pod_output_tool, send_to_pod_tool, stop_pod_tool,
};
use pod::runtime_dir::{RuntimeDir, SpawnedPodRecord};
use pod::scope_lock::{self, LockFileGuard};
use pod::spawned_pod_registry::SpawnedPodRegistry;
use protocol::stream::{JsonLineReader, JsonLineWriter};
use protocol::{Event, Greeting, Method};
use serde_json::json;
use tempfile::TempDir;
use tokio::net::UnixListener;
use tokio::task::JoinHandle;
/// Serialises env-mutating tests. The test harness runs tasks across
/// threads, and `INSOMNIA_SCOPE_LOCK` is a process-wide resource.
static ENV_LOCK: LazyLock<Mutex<()>> = LazyLock::new(|| Mutex::new(()));
struct EnvGuard {
_lock: std::sync::MutexGuard<'static, ()>,
}
impl EnvGuard {
fn acquire() -> Self {
Self {
_lock: ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner()),
}
}
}
/// Create a spawner-owned `RuntimeDir` + `SpawnedPodRegistry` scoped to
/// a fresh tempdir. The returned `TempDir` must be kept alive by the
/// caller for the duration of the test.
async fn setup_registry() -> (TempDir, Arc<SpawnedPodRegistry>, Arc<RuntimeDir>) {
let tmp = TempDir::new().unwrap();
let rd = RuntimeDir::create(tmp.path(), "spawner").await.unwrap();
let rd = Arc::new(rd);
let registry = SpawnedPodRegistry::new(rd.clone());
(tmp, registry, rd)
}
/// Register a fake spawned-child record pointing at a given socket
/// path, with a trivial write-scope for `scope_path`. Does not touch
/// scope.lock.
async fn register_child(
registry: &SpawnedPodRegistry,
name: &str,
socket: &Path,
scope_path: &Path,
) {
let record = SpawnedPodRecord {
pod_name: name.into(),
socket_path: socket.to_path_buf(),
scope_delegated: vec![ScopeRule {
target: scope_path.to_path_buf(),
permission: Permission::Write,
recursive: true,
}],
callback_address: "/dev/null".into(),
};
registry.add(record).await.unwrap();
}
/// Bind a Unix listener at a socket path inside the given directory.
async fn bind_mock_socket(dir: &Path, name: &str) -> (PathBuf, UnixListener) {
let socket = dir.join(format!("{name}.sock"));
let listener = UnixListener::bind(&socket).unwrap();
(socket, listener)
}
/// Accept one connection and read exactly one `Method` line from it.
/// The reader half is kept open; caller awaits the returned handle.
fn accept_one_method(listener: UnixListener) -> JoinHandle<Option<Method>> {
tokio::spawn(async move {
let (stream, _) = listener.accept().await.ok()?;
let (r, _w) = stream.into_split();
let mut reader = JsonLineReader::new(r);
reader.next::<Method>().await.ok().flatten()
})
}
/// Pretend to be a spawned Pod that responds to `GetHistory` with a
/// fixed set of items. Accepts connections until the first one that
/// delivers a `GetHistory` method; earlier probes (empty accepts) and
/// non-history methods are ignored. Returns nothing — tests await the
/// handle only to keep the listener alive until shutdown.
fn serve_history(listener: UnixListener, items: Vec<Item>) -> JoinHandle<()> {
tokio::spawn(async move {
loop {
let Ok((stream, _)) = listener.accept().await else {
return;
};
let (r, w) = stream.into_split();
let mut reader = JsonLineReader::new(r);
let mut writer = JsonLineWriter::new(w);
match reader.next::<Method>().await {
Ok(Some(Method::GetHistory)) => {
let values: Vec<serde_json::Value> = items
.iter()
.map(|i| serde_json::to_value(i).unwrap())
.collect();
let event = Event::History {
items: values,
greeting: Greeting {
pod_name: "child".into(),
cwd: "/tmp".into(),
provider: "anthropic".into(),
model: "x".into(),
scope_summary: String::new(),
tools: Vec::new(),
},
};
let _ = writer.write(&event).await;
}
Ok(Some(_)) | Ok(None) | Err(_) => {
// Ignore: loop accepts another connection.
}
}
}
})
}
fn assistant(text: &str) -> Item {
Item::Message {
id: None,
role: Role::Assistant,
content: vec![ContentPart::Text { text: text.into() }],
status: None,
}
}
// ---------------------------------------------------------------------------
// SendToPod
// ---------------------------------------------------------------------------
#[tokio::test]
async fn send_to_pod_delivers_run_method() {
let (tmp, registry, _rd) = setup_registry().await;
let (socket, listener) = bind_mock_socket(tmp.path(), "child").await;
let received = accept_one_method(listener);
register_child(&registry, "child", &socket, tmp.path()).await;
let def = send_to_pod_tool(registry);
let (_meta, tool) = def();
let input = json!({ "name": "child", "message": "hello there" }).to_string();
let output: ToolOutput = tool.execute(&input).await.unwrap();
assert!(output.summary.contains("child"), "summary: {}", output.summary);
let method = received.await.unwrap().expect("expected a method");
match method {
Method::Run { input } => assert_eq!(input, "hello there"),
other => panic!("expected Run, got {other:?}"),
}
}
#[tokio::test]
async fn send_to_pod_errors_on_unknown_pod() {
let (_tmp, registry, _rd) = setup_registry().await;
let def = send_to_pod_tool(registry);
let (_meta, tool) = def();
let input = json!({ "name": "nope", "message": "hi" }).to_string();
let err = tool.execute(&input).await.unwrap_err();
assert!(err.to_string().contains("no spawned pod"), "{err}");
}
// ---------------------------------------------------------------------------
// ReadPodOutput
// ---------------------------------------------------------------------------
#[tokio::test]
async fn read_pod_output_returns_new_assistant_text_then_empty_on_second_call() {
let (tmp, registry, _rd) = setup_registry().await;
let (socket, listener) = bind_mock_socket(tmp.path(), "child").await;
register_child(&registry, "child", &socket, tmp.path()).await;
let items = vec![
Item::user_message("hello"),
assistant("hi back"),
assistant("still working"),
];
let _server = serve_history(listener, items);
let def = read_pod_output_tool(registry);
let (_meta, tool) = def();
let input = json!({ "name": "child" }).to_string();
let first: ToolOutput = tool.execute(&input).await.unwrap();
let body = first.content.expect("first read should have content");
assert!(body.contains("hi back"), "body: {body}");
assert!(body.contains("still working"), "body: {body}");
// Cursor now points past all items — second call returns no new text.
let second: ToolOutput = tool.execute(&input).await.unwrap();
assert!(second.content.is_none(), "unexpected content: {:?}", second.content);
assert!(
second.summary.contains("no new assistant text"),
"summary: {}",
second.summary
);
}
#[tokio::test]
async fn read_pod_output_reports_stopped_on_dead_socket() {
let (tmp, registry, _rd) = setup_registry().await;
// Register a record pointing at a socket that nobody is listening
// on. Connect must fail → tool reports "stopped".
let dead_socket = tmp.path().join("dead.sock");
register_child(&registry, "child", &dead_socket, tmp.path()).await;
let def = read_pod_output_tool(registry);
let (_meta, tool) = def();
let input = json!({ "name": "child" }).to_string();
let output: ToolOutput = tool.execute(&input).await.unwrap();
assert!(output.summary.contains("stopped"), "{}", output.summary);
}
// ---------------------------------------------------------------------------
// StopPod
// ---------------------------------------------------------------------------
#[tokio::test]
async fn stop_pod_sends_shutdown_and_releases_scope() {
let _env = EnvGuard::acquire();
let (tmp, registry, rd) = setup_registry().await;
let lock_path = tmp.path().join("scope.lock");
unsafe {
std::env::set_var("INSOMNIA_SCOPE_LOCK", &lock_path);
}
// Seed scope.lock with a top-level `spawner` allocation plus a
// delegated `child` allocation — mimics what SpawnPod would have
// done so StopPod has something to release.
{
let mut g = LockFileGuard::open(&lock_path).unwrap();
scope_lock::register_pod(
&mut g,
"spawner".into(),
std::process::id(),
"/tmp/spawner.sock".into(),
vec![ScopeRule {
target: tmp.path().to_path_buf(),
permission: Permission::Write,
recursive: true,
}],
)
.unwrap();
scope_lock::delegate_scope(
&mut g,
"spawner",
"child".into(),
std::process::id(),
"/tmp/child.sock".into(),
vec![ScopeRule {
target: tmp.path().to_path_buf(),
permission: Permission::Write,
recursive: true,
}],
)
.unwrap();
}
let (socket, listener) = bind_mock_socket(tmp.path(), "child").await;
let received = accept_one_method(listener);
register_child(&registry, "child", &socket, tmp.path()).await;
let def = stop_pod_tool(registry.clone());
let (_meta, tool) = def();
let input = json!({ "name": "child" }).to_string();
let output: ToolOutput = tool.execute(&input).await.unwrap();
assert!(output.summary.contains("stopped"), "{}", output.summary);
// The child got a Shutdown.
let method = received.await.unwrap().expect("expected shutdown");
assert!(matches!(method, Method::Shutdown));
// Allocation for `child` is gone; `spawner` remains.
{
let g = LockFileGuard::open(&lock_path).unwrap();
assert!(g.data().find("child").is_none(), "child still allocated");
assert!(g.data().find("spawner").is_some(), "spawner missing");
}
// spawned_pods.json now lists zero children.
let spawned = rd.path().join("spawned_pods.json");
let contents = std::fs::read_to_string(&spawned).unwrap();
let records: Vec<SpawnedPodRecord> = serde_json::from_str(&contents).unwrap();
assert!(records.is_empty());
unsafe {
std::env::remove_var("INSOMNIA_SCOPE_LOCK");
}
}
#[tokio::test]
async fn stop_pod_succeeds_even_when_child_unreachable() {
let _env = EnvGuard::acquire();
let (tmp, registry, _rd) = setup_registry().await;
let lock_path = tmp.path().join("scope.lock");
unsafe {
std::env::set_var("INSOMNIA_SCOPE_LOCK", &lock_path);
}
// No live listener — socket never bound. Registered record points
// at a dead path. StopPod should still clean up local bookkeeping.
let dead_socket = tmp.path().join("dead.sock");
register_child(&registry, "child", &dead_socket, tmp.path()).await;
let def = stop_pod_tool(registry.clone());
let (_meta, tool) = def();
let input = json!({ "name": "child" }).to_string();
let output: ToolOutput = tool.execute(&input).await.unwrap();
assert!(output.summary.contains("stopped"), "{}", output.summary);
// Registry no longer knows about the child.
assert!(registry.get("child").await.is_none());
unsafe {
std::env::remove_var("INSOMNIA_SCOPE_LOCK");
}
}
// ---------------------------------------------------------------------------
// ListPods
// ---------------------------------------------------------------------------
#[tokio::test]
async fn list_pods_reports_alive_and_stopped() {
let (tmp, registry, _rd) = setup_registry().await;
// One child is reachable…
let (live_socket, listener) = bind_mock_socket(tmp.path(), "alive").await;
// Keep the listener alive by moving it into a task that never exits.
let _accept = tokio::spawn(async move {
loop {
let Ok((stream, _)) = listener.accept().await else {
return;
};
drop(stream);
}
});
register_child(&registry, "alive", &live_socket, tmp.path()).await;
// …the other is not.
let dead_socket = tmp.path().join("dead.sock");
register_child(&registry, "dead", &dead_socket, tmp.path()).await;
let def = list_pods_tool(registry);
let (_meta, tool) = def();
let output: ToolOutput = tool.execute("{}").await.unwrap();
assert!(output.summary.contains("2 pod"), "{}", output.summary);
let body = output.content.expect("list_pods should populate content");
assert!(body.contains("alive [alive]"), "body: {body}");
assert!(body.contains("dead [stopped]"), "body: {body}");
}
#[tokio::test]
async fn list_pods_empty_when_nothing_registered() {
let (_tmp, registry, _rd) = setup_registry().await;
let def = list_pods_tool(registry);
let (_meta, tool) = def();
let output: ToolOutput = tool.execute("{}").await.unwrap();
assert!(output.summary.contains("no spawned pods"), "{}", output.summary);
assert!(output.content.is_none());
}

View File

@ -15,6 +15,7 @@ use manifest::{Permission, ScopeRule};
use pod::runtime_dir::{RuntimeDir, SpawnedPodRecord};
use pod::scope_lock::{self, LockFileGuard};
use pod::spawn_pod::spawn_pod_tool;
use pod::spawned_pod_registry::SpawnedPodRegistry;
use protocol::Method;
use protocol::stream::JsonLineReader;
use serde_json::json;
@ -150,12 +151,13 @@ async fn spawn_pod_delegates_scope_and_sends_run() {
let (_predicted_socket, listener) = bind_mock_pod_socket(&runtime_base, "child").await;
let received = accept_one_method(listener);
let registry = SpawnedPodRegistry::new(spawner_rd.clone());
let def = spawn_pod_tool(
"root".into(),
spawner_socket.clone(),
runtime_base.clone(),
allow_root.path().to_path_buf(),
spawner_rd.clone(),
registry,
);
let (_meta, tool) = def();
@ -210,12 +212,13 @@ async fn spawn_pod_rejects_scope_outside_spawner() {
setup_spawner("root", allow_root.path()).await;
point_pod_command_at_true();
let registry = SpawnedPodRegistry::new(spawner_rd);
let def = spawn_pod_tool(
"root".into(),
spawner_socket,
runtime_base,
allow_root.path().to_path_buf(),
spawner_rd,
registry,
);
let (_meta, tool) = def();
@ -266,12 +269,13 @@ async fn spawn_pod_rolls_back_reservation_when_socket_never_appears() {
// marked with `// slow_test`. Keep the rest of the test suite fast
// by running this test alone when iterating.
let registry = SpawnedPodRegistry::new(spawner_rd);
let def = spawn_pod_tool(
"root".into(),
spawner_socket,
runtime_base,
allow_root.path().to_path_buf(),
spawner_rd,
registry,
);
let (_meta, tool) = def();

View File

@ -1,5 +1,7 @@
# Pod 間通信ツール: SendToPod / ReadPodOutput / StopPod / ListPods
レビュー中: [pod-comm-tools.review.md](pod-comm-tools.review.md)
## 背景
`SpawnPod` で Pod を生成した後、spawner の LLM が spawned Pod に指示を送り、結果を読み、完了したら停止させる手段が必要。

View File

@ -0,0 +1,85 @@
# Review: pod-comm-tools
実装コミット前レビュー。`cargo build -p pod` clean、`cargo test -p pod --test pod_comm_tools_test` 8/8 pass。
## 総評
チケット要件の中核4 ツールの wire 動作、`SpawnedPodRegistry` の共有、scope 回収、stale reclaim トリガー)は達成。テスト構成もモックソケットで実 `pod` バイナリ非依存で速い。ただし以下のうち修正必須項目を解消してから完了にしたい。
## 指摘と判断
### 修正必須
#### 1. `SendToPod` が RUNNING 状態を検知しない(設計合意違反)
**状況**: `connect_and_send``Method::Run` を書いて即切断し、応答 event を読まない。Controller は RUNNING 中の `Run` に対し `Event::Error { code: AlreadyRunning }` を返すが、tool 側はそれを見ないので LLM に「送信成功」と返る。
**設計合意**: pod-comm-tools 議論の結論は「A. SendToPod は `Method::Run` を使い、Pod が IDLE でなければエラーを返す」。
**判断**: 修正必要。書き込み後に short timeout で 1 event 読み、`Error { AlreadyRunning }` を検知したら `ToolError::ExecutionFailed` に変換する。正常時は `TurnStart` など先頭 event を見て「受け付けられた」ことを確認して戻る。
### チケット文言と実装の整合
#### 2. Pod status (`running / idle / stopped`) の取得手段がない
**状況**: チケットは `ReadPodOutput`/`ListPods` 出力に `running / idle / stopped` を含めることを求めているが、`Event::History` にはステータスが乗っておらず、実装は「接続可=alive / 接続不可=stopped」の 2 値のみ。
**判断**: 今回のスコープからは外す。`Greeting` または別 event に `PodStatus` を載せる改修は `Method::GetHistory` のレスポンス設計変更を伴うので、別チケット化する。本チケット文言を `alive / stopped` に揃えて実装と一致させる。
#### 3. `StopPod` は終了確認を待たない
**状況**: チケット本文は「`Method::Shutdown` 送信 → 終了確認受信 → 切断」だが、実装は fire-and-forget。scope の明示 release と `scope_lock` 上の stale reclaim で整合性は担保されているので実害なし。
**判断**: チケット文言を「`Method::Shutdown` 送信(応答は待たない)」に修正。
### 範囲外として明文化
#### 4. `spawned_pods.json` からの再起動復旧は未実装
**状況**: `spawned_pod_registry.rs` のモジュールコメントに「today only write-through is implemented」と明記されている。spawner プロセス再起動後、`ListPods` は空リストを返す。
**判断**: 今回のスコープ外。チケットの範囲外に「spawner 再起動時の `spawned_pods.json` からの復旧」を明記し、必要になったタイミングで別チケットを切る。
#### 5. `ReadPodOutput` cursor の非永続性
**状況**: `SpawnedPodRegistry::cursors``HashMap<String, usize>` でインメモリのみ。spawner 再起動後、カーソルは 0 にリセットされ、既読だった assistant text が再送される。モジュールコメントに「cursors intentionally do not persist」と明記されている。
**判断**: 現状のままで OK。設計判断としてレビューで合意し、チケットの「設計で決めること」の回答として本 review に残す。チケット側には残さないgitで追える
### 軽微
#### 6. `extract_assistant_text` のメッセージ境界
**状況**: 複数の assistant message を `\n` で連結するのみ。メッセージ境界が曖昧になる。
**判断**: `\n\n` に変更して境界を明確にする。修正コスト小。
#### 7. blocking `flock` を async から呼んでいる
**状況**: `release_scope` / stale reclaim 経路で `LockFileGuard::open`(内部で `flock(2)`)を async fn から直接呼ぶ。通常は瞬時だが厳密には tokio runtime を一瞬止める。
**判断**: 既存コード全体が同じ慣習なので今回は踏襲。気になったら別タイミングで `spawn_blocking` 化を検討。本チケットでは対応しない。
#### 8. `SpawnedPodRegistry::new``Arc<Self>` を返す
**状況**: コンストラクタが smart pointer を返す API は珍しい。
**判断**: consumer が全員 `Arc` で持つ設計なので許容。リファクタは不要。
## 完了条件との対応
| 完了条件 | 状態 |
|---|---|
| SendToPod で spawned Pod にメッセージ送信・Pod が処理 | ✅(指摘 #1 の修正後) |
| ReadPodOutput でカーソルベース差分取得 | ✅ |
| StopPod で graceful 停止 + scope 返却 | ✅ |
| ListPods で既知 Pod の状態一覧 + health check | ⚠️ `alive/stopped` のみ(指摘 #2 |
| 接続不可 Pod は `stopped` 扱い + stale 回収トリガー | ✅ |
| 各ツール正常系・異常系の単体テスト | ✅8/8 pass |
## 完了に向けた作業
1. 指摘 #1: `SendToPod` に応答確認を実装
2. 指摘 #2, #3: チケット本文の文言修正(`running/idle/stopped` → `alive/stopped`、Shutdown 応答待ちの記述削除)
3. 指摘 #4: チケットの「範囲外」に `spawned_pods.json` 復旧を追記
4. 指摘 #6: `extract_assistant_text` の区切りを `\n\n`