merge: scope subdelegation control only
This commit is contained in:
commit
39874d92dc
|
|
@ -728,7 +728,7 @@ async fn controller_loop<C, St>(
|
||||||
// RUNNING / Paused: the buffer push is the entire
|
// RUNNING / Paused: the buffer push is the entire
|
||||||
// operation; an in-flight turn (or the next
|
// operation; an in-flight turn (or the next
|
||||||
// Resume/Run) will drain it at its next
|
// Resume/Run) will drain it at its next
|
||||||
// pre_llm_request. IDLE: auto-start a turn so the LLM
|
// pending_history_appends. IDLE: auto-start a turn so the LLM
|
||||||
// sees the buffered notification(s) without a human
|
// sees the buffered notification(s) without a human
|
||||||
// Run.
|
// Run.
|
||||||
if shared_state.get_status() == PodStatus::Idle {
|
if shared_state.get_status() == PodStatus::Idle {
|
||||||
|
|
@ -900,11 +900,12 @@ async fn controller_loop<C, St>(
|
||||||
Method::ListCompletions { .. } => {}
|
Method::ListCompletions { .. } => {}
|
||||||
|
|
||||||
Method::PodEvent(event) => {
|
Method::PodEvent(event) => {
|
||||||
// Live echo travels through the SystemItem lane: once
|
// For agent-visible PodEvents, live echo travels through the
|
||||||
// the interceptor drains the notify buffer, the
|
// SystemItem lane: once the interceptor drains the notify buffer,
|
||||||
// typed `SystemItem::PodEvent` lands as a
|
// the typed `SystemItem::PodEvent` lands as a
|
||||||
// `LogEntry::SystemItem` entry and the sink forwards it
|
// `LogEntry::SystemItem` entry and the sink forwards it
|
||||||
// to clients as `Event::SystemItem`.
|
// to clients as `Event::SystemItem`. Control-plane-only
|
||||||
|
// PodEvents use this same receive path only for side effects.
|
||||||
//
|
//
|
||||||
// (1) system side effects — idempotent and tolerant of
|
// (1) system side effects — idempotent and tolerant of
|
||||||
// out-of-order delivery (e.g. `TurnEnded` arriving
|
// out-of-order delivery (e.g. `TurnEnded` arriving
|
||||||
|
|
@ -916,17 +917,19 @@ async fn controller_loop<C, St>(
|
||||||
&self_parent_socket,
|
&self_parent_socket,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
// (2) queue the typed event in the notification buffer;
|
// (2) agent-visible events enter the notification/history lane.
|
||||||
// the next LLM request will inject it as a typed
|
// Control-plane-only events (currently ScopeSubDelegated)
|
||||||
// `SystemItem::PodEvent` via the interceptor drain.
|
// stop after side effects so they do not wake or notify the LLM.
|
||||||
pod.push_pod_event_notify(event);
|
if event.should_notify_agent() {
|
||||||
// Auto-kick a turn if the Pod is idle so the
|
pod.push_pod_event_notify(event);
|
||||||
// notification is not stranded. Matches the
|
// Auto-kick a turn if the Pod is idle so the
|
||||||
// `Method::Notify` idle path.
|
// notification is not stranded. Matches the
|
||||||
if shared_state.get_status() == PodStatus::Idle {
|
// `Method::Notify` idle path.
|
||||||
pending = Some(PendingRun::RunForNotification(
|
if shared_state.get_status() == PodStatus::Idle {
|
||||||
protocol::InvokeKind::PodEvent,
|
pending = Some(PendingRun::RunForNotification(
|
||||||
));
|
protocol::InvokeKind::PodEvent,
|
||||||
|
));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -1072,7 +1075,7 @@ where
|
||||||
}
|
}
|
||||||
Some(Method::Notify { message }) => {
|
Some(Method::Notify { message }) => {
|
||||||
// Live echo arrives via `Event::SystemItem` once
|
// Live echo arrives via `Event::SystemItem` once
|
||||||
// the in-flight turn's next `pre_llm_request`
|
// the in-flight turn's next `pending_history_appends`
|
||||||
// drains this entry through the interceptor.
|
// drains this entry through the interceptor.
|
||||||
notify_buffer.push_notify(message);
|
notify_buffer.push_notify(message);
|
||||||
}
|
}
|
||||||
|
|
@ -1093,10 +1096,11 @@ where
|
||||||
// to the next main-loop iteration — drop here
|
// to the next main-loop iteration — drop here
|
||||||
// would lose the event entirely (children fire
|
// would lose the event entirely (children fire
|
||||||
// and forget). Apply the side effects inline
|
// and forget). Apply the side effects inline
|
||||||
// and stage the typed event on the notification
|
// and, for agent-visible variants, stage the typed
|
||||||
// buffer so the in-flight turn's next
|
// event on the notification buffer so the in-flight
|
||||||
// `pre_llm_request` surfaces it as a typed
|
// turn's next `pending_history_appends` surfaces it
|
||||||
// `SystemItem::PodEvent`.
|
// as a typed `SystemItem::PodEvent`. Control-plane-only
|
||||||
|
// variants stop after side effects.
|
||||||
let self_parent_socket = parent_socket.cloned();
|
let self_parent_socket = parent_socket.cloned();
|
||||||
crate::ipc::event::apply_event_side_effects(
|
crate::ipc::event::apply_event_side_effects(
|
||||||
&event,
|
&event,
|
||||||
|
|
@ -1105,7 +1109,9 @@ where
|
||||||
&self_parent_socket,
|
&self_parent_socket,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
notify_buffer.push_pod_event(event);
|
if event.should_notify_agent() {
|
||||||
|
notify_buffer.push_pod_event(event);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
None => {
|
None => {
|
||||||
let _ = cancel_tx.try_send(());
|
let _ = cancel_tx.try_send(());
|
||||||
|
|
@ -1253,6 +1259,7 @@ fn worker_error_code(e: &PodError) -> ErrorCode {
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
use crate::runtime::dir::SpawnedPodRecord;
|
||||||
use protocol::PodEvent;
|
use protocol::PodEvent;
|
||||||
use protocol::stream::{JsonLineReader, JsonLineWriter};
|
use protocol::stream::{JsonLineReader, JsonLineWriter};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
@ -1490,6 +1497,91 @@ mod tests {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn running_scope_sub_delegated_applies_side_effects_without_notify_buffer() {
|
||||||
|
let mut env = make_env().await;
|
||||||
|
env.spawned_registry
|
||||||
|
.add(SpawnedPodRecord {
|
||||||
|
pod_name: "child".into(),
|
||||||
|
socket_path: "/tmp/child.sock".into(),
|
||||||
|
scope_delegated: vec![],
|
||||||
|
callback_address: "/tmp/parent.sock".into(),
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.expect("seed child record");
|
||||||
|
env._method_tx
|
||||||
|
.send(Method::PodEvent(PodEvent::ScopeSubDelegated {
|
||||||
|
parent_pod: "child".into(),
|
||||||
|
sub_pod: "grandchild".into(),
|
||||||
|
sub_socket: "/tmp/grandchild.sock".into(),
|
||||||
|
scope: vec![],
|
||||||
|
}))
|
||||||
|
.await
|
||||||
|
.expect("send pod event");
|
||||||
|
|
||||||
|
let pod_future = async {
|
||||||
|
tokio::time::sleep(Duration::from_millis(50)).await;
|
||||||
|
Ok::<_, PodError>(PodRunResult::Finished)
|
||||||
|
};
|
||||||
|
let (status, shutdown) = drive_turn(
|
||||||
|
pod_future,
|
||||||
|
&mut env.method_rx,
|
||||||
|
&env.event_tx,
|
||||||
|
&env.cancel_tx,
|
||||||
|
&env.shared_state,
|
||||||
|
&env.notify_buffer,
|
||||||
|
Some(&env.parent_socket_path),
|
||||||
|
"parent",
|
||||||
|
&env.spawned_registry,
|
||||||
|
false,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
assert_eq!(status, PodStatus::Idle);
|
||||||
|
assert!(!shutdown);
|
||||||
|
assert!(
|
||||||
|
env.spawned_registry.get("grandchild").await.is_some(),
|
||||||
|
"ScopeSubDelegated side effects must still register the grandchild"
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
env.notify_buffer.is_empty(),
|
||||||
|
"control-plane-only ScopeSubDelegated must not enter the agent-visible notify buffer"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn running_visible_pod_event_enters_notify_buffer() {
|
||||||
|
let mut env = make_env().await;
|
||||||
|
env._method_tx
|
||||||
|
.send(Method::PodEvent(PodEvent::TurnEnded {
|
||||||
|
pod_name: "child".into(),
|
||||||
|
}))
|
||||||
|
.await
|
||||||
|
.expect("send pod event");
|
||||||
|
|
||||||
|
let pod_future = async {
|
||||||
|
tokio::time::sleep(Duration::from_millis(50)).await;
|
||||||
|
Ok::<_, PodError>(PodRunResult::Finished)
|
||||||
|
};
|
||||||
|
let (status, shutdown) = drive_turn(
|
||||||
|
pod_future,
|
||||||
|
&mut env.method_rx,
|
||||||
|
&env.event_tx,
|
||||||
|
&env.cancel_tx,
|
||||||
|
&env.shared_state,
|
||||||
|
&env.notify_buffer,
|
||||||
|
Some(&env.parent_socket_path),
|
||||||
|
"parent",
|
||||||
|
&env.spawned_registry,
|
||||||
|
false,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
assert_eq!(status, PodStatus::Idle);
|
||||||
|
assert!(!shutdown);
|
||||||
|
assert_eq!(env.notify_buffer.len(), 1);
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn compact_method_is_rejected_while_running() {
|
async fn compact_method_is_rejected_while_running() {
|
||||||
let mut env = make_env().await;
|
let mut env = make_env().await;
|
||||||
|
|
|
||||||
|
|
@ -6,8 +6,10 @@
|
||||||
//!
|
//!
|
||||||
//! - **Send** a `Method::PodEvent` to the parent socket, fire-and-forget,
|
//! - **Send** a `Method::PodEvent` to the parent socket, fire-and-forget,
|
||||||
//! logging failures without blocking the child.
|
//! logging failures without blocking the child.
|
||||||
//! - **Render** a variant into a human-readable string that the parent's
|
//! - **Render** agent-visible variants into human-readable strings for the
|
||||||
//! LLM sees via the notification buffer.
|
//! parent's notification buffer. Control-plane-only variants may still have
|
||||||
|
//! a renderer for diagnostics, but receive-side classification keeps them
|
||||||
|
//! out of LLM history/context.
|
||||||
//! - **Apply side effects** on the parent (registry / pod-registry
|
//! - **Apply side effects** on the parent (registry / pod-registry
|
||||||
//! updates) so that the receive path is idempotent and tolerant of
|
//! updates) so that the receive path is idempotent and tolerant of
|
||||||
//! out-of-order delivery.
|
//! out-of-order delivery.
|
||||||
|
|
@ -52,11 +54,13 @@ pub fn fire_and_forget(socket: Option<PathBuf>, event: PodEvent) {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Render a variant into the one-line human-readable string that will
|
/// Render a variant into a one-line human-readable string.
|
||||||
/// be injected into the parent's LLM context as a system message.
|
|
||||||
///
|
///
|
||||||
/// Kept deliberately short — the LLM can always call `ReadPodOutput`
|
/// Only events classified by `PodEvent::should_notify_agent` are injected
|
||||||
/// to fetch more detail if the event summary is not enough.
|
/// into the parent's LLM context as system messages; control-plane-only events
|
||||||
|
/// keep this renderer for diagnostics/tests. Agent-visible summaries are kept
|
||||||
|
/// deliberately short — the LLM can always call `ReadPodOutput` to fetch more
|
||||||
|
/// detail if the event summary is not enough.
|
||||||
pub fn render_event(event: &PodEvent) -> String {
|
pub fn render_event(event: &PodEvent) -> String {
|
||||||
match event {
|
match event {
|
||||||
PodEvent::TurnEnded { pod_name } => {
|
PodEvent::TurnEnded { pod_name } => {
|
||||||
|
|
|
||||||
|
|
@ -11,8 +11,9 @@
|
||||||
//! persistent history.
|
//! persistent history.
|
||||||
//!
|
//!
|
||||||
//! This is the **single lane** for "system messages produced by Pod
|
//! This is the **single lane** for "system messages produced by Pod
|
||||||
//! state that should land in the next LLM request": Notify, PodEvent,
|
//! state that should land in the next LLM request": Notify,
|
||||||
//! and any future `<system-reminder>` injection all ride this queue.
|
//! agent-visible PodEvent variants, and any future `<system-reminder>`
|
||||||
|
//! injection all ride this queue.
|
||||||
//! Per `tickets/notify-history-persist.md` and `AGENTS.md` (LLM
|
//! Per `tickets/notify-history-persist.md` and `AGENTS.md` (LLM
|
||||||
//! context の加工原則), there is **no** "transient, history-skipping"
|
//! context の加工原則), there is **no** "transient, history-skipping"
|
||||||
//! lane — everything injected into a request is also committed to
|
//! lane — everything injected into a request is also committed to
|
||||||
|
|
|
||||||
|
|
@ -1081,8 +1081,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Push a `Method::Notify` (or rendered `Method::PodEvent`) entry
|
/// Push a `Method::Notify` entry onto the pending buffer.
|
||||||
/// onto the pending buffer.
|
|
||||||
///
|
///
|
||||||
/// The notification will be appended to `worker.history` as an
|
/// The notification will be appended to `worker.history` as an
|
||||||
/// `Item::system_message` just before the next LLM request, via
|
/// `Item::system_message` just before the next LLM request, via
|
||||||
|
|
@ -1092,8 +1091,9 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
||||||
self.pending_notifies.push_notify(message);
|
self.pending_notifies.push_notify(message);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Push a typed `PodEvent` entry onto the pending buffer.
|
/// Push an agent-visible typed `PodEvent` entry onto the pending buffer.
|
||||||
///
|
///
|
||||||
|
/// Callers must classify control-plane-only PodEvents before invoking this.
|
||||||
/// Same lifecycle as [`push_notify`](Self::push_notify) but
|
/// Same lifecycle as [`push_notify`](Self::push_notify) but
|
||||||
/// preserves the typed `PodEvent` payload so the IPC layer can
|
/// preserves the typed `PodEvent` payload so the IPC layer can
|
||||||
/// emit `SystemItem::PodEvent { event, body }` with structured
|
/// emit `SystemItem::PodEvent { event, body }` with structured
|
||||||
|
|
|
||||||
|
|
@ -971,6 +971,54 @@ async fn pod_event_turn_ended_while_idle_auto_starts_turn_and_injects_system_mes
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn pod_event_scope_sub_delegated_while_idle_stays_control_plane_only() {
|
||||||
|
let client = MockClient::new(simple_text_events());
|
||||||
|
let client_for_assert = client.clone();
|
||||||
|
let pod = make_pod(client).await;
|
||||||
|
let handle = spawn_controller(pod).await;
|
||||||
|
|
||||||
|
handle
|
||||||
|
.send(Method::PodEvent(protocol::PodEvent::ScopeSubDelegated {
|
||||||
|
parent_pod: "child".into(),
|
||||||
|
sub_pod: "grandchild".into(),
|
||||||
|
sub_socket: "/tmp/grandchild.sock".into(),
|
||||||
|
scope: vec![],
|
||||||
|
}))
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
handle.shared_state.get_status(),
|
||||||
|
PodStatus::Idle,
|
||||||
|
"control-plane ScopeSubDelegated must not auto-start the parent LLM"
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
client_for_assert.captured_requests().is_empty(),
|
||||||
|
"ScopeSubDelegated must not issue an LLM request"
|
||||||
|
);
|
||||||
|
|
||||||
|
let (entries, _) = handle.sink.subscribe_with_snapshot();
|
||||||
|
let saw_scope_event_in_mirror = entries.iter().any(|entry| {
|
||||||
|
matches!(
|
||||||
|
entry,
|
||||||
|
session_store::LogEntry::SystemItem {
|
||||||
|
item: session_store::SystemItem::PodEvent {
|
||||||
|
event: protocol::PodEvent::ScopeSubDelegated { .. },
|
||||||
|
..
|
||||||
|
},
|
||||||
|
..
|
||||||
|
}
|
||||||
|
)
|
||||||
|
});
|
||||||
|
assert!(
|
||||||
|
!saw_scope_event_in_mirror,
|
||||||
|
"ScopeSubDelegated must not create an agent-visible SystemItem::PodEvent; mirror = {entries:?}"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn notify_while_running_does_not_emit_already_running_error() {
|
async fn notify_while_running_does_not_emit_already_running_error() {
|
||||||
let client = MockClient::new(simple_text_events());
|
let client = MockClient::new(simple_text_events());
|
||||||
|
|
|
||||||
|
|
@ -73,9 +73,10 @@ pub enum Method {
|
||||||
/// Typed lifecycle events sent from a child Pod to its parent.
|
/// Typed lifecycle events sent from a child Pod to its parent.
|
||||||
///
|
///
|
||||||
/// Delivered as `Method::PodEvent` over the parent's Unix socket. The
|
/// Delivered as `Method::PodEvent` over the parent's Unix socket. The
|
||||||
/// parent Controller applies variant-specific side effects (registry /
|
/// parent Controller always applies variant-specific side effects
|
||||||
/// pod-registry updates) and renders a human-readable string that is
|
/// (registry / pod-registry updates). Agent-visible variants are also
|
||||||
/// injected into the parent's LLM context via the notification buffer.
|
/// queued into the notification buffer; control-plane-only variants are
|
||||||
|
/// not injected into the parent's LLM context.
|
||||||
///
|
///
|
||||||
/// Transport is fire-and-forget; receivers must tolerate out-of-order
|
/// Transport is fire-and-forget; receivers must tolerate out-of-order
|
||||||
/// delivery (e.g. `TurnEnded` arriving after `ShutDown` for the same
|
/// delivery (e.g. `TurnEnded` arriving after `ShutDown` for the same
|
||||||
|
|
@ -98,6 +99,9 @@ pub enum PodEvent {
|
||||||
|
|
||||||
/// Child sub-delegated scope to a grandchild Pod via `SpawnPod`.
|
/// Child sub-delegated scope to a grandchild Pod via `SpawnPod`.
|
||||||
///
|
///
|
||||||
|
/// Control-plane only: receivers apply registry side effects and
|
||||||
|
/// propagate upward, but do not expose this as an agent notification.
|
||||||
|
///
|
||||||
/// The parent uses this to add the grandchild to its own
|
/// The parent uses this to add the grandchild to its own
|
||||||
/// `spawned_pods.json` so it can manage the grandchild directly
|
/// `spawned_pods.json` so it can manage the grandchild directly
|
||||||
/// even if the intermediate child dies. The parent then re-fires
|
/// even if the intermediate child dies. The parent then re-fires
|
||||||
|
|
@ -115,6 +119,22 @@ pub enum PodEvent {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl PodEvent {
|
||||||
|
/// Whether this event should become an agent-visible notification/history item.
|
||||||
|
///
|
||||||
|
/// Control-plane-only events still travel over the same wire enum and still
|
||||||
|
/// run receiver side effects, but they must not wake the parent LLM or enter
|
||||||
|
/// the notification buffer.
|
||||||
|
pub fn should_notify_agent(&self) -> bool {
|
||||||
|
match self {
|
||||||
|
PodEvent::TurnEnded { .. } | PodEvent::Errored { .. } | PodEvent::ShutDown { .. } => {
|
||||||
|
true
|
||||||
|
}
|
||||||
|
PodEvent::ScopeSubDelegated { .. } => false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
// Segment — typed pieces of a user submission
|
// Segment — typed pieces of a user submission
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
|
|
@ -1209,6 +1229,38 @@ mod tests {
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn pod_event_agent_notification_classification() {
|
||||||
|
assert!(
|
||||||
|
PodEvent::TurnEnded {
|
||||||
|
pod_name: "child".into()
|
||||||
|
}
|
||||||
|
.should_notify_agent()
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
PodEvent::Errored {
|
||||||
|
pod_name: "child".into(),
|
||||||
|
message: "boom".into()
|
||||||
|
}
|
||||||
|
.should_notify_agent()
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
PodEvent::ShutDown {
|
||||||
|
pod_name: "child".into()
|
||||||
|
}
|
||||||
|
.should_notify_agent()
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
!PodEvent::ScopeSubDelegated {
|
||||||
|
parent_pod: "child".into(),
|
||||||
|
sub_pod: "grandchild".into(),
|
||||||
|
sub_socket: "/tmp/grandchild.sock".into(),
|
||||||
|
scope: vec![],
|
||||||
|
}
|
||||||
|
.should_notify_agent()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn method_pod_event_scope_sub_delegated_roundtrip() {
|
fn method_pod_event_scope_sub_delegated_roundtrip() {
|
||||||
let method = Method::PodEvent(PodEvent::ScopeSubDelegated {
|
let method = Method::PodEvent(PodEvent::ScopeSubDelegated {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user