From b3fe72574257be5e51255a08af5e381effc7e29d Mon Sep 17 00:00:00 2001 From: Hare Date: Sat, 30 May 2026 14:01:09 +0900 Subject: [PATCH] fix: keep scope subdelegation control-only --- crates/pod/src/controller.rs | 136 +++++++++++++++++++++++----- crates/pod/src/ipc/event.rs | 16 ++-- crates/pod/src/ipc/notify_buffer.rs | 5 +- crates/pod/src/pod.rs | 6 +- crates/pod/tests/controller_test.rs | 48 ++++++++++ crates/protocol/src/lib.rs | 58 +++++++++++- 6 files changed, 233 insertions(+), 36 deletions(-) diff --git a/crates/pod/src/controller.rs b/crates/pod/src/controller.rs index 8efb66f6..3ff4ea78 100644 --- a/crates/pod/src/controller.rs +++ b/crates/pod/src/controller.rs @@ -728,7 +728,7 @@ async fn controller_loop( // RUNNING / Paused: the buffer push is the entire // operation; an in-flight turn (or the next // Resume/Run) will drain it at its next - // pre_llm_request. IDLE: auto-start a turn so the LLM + // pending_history_appends. IDLE: auto-start a turn so the LLM // sees the buffered notification(s) without a human // Run. if shared_state.get_status() == PodStatus::Idle { @@ -900,11 +900,12 @@ async fn controller_loop( Method::ListCompletions { .. } => {} Method::PodEvent(event) => { - // Live echo travels through the SystemItem lane: once - // the interceptor drains the notify buffer, the - // typed `SystemItem::PodEvent` lands as a + // For agent-visible PodEvents, live echo travels through the + // SystemItem lane: once the interceptor drains the notify buffer, + // the typed `SystemItem::PodEvent` lands as a // `LogEntry::SystemItem` entry and the sink forwards it - // to clients as `Event::SystemItem`. + // to clients as `Event::SystemItem`. Control-plane-only + // PodEvents use this same receive path only for side effects. // // (1) system side effects — idempotent and tolerant of // out-of-order delivery (e.g. `TurnEnded` arriving @@ -916,17 +917,19 @@ async fn controller_loop( &self_parent_socket, ) .await; - // (2) queue the typed event in the notification buffer; - // the next LLM request will inject it as a typed - // `SystemItem::PodEvent` via the interceptor drain. - pod.push_pod_event_notify(event); - // Auto-kick a turn if the Pod is idle so the - // notification is not stranded. Matches the - // `Method::Notify` idle path. - if shared_state.get_status() == PodStatus::Idle { - pending = Some(PendingRun::RunForNotification( - protocol::InvokeKind::PodEvent, - )); + // (2) agent-visible events enter the notification/history lane. + // Control-plane-only events (currently ScopeSubDelegated) + // stop after side effects so they do not wake or notify the LLM. + if event.should_notify_agent() { + pod.push_pod_event_notify(event); + // Auto-kick a turn if the Pod is idle so the + // notification is not stranded. Matches the + // `Method::Notify` idle path. + if shared_state.get_status() == PodStatus::Idle { + pending = Some(PendingRun::RunForNotification( + protocol::InvokeKind::PodEvent, + )); + } } } } @@ -1072,7 +1075,7 @@ where } Some(Method::Notify { message }) => { // Live echo arrives via `Event::SystemItem` once - // the in-flight turn's next `pre_llm_request` + // the in-flight turn's next `pending_history_appends` // drains this entry through the interceptor. notify_buffer.push_notify(message); } @@ -1093,10 +1096,11 @@ where // to the next main-loop iteration — drop here // would lose the event entirely (children fire // and forget). Apply the side effects inline - // and stage the typed event on the notification - // buffer so the in-flight turn's next - // `pre_llm_request` surfaces it as a typed - // `SystemItem::PodEvent`. + // and, for agent-visible variants, stage the typed + // event on the notification buffer so the in-flight + // turn's next `pending_history_appends` surfaces it + // as a typed `SystemItem::PodEvent`. Control-plane-only + // variants stop after side effects. let self_parent_socket = parent_socket.cloned(); crate::ipc::event::apply_event_side_effects( &event, @@ -1105,7 +1109,9 @@ where &self_parent_socket, ) .await; - notify_buffer.push_pod_event(event); + if event.should_notify_agent() { + notify_buffer.push_pod_event(event); + } } None => { let _ = cancel_tx.try_send(()); @@ -1253,6 +1259,7 @@ fn worker_error_code(e: &PodError) -> ErrorCode { #[cfg(test)] mod tests { use super::*; + use crate::runtime::dir::SpawnedPodRecord; use protocol::PodEvent; use protocol::stream::{JsonLineReader, JsonLineWriter}; use std::time::Duration; @@ -1490,6 +1497,91 @@ mod tests { ); } + #[tokio::test] + async fn running_scope_sub_delegated_applies_side_effects_without_notify_buffer() { + let mut env = make_env().await; + env.spawned_registry + .add(SpawnedPodRecord { + pod_name: "child".into(), + socket_path: "/tmp/child.sock".into(), + scope_delegated: vec![], + callback_address: "/tmp/parent.sock".into(), + }) + .await + .expect("seed child record"); + env._method_tx + .send(Method::PodEvent(PodEvent::ScopeSubDelegated { + parent_pod: "child".into(), + sub_pod: "grandchild".into(), + sub_socket: "/tmp/grandchild.sock".into(), + scope: vec![], + })) + .await + .expect("send pod event"); + + let pod_future = async { + tokio::time::sleep(Duration::from_millis(50)).await; + Ok::<_, PodError>(PodRunResult::Finished) + }; + let (status, shutdown) = drive_turn( + pod_future, + &mut env.method_rx, + &env.event_tx, + &env.cancel_tx, + &env.shared_state, + &env.notify_buffer, + Some(&env.parent_socket_path), + "parent", + &env.spawned_registry, + false, + ) + .await; + + assert_eq!(status, PodStatus::Idle); + assert!(!shutdown); + assert!( + env.spawned_registry.get("grandchild").await.is_some(), + "ScopeSubDelegated side effects must still register the grandchild" + ); + assert!( + env.notify_buffer.is_empty(), + "control-plane-only ScopeSubDelegated must not enter the agent-visible notify buffer" + ); + } + + #[tokio::test] + async fn running_visible_pod_event_enters_notify_buffer() { + let mut env = make_env().await; + env._method_tx + .send(Method::PodEvent(PodEvent::TurnEnded { + pod_name: "child".into(), + })) + .await + .expect("send pod event"); + + let pod_future = async { + tokio::time::sleep(Duration::from_millis(50)).await; + Ok::<_, PodError>(PodRunResult::Finished) + }; + let (status, shutdown) = drive_turn( + pod_future, + &mut env.method_rx, + &env.event_tx, + &env.cancel_tx, + &env.shared_state, + &env.notify_buffer, + Some(&env.parent_socket_path), + "parent", + &env.spawned_registry, + false, + ) + .await; + + assert_eq!(status, PodStatus::Idle); + assert!(!shutdown); + assert_eq!(env.notify_buffer.len(), 1); + } + #[tokio::test] async fn compact_method_is_rejected_while_running() { let mut env = make_env().await; diff --git a/crates/pod/src/ipc/event.rs b/crates/pod/src/ipc/event.rs index 78d8e4d3..26d08308 100644 --- a/crates/pod/src/ipc/event.rs +++ b/crates/pod/src/ipc/event.rs @@ -6,8 +6,10 @@ //! //! - **Send** a `Method::PodEvent` to the parent socket, fire-and-forget, //! logging failures without blocking the child. -//! - **Render** a variant into a human-readable string that the parent's -//! LLM sees via the notification buffer. +//! - **Render** agent-visible variants into human-readable strings for the +//! parent's notification buffer. Control-plane-only variants may still have +//! a renderer for diagnostics, but receive-side classification keeps them +//! out of LLM history/context. //! - **Apply side effects** on the parent (registry / pod-registry //! updates) so that the receive path is idempotent and tolerant of //! out-of-order delivery. @@ -52,11 +54,13 @@ pub fn fire_and_forget(socket: Option, event: PodEvent) { }); } -/// Render a variant into the one-line human-readable string that will -/// be injected into the parent's LLM context as a system message. +/// Render a variant into a one-line human-readable string. /// -/// Kept deliberately short — the LLM can always call `ReadPodOutput` -/// to fetch more detail if the event summary is not enough. +/// Only events classified by `PodEvent::should_notify_agent` are injected +/// into the parent's LLM context as system messages; control-plane-only events +/// keep this renderer for diagnostics/tests. Agent-visible summaries are kept +/// deliberately short — the LLM can always call `ReadPodOutput` to fetch more +/// detail if the event summary is not enough. pub fn render_event(event: &PodEvent) -> String { match event { PodEvent::TurnEnded { pod_name } => { diff --git a/crates/pod/src/ipc/notify_buffer.rs b/crates/pod/src/ipc/notify_buffer.rs index 9745b17a..d2ee22e5 100644 --- a/crates/pod/src/ipc/notify_buffer.rs +++ b/crates/pod/src/ipc/notify_buffer.rs @@ -11,8 +11,9 @@ //! persistent history. //! //! This is the **single lane** for "system messages produced by Pod -//! state that should land in the next LLM request": Notify, PodEvent, -//! and any future `` injection all ride this queue. +//! state that should land in the next LLM request": Notify, +//! agent-visible PodEvent variants, and any future `` +//! injection all ride this queue. //! Per `tickets/notify-history-persist.md` and `AGENTS.md` (LLM //! context の加工原則), there is **no** "transient, history-skipping" //! lane — everything injected into a request is also committed to diff --git a/crates/pod/src/pod.rs b/crates/pod/src/pod.rs index b671d6d9..87d29cab 100644 --- a/crates/pod/src/pod.rs +++ b/crates/pod/src/pod.rs @@ -1081,8 +1081,7 @@ impl Pod { } } - /// Push a `Method::Notify` (or rendered `Method::PodEvent`) entry - /// onto the pending buffer. + /// Push a `Method::Notify` entry onto the pending buffer. /// /// The notification will be appended to `worker.history` as an /// `Item::system_message` just before the next LLM request, via @@ -1092,8 +1091,9 @@ impl Pod { self.pending_notifies.push_notify(message); } - /// Push a typed `PodEvent` entry onto the pending buffer. + /// Push an agent-visible typed `PodEvent` entry onto the pending buffer. /// + /// Callers must classify control-plane-only PodEvents before invoking this. /// Same lifecycle as [`push_notify`](Self::push_notify) but /// preserves the typed `PodEvent` payload so the IPC layer can /// emit `SystemItem::PodEvent { event, body }` with structured diff --git a/crates/pod/tests/controller_test.rs b/crates/pod/tests/controller_test.rs index b7933121..fd7c90e7 100644 --- a/crates/pod/tests/controller_test.rs +++ b/crates/pod/tests/controller_test.rs @@ -971,6 +971,54 @@ async fn pod_event_turn_ended_while_idle_auto_starts_turn_and_injects_system_mes ); } +#[tokio::test] +async fn pod_event_scope_sub_delegated_while_idle_stays_control_plane_only() { + let client = MockClient::new(simple_text_events()); + let client_for_assert = client.clone(); + let pod = make_pod(client).await; + let handle = spawn_controller(pod).await; + + handle + .send(Method::PodEvent(protocol::PodEvent::ScopeSubDelegated { + parent_pod: "child".into(), + sub_pod: "grandchild".into(), + sub_socket: "/tmp/grandchild.sock".into(), + scope: vec![], + })) + .await + .unwrap(); + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + assert_eq!( + handle.shared_state.get_status(), + PodStatus::Idle, + "control-plane ScopeSubDelegated must not auto-start the parent LLM" + ); + assert!( + client_for_assert.captured_requests().is_empty(), + "ScopeSubDelegated must not issue an LLM request" + ); + + let (entries, _) = handle.sink.subscribe_with_snapshot(); + let saw_scope_event_in_mirror = entries.iter().any(|entry| { + matches!( + entry, + session_store::LogEntry::SystemItem { + item: session_store::SystemItem::PodEvent { + event: protocol::PodEvent::ScopeSubDelegated { .. }, + .. + }, + .. + } + ) + }); + assert!( + !saw_scope_event_in_mirror, + "ScopeSubDelegated must not create an agent-visible SystemItem::PodEvent; mirror = {entries:?}" + ); +} + #[tokio::test] async fn notify_while_running_does_not_emit_already_running_error() { let client = MockClient::new(simple_text_events()); diff --git a/crates/protocol/src/lib.rs b/crates/protocol/src/lib.rs index 507302a1..3cbc1be9 100644 --- a/crates/protocol/src/lib.rs +++ b/crates/protocol/src/lib.rs @@ -73,9 +73,10 @@ pub enum Method { /// Typed lifecycle events sent from a child Pod to its parent. /// /// Delivered as `Method::PodEvent` over the parent's Unix socket. The -/// parent Controller applies variant-specific side effects (registry / -/// pod-registry updates) and renders a human-readable string that is -/// injected into the parent's LLM context via the notification buffer. +/// parent Controller always applies variant-specific side effects +/// (registry / pod-registry updates). Agent-visible variants are also +/// queued into the notification buffer; control-plane-only variants are +/// not injected into the parent's LLM context. /// /// Transport is fire-and-forget; receivers must tolerate out-of-order /// delivery (e.g. `TurnEnded` arriving after `ShutDown` for the same @@ -98,6 +99,9 @@ pub enum PodEvent { /// Child sub-delegated scope to a grandchild Pod via `SpawnPod`. /// + /// Control-plane only: receivers apply registry side effects and + /// propagate upward, but do not expose this as an agent notification. + /// /// The parent uses this to add the grandchild to its own /// `spawned_pods.json` so it can manage the grandchild directly /// even if the intermediate child dies. The parent then re-fires @@ -115,6 +119,22 @@ pub enum PodEvent { }, } +impl PodEvent { + /// Whether this event should become an agent-visible notification/history item. + /// + /// Control-plane-only events still travel over the same wire enum and still + /// run receiver side effects, but they must not wake the parent LLM or enter + /// the notification buffer. + pub fn should_notify_agent(&self) -> bool { + match self { + PodEvent::TurnEnded { .. } | PodEvent::Errored { .. } | PodEvent::ShutDown { .. } => { + true + } + PodEvent::ScopeSubDelegated { .. } => false, + } + } +} + // --------------------------------------------------------------------------- // Segment — typed pieces of a user submission // --------------------------------------------------------------------------- @@ -1209,6 +1229,38 @@ mod tests { )); } + #[test] + fn pod_event_agent_notification_classification() { + assert!( + PodEvent::TurnEnded { + pod_name: "child".into() + } + .should_notify_agent() + ); + assert!( + PodEvent::Errored { + pod_name: "child".into(), + message: "boom".into() + } + .should_notify_agent() + ); + assert!( + PodEvent::ShutDown { + pod_name: "child".into() + } + .should_notify_agent() + ); + assert!( + !PodEvent::ScopeSubDelegated { + parent_pod: "child".into(), + sub_pod: "grandchild".into(), + sub_socket: "/tmp/grandchild.sock".into(), + scope: vec![], + } + .should_notify_agent() + ); + } + #[test] fn method_pod_event_scope_sub_delegated_roundtrip() { let method = Method::PodEvent(PodEvent::ScopeSubDelegated {