fix: keep scope subdelegation control-only

This commit is contained in:
Keisuke Hirata 2026-05-30 14:01:09 +09:00
parent 08397f3f3b
commit b3fe725742
No known key found for this signature in database
6 changed files with 233 additions and 36 deletions

View File

@ -728,7 +728,7 @@ async fn controller_loop<C, St>(
// RUNNING / Paused: the buffer push is the entire // RUNNING / Paused: the buffer push is the entire
// operation; an in-flight turn (or the next // operation; an in-flight turn (or the next
// Resume/Run) will drain it at its next // Resume/Run) will drain it at its next
// pre_llm_request. IDLE: auto-start a turn so the LLM // pending_history_appends. IDLE: auto-start a turn so the LLM
// sees the buffered notification(s) without a human // sees the buffered notification(s) without a human
// Run. // Run.
if shared_state.get_status() == PodStatus::Idle { if shared_state.get_status() == PodStatus::Idle {
@ -900,11 +900,12 @@ async fn controller_loop<C, St>(
Method::ListCompletions { .. } => {} Method::ListCompletions { .. } => {}
Method::PodEvent(event) => { Method::PodEvent(event) => {
// Live echo travels through the SystemItem lane: once // For agent-visible PodEvents, live echo travels through the
// the interceptor drains the notify buffer, the // SystemItem lane: once the interceptor drains the notify buffer,
// typed `SystemItem::PodEvent` lands as a // the typed `SystemItem::PodEvent` lands as a
// `LogEntry::SystemItem` entry and the sink forwards it // `LogEntry::SystemItem` entry and the sink forwards it
// to clients as `Event::SystemItem`. // to clients as `Event::SystemItem`. Control-plane-only
// PodEvents use this same receive path only for side effects.
// //
// (1) system side effects — idempotent and tolerant of // (1) system side effects — idempotent and tolerant of
// out-of-order delivery (e.g. `TurnEnded` arriving // out-of-order delivery (e.g. `TurnEnded` arriving
@ -916,17 +917,19 @@ async fn controller_loop<C, St>(
&self_parent_socket, &self_parent_socket,
) )
.await; .await;
// (2) queue the typed event in the notification buffer; // (2) agent-visible events enter the notification/history lane.
// the next LLM request will inject it as a typed // Control-plane-only events (currently ScopeSubDelegated)
// `SystemItem::PodEvent` via the interceptor drain. // stop after side effects so they do not wake or notify the LLM.
pod.push_pod_event_notify(event); if event.should_notify_agent() {
// Auto-kick a turn if the Pod is idle so the pod.push_pod_event_notify(event);
// notification is not stranded. Matches the // Auto-kick a turn if the Pod is idle so the
// `Method::Notify` idle path. // notification is not stranded. Matches the
if shared_state.get_status() == PodStatus::Idle { // `Method::Notify` idle path.
pending = Some(PendingRun::RunForNotification( if shared_state.get_status() == PodStatus::Idle {
protocol::InvokeKind::PodEvent, pending = Some(PendingRun::RunForNotification(
)); protocol::InvokeKind::PodEvent,
));
}
} }
} }
} }
@ -1072,7 +1075,7 @@ where
} }
Some(Method::Notify { message }) => { Some(Method::Notify { message }) => {
// Live echo arrives via `Event::SystemItem` once // Live echo arrives via `Event::SystemItem` once
// the in-flight turn's next `pre_llm_request` // the in-flight turn's next `pending_history_appends`
// drains this entry through the interceptor. // drains this entry through the interceptor.
notify_buffer.push_notify(message); notify_buffer.push_notify(message);
} }
@ -1093,10 +1096,11 @@ where
// to the next main-loop iteration — drop here // to the next main-loop iteration — drop here
// would lose the event entirely (children fire // would lose the event entirely (children fire
// and forget). Apply the side effects inline // and forget). Apply the side effects inline
// and stage the typed event on the notification // and, for agent-visible variants, stage the typed
// buffer so the in-flight turn's next // event on the notification buffer so the in-flight
// `pre_llm_request` surfaces it as a typed // turn's next `pending_history_appends` surfaces it
// `SystemItem::PodEvent`. // as a typed `SystemItem::PodEvent`. Control-plane-only
// variants stop after side effects.
let self_parent_socket = parent_socket.cloned(); let self_parent_socket = parent_socket.cloned();
crate::ipc::event::apply_event_side_effects( crate::ipc::event::apply_event_side_effects(
&event, &event,
@ -1105,7 +1109,9 @@ where
&self_parent_socket, &self_parent_socket,
) )
.await; .await;
notify_buffer.push_pod_event(event); if event.should_notify_agent() {
notify_buffer.push_pod_event(event);
}
} }
None => { None => {
let _ = cancel_tx.try_send(()); let _ = cancel_tx.try_send(());
@ -1253,6 +1259,7 @@ fn worker_error_code(e: &PodError) -> ErrorCode {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use crate::runtime::dir::SpawnedPodRecord;
use protocol::PodEvent; use protocol::PodEvent;
use protocol::stream::{JsonLineReader, JsonLineWriter}; use protocol::stream::{JsonLineReader, JsonLineWriter};
use std::time::Duration; use std::time::Duration;
@ -1490,6 +1497,91 @@ mod tests {
); );
} }
#[tokio::test]
async fn running_scope_sub_delegated_applies_side_effects_without_notify_buffer() {
let mut env = make_env().await;
env.spawned_registry
.add(SpawnedPodRecord {
pod_name: "child".into(),
socket_path: "/tmp/child.sock".into(),
scope_delegated: vec![],
callback_address: "/tmp/parent.sock".into(),
})
.await
.expect("seed child record");
env._method_tx
.send(Method::PodEvent(PodEvent::ScopeSubDelegated {
parent_pod: "child".into(),
sub_pod: "grandchild".into(),
sub_socket: "/tmp/grandchild.sock".into(),
scope: vec![],
}))
.await
.expect("send pod event");
let pod_future = async {
tokio::time::sleep(Duration::from_millis(50)).await;
Ok::<_, PodError>(PodRunResult::Finished)
};
let (status, shutdown) = drive_turn(
pod_future,
&mut env.method_rx,
&env.event_tx,
&env.cancel_tx,
&env.shared_state,
&env.notify_buffer,
Some(&env.parent_socket_path),
"parent",
&env.spawned_registry,
false,
)
.await;
assert_eq!(status, PodStatus::Idle);
assert!(!shutdown);
assert!(
env.spawned_registry.get("grandchild").await.is_some(),
"ScopeSubDelegated side effects must still register the grandchild"
);
assert!(
env.notify_buffer.is_empty(),
"control-plane-only ScopeSubDelegated must not enter the agent-visible notify buffer"
);
}
#[tokio::test]
async fn running_visible_pod_event_enters_notify_buffer() {
let mut env = make_env().await;
env._method_tx
.send(Method::PodEvent(PodEvent::TurnEnded {
pod_name: "child".into(),
}))
.await
.expect("send pod event");
let pod_future = async {
tokio::time::sleep(Duration::from_millis(50)).await;
Ok::<_, PodError>(PodRunResult::Finished)
};
let (status, shutdown) = drive_turn(
pod_future,
&mut env.method_rx,
&env.event_tx,
&env.cancel_tx,
&env.shared_state,
&env.notify_buffer,
Some(&env.parent_socket_path),
"parent",
&env.spawned_registry,
false,
)
.await;
assert_eq!(status, PodStatus::Idle);
assert!(!shutdown);
assert_eq!(env.notify_buffer.len(), 1);
}
#[tokio::test] #[tokio::test]
async fn compact_method_is_rejected_while_running() { async fn compact_method_is_rejected_while_running() {
let mut env = make_env().await; let mut env = make_env().await;

View File

@ -6,8 +6,10 @@
//! //!
//! - **Send** a `Method::PodEvent` to the parent socket, fire-and-forget, //! - **Send** a `Method::PodEvent` to the parent socket, fire-and-forget,
//! logging failures without blocking the child. //! logging failures without blocking the child.
//! - **Render** a variant into a human-readable string that the parent's //! - **Render** agent-visible variants into human-readable strings for the
//! LLM sees via the notification buffer. //! parent's notification buffer. Control-plane-only variants may still have
//! a renderer for diagnostics, but receive-side classification keeps them
//! out of LLM history/context.
//! - **Apply side effects** on the parent (registry / pod-registry //! - **Apply side effects** on the parent (registry / pod-registry
//! updates) so that the receive path is idempotent and tolerant of //! updates) so that the receive path is idempotent and tolerant of
//! out-of-order delivery. //! out-of-order delivery.
@ -52,11 +54,13 @@ pub fn fire_and_forget(socket: Option<PathBuf>, event: PodEvent) {
}); });
} }
/// Render a variant into the one-line human-readable string that will /// Render a variant into a one-line human-readable string.
/// be injected into the parent's LLM context as a system message.
/// ///
/// Kept deliberately short — the LLM can always call `ReadPodOutput` /// Only events classified by `PodEvent::should_notify_agent` are injected
/// to fetch more detail if the event summary is not enough. /// into the parent's LLM context as system messages; control-plane-only events
/// keep this renderer for diagnostics/tests. Agent-visible summaries are kept
/// deliberately short — the LLM can always call `ReadPodOutput` to fetch more
/// detail if the event summary is not enough.
pub fn render_event(event: &PodEvent) -> String { pub fn render_event(event: &PodEvent) -> String {
match event { match event {
PodEvent::TurnEnded { pod_name } => { PodEvent::TurnEnded { pod_name } => {

View File

@ -11,8 +11,9 @@
//! persistent history. //! persistent history.
//! //!
//! This is the **single lane** for "system messages produced by Pod //! This is the **single lane** for "system messages produced by Pod
//! state that should land in the next LLM request": Notify, PodEvent, //! state that should land in the next LLM request": Notify,
//! and any future `<system-reminder>` injection all ride this queue. //! agent-visible PodEvent variants, and any future `<system-reminder>`
//! injection all ride this queue.
//! Per `tickets/notify-history-persist.md` and `AGENTS.md` (LLM //! Per `tickets/notify-history-persist.md` and `AGENTS.md` (LLM
//! context の加工原則), there is **no** "transient, history-skipping" //! context の加工原則), there is **no** "transient, history-skipping"
//! lane — everything injected into a request is also committed to //! lane — everything injected into a request is also committed to

View File

@ -1081,8 +1081,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
} }
} }
/// Push a `Method::Notify` (or rendered `Method::PodEvent`) entry /// Push a `Method::Notify` entry onto the pending buffer.
/// onto the pending buffer.
/// ///
/// The notification will be appended to `worker.history` as an /// The notification will be appended to `worker.history` as an
/// `Item::system_message` just before the next LLM request, via /// `Item::system_message` just before the next LLM request, via
@ -1092,8 +1091,9 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
self.pending_notifies.push_notify(message); self.pending_notifies.push_notify(message);
} }
/// Push a typed `PodEvent` entry onto the pending buffer. /// Push an agent-visible typed `PodEvent` entry onto the pending buffer.
/// ///
/// Callers must classify control-plane-only PodEvents before invoking this.
/// Same lifecycle as [`push_notify`](Self::push_notify) but /// Same lifecycle as [`push_notify`](Self::push_notify) but
/// preserves the typed `PodEvent` payload so the IPC layer can /// preserves the typed `PodEvent` payload so the IPC layer can
/// emit `SystemItem::PodEvent { event, body }` with structured /// emit `SystemItem::PodEvent { event, body }` with structured

View File

@ -971,6 +971,54 @@ async fn pod_event_turn_ended_while_idle_auto_starts_turn_and_injects_system_mes
); );
} }
#[tokio::test]
async fn pod_event_scope_sub_delegated_while_idle_stays_control_plane_only() {
let client = MockClient::new(simple_text_events());
let client_for_assert = client.clone();
let pod = make_pod(client).await;
let handle = spawn_controller(pod).await;
handle
.send(Method::PodEvent(protocol::PodEvent::ScopeSubDelegated {
parent_pod: "child".into(),
sub_pod: "grandchild".into(),
sub_socket: "/tmp/grandchild.sock".into(),
scope: vec![],
}))
.await
.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
assert_eq!(
handle.shared_state.get_status(),
PodStatus::Idle,
"control-plane ScopeSubDelegated must not auto-start the parent LLM"
);
assert!(
client_for_assert.captured_requests().is_empty(),
"ScopeSubDelegated must not issue an LLM request"
);
let (entries, _) = handle.sink.subscribe_with_snapshot();
let saw_scope_event_in_mirror = entries.iter().any(|entry| {
matches!(
entry,
session_store::LogEntry::SystemItem {
item: session_store::SystemItem::PodEvent {
event: protocol::PodEvent::ScopeSubDelegated { .. },
..
},
..
}
)
});
assert!(
!saw_scope_event_in_mirror,
"ScopeSubDelegated must not create an agent-visible SystemItem::PodEvent; mirror = {entries:?}"
);
}
#[tokio::test] #[tokio::test]
async fn notify_while_running_does_not_emit_already_running_error() { async fn notify_while_running_does_not_emit_already_running_error() {
let client = MockClient::new(simple_text_events()); let client = MockClient::new(simple_text_events());

View File

@ -73,9 +73,10 @@ pub enum Method {
/// Typed lifecycle events sent from a child Pod to its parent. /// Typed lifecycle events sent from a child Pod to its parent.
/// ///
/// Delivered as `Method::PodEvent` over the parent's Unix socket. The /// Delivered as `Method::PodEvent` over the parent's Unix socket. The
/// parent Controller applies variant-specific side effects (registry / /// parent Controller always applies variant-specific side effects
/// pod-registry updates) and renders a human-readable string that is /// (registry / pod-registry updates). Agent-visible variants are also
/// injected into the parent's LLM context via the notification buffer. /// queued into the notification buffer; control-plane-only variants are
/// not injected into the parent's LLM context.
/// ///
/// Transport is fire-and-forget; receivers must tolerate out-of-order /// Transport is fire-and-forget; receivers must tolerate out-of-order
/// delivery (e.g. `TurnEnded` arriving after `ShutDown` for the same /// delivery (e.g. `TurnEnded` arriving after `ShutDown` for the same
@ -98,6 +99,9 @@ pub enum PodEvent {
/// Child sub-delegated scope to a grandchild Pod via `SpawnPod`. /// Child sub-delegated scope to a grandchild Pod via `SpawnPod`.
/// ///
/// Control-plane only: receivers apply registry side effects and
/// propagate upward, but do not expose this as an agent notification.
///
/// The parent uses this to add the grandchild to its own /// The parent uses this to add the grandchild to its own
/// `spawned_pods.json` so it can manage the grandchild directly /// `spawned_pods.json` so it can manage the grandchild directly
/// even if the intermediate child dies. The parent then re-fires /// even if the intermediate child dies. The parent then re-fires
@ -115,6 +119,22 @@ pub enum PodEvent {
}, },
} }
impl PodEvent {
/// Whether this event should become an agent-visible notification/history item.
///
/// Control-plane-only events still travel over the same wire enum and still
/// run receiver side effects, but they must not wake the parent LLM or enter
/// the notification buffer.
pub fn should_notify_agent(&self) -> bool {
match self {
PodEvent::TurnEnded { .. } | PodEvent::Errored { .. } | PodEvent::ShutDown { .. } => {
true
}
PodEvent::ScopeSubDelegated { .. } => false,
}
}
}
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Segment — typed pieces of a user submission // Segment — typed pieces of a user submission
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@ -1209,6 +1229,38 @@ mod tests {
)); ));
} }
#[test]
fn pod_event_agent_notification_classification() {
assert!(
PodEvent::TurnEnded {
pod_name: "child".into()
}
.should_notify_agent()
);
assert!(
PodEvent::Errored {
pod_name: "child".into(),
message: "boom".into()
}
.should_notify_agent()
);
assert!(
PodEvent::ShutDown {
pod_name: "child".into()
}
.should_notify_agent()
);
assert!(
!PodEvent::ScopeSubDelegated {
parent_pod: "child".into(),
sub_pod: "grandchild".into(),
sub_socket: "/tmp/grandchild.sock".into(),
scope: vec![],
}
.should_notify_agent()
);
}
#[test] #[test]
fn method_pod_event_scope_sub_delegated_roundtrip() { fn method_pod_event_scope_sub_delegated_roundtrip() {
let method = Method::PodEvent(PodEvent::ScopeSubDelegated { let method = Method::PodEvent(PodEvent::ScopeSubDelegated {