From 255e3708568dc1ea7fa5c01f73b8bb4ca2f483fb Mon Sep 17 00:00:00 2001 From: Hare Date: Sun, 19 Apr 2026 08:20:07 +0900 Subject: [PATCH] =?UTF-8?q?pod-upstream-event=E5=AE=9F=E8=A3=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Cargo.lock | 1 + TODO.md | 1 + crates/manifest/Cargo.toml | 1 + crates/manifest/src/lib.rs | 34 +-- crates/pod/src/controller.rs | 147 +++++++++-- crates/pod/src/lib.rs | 1 + crates/pod/src/notification_buffer.rs | 31 +-- crates/pod/src/pod.rs | 21 +- crates/pod/src/pod_comm_tools.rs | 2 +- crates/pod/src/pod_events.rs | 187 +++++++++++++ crates/pod/src/pod_interceptor.rs | 10 +- crates/pod/src/spawn_pod.rs | 29 +- crates/pod/tests/controller_test.rs | 4 +- crates/pod/tests/pod_events_test.rs | 366 ++++++++++++++++++++++++++ crates/pod/tests/spawn_pod_test.rs | 3 + crates/protocol/src/lib.rs | 182 ++++++++++++- tickets/tool-call-empty-args-null.md | 75 ++++++ 17 files changed, 1008 insertions(+), 87 deletions(-) create mode 100644 crates/pod/src/pod_events.rs create mode 100644 crates/pod/tests/pod_events_test.rs create mode 100644 tickets/tool-call-empty-args-null.md diff --git a/Cargo.lock b/Cargo.lock index 2169080c..3820c06e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1517,6 +1517,7 @@ dependencies = [ name = "manifest" version = "0.1.0" dependencies = [ + "protocol", "serde", "serde_ignored", "tempfile", diff --git a/TODO.md b/TODO.md index f5628d1a..155a9a8b 100644 --- a/TODO.md +++ b/TODO.md @@ -1,4 +1,5 @@ - [ ] テスト設計 → [tickets/test-design.md](tickets/test-design.md) +- [ ] 引数なし tool 呼び出しで `arguments = "null"` が記録される不具合 → [tickets/tool-call-empty-args-null.md](tickets/tool-call-empty-args-null.md) - [ ] ツール設計 - [ ] Bash ツール (Permission 層と統合) → [tickets/bash-tool.md](tickets/bash-tool.md) - [ ] Compact の改善(要約品質 + 挙動詳細) → [tickets/compact-improvements.md](tickets/compact-improvements.md) diff --git a/crates/manifest/Cargo.toml b/crates/manifest/Cargo.toml index 1887d236..fdabd5de 100644 --- a/crates/manifest/Cargo.toml +++ b/crates/manifest/Cargo.toml @@ -5,6 +5,7 @@ edition.workspace = true license.workspace = true [dependencies] +protocol = { version = "0.1.0", path = "../protocol" } serde = { version = "1.0.228", features = ["derive"] } serde_ignored = "0.1.14" thiserror = "2.0.18" diff --git a/crates/manifest/src/lib.rs b/crates/manifest/src/lib.rs index f839de24..d7aef1ed 100644 --- a/crates/manifest/src/lib.rs +++ b/crates/manifest/src/lib.rs @@ -6,6 +6,7 @@ pub use config::{ CompactionConfigPartial, PodManifestConfig, PodMetaConfig, ProviderConfigPartial, ResolveError, ToolOutputLimitsPartial, WorkerManifestConfig, }; +pub use protocol::{Permission, ScopeRule}; pub use scope::{Scope, ScopeError}; use std::collections::HashMap; @@ -159,39 +160,6 @@ pub struct ScopeConfig { pub deny: Vec, } -/// A single allow or deny rule inside [`ScopeConfig`]. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct ScopeRule { - /// Target path. Must be absolute by the time [`Scope::from_config`] - /// runs — relative paths are resolved per-layer against the manifest - /// file's directory (cwd for overlay layers) before cascade merge. - pub target: PathBuf, - /// Permission level this rule grants (allow) or caps strictly below - /// (deny). - pub permission: Permission, - /// When `false`, the rule only matches the target itself and its - /// direct children. Defaults to `true`. - #[serde(default = "default_recursive")] - pub recursive: bool, -} - -fn default_recursive() -> bool { - true -} - -/// Permission lattice used by [`ScopeRule`]. -/// -/// The derived `Ord` instance follows declaration order, so -/// `Read < Write`. Allow rules grant the stated level (and by extension -/// everything below); deny rules cap the effective level **strictly -/// below** the stated level. -#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] -#[serde(rename_all = "lowercase")] -pub enum Permission { - Read, - Write, -} - /// Context compaction configuration. /// /// Controls Prune (content removal from old tool results) and Compact diff --git a/crates/pod/src/controller.rs b/crates/pod/src/controller.rs index 799cfeb6..ce7cbe64 100644 --- a/crates/pod/src/controller.rs +++ b/crates/pod/src/controller.rs @@ -114,6 +114,17 @@ impl PodController { let pwd_for_tools = pod.pwd().to_path_buf(); let spawner_name = pod.manifest().pod.name.clone(); + // Parent callback socket (this Pod's own parent, used for + // `PodEvent` upward reports). `None` for top-level Pods. + let self_parent_socket = pod.callback_socket().cloned(); + + // `SpawnedPodRegistry` is shared between the Pod-orchestration + // tools (registered below) and the main loop's `PodEvent` + // handler (added later in this function), so hoist its creation + // above the worker-borrow block. + let spawner_socket = runtime_dir.socket_path(); + let spawned_registry = SpawnedPodRegistry::new(runtime_dir.clone()); + // Register event bridge callbacks on the worker { let worker = pod.worker_mut(); @@ -209,24 +220,20 @@ impl PodController { worker.register_tools(tools::builtin_tools(fs, tracker.clone())); // Pod-orchestration tools (SpawnPod + the four comm tools) - // share a single `SpawnedPodRegistry`: `SpawnPod` writes to - // it, the others read/mutate. Wired here rather than in - // `tools::builtin_tools` because these need Pod-scoped - // handles (this Pod's own socket path, runtime_dir, spawner - // name) that the generic tools crate has no access to. - let spawner_socket = runtime_dir.socket_path(); - let spawned_registry = SpawnedPodRegistry::new(runtime_dir.clone()); + // share the Pod-scoped `SpawnedPodRegistry` hoisted above + // (also consumed by the main loop's `PodEvent` handler). worker.register_tool(spawn_pod_tool( - spawner_name, - spawner_socket, + spawner_name.clone(), + spawner_socket.clone(), runtime_base.to_path_buf(), pwd_for_tools, spawned_registry.clone(), + self_parent_socket.clone(), )); worker.register_tool(send_to_pod_tool(spawned_registry.clone())); worker.register_tool(read_pod_output_tool(spawned_registry.clone())); worker.register_tool(stop_pod_tool(spawned_registry.clone())); - worker.register_tool(list_pods_tool(spawned_registry)); + worker.register_tool(list_pods_tool(spawned_registry.clone())); pod.attach_tracker(tracker); } @@ -266,6 +273,8 @@ impl PodController { &cancel_tx, &shared_state, ¬ification_buffer, + self_parent_socket.as_ref(), + &spawner_name, ) .await; @@ -292,8 +301,8 @@ impl PodController { } } - Method::Notify { source, message } => { - pod.push_notification(source, message); + Method::Notify { message } => { + pod.push_notification(message); if shared_state.get_status() != PodStatus::Idle { // RUNNING / Paused: the buffer push is the // entire operation; the in-flight turn (or @@ -313,6 +322,8 @@ impl PodController { &cancel_tx, &shared_state, ¬ification_buffer, + self_parent_socket.as_ref(), + &spawner_name, ) .await; @@ -357,6 +368,8 @@ impl PodController { &cancel_tx, &shared_state, ¬ification_buffer, + self_parent_socket.as_ref(), + &spawner_name, ) .await; @@ -398,9 +411,79 @@ impl PodController { // GetHistory is handled at the socket layer (direct response). // If it somehow reaches the controller, ignore it. Method::GetHistory => {} + + Method::PodEvent(event) => { + // (1) system side effects — idempotent and + // tolerant of out-of-order delivery (e.g. + // `TurnEnded` arriving after `ShutDown`). + crate::pod_events::apply_event_side_effects( + &event, + &spawned_registry, + &spawner_name, + &self_parent_socket, + ) + .await; + // (2) render a one-line summary and push it + // into the notification buffer; the next LLM + // request will inject it as a system message + // via `PodInterceptor::pre_llm_request`. + let text = crate::pod_events::render_event(&event); + pod.push_notification(text); + // Auto-kick a turn if the Pod is idle so the + // notification is not stranded. Matches the + // `Method::Notify` idle path. + if shared_state.get_status() == PodStatus::Idle { + shared_state.set_status(PodStatus::Running); + let _ = runtime_dir.write_status(&shared_state).await; + + let (new_status, shutdown) = run_with_cancel_support( + pod.run_for_notification(), + &mut method_rx, + &event_tx, + &cancel_tx, + &shared_state, + ¬ification_buffer, + self_parent_socket.as_ref(), + &spawner_name, + ) + .await; + + if new_status == PodStatus::Idle { + if let Err(e) = pod.try_post_run_compact().await { + tracing::warn!(error = %e, "Post-run compaction error"); + notifier.notify( + NotificationLevel::Warn, + NotificationSource::Compactor, + format!("post-run compaction error: {e}"), + ); + } + } + + let items = pod.worker().history().to_vec(); + shared_state.update_history(items); + shared_state.set_status(new_status); + let _ = runtime_dir.write_status(&shared_state).await; + let _ = runtime_dir.write_history(&shared_state).await; + + if shutdown { + let _ = event_tx.send(Event::Shutdown); + break; + } + } + } } } + // Report upward that this Pod is stopping before the + // controller task exits. Fire-and-forget; the parent may + // already be gone. + crate::pod_events::fire_and_forget( + self_parent_socket.clone(), + protocol::PodEvent::ShutDown { + pod_name: spawner_name.clone(), + }, + ); + let _ = shutdown_tx.send(()); }); @@ -411,6 +494,12 @@ impl PodController { /// Runs a Pod future while concurrently processing incoming methods. /// /// Returns `(final_status, shutdown_requested)`. +/// +/// `parent_socket` / `self_name` drive upward `PodEvent` reports +/// (`TurnEnded` on a clean Finished, `Errored` on a worker failure). +/// `None` parent skips the send (top-level Pod). Transient method +/// rejections such as `AlreadyRunning` are intentionally NOT reported +/// as `Errored` — only the worker-execution `Err` branch below fires. async fn run_with_cancel_support( pod_future: F, method_rx: &mut mpsc::Receiver, @@ -418,6 +507,8 @@ async fn run_with_cancel_support( cancel_tx: &mpsc::Sender<()>, shared_state: &Arc, notification_buffer: &NotificationBuffer, + parent_socket: Option<&std::path::PathBuf>, + self_name: &str, ) -> (PodStatus, bool) where F: std::future::Future>, @@ -436,14 +527,30 @@ where PodRunResult::LimitReached => (PodStatus::Idle, RunResult::LimitReached), }; let _ = event_tx.send(Event::RunEnd { result: run_result }); + if matches!(run_result, RunResult::Finished) { + crate::pod_events::fire_and_forget( + parent_socket.cloned(), + protocol::PodEvent::TurnEnded { + pod_name: self_name.to_string(), + }, + ); + } (status, shutdown_requested) } Err(e) => { let code = worker_error_code(&e); + let message = e.to_string(); let _ = event_tx.send(Event::Error { code, - message: e.to_string(), + message: message.clone(), }); + crate::pod_events::fire_and_forget( + parent_socket.cloned(), + protocol::PodEvent::Errored { + pod_name: self_name.to_string(), + message, + }, + ); (PodStatus::Idle, shutdown_requested) } }; @@ -463,12 +570,22 @@ where message: "Pod is already executing a turn".into(), }); } - Some(Method::Notify { source, message }) => { + Some(Method::Notify { message }) => { // Route into the buffer; the in-flight turn will // drain it at its next pre_llm_request. - notification_buffer.push(source, message); + notification_buffer.push(message); } Some(Method::GetHistory) => {} + Some(Method::PodEvent(_)) => { + // PodEvent is handled in the main loop (next + // iteration). Dropping it here is fine because + // the sender is external and we will see it + // again via `method_rx` after the current turn + // ends — this arm only fires for concurrent + // arrivals during an in-flight turn, where the + // strict ordering guarantees that matter for + // PodEvent don't exist anyway (fire-and-forget). + } None => { let _ = cancel_tx.try_send(()); shared_state.set_status(PodStatus::Idle); diff --git a/crates/pod/src/lib.rs b/crates/pod/src/lib.rs index 5961710b..6897567a 100644 --- a/crates/pod/src/lib.rs +++ b/crates/pod/src/lib.rs @@ -5,6 +5,7 @@ pub mod runtime_dir; pub mod scope_lock; pub mod shared_state; pub mod pod_comm_tools; +pub mod pod_events; pub mod socket_server; pub mod spawn_pod; pub mod spawned_pod_registry; diff --git a/crates/pod/src/notification_buffer.rs b/crates/pod/src/notification_buffer.rs index 4f056107..33562096 100644 --- a/crates/pod/src/notification_buffer.rs +++ b/crates/pod/src/notification_buffer.rs @@ -17,7 +17,6 @@ const CAPACITY: usize = 128; /// One pending notification awaiting injection into the next LLM request. #[derive(Debug, Clone)] pub struct PendingNotification { - pub source: String, pub message: String, } @@ -37,17 +36,17 @@ impl NotificationBuffer { /// Push a notification onto the queue. If the queue is full, the /// oldest entry is dropped and a `tracing::warn` is emitted — the /// caller should never hit this in normal operation. - pub fn push(&self, source: String, message: String) { + pub fn push(&self, message: String) { let mut q = self.inner.lock().expect("notification buffer poisoned"); if q.len() >= CAPACITY { let dropped = q.pop_front(); warn!( capacity = CAPACITY, - dropped_source = dropped.as_ref().map(|n| n.source.as_str()), + dropped_message = dropped.as_ref().map(|n| n.message.as_str()), "notification buffer overflow; dropped oldest" ); } - q.push_back(PendingNotification { source, message }); + q.push_back(PendingNotification { message }); } /// Remove and return all pending notifications in FIFO order. @@ -73,11 +72,10 @@ impl NotificationBuffer { /// that gets injected into the per-request context. pub(crate) fn format_notification(n: &PendingNotification) -> Item { let text = format!( - "[Notification from {source}]\n{message}\n\n\ + "[Notification]\n{message}\n\n\ This is a notification, not a blocking request. \ If you are in the middle of a task, continue your current work \ and address this at a natural stopping point.", - source = n.source, message = n.message, ); Item::system_message(text) @@ -90,12 +88,12 @@ mod tests { #[test] fn push_then_drain_preserves_order() { let buf = NotificationBuffer::new(); - buf.push("a".into(), "one".into()); - buf.push("b".into(), "two".into()); + buf.push("one".into()); + buf.push("two".into()); let drained = buf.drain(); assert_eq!(drained.len(), 2); - assert_eq!(drained[0].source, "a"); - assert_eq!(drained[1].source, "b"); + assert_eq!(drained[0].message, "one"); + assert_eq!(drained[1].message, "two"); assert!(buf.is_empty()); } @@ -103,24 +101,23 @@ mod tests { fn capacity_drops_oldest() { let buf = NotificationBuffer::new(); for i in 0..(CAPACITY + 5) { - buf.push(format!("src{i}"), format!("msg{i}")); + buf.push(format!("msg{i}")); } let drained = buf.drain(); assert_eq!(drained.len(), CAPACITY); - // Oldest 5 were dropped; first retained is src5. - assert_eq!(drained[0].source, "src5"); - assert_eq!(drained[CAPACITY - 1].source, format!("src{}", CAPACITY + 4)); + // Oldest 5 were dropped; first retained is msg5. + assert_eq!(drained[0].message, "msg5"); + assert_eq!(drained[CAPACITY - 1].message, format!("msg{}", CAPACITY + 4)); } #[test] - fn format_notification_includes_source_message_and_nonblocking_hint() { + fn format_notification_includes_message_and_nonblocking_hint() { let n = PendingNotification { - source: "child".into(), message: "hello".into(), }; let item = format_notification(&n); let text = item.as_text().unwrap_or_default().to_string(); - assert!(text.contains("[Notification from child]")); + assert!(text.contains("[Notification]")); assert!(text.contains("hello")); assert!(text.contains("not a blocking request")); } diff --git a/crates/pod/src/pod.rs b/crates/pod/src/pod.rs index f8eb284e..5967ff22 100644 --- a/crates/pod/src/pod.rs +++ b/crates/pod/src/pod.rs @@ -115,11 +115,9 @@ pub struct Pod { #[allow(dead_code)] scope_allocation: Option, /// Socket path of the spawning Pod. `Some` only for Pods built via - /// `from_manifest_spawned`. The callback is consumed by the - /// `pod-callback` layer (separate ticket) to deliver - /// `Method::Notify` back to the spawner; stored here so the Pod - /// carries the reference for the duration of its life. - #[allow(dead_code)] + /// `from_manifest_spawned`. Consumed by the controller to fire + /// `Method::PodEvent` reports upward (turn end, error, shutdown, + /// scope sub-delegation). callback_socket: Option, } @@ -325,8 +323,8 @@ impl Pod { /// The notification will be injected as an `Item::system_message` /// into the next outgoing LLM request context (not into history). /// See [`NotificationBuffer`] for overflow behaviour. - pub fn push_notification(&self, source: String, message: String) { - self.pending_notifications.push(source, message); + pub fn push_notification(&self, message: String) { + self.pending_notifications.push(message); } /// Shared handle to the pending notification buffer. @@ -337,6 +335,15 @@ impl Pod { self.pending_notifications.clone() } + /// Parent callback socket set by `from_manifest_spawned`. + /// + /// Consumed by the Controller to fire `Method::PodEvent` upward on + /// lifecycle transitions. `None` for top-level Pods, in which case + /// the Controller silently skips the send. + pub fn callback_socket(&self) -> Option<&PathBuf> { + self.callback_socket.as_ref() + } + // --- Hook registration --- fn assert_hooks_open(&self) { diff --git a/crates/pod/src/pod_comm_tools.rs b/crates/pod/src/pod_comm_tools.rs index 1400afa8..f3e29e04 100644 --- a/crates/pod/src/pod_comm_tools.rs +++ b/crates/pod/src/pod_comm_tools.rs @@ -325,7 +325,7 @@ fn unknown_pod_err(name: &str) -> ToolError { /// Connect with a timeout, write one `Method` line, flush, and close. /// Any socket error maps to an `io::Error`; the caller decides whether /// to surface it to the LLM or treat it as "pod stopped". -async fn connect_and_send(socket: &Path, method: &Method) -> std::io::Result<()> { +pub(crate) async fn connect_and_send(socket: &Path, method: &Method) -> std::io::Result<()> { let stream = tokio::time::timeout(SOCKET_OP_TIMEOUT, UnixStream::connect(socket)) .await .map_err(|_| std::io::Error::new(std::io::ErrorKind::TimedOut, "connect timed out"))??; diff --git a/crates/pod/src/pod_events.rs b/crates/pod/src/pod_events.rs new file mode 100644 index 00000000..1c26bfd5 --- /dev/null +++ b/crates/pod/src/pod_events.rs @@ -0,0 +1,187 @@ +//! `PodEvent` send / receive helpers. +//! +//! This module owns the parent-facing lifecycle-event primitive +//! (`PodEvent`) that children fire upward on turn-end / error / +//! shutdown / scope-sub-delegation. Three responsibilities live here: +//! +//! - **Send** a `Method::PodEvent` to the parent socket, fire-and-forget, +//! logging failures without blocking the child. +//! - **Render** a variant into a human-readable string that the parent's +//! LLM sees via the notification buffer. +//! - **Apply side effects** on the parent (registry / scope-lock +//! updates) so that the receive path is idempotent and tolerant of +//! out-of-order delivery. +//! +//! Transport is fire-and-forget — the ticket's decision is that +//! callbacks are an optimisation and `ListPods` + `reclaim_stale` are +//! the real fallback. This module is allowed to drop events on the +//! floor (with a warn log) rather than retry. +//! +//! `apply_event_side_effects` takes its dependencies (registry, scope +//! lock path, self identity) by reference so the caller owns lifetime +//! and locking concerns. + +use std::path::{Path, PathBuf}; +use std::sync::Arc; + +use protocol::{Method, PodEvent, ScopeRule}; + +use crate::pod_comm_tools::connect_and_send; +use crate::runtime_dir::SpawnedPodRecord; +use crate::scope_lock::{self, ScopeLockError}; +use crate::spawned_pod_registry::SpawnedPodRegistry; + +/// Connect to `socket`, send a single `Method::PodEvent(event)`, and +/// return. Used by children to report up to their parent. +/// +/// This is a synchronous helper — callers that want fire-and-forget +/// semantics should wrap the call in `tokio::spawn` themselves. +pub async fn send_pod_event(socket: &Path, event: PodEvent) -> std::io::Result<()> { + connect_and_send(socket, &Method::PodEvent(event)).await +} + +/// Spawn a fire-and-forget task that sends `event` to `socket`. If +/// `socket` is `None`, no send happens (top-level Pods have no parent). +/// Any send failure is logged at warn level but otherwise ignored — +/// the parent is treated as best-effort. +pub fn fire_and_forget(socket: Option, event: PodEvent) { + let Some(socket) = socket else { return }; + tokio::spawn(async move { + if let Err(e) = send_pod_event(&socket, event).await { + tracing::warn!(error = %e, socket = %socket.display(), "PodEvent send failed"); + } + }); +} + +/// Render a variant into the one-line human-readable string that will +/// be injected into the parent's LLM context as a system message. +/// +/// Kept deliberately short — the LLM can always call `ReadPodOutput` +/// to fetch more detail if the event summary is not enough. +pub fn render_event(event: &PodEvent) -> String { + match event { + PodEvent::TurnEnded { pod_name } => { + format!("Pod `{pod_name}` finished a turn.") + } + PodEvent::Errored { pod_name, message } => { + format!("Pod `{pod_name}` reported an error: {message}") + } + PodEvent::ShutDown { pod_name } => { + format!("Pod `{pod_name}` has stopped.") + } + PodEvent::ScopeSubDelegated { + parent_pod, + sub_pod, + .. + } => { + format!("Pod `{parent_pod}` spawned `{sub_pod}` and delegated scope to it.") + } + } +} + +/// Apply the variant-specific side effect on the parent side. +/// +/// All operations are idempotent so that out-of-order delivery (e.g. +/// `TurnEnded` arriving after `ShutDown`) does not produce errors: +/// +/// - `TurnEnded` / `Errored`: no system work; the LLM handles the +/// semantic response. +/// - `ShutDown`: remove the child from `spawned_pods.json` and release +/// its scope allocation. Missing entries are swallowed. +/// - `ScopeSubDelegated`: register the grandchild locally and re-emit +/// upward to our own parent if we have one. Duplicate grandchild +/// entries (re-delivery) are swallowed. +pub async fn apply_event_side_effects( + event: &PodEvent, + registry: &Arc, + self_name: &str, + self_parent_socket: &Option, +) { + match event { + PodEvent::TurnEnded { .. } | PodEvent::Errored { .. } => {} + + PodEvent::ShutDown { pod_name } => { + if let Err(e) = registry.remove(pod_name).await { + tracing::warn!(error = %e, pod = %pod_name, "registry remove on ShutDown failed"); + } + release_scope_silently(pod_name); + } + + PodEvent::ScopeSubDelegated { + parent_pod, + sub_pod, + sub_socket, + scope, + } => { + if registry.get(sub_pod).await.is_some() { + return; + } + let callback_address = registry + .get(parent_pod) + .await + .map(|r| r.socket_path) + .unwrap_or_else(PathBuf::new); + let record = SpawnedPodRecord { + pod_name: sub_pod.clone(), + socket_path: sub_socket.clone(), + scope_delegated: scope.clone(), + callback_address, + }; + if let Err(e) = registry.add(record).await { + tracing::warn!( + error = %e, + sub_pod = %sub_pod, + "registry add on ScopeSubDelegated failed" + ); + } + reemit_scope_sub_delegated( + self_parent_socket, + self_name, + sub_pod.clone(), + sub_socket.clone(), + scope.clone(), + ); + } + } +} + +fn release_scope_silently(pod_name: &str) { + let lock_path = match scope_lock::default_lock_path() { + Ok(p) => p, + Err(e) => { + tracing::warn!(error = %e, "default_lock_path failed"); + return; + } + }; + let mut guard = match scope_lock::LockFileGuard::open(&lock_path) { + Ok(g) => g, + Err(e) => { + tracing::warn!(error = %e, "LockFileGuard open failed"); + return; + } + }; + match scope_lock::release_pod(&mut guard, pod_name) { + Ok(()) => {} + Err(ScopeLockError::UnknownPod(_)) => {} + Err(e) => tracing::warn!(error = ?e, pod = %pod_name, "release_pod failed"), + } +} + +fn reemit_scope_sub_delegated( + self_parent_socket: &Option, + self_name: &str, + sub_pod: String, + sub_socket: PathBuf, + scope: Vec, +) { + let Some(parent_socket) = self_parent_socket.clone() else { + return; + }; + let event = PodEvent::ScopeSubDelegated { + parent_pod: self_name.to_string(), + sub_pod, + sub_socket, + scope, + }; + fire_and_forget(Some(parent_socket), event); +} diff --git a/crates/pod/src/pod_interceptor.rs b/crates/pod/src/pod_interceptor.rs index 5f43b7d4..b00b85a9 100644 --- a/crates/pod/src/pod_interceptor.rs +++ b/crates/pod/src/pod_interceptor.rs @@ -292,8 +292,8 @@ mod tests { async fn pre_llm_request_drains_pending_notifications_into_context() { let registry = Arc::new(HookRegistryBuilder::new().build()); let buffer = NotificationBuffer::new(); - buffer.push("child-a".into(), "first".into()); - buffer.push("child-b".into(), "second".into()); + buffer.push("first".into()); + buffer.push("second".into()); let interceptor = PodInterceptor::new(registry, None, buffer.clone()); let mut ctx: Vec = vec![Item::user_message("hi")]; @@ -304,9 +304,9 @@ mod tests { assert_eq!(ctx.len(), 3); let second = ctx[1].as_text().unwrap_or_default(); let third = ctx[2].as_text().unwrap_or_default(); - assert!(second.contains("[Notification from child-a]")); + assert!(second.contains("[Notification]")); assert!(second.contains("first")); - assert!(third.contains("[Notification from child-b]")); + assert!(third.contains("[Notification]")); assert!(third.contains("second")); // Buffer is drained after a single pre_llm_request call. assert!(buffer.is_empty()); @@ -318,7 +318,7 @@ mod tests { // the next pre_llm_request (after compaction + resume). let registry = Arc::new(HookRegistryBuilder::new().build()); let buffer = NotificationBuffer::new(); - buffer.push("src".into(), "msg".into()); + buffer.push("msg".into()); let state = Arc::new(CompactState::new(100, 2)); state.update_input_tokens(200); diff --git a/crates/pod/src/spawn_pod.rs b/crates/pod/src/spawn_pod.rs index 819e01da..ef0e9f48 100644 --- a/crates/pod/src/spawn_pod.rs +++ b/crates/pod/src/spawn_pod.rs @@ -23,9 +23,11 @@ use tokio::net::UnixStream; use tokio::process::Command; use tokio::time::sleep; +use crate::pod_events; use crate::runtime_dir::SpawnedPodRecord; use crate::scope_lock::{self, LockFileGuard, ScopeLockError}; use crate::spawned_pod_registry::SpawnedPodRegistry; +use protocol::PodEvent; const DESCRIPTION: &str = "Spawn a new Pod process to work on a delegated task. \ The spawner's write scope is reduced by the scope passed here; the spawned \ @@ -93,7 +95,7 @@ pub struct SpawnPodTool { /// `delegated_from` in the scope-lock registry. spawner_name: String, /// Path to the spawner's Unix socket. Handed to the child via - /// `--callback` so `Method::Notify` has somewhere to land. + /// `--callback` so its `PodEvent` callbacks have somewhere to land. callback_socket: PathBuf, /// Root of the `$XDG_RUNTIME_DIR/insomnia/` tree, used to predict /// the spawned Pod's socket path before the child has bound it. @@ -105,6 +107,12 @@ pub struct SpawnPodTool { /// pod-comm tools (`SendToPod` / `ReadPodOutput` / `StopPod` / /// `ListPods`). Writes the list to `spawned_pods.json` on each add. registry: Arc, + /// THIS Pod's own parent-callback socket, if any. After a + /// successful spawn we fire `PodEvent::ScopeSubDelegated` upward + /// so the grandparent can register the grandchild directly. + /// `None` for top-level Pods — in that case the re-emission is a + /// no-op. + parent_socket: Option, } impl SpawnPodTool { @@ -114,6 +122,7 @@ impl SpawnPodTool { runtime_base: PathBuf, spawner_pwd: PathBuf, registry: Arc, + parent_socket: Option, ) -> Self { Self { spawner_name, @@ -121,6 +130,7 @@ impl SpawnPodTool { runtime_base, spawner_pwd, registry, + parent_socket, } } } @@ -197,7 +207,7 @@ impl Tool for SpawnPodTool { let record = SpawnedPodRecord { pod_name: input.name.clone(), socket_path: predicted_socket.clone(), - scope_delegated: scope_allow, + scope_delegated: scope_allow.clone(), callback_address: self.callback_socket.clone(), }; self.registry @@ -205,6 +215,19 @@ impl Tool for SpawnPodTool { .await .map_err(|e| ToolError::ExecutionFailed(format!("write spawned_pods.json: {e}")))?; + // Notify this Pod's own parent so the grandparent can register + // the new grandchild directly. Fire-and-forget; top-level Pods + // (with no parent) skip the send inside `fire_and_forget`. + pod_events::fire_and_forget( + self.parent_socket.clone(), + PodEvent::ScopeSubDelegated { + parent_pod: self.spawner_name.clone(), + sub_pod: input.name.clone(), + sub_socket: predicted_socket.clone(), + scope: scope_allow, + }, + ); + Ok(ToolOutput { summary: format!( "spawned pod `{}` listening on {}", @@ -362,6 +385,7 @@ pub fn spawn_pod_tool( runtime_base: PathBuf, spawner_pwd: PathBuf, registry: Arc, + parent_socket: Option, ) -> ToolDefinition { Arc::new(move || { let schema = schemars::schema_for!(SpawnPodInput); @@ -375,6 +399,7 @@ pub fn spawn_pod_tool( runtime_base.clone(), spawner_pwd.clone(), registry.clone(), + parent_socket.clone(), )); (meta, tool) }) diff --git a/crates/pod/tests/controller_test.rs b/crates/pod/tests/controller_test.rs index 4ccfbf4b..e337be80 100644 --- a/crates/pod/tests/controller_test.rs +++ b/crates/pod/tests/controller_test.rs @@ -346,7 +346,6 @@ async fn notify_while_idle_auto_starts_turn_and_injects_system_message() { handle .send(Method::Notify { - source: "child-a".into(), message: "turn finished".into(), }) .await @@ -384,7 +383,7 @@ async fn notify_while_idle_auto_starts_turn_and_injects_system_message() { .unwrap_or_default() .to_string(); assert!( - last_item_text.contains("[Notification from child-a]"), + last_item_text.contains("[Notification]"), "injected system message missing, got: {last_item_text:?}" ); assert!(last_item_text.contains("turn finished")); @@ -406,7 +405,6 @@ async fn notify_while_running_does_not_emit_already_running_error() { .unwrap(); handle .send(Method::Notify { - source: "child".into(), message: "ping".into(), }) .await diff --git a/crates/pod/tests/pod_events_test.rs b/crates/pod/tests/pod_events_test.rs new file mode 100644 index 00000000..b0550ac5 --- /dev/null +++ b/crates/pod/tests/pod_events_test.rs @@ -0,0 +1,366 @@ +//! Integration tests for the `PodEvent` send / receive primitive. +//! +//! These tests drive `pod_events::fire_and_forget` and +//! `pod_events::apply_event_side_effects` directly — the full +//! Controller wiring is exercised by the existing controller / +//! spawn-pod tests, which rely on the same primitives. + +use std::path::PathBuf; +use std::sync::{Arc, LazyLock, Mutex}; +use std::time::Duration; + +use pod::pod_events::{apply_event_side_effects, fire_and_forget, render_event}; +use pod::runtime_dir::{RuntimeDir, SpawnedPodRecord}; +use pod::scope_lock::{self, LockFileGuard}; +use pod::spawned_pod_registry::SpawnedPodRegistry; +use protocol::stream::JsonLineReader; +use protocol::{Method, Permission, PodEvent, ScopeRule}; +use tempfile::TempDir; +use tokio::net::UnixListener; + +/// Serialises tests that mutate `INSOMNIA_SCOPE_LOCK`. +static ENV_LOCK: LazyLock> = LazyLock::new(|| Mutex::new(())); + +struct EnvGuard { + _lock: std::sync::MutexGuard<'static, ()>, +} + +impl EnvGuard { + fn acquire() -> Self { + Self { + _lock: ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner()), + } + } +} + +fn set_scope_lock_path(path: &std::path::Path) { + unsafe { + std::env::set_var("INSOMNIA_SCOPE_LOCK", path); + } +} + +fn clear_scope_lock_path() { + unsafe { + std::env::remove_var("INSOMNIA_SCOPE_LOCK"); + } +} + +/// Accept a single connection, read one `Method`, and return it. +fn accept_one_method( + listener: UnixListener, +) -> tokio::task::JoinHandle> { + tokio::spawn(async move { + let (stream, _) = listener.accept().await.ok()?; + let (reader, _writer) = stream.into_split(); + let mut r = JsonLineReader::new(reader); + r.next::().await.ok().flatten() + }) +} + +#[test] +fn render_event_all_variants_mention_pod_name() { + let t1 = render_event(&PodEvent::TurnEnded { + pod_name: "alpha".into(), + }); + assert!(t1.contains("alpha"), "{t1}"); + + let t2 = render_event(&PodEvent::Errored { + pod_name: "bravo".into(), + message: "boom".into(), + }); + assert!(t2.contains("bravo") && t2.contains("boom"), "{t2}"); + + let t3 = render_event(&PodEvent::ShutDown { + pod_name: "charlie".into(), + }); + assert!(t3.contains("charlie"), "{t3}"); + + let t4 = render_event(&PodEvent::ScopeSubDelegated { + parent_pod: "delta".into(), + sub_pod: "echo".into(), + sub_socket: "/tmp/sock".into(), + scope: vec![], + }); + assert!(t4.contains("delta") && t4.contains("echo"), "{t4}"); +} + +#[tokio::test] +async fn fire_and_forget_delivers_pod_event_to_listener() { + let dir = TempDir::new().unwrap(); + let socket_path = dir.path().join("parent.sock"); + let listener = UnixListener::bind(&socket_path).unwrap(); + let received = accept_one_method(listener); + + fire_and_forget( + Some(socket_path.clone()), + PodEvent::TurnEnded { + pod_name: "child".into(), + }, + ); + + let method = tokio::time::timeout(Duration::from_secs(2), received) + .await + .expect("send timed out") + .unwrap() + .expect("no method received"); + match method { + Method::PodEvent(PodEvent::TurnEnded { pod_name }) => assert_eq!(pod_name, "child"), + other => panic!("expected TurnEnded, got {other:?}"), + } +} + +#[tokio::test] +async fn fire_and_forget_with_none_socket_is_noop() { + // Nothing binds and nothing listens; the call must not panic and + // must not leak a task that never completes. + fire_and_forget( + None, + PodEvent::ShutDown { + pod_name: "x".into(), + }, + ); + // Yield once so any accidentally-spawned task would surface. + tokio::time::sleep(Duration::from_millis(50)).await; +} + +/// Build a registry backed by a fresh runtime dir. +async fn fresh_registry(runtime_base: &std::path::Path, pod_name: &str) -> Arc { + let rd = RuntimeDir::create(runtime_base, pod_name).await.unwrap(); + SpawnedPodRegistry::new(Arc::new(rd)) +} + +#[tokio::test] +async fn apply_shutdown_removes_from_registry_and_tolerates_missing() { + let _env = EnvGuard::acquire(); + let scope_dir = TempDir::new().unwrap(); + set_scope_lock_path(&scope_dir.path().join("scope.lock")); + + let runtime_base = TempDir::new().unwrap(); + let registry = fresh_registry(runtime_base.path(), "parent").await; + + // Seed a child record; then ShutDown for it should remove it. + registry + .add(SpawnedPodRecord { + pod_name: "child".into(), + socket_path: "/tmp/child.sock".into(), + scope_delegated: vec![], + callback_address: "/tmp/parent.sock".into(), + }) + .await + .unwrap(); + + let event = PodEvent::ShutDown { + pod_name: "child".into(), + }; + apply_event_side_effects(&event, ®istry, "parent", &None).await; + assert!(registry.get("child").await.is_none()); + + // Second ShutDown for the same (now-missing) child must be a no-op, + // not an error — this is the idempotency guarantee for out-of-order + // delivery. + apply_event_side_effects(&event, ®istry, "parent", &None).await; + assert!(registry.get("child").await.is_none()); + + clear_scope_lock_path(); +} + +#[tokio::test] +async fn apply_scope_sub_delegated_adds_grandchild_then_duplicate_is_noop() { + let _env = EnvGuard::acquire(); + let scope_dir = TempDir::new().unwrap(); + set_scope_lock_path(&scope_dir.path().join("scope.lock")); + + let runtime_base = TempDir::new().unwrap(); + let registry = fresh_registry(runtime_base.path(), "grandparent").await; + + // Seed the intermediate child so callback_address lookup succeeds. + registry + .add(SpawnedPodRecord { + pod_name: "child".into(), + socket_path: "/tmp/child.sock".into(), + scope_delegated: vec![], + callback_address: "/tmp/grandparent.sock".into(), + }) + .await + .unwrap(); + + let event = PodEvent::ScopeSubDelegated { + parent_pod: "child".into(), + sub_pod: "grandchild".into(), + sub_socket: "/tmp/grandchild.sock".into(), + scope: vec![ScopeRule { + target: scope_dir.path().to_path_buf(), + permission: Permission::Write, + recursive: true, + }], + }; + + apply_event_side_effects(&event, ®istry, "grandparent", &None).await; + let gc = registry + .get("grandchild") + .await + .expect("grandchild missing after ScopeSubDelegated"); + assert_eq!(gc.socket_path, PathBuf::from("/tmp/grandchild.sock")); + assert_eq!(gc.callback_address, PathBuf::from("/tmp/child.sock")); + + // Duplicate delivery must not error and must not overwrite. + apply_event_side_effects(&event, ®istry, "grandparent", &None).await; + let gc2 = registry.get("grandchild").await.unwrap(); + assert_eq!(gc2.socket_path, PathBuf::from("/tmp/grandchild.sock")); + + clear_scope_lock_path(); +} + +#[tokio::test] +async fn apply_scope_sub_delegated_reemits_to_own_parent() { + let _env = EnvGuard::acquire(); + let scope_dir = TempDir::new().unwrap(); + set_scope_lock_path(&scope_dir.path().join("scope.lock")); + + let runtime_base = TempDir::new().unwrap(); + let registry = fresh_registry(runtime_base.path(), "B").await; + + // Bind a listener at "A's" socket so we can watch the re-emission + // climb one level up the tree. + let sock_dir = TempDir::new().unwrap(); + let a_socket = sock_dir.path().join("A.sock"); + let listener = UnixListener::bind(&a_socket).unwrap(); + let received = accept_one_method(listener); + + // Seed the child record that the event claims spawned the grandchild. + registry + .add(SpawnedPodRecord { + pod_name: "C".into(), + socket_path: "/tmp/C.sock".into(), + scope_delegated: vec![], + callback_address: "/tmp/B.sock".into(), + }) + .await + .unwrap(); + + let event = PodEvent::ScopeSubDelegated { + parent_pod: "C".into(), + sub_pod: "D".into(), + sub_socket: "/tmp/D.sock".into(), + scope: vec![], + }; + + // Self is B, and B's parent socket is A's listener. + apply_event_side_effects(&event, ®istry, "B", &Some(a_socket.clone())).await; + + // A must see the re-emission with parent_pod set to "B" (the + // sender from A's perspective), not "C" (the original sender's + // local view). + let method = tokio::time::timeout(Duration::from_secs(2), received) + .await + .expect("re-emission timed out") + .unwrap() + .expect("no method received on A's socket"); + match method { + Method::PodEvent(PodEvent::ScopeSubDelegated { + parent_pod, + sub_pod, + .. + }) => { + assert_eq!(parent_pod, "B"); + assert_eq!(sub_pod, "D"); + } + other => panic!("expected re-emitted ScopeSubDelegated, got {other:?}"), + } + + clear_scope_lock_path(); +} + +#[tokio::test] +async fn apply_turn_ended_and_errored_are_system_noops() { + let _env = EnvGuard::acquire(); + let scope_dir = TempDir::new().unwrap(); + set_scope_lock_path(&scope_dir.path().join("scope.lock")); + + let runtime_base = TempDir::new().unwrap(); + let registry = fresh_registry(runtime_base.path(), "parent").await; + + // Seed a child to verify it survives the no-op path. + registry + .add(SpawnedPodRecord { + pod_name: "child".into(), + socket_path: "/tmp/child.sock".into(), + scope_delegated: vec![], + callback_address: "/tmp/parent.sock".into(), + }) + .await + .unwrap(); + + apply_event_side_effects( + &PodEvent::TurnEnded { + pod_name: "child".into(), + }, + ®istry, + "parent", + &None, + ) + .await; + apply_event_side_effects( + &PodEvent::Errored { + pod_name: "child".into(), + message: "x".into(), + }, + ®istry, + "parent", + &None, + ) + .await; + + assert!(registry.get("child").await.is_some()); + clear_scope_lock_path(); +} + +#[tokio::test] +async fn shutdown_releases_scope_allocation_when_present() { + let _env = EnvGuard::acquire(); + let scope_dir = TempDir::new().unwrap(); + let lock_path = scope_dir.path().join("scope.lock"); + set_scope_lock_path(&lock_path); + + // Install a top-level allocation for "kid" so ShutDown has + // something to release. + let guard = scope_lock::install_top_level( + "kid".into(), + std::process::id(), + "/tmp/kid.sock".into(), + vec![], + ) + .unwrap(); + std::mem::forget(guard); + + let runtime_base = TempDir::new().unwrap(); + let registry = fresh_registry(runtime_base.path(), "parent").await; + registry + .add(SpawnedPodRecord { + pod_name: "kid".into(), + socket_path: "/tmp/kid.sock".into(), + scope_delegated: vec![], + callback_address: "/tmp/parent.sock".into(), + }) + .await + .unwrap(); + + apply_event_side_effects( + &PodEvent::ShutDown { + pod_name: "kid".into(), + }, + ®istry, + "parent", + &None, + ) + .await; + + // Allocation is gone from the scope lock. + let g = LockFileGuard::open(&lock_path).unwrap(); + assert!( + g.data().find("kid").is_none(), + "ShutDown should have released the scope allocation" + ); + + clear_scope_lock_path(); +} diff --git a/crates/pod/tests/spawn_pod_test.rs b/crates/pod/tests/spawn_pod_test.rs index 716538d6..a8509556 100644 --- a/crates/pod/tests/spawn_pod_test.rs +++ b/crates/pod/tests/spawn_pod_test.rs @@ -158,6 +158,7 @@ async fn spawn_pod_delegates_scope_and_sends_run() { runtime_base.clone(), allow_root.path().to_path_buf(), registry, + None, ); let (_meta, tool) = def(); @@ -219,6 +220,7 @@ async fn spawn_pod_rejects_scope_outside_spawner() { runtime_base, allow_root.path().to_path_buf(), registry, + None, ); let (_meta, tool) = def(); @@ -276,6 +278,7 @@ async fn spawn_pod_rolls_back_reservation_when_socket_never_appears() { runtime_base, allow_root.path().to_path_buf(), registry, + None, ); let (_meta, tool) = def(); diff --git a/crates/protocol/src/lib.rs b/crates/protocol/src/lib.rs index faa4a92a..84a15e08 100644 --- a/crates/protocol/src/lib.rs +++ b/crates/protocol/src/lib.rs @@ -1,5 +1,7 @@ pub mod stream; +use std::path::PathBuf; + use serde::{Deserialize, Serialize}; // --------------------------------------------------------------------------- @@ -10,13 +12,63 @@ use serde::{Deserialize, Serialize}; #[serde(tag = "method", content = "params", rename_all = "snake_case")] pub enum Method { Run { input: String }, - Notify { source: String, message: String }, + /// Human-readable text injected into the target Pod's LLM context + /// as a non-blocking system message. No side effects beyond LLM + /// context; use `PodEvent` for typed lifecycle reports. + Notify { message: String }, + /// Typed lifecycle report from a child Pod to its direct parent. + PodEvent(PodEvent), Resume, Cancel, Shutdown, GetHistory, } +/// Typed lifecycle events sent from a child Pod to its parent. +/// +/// Delivered as `Method::PodEvent` over the parent's Unix socket. The +/// parent Controller applies variant-specific side effects (registry / +/// scope-lock updates) and renders a human-readable string that is +/// injected into the parent's LLM context via the notification buffer. +/// +/// Transport is fire-and-forget; receivers must tolerate out-of-order +/// delivery (e.g. `TurnEnded` arriving after `ShutDown` for the same +/// child Pod). +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "kind", rename_all = "snake_case")] +pub enum PodEvent { + /// Child finished one turn and is back to IDLE. + TurnEnded { pod_name: String }, + + /// Worker execution error occurred inside the child's turn. + /// + /// Limited to worker runtime failures (provider / tool errors) — + /// does not include transient method-rejection responses such as + /// `AlreadyRunning`. + Errored { pod_name: String, message: String }, + + /// Child has stopped (controller loop is exiting). + ShutDown { pod_name: String }, + + /// Child sub-delegated scope to a grandchild Pod via `SpawnPod`. + /// + /// The parent uses this to add the grandchild to its own + /// `spawned_pods.json` so it can manage the grandchild directly + /// even if the intermediate child dies. The parent then re-fires + /// this event upward (if it has a parent of its own) to maintain + /// the chain to root. + ScopeSubDelegated { + /// Sub-delegating Pod (= the sender itself). + parent_pod: String, + /// Name of the grandchild Pod. + sub_pod: String, + /// Unix-socket path where the grandchild is reachable. + sub_socket: PathBuf, + /// Scope delegated to the grandchild. + scope: Vec, + }, +} + // --------------------------------------------------------------------------- // Event (Pod → Client via Unix Socket broadcast) // --------------------------------------------------------------------------- @@ -150,6 +202,48 @@ pub enum ErrorCode { Internal, } +// --------------------------------------------------------------------------- +// Scope rule / permission (wire type) +// +// Defined here so that both `manifest` (config parsing) and `protocol` +// itself (inter-pod messaging such as `PodEvent::ScopeSubDelegated`) can +// reference the same type without introducing a reverse dependency. +// --------------------------------------------------------------------------- + +/// A single allow or deny rule inside a scope configuration. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ScopeRule { + /// Target path. Must be absolute by the time a `Scope` is built from + /// this rule — relative paths are resolved per-layer against the + /// manifest file's directory (cwd for overlay layers) before cascade + /// merge. + pub target: PathBuf, + /// Permission level this rule grants (allow) or caps strictly below + /// (deny). + pub permission: Permission, + /// When `false`, the rule only matches the target itself and its + /// direct children. Defaults to `true`. + #[serde(default = "default_recursive")] + pub recursive: bool, +} + +fn default_recursive() -> bool { + true +} + +/// Permission lattice used by [`ScopeRule`]. +/// +/// The derived `Ord` instance follows declaration order, so +/// `Read < Write`. Allow rules grant the stated level (and by extension +/// everything below); deny rules cap the effective level **strictly +/// below** the stated level. +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum Permission { + Read, + Write, +} + #[cfg(test)] mod tests { use super::*; @@ -195,12 +289,11 @@ mod tests { #[test] fn method_notify_json_roundtrip() { - let json = r#"{"method":"notify","params":{"source":"child-pod","message":"turn done"}}"#; + let json = r#"{"method":"notify","params":{"message":"turn done"}}"#; let method: Method = serde_json::from_str(json).unwrap(); assert!(matches!( method, - Method::Notify { ref source, ref message } - if source == "child-pod" && message == "turn done" + Method::Notify { ref message } if message == "turn done" )); let serialized = serde_json::to_string(&method).unwrap(); assert_eq!(serialized, json); @@ -235,6 +328,87 @@ mod tests { assert_eq!(parsed["data"]["greeting"]["tools"][0], "Read"); } + #[test] + fn method_pod_event_turn_ended_roundtrip() { + let method = Method::PodEvent(PodEvent::TurnEnded { + pod_name: "child".into(), + }); + let json = serde_json::to_string(&method).unwrap(); + let parsed: serde_json::Value = serde_json::from_str(&json).unwrap(); + assert_eq!(parsed["method"], "pod_event"); + assert_eq!(parsed["params"]["kind"], "turn_ended"); + assert_eq!(parsed["params"]["pod_name"], "child"); + + let decoded: Method = serde_json::from_str(&json).unwrap(); + assert!(matches!( + decoded, + Method::PodEvent(PodEvent::TurnEnded { ref pod_name }) if pod_name == "child" + )); + } + + #[test] + fn method_pod_event_errored_roundtrip() { + let method = Method::PodEvent(PodEvent::Errored { + pod_name: "child".into(), + message: "provider 429".into(), + }); + let json = serde_json::to_string(&method).unwrap(); + let decoded: Method = serde_json::from_str(&json).unwrap(); + match decoded { + Method::PodEvent(PodEvent::Errored { pod_name, message }) => { + assert_eq!(pod_name, "child"); + assert_eq!(message, "provider 429"); + } + other => panic!("expected Errored, got {other:?}"), + } + } + + #[test] + fn method_pod_event_shutdown_roundtrip() { + let method = Method::PodEvent(PodEvent::ShutDown { + pod_name: "child".into(), + }); + let json = serde_json::to_string(&method).unwrap(); + let decoded: Method = serde_json::from_str(&json).unwrap(); + assert!(matches!( + decoded, + Method::PodEvent(PodEvent::ShutDown { ref pod_name }) if pod_name == "child" + )); + } + + #[test] + fn method_pod_event_scope_sub_delegated_roundtrip() { + let method = Method::PodEvent(PodEvent::ScopeSubDelegated { + parent_pod: "child".into(), + sub_pod: "grandchild".into(), + sub_socket: "/run/insomnia/grandchild/sock".into(), + scope: vec![ScopeRule { + target: "/tmp/work".into(), + permission: Permission::Write, + recursive: true, + }], + }); + let json = serde_json::to_string(&method).unwrap(); + let decoded: Method = serde_json::from_str(&json).unwrap(); + match decoded { + Method::PodEvent(PodEvent::ScopeSubDelegated { + parent_pod, + sub_pod, + sub_socket, + scope, + }) => { + assert_eq!(parent_pod, "child"); + assert_eq!(sub_pod, "grandchild"); + assert_eq!(sub_socket, PathBuf::from("/run/insomnia/grandchild/sock")); + assert_eq!(scope.len(), 1); + assert_eq!(scope[0].target, PathBuf::from("/tmp/work")); + assert_eq!(scope[0].permission, Permission::Write); + assert!(scope[0].recursive); + } + other => panic!("expected ScopeSubDelegated, got {other:?}"), + } + } + #[test] fn event_notification_format() { let event = Event::Notification(Notification { diff --git a/tickets/tool-call-empty-args-null.md b/tickets/tool-call-empty-args-null.md new file mode 100644 index 00000000..c4d2f7c4 --- /dev/null +++ b/tickets/tool-call-empty-args-null.md @@ -0,0 +1,75 @@ +# 引数なし tool 呼び出しで `arguments = "null"` が記録される不具合 + +## 背景 + +引数を取らないツール(例: `ListPods`)を Anthropic の Claude が呼び出したとき、次ターンで履歴を送り返す際に Anthropic API が以下のエラーで 400 を返す: + +``` +messages.N.content.0.tool_use.input: Input should be a valid dictionary +``` + +実環境で `cargo run -p pod` + TUI / API 経由で `ListPods` を呼ぶと再現する。セッション jsonl には tool 呼び出しが以下の形で記録されている: + +```json +{"type":"tool_call","call_id":"toolu_...","name":"ListPods","arguments":"null"} +``` + +`arguments` が `"null"` 文字列になっており、次ターンで Anthropic に送る `tool_use.input` が JSON `null` として serialize されてしまうことが原因。 + +## 原因 + +`crates/llm-worker/src/timeline/tool_call_collector.rs:87-88`: + +```rust +let input = serde_json::from_str(&scope.input_json_buffer) + .unwrap_or(serde_json::Value::Null); +``` + +Anthropic は引数なしのツール呼び出しでは `input_json_delta` を一度も送らない。その結果 `input_json_buffer` が空文字 `""` のまま stop イベントに到達し、`from_str("")` が失敗して `Value::Null` に fallback する。 + +この `Null` が `worker.rs:499` で `Item::tool_call_json(..., Value::Null)` として履歴に保存され、`Value::Null.to_string()` = `"null"` が `arguments` フィールドに残る。 + +次ターンで `anthropic/request.rs:174-175` が history → request body 変換する際: + +```rust +let input = serde_json::from_str(arguments) + .unwrap_or_else(|_| serde_json::Value::Object(serde_json::Map::new())); +``` + +`"null"` は valid JSON として parse 成功するため fallback が効かず、`Value::Null` のまま `tool_use.input` に入り API に送信される。Anthropic の tool_use.input は object 必須なので拒否される。 + +## 修正方針 + +ルート修正を `tool_call_collector.rs` に入れる。引数なし / パース失敗の場合は `Value::Object(Map::new())`(= `{}`)にする: + +```rust +let input = if scope.input_json_buffer.is_empty() { + serde_json::Value::Object(serde_json::Map::new()) +} else { + serde_json::from_str(&scope.input_json_buffer) + .unwrap_or_else(|_| serde_json::Value::Object(serde_json::Map::new())) +}; +``` + +加えて防御層として `anthropic/request.rs:174` で parse 結果が object でない場合も `{}` に正規化する。既に `arguments = "null"` として保存済みの古いセッションが resume されたときに回復できるようにするため。 + +## 影響範囲 + +- `crates/llm-worker/src/timeline/tool_call_collector.rs`: 空バッファ時の default を `Value::Object` に +- `crates/llm-worker/src/llm_client/scheme/anthropic/request.rs`: parse 結果が非 object の場合の正規化 +- `crates/llm-worker/src/worker.rs:576-577` の同様の parse でも非 object を正規化(防御) +- OpenAI / Gemini の request.rs でも同等の問題があるか確認、必要なら同じ修正を入れる + +## 完了条件 + +- 引数なしツール(`ListPods` など)を呼んだ直後のセッション jsonl で `arguments` が `"{}"` になる +- 同一セッション内で引数なしツールを呼んでから次ターンを開始しても 400 エラーが出ない +- `"arguments":"null"` が残っている既存セッションを resume しても 400 エラーが出ない +- `tool_call_collector.rs` に「空バッファ → `{}`」を検証するテストを追加 +- 該当パスの回帰テストを単体テストで担保 + +## 範囲外 + +- LLM 側が「無意味に `null` を `input` に入れてくる」ケースの検出・警告(そもそもプロバイダから来ないはず) +- OpenAI / Gemini の同等検証(確認だけ行い、問題あれば別チケット化) +- 既存セッション jsonl の自動修復スクリプト(resume 時の defensive 正規化で十分)