pod-upstream-event実装
This commit is contained in:
parent
cc7bb0b711
commit
7637f0e440
1
Cargo.lock
generated
1
Cargo.lock
generated
|
|
@ -1517,6 +1517,7 @@ dependencies = [
|
||||||
name = "manifest"
|
name = "manifest"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
|
"protocol",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_ignored",
|
"serde_ignored",
|
||||||
"tempfile",
|
"tempfile",
|
||||||
|
|
|
||||||
1
TODO.md
1
TODO.md
|
|
@ -1,4 +1,5 @@
|
||||||
- [ ] テスト設計 → [tickets/test-design.md](tickets/test-design.md)
|
- [ ] テスト設計 → [tickets/test-design.md](tickets/test-design.md)
|
||||||
|
- [ ] 引数なし tool 呼び出しで `arguments = "null"` が記録される不具合 → [tickets/tool-call-empty-args-null.md](tickets/tool-call-empty-args-null.md)
|
||||||
- [ ] ツール設計
|
- [ ] ツール設計
|
||||||
- [ ] Bash ツール (Permission 層と統合) → [tickets/bash-tool.md](tickets/bash-tool.md)
|
- [ ] Bash ツール (Permission 層と統合) → [tickets/bash-tool.md](tickets/bash-tool.md)
|
||||||
- [ ] Compact の改善(要約品質 + 挙動詳細) → [tickets/compact-improvements.md](tickets/compact-improvements.md)
|
- [ ] Compact の改善(要約品質 + 挙動詳細) → [tickets/compact-improvements.md](tickets/compact-improvements.md)
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,7 @@ edition.workspace = true
|
||||||
license.workspace = true
|
license.workspace = true
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
protocol = { version = "0.1.0", path = "../protocol" }
|
||||||
serde = { version = "1.0.228", features = ["derive"] }
|
serde = { version = "1.0.228", features = ["derive"] }
|
||||||
serde_ignored = "0.1.14"
|
serde_ignored = "0.1.14"
|
||||||
thiserror = "2.0.18"
|
thiserror = "2.0.18"
|
||||||
|
|
|
||||||
|
|
@ -6,6 +6,7 @@ pub use config::{
|
||||||
CompactionConfigPartial, PodManifestConfig, PodMetaConfig, ProviderConfigPartial, ResolveError,
|
CompactionConfigPartial, PodManifestConfig, PodMetaConfig, ProviderConfigPartial, ResolveError,
|
||||||
ToolOutputLimitsPartial, WorkerManifestConfig,
|
ToolOutputLimitsPartial, WorkerManifestConfig,
|
||||||
};
|
};
|
||||||
|
pub use protocol::{Permission, ScopeRule};
|
||||||
pub use scope::{Scope, ScopeError};
|
pub use scope::{Scope, ScopeError};
|
||||||
|
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
|
@ -159,39 +160,6 @@ pub struct ScopeConfig {
|
||||||
pub deny: Vec<ScopeRule>,
|
pub deny: Vec<ScopeRule>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A single allow or deny rule inside [`ScopeConfig`].
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
||||||
pub struct ScopeRule {
|
|
||||||
/// Target path. Must be absolute by the time [`Scope::from_config`]
|
|
||||||
/// runs — relative paths are resolved per-layer against the manifest
|
|
||||||
/// file's directory (cwd for overlay layers) before cascade merge.
|
|
||||||
pub target: PathBuf,
|
|
||||||
/// Permission level this rule grants (allow) or caps strictly below
|
|
||||||
/// (deny).
|
|
||||||
pub permission: Permission,
|
|
||||||
/// When `false`, the rule only matches the target itself and its
|
|
||||||
/// direct children. Defaults to `true`.
|
|
||||||
#[serde(default = "default_recursive")]
|
|
||||||
pub recursive: bool,
|
|
||||||
}
|
|
||||||
|
|
||||||
fn default_recursive() -> bool {
|
|
||||||
true
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Permission lattice used by [`ScopeRule`].
|
|
||||||
///
|
|
||||||
/// The derived `Ord` instance follows declaration order, so
|
|
||||||
/// `Read < Write`. Allow rules grant the stated level (and by extension
|
|
||||||
/// everything below); deny rules cap the effective level **strictly
|
|
||||||
/// below** the stated level.
|
|
||||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
|
|
||||||
#[serde(rename_all = "lowercase")]
|
|
||||||
pub enum Permission {
|
|
||||||
Read,
|
|
||||||
Write,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Context compaction configuration.
|
/// Context compaction configuration.
|
||||||
///
|
///
|
||||||
/// Controls Prune (content removal from old tool results) and Compact
|
/// Controls Prune (content removal from old tool results) and Compact
|
||||||
|
|
|
||||||
|
|
@ -114,6 +114,17 @@ impl PodController {
|
||||||
let pwd_for_tools = pod.pwd().to_path_buf();
|
let pwd_for_tools = pod.pwd().to_path_buf();
|
||||||
let spawner_name = pod.manifest().pod.name.clone();
|
let spawner_name = pod.manifest().pod.name.clone();
|
||||||
|
|
||||||
|
// Parent callback socket (this Pod's own parent, used for
|
||||||
|
// `PodEvent` upward reports). `None` for top-level Pods.
|
||||||
|
let self_parent_socket = pod.callback_socket().cloned();
|
||||||
|
|
||||||
|
// `SpawnedPodRegistry` is shared between the Pod-orchestration
|
||||||
|
// tools (registered below) and the main loop's `PodEvent`
|
||||||
|
// handler (added later in this function), so hoist its creation
|
||||||
|
// above the worker-borrow block.
|
||||||
|
let spawner_socket = runtime_dir.socket_path();
|
||||||
|
let spawned_registry = SpawnedPodRegistry::new(runtime_dir.clone());
|
||||||
|
|
||||||
// Register event bridge callbacks on the worker
|
// Register event bridge callbacks on the worker
|
||||||
{
|
{
|
||||||
let worker = pod.worker_mut();
|
let worker = pod.worker_mut();
|
||||||
|
|
@ -209,24 +220,20 @@ impl PodController {
|
||||||
worker.register_tools(tools::builtin_tools(fs, tracker.clone()));
|
worker.register_tools(tools::builtin_tools(fs, tracker.clone()));
|
||||||
|
|
||||||
// Pod-orchestration tools (SpawnPod + the four comm tools)
|
// Pod-orchestration tools (SpawnPod + the four comm tools)
|
||||||
// share a single `SpawnedPodRegistry`: `SpawnPod` writes to
|
// share the Pod-scoped `SpawnedPodRegistry` hoisted above
|
||||||
// it, the others read/mutate. Wired here rather than in
|
// (also consumed by the main loop's `PodEvent` handler).
|
||||||
// `tools::builtin_tools` because these need Pod-scoped
|
|
||||||
// handles (this Pod's own socket path, runtime_dir, spawner
|
|
||||||
// name) that the generic tools crate has no access to.
|
|
||||||
let spawner_socket = runtime_dir.socket_path();
|
|
||||||
let spawned_registry = SpawnedPodRegistry::new(runtime_dir.clone());
|
|
||||||
worker.register_tool(spawn_pod_tool(
|
worker.register_tool(spawn_pod_tool(
|
||||||
spawner_name,
|
spawner_name.clone(),
|
||||||
spawner_socket,
|
spawner_socket.clone(),
|
||||||
runtime_base.to_path_buf(),
|
runtime_base.to_path_buf(),
|
||||||
pwd_for_tools,
|
pwd_for_tools,
|
||||||
spawned_registry.clone(),
|
spawned_registry.clone(),
|
||||||
|
self_parent_socket.clone(),
|
||||||
));
|
));
|
||||||
worker.register_tool(send_to_pod_tool(spawned_registry.clone()));
|
worker.register_tool(send_to_pod_tool(spawned_registry.clone()));
|
||||||
worker.register_tool(read_pod_output_tool(spawned_registry.clone()));
|
worker.register_tool(read_pod_output_tool(spawned_registry.clone()));
|
||||||
worker.register_tool(stop_pod_tool(spawned_registry.clone()));
|
worker.register_tool(stop_pod_tool(spawned_registry.clone()));
|
||||||
worker.register_tool(list_pods_tool(spawned_registry));
|
worker.register_tool(list_pods_tool(spawned_registry.clone()));
|
||||||
pod.attach_tracker(tracker);
|
pod.attach_tracker(tracker);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -266,6 +273,8 @@ impl PodController {
|
||||||
&cancel_tx,
|
&cancel_tx,
|
||||||
&shared_state,
|
&shared_state,
|
||||||
¬ification_buffer,
|
¬ification_buffer,
|
||||||
|
self_parent_socket.as_ref(),
|
||||||
|
&spawner_name,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
|
|
@ -292,8 +301,8 @@ impl PodController {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Method::Notify { source, message } => {
|
Method::Notify { message } => {
|
||||||
pod.push_notification(source, message);
|
pod.push_notification(message);
|
||||||
if shared_state.get_status() != PodStatus::Idle {
|
if shared_state.get_status() != PodStatus::Idle {
|
||||||
// RUNNING / Paused: the buffer push is the
|
// RUNNING / Paused: the buffer push is the
|
||||||
// entire operation; the in-flight turn (or
|
// entire operation; the in-flight turn (or
|
||||||
|
|
@ -313,6 +322,8 @@ impl PodController {
|
||||||
&cancel_tx,
|
&cancel_tx,
|
||||||
&shared_state,
|
&shared_state,
|
||||||
¬ification_buffer,
|
¬ification_buffer,
|
||||||
|
self_parent_socket.as_ref(),
|
||||||
|
&spawner_name,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
|
|
@ -357,6 +368,8 @@ impl PodController {
|
||||||
&cancel_tx,
|
&cancel_tx,
|
||||||
&shared_state,
|
&shared_state,
|
||||||
¬ification_buffer,
|
¬ification_buffer,
|
||||||
|
self_parent_socket.as_ref(),
|
||||||
|
&spawner_name,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
|
|
@ -398,9 +411,79 @@ impl PodController {
|
||||||
// GetHistory is handled at the socket layer (direct response).
|
// GetHistory is handled at the socket layer (direct response).
|
||||||
// If it somehow reaches the controller, ignore it.
|
// If it somehow reaches the controller, ignore it.
|
||||||
Method::GetHistory => {}
|
Method::GetHistory => {}
|
||||||
|
|
||||||
|
Method::PodEvent(event) => {
|
||||||
|
// (1) system side effects — idempotent and
|
||||||
|
// tolerant of out-of-order delivery (e.g.
|
||||||
|
// `TurnEnded` arriving after `ShutDown`).
|
||||||
|
crate::pod_events::apply_event_side_effects(
|
||||||
|
&event,
|
||||||
|
&spawned_registry,
|
||||||
|
&spawner_name,
|
||||||
|
&self_parent_socket,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
// (2) render a one-line summary and push it
|
||||||
|
// into the notification buffer; the next LLM
|
||||||
|
// request will inject it as a system message
|
||||||
|
// via `PodInterceptor::pre_llm_request`.
|
||||||
|
let text = crate::pod_events::render_event(&event);
|
||||||
|
pod.push_notification(text);
|
||||||
|
// Auto-kick a turn if the Pod is idle so the
|
||||||
|
// notification is not stranded. Matches the
|
||||||
|
// `Method::Notify` idle path.
|
||||||
|
if shared_state.get_status() == PodStatus::Idle {
|
||||||
|
shared_state.set_status(PodStatus::Running);
|
||||||
|
let _ = runtime_dir.write_status(&shared_state).await;
|
||||||
|
|
||||||
|
let (new_status, shutdown) = run_with_cancel_support(
|
||||||
|
pod.run_for_notification(),
|
||||||
|
&mut method_rx,
|
||||||
|
&event_tx,
|
||||||
|
&cancel_tx,
|
||||||
|
&shared_state,
|
||||||
|
¬ification_buffer,
|
||||||
|
self_parent_socket.as_ref(),
|
||||||
|
&spawner_name,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
if new_status == PodStatus::Idle {
|
||||||
|
if let Err(e) = pod.try_post_run_compact().await {
|
||||||
|
tracing::warn!(error = %e, "Post-run compaction error");
|
||||||
|
notifier.notify(
|
||||||
|
NotificationLevel::Warn,
|
||||||
|
NotificationSource::Compactor,
|
||||||
|
format!("post-run compaction error: {e}"),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let items = pod.worker().history().to_vec();
|
||||||
|
shared_state.update_history(items);
|
||||||
|
shared_state.set_status(new_status);
|
||||||
|
let _ = runtime_dir.write_status(&shared_state).await;
|
||||||
|
let _ = runtime_dir.write_history(&shared_state).await;
|
||||||
|
|
||||||
|
if shutdown {
|
||||||
|
let _ = event_tx.send(Event::Shutdown);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Report upward that this Pod is stopping before the
|
||||||
|
// controller task exits. Fire-and-forget; the parent may
|
||||||
|
// already be gone.
|
||||||
|
crate::pod_events::fire_and_forget(
|
||||||
|
self_parent_socket.clone(),
|
||||||
|
protocol::PodEvent::ShutDown {
|
||||||
|
pod_name: spawner_name.clone(),
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
let _ = shutdown_tx.send(());
|
let _ = shutdown_tx.send(());
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
@ -411,6 +494,12 @@ impl PodController {
|
||||||
/// Runs a Pod future while concurrently processing incoming methods.
|
/// Runs a Pod future while concurrently processing incoming methods.
|
||||||
///
|
///
|
||||||
/// Returns `(final_status, shutdown_requested)`.
|
/// Returns `(final_status, shutdown_requested)`.
|
||||||
|
///
|
||||||
|
/// `parent_socket` / `self_name` drive upward `PodEvent` reports
|
||||||
|
/// (`TurnEnded` on a clean Finished, `Errored` on a worker failure).
|
||||||
|
/// `None` parent skips the send (top-level Pod). Transient method
|
||||||
|
/// rejections such as `AlreadyRunning` are intentionally NOT reported
|
||||||
|
/// as `Errored` — only the worker-execution `Err` branch below fires.
|
||||||
async fn run_with_cancel_support<F>(
|
async fn run_with_cancel_support<F>(
|
||||||
pod_future: F,
|
pod_future: F,
|
||||||
method_rx: &mut mpsc::Receiver<Method>,
|
method_rx: &mut mpsc::Receiver<Method>,
|
||||||
|
|
@ -418,6 +507,8 @@ async fn run_with_cancel_support<F>(
|
||||||
cancel_tx: &mpsc::Sender<()>,
|
cancel_tx: &mpsc::Sender<()>,
|
||||||
shared_state: &Arc<PodSharedState>,
|
shared_state: &Arc<PodSharedState>,
|
||||||
notification_buffer: &NotificationBuffer,
|
notification_buffer: &NotificationBuffer,
|
||||||
|
parent_socket: Option<&std::path::PathBuf>,
|
||||||
|
self_name: &str,
|
||||||
) -> (PodStatus, bool)
|
) -> (PodStatus, bool)
|
||||||
where
|
where
|
||||||
F: std::future::Future<Output = Result<PodRunResult, PodError>>,
|
F: std::future::Future<Output = Result<PodRunResult, PodError>>,
|
||||||
|
|
@ -436,14 +527,30 @@ where
|
||||||
PodRunResult::LimitReached => (PodStatus::Idle, RunResult::LimitReached),
|
PodRunResult::LimitReached => (PodStatus::Idle, RunResult::LimitReached),
|
||||||
};
|
};
|
||||||
let _ = event_tx.send(Event::RunEnd { result: run_result });
|
let _ = event_tx.send(Event::RunEnd { result: run_result });
|
||||||
|
if matches!(run_result, RunResult::Finished) {
|
||||||
|
crate::pod_events::fire_and_forget(
|
||||||
|
parent_socket.cloned(),
|
||||||
|
protocol::PodEvent::TurnEnded {
|
||||||
|
pod_name: self_name.to_string(),
|
||||||
|
},
|
||||||
|
);
|
||||||
|
}
|
||||||
(status, shutdown_requested)
|
(status, shutdown_requested)
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
let code = worker_error_code(&e);
|
let code = worker_error_code(&e);
|
||||||
|
let message = e.to_string();
|
||||||
let _ = event_tx.send(Event::Error {
|
let _ = event_tx.send(Event::Error {
|
||||||
code,
|
code,
|
||||||
message: e.to_string(),
|
message: message.clone(),
|
||||||
});
|
});
|
||||||
|
crate::pod_events::fire_and_forget(
|
||||||
|
parent_socket.cloned(),
|
||||||
|
protocol::PodEvent::Errored {
|
||||||
|
pod_name: self_name.to_string(),
|
||||||
|
message,
|
||||||
|
},
|
||||||
|
);
|
||||||
(PodStatus::Idle, shutdown_requested)
|
(PodStatus::Idle, shutdown_requested)
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
@ -463,12 +570,22 @@ where
|
||||||
message: "Pod is already executing a turn".into(),
|
message: "Pod is already executing a turn".into(),
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
Some(Method::Notify { source, message }) => {
|
Some(Method::Notify { message }) => {
|
||||||
// Route into the buffer; the in-flight turn will
|
// Route into the buffer; the in-flight turn will
|
||||||
// drain it at its next pre_llm_request.
|
// drain it at its next pre_llm_request.
|
||||||
notification_buffer.push(source, message);
|
notification_buffer.push(message);
|
||||||
}
|
}
|
||||||
Some(Method::GetHistory) => {}
|
Some(Method::GetHistory) => {}
|
||||||
|
Some(Method::PodEvent(_)) => {
|
||||||
|
// PodEvent is handled in the main loop (next
|
||||||
|
// iteration). Dropping it here is fine because
|
||||||
|
// the sender is external and we will see it
|
||||||
|
// again via `method_rx` after the current turn
|
||||||
|
// ends — this arm only fires for concurrent
|
||||||
|
// arrivals during an in-flight turn, where the
|
||||||
|
// strict ordering guarantees that matter for
|
||||||
|
// PodEvent don't exist anyway (fire-and-forget).
|
||||||
|
}
|
||||||
None => {
|
None => {
|
||||||
let _ = cancel_tx.try_send(());
|
let _ = cancel_tx.try_send(());
|
||||||
shared_state.set_status(PodStatus::Idle);
|
shared_state.set_status(PodStatus::Idle);
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,7 @@ pub mod runtime_dir;
|
||||||
pub mod scope_lock;
|
pub mod scope_lock;
|
||||||
pub mod shared_state;
|
pub mod shared_state;
|
||||||
pub mod pod_comm_tools;
|
pub mod pod_comm_tools;
|
||||||
|
pub mod pod_events;
|
||||||
pub mod socket_server;
|
pub mod socket_server;
|
||||||
pub mod spawn_pod;
|
pub mod spawn_pod;
|
||||||
pub mod spawned_pod_registry;
|
pub mod spawned_pod_registry;
|
||||||
|
|
|
||||||
|
|
@ -17,7 +17,6 @@ const CAPACITY: usize = 128;
|
||||||
/// One pending notification awaiting injection into the next LLM request.
|
/// One pending notification awaiting injection into the next LLM request.
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct PendingNotification {
|
pub struct PendingNotification {
|
||||||
pub source: String,
|
|
||||||
pub message: String,
|
pub message: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -37,17 +36,17 @@ impl NotificationBuffer {
|
||||||
/// Push a notification onto the queue. If the queue is full, the
|
/// Push a notification onto the queue. If the queue is full, the
|
||||||
/// oldest entry is dropped and a `tracing::warn` is emitted — the
|
/// oldest entry is dropped and a `tracing::warn` is emitted — the
|
||||||
/// caller should never hit this in normal operation.
|
/// caller should never hit this in normal operation.
|
||||||
pub fn push(&self, source: String, message: String) {
|
pub fn push(&self, message: String) {
|
||||||
let mut q = self.inner.lock().expect("notification buffer poisoned");
|
let mut q = self.inner.lock().expect("notification buffer poisoned");
|
||||||
if q.len() >= CAPACITY {
|
if q.len() >= CAPACITY {
|
||||||
let dropped = q.pop_front();
|
let dropped = q.pop_front();
|
||||||
warn!(
|
warn!(
|
||||||
capacity = CAPACITY,
|
capacity = CAPACITY,
|
||||||
dropped_source = dropped.as_ref().map(|n| n.source.as_str()),
|
dropped_message = dropped.as_ref().map(|n| n.message.as_str()),
|
||||||
"notification buffer overflow; dropped oldest"
|
"notification buffer overflow; dropped oldest"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
q.push_back(PendingNotification { source, message });
|
q.push_back(PendingNotification { message });
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Remove and return all pending notifications in FIFO order.
|
/// Remove and return all pending notifications in FIFO order.
|
||||||
|
|
@ -73,11 +72,10 @@ impl NotificationBuffer {
|
||||||
/// that gets injected into the per-request context.
|
/// that gets injected into the per-request context.
|
||||||
pub(crate) fn format_notification(n: &PendingNotification) -> Item {
|
pub(crate) fn format_notification(n: &PendingNotification) -> Item {
|
||||||
let text = format!(
|
let text = format!(
|
||||||
"[Notification from {source}]\n{message}\n\n\
|
"[Notification]\n{message}\n\n\
|
||||||
This is a notification, not a blocking request. \
|
This is a notification, not a blocking request. \
|
||||||
If you are in the middle of a task, continue your current work \
|
If you are in the middle of a task, continue your current work \
|
||||||
and address this at a natural stopping point.",
|
and address this at a natural stopping point.",
|
||||||
source = n.source,
|
|
||||||
message = n.message,
|
message = n.message,
|
||||||
);
|
);
|
||||||
Item::system_message(text)
|
Item::system_message(text)
|
||||||
|
|
@ -90,12 +88,12 @@ mod tests {
|
||||||
#[test]
|
#[test]
|
||||||
fn push_then_drain_preserves_order() {
|
fn push_then_drain_preserves_order() {
|
||||||
let buf = NotificationBuffer::new();
|
let buf = NotificationBuffer::new();
|
||||||
buf.push("a".into(), "one".into());
|
buf.push("one".into());
|
||||||
buf.push("b".into(), "two".into());
|
buf.push("two".into());
|
||||||
let drained = buf.drain();
|
let drained = buf.drain();
|
||||||
assert_eq!(drained.len(), 2);
|
assert_eq!(drained.len(), 2);
|
||||||
assert_eq!(drained[0].source, "a");
|
assert_eq!(drained[0].message, "one");
|
||||||
assert_eq!(drained[1].source, "b");
|
assert_eq!(drained[1].message, "two");
|
||||||
assert!(buf.is_empty());
|
assert!(buf.is_empty());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -103,24 +101,23 @@ mod tests {
|
||||||
fn capacity_drops_oldest() {
|
fn capacity_drops_oldest() {
|
||||||
let buf = NotificationBuffer::new();
|
let buf = NotificationBuffer::new();
|
||||||
for i in 0..(CAPACITY + 5) {
|
for i in 0..(CAPACITY + 5) {
|
||||||
buf.push(format!("src{i}"), format!("msg{i}"));
|
buf.push(format!("msg{i}"));
|
||||||
}
|
}
|
||||||
let drained = buf.drain();
|
let drained = buf.drain();
|
||||||
assert_eq!(drained.len(), CAPACITY);
|
assert_eq!(drained.len(), CAPACITY);
|
||||||
// Oldest 5 were dropped; first retained is src5.
|
// Oldest 5 were dropped; first retained is msg5.
|
||||||
assert_eq!(drained[0].source, "src5");
|
assert_eq!(drained[0].message, "msg5");
|
||||||
assert_eq!(drained[CAPACITY - 1].source, format!("src{}", CAPACITY + 4));
|
assert_eq!(drained[CAPACITY - 1].message, format!("msg{}", CAPACITY + 4));
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn format_notification_includes_source_message_and_nonblocking_hint() {
|
fn format_notification_includes_message_and_nonblocking_hint() {
|
||||||
let n = PendingNotification {
|
let n = PendingNotification {
|
||||||
source: "child".into(),
|
|
||||||
message: "hello".into(),
|
message: "hello".into(),
|
||||||
};
|
};
|
||||||
let item = format_notification(&n);
|
let item = format_notification(&n);
|
||||||
let text = item.as_text().unwrap_or_default().to_string();
|
let text = item.as_text().unwrap_or_default().to_string();
|
||||||
assert!(text.contains("[Notification from child]"));
|
assert!(text.contains("[Notification]"));
|
||||||
assert!(text.contains("hello"));
|
assert!(text.contains("hello"));
|
||||||
assert!(text.contains("not a blocking request"));
|
assert!(text.contains("not a blocking request"));
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -115,11 +115,9 @@ pub struct Pod<C: LlmClient, St: Store> {
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
scope_allocation: Option<ScopeAllocationGuard>,
|
scope_allocation: Option<ScopeAllocationGuard>,
|
||||||
/// Socket path of the spawning Pod. `Some` only for Pods built via
|
/// Socket path of the spawning Pod. `Some` only for Pods built via
|
||||||
/// `from_manifest_spawned`. The callback is consumed by the
|
/// `from_manifest_spawned`. Consumed by the controller to fire
|
||||||
/// `pod-callback` layer (separate ticket) to deliver
|
/// `Method::PodEvent` reports upward (turn end, error, shutdown,
|
||||||
/// `Method::Notify` back to the spawner; stored here so the Pod
|
/// scope sub-delegation).
|
||||||
/// carries the reference for the duration of its life.
|
|
||||||
#[allow(dead_code)]
|
|
||||||
callback_socket: Option<PathBuf>,
|
callback_socket: Option<PathBuf>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -325,8 +323,8 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
||||||
/// The notification will be injected as an `Item::system_message`
|
/// The notification will be injected as an `Item::system_message`
|
||||||
/// into the next outgoing LLM request context (not into history).
|
/// into the next outgoing LLM request context (not into history).
|
||||||
/// See [`NotificationBuffer`] for overflow behaviour.
|
/// See [`NotificationBuffer`] for overflow behaviour.
|
||||||
pub fn push_notification(&self, source: String, message: String) {
|
pub fn push_notification(&self, message: String) {
|
||||||
self.pending_notifications.push(source, message);
|
self.pending_notifications.push(message);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Shared handle to the pending notification buffer.
|
/// Shared handle to the pending notification buffer.
|
||||||
|
|
@ -337,6 +335,15 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
||||||
self.pending_notifications.clone()
|
self.pending_notifications.clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Parent callback socket set by `from_manifest_spawned`.
|
||||||
|
///
|
||||||
|
/// Consumed by the Controller to fire `Method::PodEvent` upward on
|
||||||
|
/// lifecycle transitions. `None` for top-level Pods, in which case
|
||||||
|
/// the Controller silently skips the send.
|
||||||
|
pub fn callback_socket(&self) -> Option<&PathBuf> {
|
||||||
|
self.callback_socket.as_ref()
|
||||||
|
}
|
||||||
|
|
||||||
// --- Hook registration ---
|
// --- Hook registration ---
|
||||||
|
|
||||||
fn assert_hooks_open(&self) {
|
fn assert_hooks_open(&self) {
|
||||||
|
|
|
||||||
|
|
@ -325,7 +325,7 @@ fn unknown_pod_err(name: &str) -> ToolError {
|
||||||
/// Connect with a timeout, write one `Method` line, flush, and close.
|
/// Connect with a timeout, write one `Method` line, flush, and close.
|
||||||
/// Any socket error maps to an `io::Error`; the caller decides whether
|
/// Any socket error maps to an `io::Error`; the caller decides whether
|
||||||
/// to surface it to the LLM or treat it as "pod stopped".
|
/// to surface it to the LLM or treat it as "pod stopped".
|
||||||
async fn connect_and_send(socket: &Path, method: &Method) -> std::io::Result<()> {
|
pub(crate) async fn connect_and_send(socket: &Path, method: &Method) -> std::io::Result<()> {
|
||||||
let stream = tokio::time::timeout(SOCKET_OP_TIMEOUT, UnixStream::connect(socket))
|
let stream = tokio::time::timeout(SOCKET_OP_TIMEOUT, UnixStream::connect(socket))
|
||||||
.await
|
.await
|
||||||
.map_err(|_| std::io::Error::new(std::io::ErrorKind::TimedOut, "connect timed out"))??;
|
.map_err(|_| std::io::Error::new(std::io::ErrorKind::TimedOut, "connect timed out"))??;
|
||||||
|
|
|
||||||
187
crates/pod/src/pod_events.rs
Normal file
187
crates/pod/src/pod_events.rs
Normal file
|
|
@ -0,0 +1,187 @@
|
||||||
|
//! `PodEvent` send / receive helpers.
|
||||||
|
//!
|
||||||
|
//! This module owns the parent-facing lifecycle-event primitive
|
||||||
|
//! (`PodEvent`) that children fire upward on turn-end / error /
|
||||||
|
//! shutdown / scope-sub-delegation. Three responsibilities live here:
|
||||||
|
//!
|
||||||
|
//! - **Send** a `Method::PodEvent` to the parent socket, fire-and-forget,
|
||||||
|
//! logging failures without blocking the child.
|
||||||
|
//! - **Render** a variant into a human-readable string that the parent's
|
||||||
|
//! LLM sees via the notification buffer.
|
||||||
|
//! - **Apply side effects** on the parent (registry / scope-lock
|
||||||
|
//! updates) so that the receive path is idempotent and tolerant of
|
||||||
|
//! out-of-order delivery.
|
||||||
|
//!
|
||||||
|
//! Transport is fire-and-forget — the ticket's decision is that
|
||||||
|
//! callbacks are an optimisation and `ListPods` + `reclaim_stale` are
|
||||||
|
//! the real fallback. This module is allowed to drop events on the
|
||||||
|
//! floor (with a warn log) rather than retry.
|
||||||
|
//!
|
||||||
|
//! `apply_event_side_effects` takes its dependencies (registry, scope
|
||||||
|
//! lock path, self identity) by reference so the caller owns lifetime
|
||||||
|
//! and locking concerns.
|
||||||
|
|
||||||
|
use std::path::{Path, PathBuf};
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use protocol::{Method, PodEvent, ScopeRule};
|
||||||
|
|
||||||
|
use crate::pod_comm_tools::connect_and_send;
|
||||||
|
use crate::runtime_dir::SpawnedPodRecord;
|
||||||
|
use crate::scope_lock::{self, ScopeLockError};
|
||||||
|
use crate::spawned_pod_registry::SpawnedPodRegistry;
|
||||||
|
|
||||||
|
/// Connect to `socket`, send a single `Method::PodEvent(event)`, and
|
||||||
|
/// return. Used by children to report up to their parent.
|
||||||
|
///
|
||||||
|
/// This is a synchronous helper — callers that want fire-and-forget
|
||||||
|
/// semantics should wrap the call in `tokio::spawn` themselves.
|
||||||
|
pub async fn send_pod_event(socket: &Path, event: PodEvent) -> std::io::Result<()> {
|
||||||
|
connect_and_send(socket, &Method::PodEvent(event)).await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Spawn a fire-and-forget task that sends `event` to `socket`. If
|
||||||
|
/// `socket` is `None`, no send happens (top-level Pods have no parent).
|
||||||
|
/// Any send failure is logged at warn level but otherwise ignored —
|
||||||
|
/// the parent is treated as best-effort.
|
||||||
|
pub fn fire_and_forget(socket: Option<PathBuf>, event: PodEvent) {
|
||||||
|
let Some(socket) = socket else { return };
|
||||||
|
tokio::spawn(async move {
|
||||||
|
if let Err(e) = send_pod_event(&socket, event).await {
|
||||||
|
tracing::warn!(error = %e, socket = %socket.display(), "PodEvent send failed");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Render a variant into the one-line human-readable string that will
|
||||||
|
/// be injected into the parent's LLM context as a system message.
|
||||||
|
///
|
||||||
|
/// Kept deliberately short — the LLM can always call `ReadPodOutput`
|
||||||
|
/// to fetch more detail if the event summary is not enough.
|
||||||
|
pub fn render_event(event: &PodEvent) -> String {
|
||||||
|
match event {
|
||||||
|
PodEvent::TurnEnded { pod_name } => {
|
||||||
|
format!("Pod `{pod_name}` finished a turn.")
|
||||||
|
}
|
||||||
|
PodEvent::Errored { pod_name, message } => {
|
||||||
|
format!("Pod `{pod_name}` reported an error: {message}")
|
||||||
|
}
|
||||||
|
PodEvent::ShutDown { pod_name } => {
|
||||||
|
format!("Pod `{pod_name}` has stopped.")
|
||||||
|
}
|
||||||
|
PodEvent::ScopeSubDelegated {
|
||||||
|
parent_pod,
|
||||||
|
sub_pod,
|
||||||
|
..
|
||||||
|
} => {
|
||||||
|
format!("Pod `{parent_pod}` spawned `{sub_pod}` and delegated scope to it.")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Apply the variant-specific side effect on the parent side.
|
||||||
|
///
|
||||||
|
/// All operations are idempotent so that out-of-order delivery (e.g.
|
||||||
|
/// `TurnEnded` arriving after `ShutDown`) does not produce errors:
|
||||||
|
///
|
||||||
|
/// - `TurnEnded` / `Errored`: no system work; the LLM handles the
|
||||||
|
/// semantic response.
|
||||||
|
/// - `ShutDown`: remove the child from `spawned_pods.json` and release
|
||||||
|
/// its scope allocation. Missing entries are swallowed.
|
||||||
|
/// - `ScopeSubDelegated`: register the grandchild locally and re-emit
|
||||||
|
/// upward to our own parent if we have one. Duplicate grandchild
|
||||||
|
/// entries (re-delivery) are swallowed.
|
||||||
|
pub async fn apply_event_side_effects(
|
||||||
|
event: &PodEvent,
|
||||||
|
registry: &Arc<SpawnedPodRegistry>,
|
||||||
|
self_name: &str,
|
||||||
|
self_parent_socket: &Option<PathBuf>,
|
||||||
|
) {
|
||||||
|
match event {
|
||||||
|
PodEvent::TurnEnded { .. } | PodEvent::Errored { .. } => {}
|
||||||
|
|
||||||
|
PodEvent::ShutDown { pod_name } => {
|
||||||
|
if let Err(e) = registry.remove(pod_name).await {
|
||||||
|
tracing::warn!(error = %e, pod = %pod_name, "registry remove on ShutDown failed");
|
||||||
|
}
|
||||||
|
release_scope_silently(pod_name);
|
||||||
|
}
|
||||||
|
|
||||||
|
PodEvent::ScopeSubDelegated {
|
||||||
|
parent_pod,
|
||||||
|
sub_pod,
|
||||||
|
sub_socket,
|
||||||
|
scope,
|
||||||
|
} => {
|
||||||
|
if registry.get(sub_pod).await.is_some() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
let callback_address = registry
|
||||||
|
.get(parent_pod)
|
||||||
|
.await
|
||||||
|
.map(|r| r.socket_path)
|
||||||
|
.unwrap_or_else(PathBuf::new);
|
||||||
|
let record = SpawnedPodRecord {
|
||||||
|
pod_name: sub_pod.clone(),
|
||||||
|
socket_path: sub_socket.clone(),
|
||||||
|
scope_delegated: scope.clone(),
|
||||||
|
callback_address,
|
||||||
|
};
|
||||||
|
if let Err(e) = registry.add(record).await {
|
||||||
|
tracing::warn!(
|
||||||
|
error = %e,
|
||||||
|
sub_pod = %sub_pod,
|
||||||
|
"registry add on ScopeSubDelegated failed"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
reemit_scope_sub_delegated(
|
||||||
|
self_parent_socket,
|
||||||
|
self_name,
|
||||||
|
sub_pod.clone(),
|
||||||
|
sub_socket.clone(),
|
||||||
|
scope.clone(),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn release_scope_silently(pod_name: &str) {
|
||||||
|
let lock_path = match scope_lock::default_lock_path() {
|
||||||
|
Ok(p) => p,
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!(error = %e, "default_lock_path failed");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let mut guard = match scope_lock::LockFileGuard::open(&lock_path) {
|
||||||
|
Ok(g) => g,
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!(error = %e, "LockFileGuard open failed");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
match scope_lock::release_pod(&mut guard, pod_name) {
|
||||||
|
Ok(()) => {}
|
||||||
|
Err(ScopeLockError::UnknownPod(_)) => {}
|
||||||
|
Err(e) => tracing::warn!(error = ?e, pod = %pod_name, "release_pod failed"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn reemit_scope_sub_delegated(
|
||||||
|
self_parent_socket: &Option<PathBuf>,
|
||||||
|
self_name: &str,
|
||||||
|
sub_pod: String,
|
||||||
|
sub_socket: PathBuf,
|
||||||
|
scope: Vec<ScopeRule>,
|
||||||
|
) {
|
||||||
|
let Some(parent_socket) = self_parent_socket.clone() else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
let event = PodEvent::ScopeSubDelegated {
|
||||||
|
parent_pod: self_name.to_string(),
|
||||||
|
sub_pod,
|
||||||
|
sub_socket,
|
||||||
|
scope,
|
||||||
|
};
|
||||||
|
fire_and_forget(Some(parent_socket), event);
|
||||||
|
}
|
||||||
|
|
@ -292,8 +292,8 @@ mod tests {
|
||||||
async fn pre_llm_request_drains_pending_notifications_into_context() {
|
async fn pre_llm_request_drains_pending_notifications_into_context() {
|
||||||
let registry = Arc::new(HookRegistryBuilder::new().build());
|
let registry = Arc::new(HookRegistryBuilder::new().build());
|
||||||
let buffer = NotificationBuffer::new();
|
let buffer = NotificationBuffer::new();
|
||||||
buffer.push("child-a".into(), "first".into());
|
buffer.push("first".into());
|
||||||
buffer.push("child-b".into(), "second".into());
|
buffer.push("second".into());
|
||||||
|
|
||||||
let interceptor = PodInterceptor::new(registry, None, buffer.clone());
|
let interceptor = PodInterceptor::new(registry, None, buffer.clone());
|
||||||
let mut ctx: Vec<Item> = vec![Item::user_message("hi")];
|
let mut ctx: Vec<Item> = vec![Item::user_message("hi")];
|
||||||
|
|
@ -304,9 +304,9 @@ mod tests {
|
||||||
assert_eq!(ctx.len(), 3);
|
assert_eq!(ctx.len(), 3);
|
||||||
let second = ctx[1].as_text().unwrap_or_default();
|
let second = ctx[1].as_text().unwrap_or_default();
|
||||||
let third = ctx[2].as_text().unwrap_or_default();
|
let third = ctx[2].as_text().unwrap_or_default();
|
||||||
assert!(second.contains("[Notification from child-a]"));
|
assert!(second.contains("[Notification]"));
|
||||||
assert!(second.contains("first"));
|
assert!(second.contains("first"));
|
||||||
assert!(third.contains("[Notification from child-b]"));
|
assert!(third.contains("[Notification]"));
|
||||||
assert!(third.contains("second"));
|
assert!(third.contains("second"));
|
||||||
// Buffer is drained after a single pre_llm_request call.
|
// Buffer is drained after a single pre_llm_request call.
|
||||||
assert!(buffer.is_empty());
|
assert!(buffer.is_empty());
|
||||||
|
|
@ -318,7 +318,7 @@ mod tests {
|
||||||
// the next pre_llm_request (after compaction + resume).
|
// the next pre_llm_request (after compaction + resume).
|
||||||
let registry = Arc::new(HookRegistryBuilder::new().build());
|
let registry = Arc::new(HookRegistryBuilder::new().build());
|
||||||
let buffer = NotificationBuffer::new();
|
let buffer = NotificationBuffer::new();
|
||||||
buffer.push("src".into(), "msg".into());
|
buffer.push("msg".into());
|
||||||
|
|
||||||
let state = Arc::new(CompactState::new(100, 2));
|
let state = Arc::new(CompactState::new(100, 2));
|
||||||
state.update_input_tokens(200);
|
state.update_input_tokens(200);
|
||||||
|
|
|
||||||
|
|
@ -23,9 +23,11 @@ use tokio::net::UnixStream;
|
||||||
use tokio::process::Command;
|
use tokio::process::Command;
|
||||||
use tokio::time::sleep;
|
use tokio::time::sleep;
|
||||||
|
|
||||||
|
use crate::pod_events;
|
||||||
use crate::runtime_dir::SpawnedPodRecord;
|
use crate::runtime_dir::SpawnedPodRecord;
|
||||||
use crate::scope_lock::{self, LockFileGuard, ScopeLockError};
|
use crate::scope_lock::{self, LockFileGuard, ScopeLockError};
|
||||||
use crate::spawned_pod_registry::SpawnedPodRegistry;
|
use crate::spawned_pod_registry::SpawnedPodRegistry;
|
||||||
|
use protocol::PodEvent;
|
||||||
|
|
||||||
const DESCRIPTION: &str = "Spawn a new Pod process to work on a delegated task. \
|
const DESCRIPTION: &str = "Spawn a new Pod process to work on a delegated task. \
|
||||||
The spawner's write scope is reduced by the scope passed here; the spawned \
|
The spawner's write scope is reduced by the scope passed here; the spawned \
|
||||||
|
|
@ -93,7 +95,7 @@ pub struct SpawnPodTool {
|
||||||
/// `delegated_from` in the scope-lock registry.
|
/// `delegated_from` in the scope-lock registry.
|
||||||
spawner_name: String,
|
spawner_name: String,
|
||||||
/// Path to the spawner's Unix socket. Handed to the child via
|
/// Path to the spawner's Unix socket. Handed to the child via
|
||||||
/// `--callback` so `Method::Notify` has somewhere to land.
|
/// `--callback` so its `PodEvent` callbacks have somewhere to land.
|
||||||
callback_socket: PathBuf,
|
callback_socket: PathBuf,
|
||||||
/// Root of the `$XDG_RUNTIME_DIR/insomnia/` tree, used to predict
|
/// Root of the `$XDG_RUNTIME_DIR/insomnia/` tree, used to predict
|
||||||
/// the spawned Pod's socket path before the child has bound it.
|
/// the spawned Pod's socket path before the child has bound it.
|
||||||
|
|
@ -105,6 +107,12 @@ pub struct SpawnPodTool {
|
||||||
/// pod-comm tools (`SendToPod` / `ReadPodOutput` / `StopPod` /
|
/// pod-comm tools (`SendToPod` / `ReadPodOutput` / `StopPod` /
|
||||||
/// `ListPods`). Writes the list to `spawned_pods.json` on each add.
|
/// `ListPods`). Writes the list to `spawned_pods.json` on each add.
|
||||||
registry: Arc<SpawnedPodRegistry>,
|
registry: Arc<SpawnedPodRegistry>,
|
||||||
|
/// THIS Pod's own parent-callback socket, if any. After a
|
||||||
|
/// successful spawn we fire `PodEvent::ScopeSubDelegated` upward
|
||||||
|
/// so the grandparent can register the grandchild directly.
|
||||||
|
/// `None` for top-level Pods — in that case the re-emission is a
|
||||||
|
/// no-op.
|
||||||
|
parent_socket: Option<PathBuf>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SpawnPodTool {
|
impl SpawnPodTool {
|
||||||
|
|
@ -114,6 +122,7 @@ impl SpawnPodTool {
|
||||||
runtime_base: PathBuf,
|
runtime_base: PathBuf,
|
||||||
spawner_pwd: PathBuf,
|
spawner_pwd: PathBuf,
|
||||||
registry: Arc<SpawnedPodRegistry>,
|
registry: Arc<SpawnedPodRegistry>,
|
||||||
|
parent_socket: Option<PathBuf>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
spawner_name,
|
spawner_name,
|
||||||
|
|
@ -121,6 +130,7 @@ impl SpawnPodTool {
|
||||||
runtime_base,
|
runtime_base,
|
||||||
spawner_pwd,
|
spawner_pwd,
|
||||||
registry,
|
registry,
|
||||||
|
parent_socket,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -197,7 +207,7 @@ impl Tool for SpawnPodTool {
|
||||||
let record = SpawnedPodRecord {
|
let record = SpawnedPodRecord {
|
||||||
pod_name: input.name.clone(),
|
pod_name: input.name.clone(),
|
||||||
socket_path: predicted_socket.clone(),
|
socket_path: predicted_socket.clone(),
|
||||||
scope_delegated: scope_allow,
|
scope_delegated: scope_allow.clone(),
|
||||||
callback_address: self.callback_socket.clone(),
|
callback_address: self.callback_socket.clone(),
|
||||||
};
|
};
|
||||||
self.registry
|
self.registry
|
||||||
|
|
@ -205,6 +215,19 @@ impl Tool for SpawnPodTool {
|
||||||
.await
|
.await
|
||||||
.map_err(|e| ToolError::ExecutionFailed(format!("write spawned_pods.json: {e}")))?;
|
.map_err(|e| ToolError::ExecutionFailed(format!("write spawned_pods.json: {e}")))?;
|
||||||
|
|
||||||
|
// Notify this Pod's own parent so the grandparent can register
|
||||||
|
// the new grandchild directly. Fire-and-forget; top-level Pods
|
||||||
|
// (with no parent) skip the send inside `fire_and_forget`.
|
||||||
|
pod_events::fire_and_forget(
|
||||||
|
self.parent_socket.clone(),
|
||||||
|
PodEvent::ScopeSubDelegated {
|
||||||
|
parent_pod: self.spawner_name.clone(),
|
||||||
|
sub_pod: input.name.clone(),
|
||||||
|
sub_socket: predicted_socket.clone(),
|
||||||
|
scope: scope_allow,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
Ok(ToolOutput {
|
Ok(ToolOutput {
|
||||||
summary: format!(
|
summary: format!(
|
||||||
"spawned pod `{}` listening on {}",
|
"spawned pod `{}` listening on {}",
|
||||||
|
|
@ -362,6 +385,7 @@ pub fn spawn_pod_tool(
|
||||||
runtime_base: PathBuf,
|
runtime_base: PathBuf,
|
||||||
spawner_pwd: PathBuf,
|
spawner_pwd: PathBuf,
|
||||||
registry: Arc<SpawnedPodRegistry>,
|
registry: Arc<SpawnedPodRegistry>,
|
||||||
|
parent_socket: Option<PathBuf>,
|
||||||
) -> ToolDefinition {
|
) -> ToolDefinition {
|
||||||
Arc::new(move || {
|
Arc::new(move || {
|
||||||
let schema = schemars::schema_for!(SpawnPodInput);
|
let schema = schemars::schema_for!(SpawnPodInput);
|
||||||
|
|
@ -375,6 +399,7 @@ pub fn spawn_pod_tool(
|
||||||
runtime_base.clone(),
|
runtime_base.clone(),
|
||||||
spawner_pwd.clone(),
|
spawner_pwd.clone(),
|
||||||
registry.clone(),
|
registry.clone(),
|
||||||
|
parent_socket.clone(),
|
||||||
));
|
));
|
||||||
(meta, tool)
|
(meta, tool)
|
||||||
})
|
})
|
||||||
|
|
|
||||||
|
|
@ -346,7 +346,6 @@ async fn notify_while_idle_auto_starts_turn_and_injects_system_message() {
|
||||||
|
|
||||||
handle
|
handle
|
||||||
.send(Method::Notify {
|
.send(Method::Notify {
|
||||||
source: "child-a".into(),
|
|
||||||
message: "turn finished".into(),
|
message: "turn finished".into(),
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
|
|
@ -384,7 +383,7 @@ async fn notify_while_idle_auto_starts_turn_and_injects_system_message() {
|
||||||
.unwrap_or_default()
|
.unwrap_or_default()
|
||||||
.to_string();
|
.to_string();
|
||||||
assert!(
|
assert!(
|
||||||
last_item_text.contains("[Notification from child-a]"),
|
last_item_text.contains("[Notification]"),
|
||||||
"injected system message missing, got: {last_item_text:?}"
|
"injected system message missing, got: {last_item_text:?}"
|
||||||
);
|
);
|
||||||
assert!(last_item_text.contains("turn finished"));
|
assert!(last_item_text.contains("turn finished"));
|
||||||
|
|
@ -406,7 +405,6 @@ async fn notify_while_running_does_not_emit_already_running_error() {
|
||||||
.unwrap();
|
.unwrap();
|
||||||
handle
|
handle
|
||||||
.send(Method::Notify {
|
.send(Method::Notify {
|
||||||
source: "child".into(),
|
|
||||||
message: "ping".into(),
|
message: "ping".into(),
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
|
|
|
||||||
366
crates/pod/tests/pod_events_test.rs
Normal file
366
crates/pod/tests/pod_events_test.rs
Normal file
|
|
@ -0,0 +1,366 @@
|
||||||
|
//! Integration tests for the `PodEvent` send / receive primitive.
|
||||||
|
//!
|
||||||
|
//! These tests drive `pod_events::fire_and_forget` and
|
||||||
|
//! `pod_events::apply_event_side_effects` directly — the full
|
||||||
|
//! Controller wiring is exercised by the existing controller /
|
||||||
|
//! spawn-pod tests, which rely on the same primitives.
|
||||||
|
|
||||||
|
use std::path::PathBuf;
|
||||||
|
use std::sync::{Arc, LazyLock, Mutex};
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use pod::pod_events::{apply_event_side_effects, fire_and_forget, render_event};
|
||||||
|
use pod::runtime_dir::{RuntimeDir, SpawnedPodRecord};
|
||||||
|
use pod::scope_lock::{self, LockFileGuard};
|
||||||
|
use pod::spawned_pod_registry::SpawnedPodRegistry;
|
||||||
|
use protocol::stream::JsonLineReader;
|
||||||
|
use protocol::{Method, Permission, PodEvent, ScopeRule};
|
||||||
|
use tempfile::TempDir;
|
||||||
|
use tokio::net::UnixListener;
|
||||||
|
|
||||||
|
/// Serialises tests that mutate `INSOMNIA_SCOPE_LOCK`.
|
||||||
|
static ENV_LOCK: LazyLock<Mutex<()>> = LazyLock::new(|| Mutex::new(()));
|
||||||
|
|
||||||
|
struct EnvGuard {
|
||||||
|
_lock: std::sync::MutexGuard<'static, ()>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl EnvGuard {
|
||||||
|
fn acquire() -> Self {
|
||||||
|
Self {
|
||||||
|
_lock: ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_scope_lock_path(path: &std::path::Path) {
|
||||||
|
unsafe {
|
||||||
|
std::env::set_var("INSOMNIA_SCOPE_LOCK", path);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn clear_scope_lock_path() {
|
||||||
|
unsafe {
|
||||||
|
std::env::remove_var("INSOMNIA_SCOPE_LOCK");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Accept a single connection, read one `Method`, and return it.
|
||||||
|
fn accept_one_method(
|
||||||
|
listener: UnixListener,
|
||||||
|
) -> tokio::task::JoinHandle<Option<Method>> {
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let (stream, _) = listener.accept().await.ok()?;
|
||||||
|
let (reader, _writer) = stream.into_split();
|
||||||
|
let mut r = JsonLineReader::new(reader);
|
||||||
|
r.next::<Method>().await.ok().flatten()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn render_event_all_variants_mention_pod_name() {
|
||||||
|
let t1 = render_event(&PodEvent::TurnEnded {
|
||||||
|
pod_name: "alpha".into(),
|
||||||
|
});
|
||||||
|
assert!(t1.contains("alpha"), "{t1}");
|
||||||
|
|
||||||
|
let t2 = render_event(&PodEvent::Errored {
|
||||||
|
pod_name: "bravo".into(),
|
||||||
|
message: "boom".into(),
|
||||||
|
});
|
||||||
|
assert!(t2.contains("bravo") && t2.contains("boom"), "{t2}");
|
||||||
|
|
||||||
|
let t3 = render_event(&PodEvent::ShutDown {
|
||||||
|
pod_name: "charlie".into(),
|
||||||
|
});
|
||||||
|
assert!(t3.contains("charlie"), "{t3}");
|
||||||
|
|
||||||
|
let t4 = render_event(&PodEvent::ScopeSubDelegated {
|
||||||
|
parent_pod: "delta".into(),
|
||||||
|
sub_pod: "echo".into(),
|
||||||
|
sub_socket: "/tmp/sock".into(),
|
||||||
|
scope: vec![],
|
||||||
|
});
|
||||||
|
assert!(t4.contains("delta") && t4.contains("echo"), "{t4}");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn fire_and_forget_delivers_pod_event_to_listener() {
|
||||||
|
let dir = TempDir::new().unwrap();
|
||||||
|
let socket_path = dir.path().join("parent.sock");
|
||||||
|
let listener = UnixListener::bind(&socket_path).unwrap();
|
||||||
|
let received = accept_one_method(listener);
|
||||||
|
|
||||||
|
fire_and_forget(
|
||||||
|
Some(socket_path.clone()),
|
||||||
|
PodEvent::TurnEnded {
|
||||||
|
pod_name: "child".into(),
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
let method = tokio::time::timeout(Duration::from_secs(2), received)
|
||||||
|
.await
|
||||||
|
.expect("send timed out")
|
||||||
|
.unwrap()
|
||||||
|
.expect("no method received");
|
||||||
|
match method {
|
||||||
|
Method::PodEvent(PodEvent::TurnEnded { pod_name }) => assert_eq!(pod_name, "child"),
|
||||||
|
other => panic!("expected TurnEnded, got {other:?}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn fire_and_forget_with_none_socket_is_noop() {
|
||||||
|
// Nothing binds and nothing listens; the call must not panic and
|
||||||
|
// must not leak a task that never completes.
|
||||||
|
fire_and_forget(
|
||||||
|
None,
|
||||||
|
PodEvent::ShutDown {
|
||||||
|
pod_name: "x".into(),
|
||||||
|
},
|
||||||
|
);
|
||||||
|
// Yield once so any accidentally-spawned task would surface.
|
||||||
|
tokio::time::sleep(Duration::from_millis(50)).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Build a registry backed by a fresh runtime dir.
|
||||||
|
async fn fresh_registry(runtime_base: &std::path::Path, pod_name: &str) -> Arc<SpawnedPodRegistry> {
|
||||||
|
let rd = RuntimeDir::create(runtime_base, pod_name).await.unwrap();
|
||||||
|
SpawnedPodRegistry::new(Arc::new(rd))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn apply_shutdown_removes_from_registry_and_tolerates_missing() {
|
||||||
|
let _env = EnvGuard::acquire();
|
||||||
|
let scope_dir = TempDir::new().unwrap();
|
||||||
|
set_scope_lock_path(&scope_dir.path().join("scope.lock"));
|
||||||
|
|
||||||
|
let runtime_base = TempDir::new().unwrap();
|
||||||
|
let registry = fresh_registry(runtime_base.path(), "parent").await;
|
||||||
|
|
||||||
|
// Seed a child record; then ShutDown for it should remove it.
|
||||||
|
registry
|
||||||
|
.add(SpawnedPodRecord {
|
||||||
|
pod_name: "child".into(),
|
||||||
|
socket_path: "/tmp/child.sock".into(),
|
||||||
|
scope_delegated: vec![],
|
||||||
|
callback_address: "/tmp/parent.sock".into(),
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let event = PodEvent::ShutDown {
|
||||||
|
pod_name: "child".into(),
|
||||||
|
};
|
||||||
|
apply_event_side_effects(&event, ®istry, "parent", &None).await;
|
||||||
|
assert!(registry.get("child").await.is_none());
|
||||||
|
|
||||||
|
// Second ShutDown for the same (now-missing) child must be a no-op,
|
||||||
|
// not an error — this is the idempotency guarantee for out-of-order
|
||||||
|
// delivery.
|
||||||
|
apply_event_side_effects(&event, ®istry, "parent", &None).await;
|
||||||
|
assert!(registry.get("child").await.is_none());
|
||||||
|
|
||||||
|
clear_scope_lock_path();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn apply_scope_sub_delegated_adds_grandchild_then_duplicate_is_noop() {
|
||||||
|
let _env = EnvGuard::acquire();
|
||||||
|
let scope_dir = TempDir::new().unwrap();
|
||||||
|
set_scope_lock_path(&scope_dir.path().join("scope.lock"));
|
||||||
|
|
||||||
|
let runtime_base = TempDir::new().unwrap();
|
||||||
|
let registry = fresh_registry(runtime_base.path(), "grandparent").await;
|
||||||
|
|
||||||
|
// Seed the intermediate child so callback_address lookup succeeds.
|
||||||
|
registry
|
||||||
|
.add(SpawnedPodRecord {
|
||||||
|
pod_name: "child".into(),
|
||||||
|
socket_path: "/tmp/child.sock".into(),
|
||||||
|
scope_delegated: vec![],
|
||||||
|
callback_address: "/tmp/grandparent.sock".into(),
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let event = PodEvent::ScopeSubDelegated {
|
||||||
|
parent_pod: "child".into(),
|
||||||
|
sub_pod: "grandchild".into(),
|
||||||
|
sub_socket: "/tmp/grandchild.sock".into(),
|
||||||
|
scope: vec![ScopeRule {
|
||||||
|
target: scope_dir.path().to_path_buf(),
|
||||||
|
permission: Permission::Write,
|
||||||
|
recursive: true,
|
||||||
|
}],
|
||||||
|
};
|
||||||
|
|
||||||
|
apply_event_side_effects(&event, ®istry, "grandparent", &None).await;
|
||||||
|
let gc = registry
|
||||||
|
.get("grandchild")
|
||||||
|
.await
|
||||||
|
.expect("grandchild missing after ScopeSubDelegated");
|
||||||
|
assert_eq!(gc.socket_path, PathBuf::from("/tmp/grandchild.sock"));
|
||||||
|
assert_eq!(gc.callback_address, PathBuf::from("/tmp/child.sock"));
|
||||||
|
|
||||||
|
// Duplicate delivery must not error and must not overwrite.
|
||||||
|
apply_event_side_effects(&event, ®istry, "grandparent", &None).await;
|
||||||
|
let gc2 = registry.get("grandchild").await.unwrap();
|
||||||
|
assert_eq!(gc2.socket_path, PathBuf::from("/tmp/grandchild.sock"));
|
||||||
|
|
||||||
|
clear_scope_lock_path();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn apply_scope_sub_delegated_reemits_to_own_parent() {
|
||||||
|
let _env = EnvGuard::acquire();
|
||||||
|
let scope_dir = TempDir::new().unwrap();
|
||||||
|
set_scope_lock_path(&scope_dir.path().join("scope.lock"));
|
||||||
|
|
||||||
|
let runtime_base = TempDir::new().unwrap();
|
||||||
|
let registry = fresh_registry(runtime_base.path(), "B").await;
|
||||||
|
|
||||||
|
// Bind a listener at "A's" socket so we can watch the re-emission
|
||||||
|
// climb one level up the tree.
|
||||||
|
let sock_dir = TempDir::new().unwrap();
|
||||||
|
let a_socket = sock_dir.path().join("A.sock");
|
||||||
|
let listener = UnixListener::bind(&a_socket).unwrap();
|
||||||
|
let received = accept_one_method(listener);
|
||||||
|
|
||||||
|
// Seed the child record that the event claims spawned the grandchild.
|
||||||
|
registry
|
||||||
|
.add(SpawnedPodRecord {
|
||||||
|
pod_name: "C".into(),
|
||||||
|
socket_path: "/tmp/C.sock".into(),
|
||||||
|
scope_delegated: vec![],
|
||||||
|
callback_address: "/tmp/B.sock".into(),
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let event = PodEvent::ScopeSubDelegated {
|
||||||
|
parent_pod: "C".into(),
|
||||||
|
sub_pod: "D".into(),
|
||||||
|
sub_socket: "/tmp/D.sock".into(),
|
||||||
|
scope: vec![],
|
||||||
|
};
|
||||||
|
|
||||||
|
// Self is B, and B's parent socket is A's listener.
|
||||||
|
apply_event_side_effects(&event, ®istry, "B", &Some(a_socket.clone())).await;
|
||||||
|
|
||||||
|
// A must see the re-emission with parent_pod set to "B" (the
|
||||||
|
// sender from A's perspective), not "C" (the original sender's
|
||||||
|
// local view).
|
||||||
|
let method = tokio::time::timeout(Duration::from_secs(2), received)
|
||||||
|
.await
|
||||||
|
.expect("re-emission timed out")
|
||||||
|
.unwrap()
|
||||||
|
.expect("no method received on A's socket");
|
||||||
|
match method {
|
||||||
|
Method::PodEvent(PodEvent::ScopeSubDelegated {
|
||||||
|
parent_pod,
|
||||||
|
sub_pod,
|
||||||
|
..
|
||||||
|
}) => {
|
||||||
|
assert_eq!(parent_pod, "B");
|
||||||
|
assert_eq!(sub_pod, "D");
|
||||||
|
}
|
||||||
|
other => panic!("expected re-emitted ScopeSubDelegated, got {other:?}"),
|
||||||
|
}
|
||||||
|
|
||||||
|
clear_scope_lock_path();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn apply_turn_ended_and_errored_are_system_noops() {
|
||||||
|
let _env = EnvGuard::acquire();
|
||||||
|
let scope_dir = TempDir::new().unwrap();
|
||||||
|
set_scope_lock_path(&scope_dir.path().join("scope.lock"));
|
||||||
|
|
||||||
|
let runtime_base = TempDir::new().unwrap();
|
||||||
|
let registry = fresh_registry(runtime_base.path(), "parent").await;
|
||||||
|
|
||||||
|
// Seed a child to verify it survives the no-op path.
|
||||||
|
registry
|
||||||
|
.add(SpawnedPodRecord {
|
||||||
|
pod_name: "child".into(),
|
||||||
|
socket_path: "/tmp/child.sock".into(),
|
||||||
|
scope_delegated: vec![],
|
||||||
|
callback_address: "/tmp/parent.sock".into(),
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
apply_event_side_effects(
|
||||||
|
&PodEvent::TurnEnded {
|
||||||
|
pod_name: "child".into(),
|
||||||
|
},
|
||||||
|
®istry,
|
||||||
|
"parent",
|
||||||
|
&None,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
apply_event_side_effects(
|
||||||
|
&PodEvent::Errored {
|
||||||
|
pod_name: "child".into(),
|
||||||
|
message: "x".into(),
|
||||||
|
},
|
||||||
|
®istry,
|
||||||
|
"parent",
|
||||||
|
&None,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
assert!(registry.get("child").await.is_some());
|
||||||
|
clear_scope_lock_path();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn shutdown_releases_scope_allocation_when_present() {
|
||||||
|
let _env = EnvGuard::acquire();
|
||||||
|
let scope_dir = TempDir::new().unwrap();
|
||||||
|
let lock_path = scope_dir.path().join("scope.lock");
|
||||||
|
set_scope_lock_path(&lock_path);
|
||||||
|
|
||||||
|
// Install a top-level allocation for "kid" so ShutDown has
|
||||||
|
// something to release.
|
||||||
|
let guard = scope_lock::install_top_level(
|
||||||
|
"kid".into(),
|
||||||
|
std::process::id(),
|
||||||
|
"/tmp/kid.sock".into(),
|
||||||
|
vec![],
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
std::mem::forget(guard);
|
||||||
|
|
||||||
|
let runtime_base = TempDir::new().unwrap();
|
||||||
|
let registry = fresh_registry(runtime_base.path(), "parent").await;
|
||||||
|
registry
|
||||||
|
.add(SpawnedPodRecord {
|
||||||
|
pod_name: "kid".into(),
|
||||||
|
socket_path: "/tmp/kid.sock".into(),
|
||||||
|
scope_delegated: vec![],
|
||||||
|
callback_address: "/tmp/parent.sock".into(),
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
apply_event_side_effects(
|
||||||
|
&PodEvent::ShutDown {
|
||||||
|
pod_name: "kid".into(),
|
||||||
|
},
|
||||||
|
®istry,
|
||||||
|
"parent",
|
||||||
|
&None,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
// Allocation is gone from the scope lock.
|
||||||
|
let g = LockFileGuard::open(&lock_path).unwrap();
|
||||||
|
assert!(
|
||||||
|
g.data().find("kid").is_none(),
|
||||||
|
"ShutDown should have released the scope allocation"
|
||||||
|
);
|
||||||
|
|
||||||
|
clear_scope_lock_path();
|
||||||
|
}
|
||||||
|
|
@ -158,6 +158,7 @@ async fn spawn_pod_delegates_scope_and_sends_run() {
|
||||||
runtime_base.clone(),
|
runtime_base.clone(),
|
||||||
allow_root.path().to_path_buf(),
|
allow_root.path().to_path_buf(),
|
||||||
registry,
|
registry,
|
||||||
|
None,
|
||||||
);
|
);
|
||||||
let (_meta, tool) = def();
|
let (_meta, tool) = def();
|
||||||
|
|
||||||
|
|
@ -219,6 +220,7 @@ async fn spawn_pod_rejects_scope_outside_spawner() {
|
||||||
runtime_base,
|
runtime_base,
|
||||||
allow_root.path().to_path_buf(),
|
allow_root.path().to_path_buf(),
|
||||||
registry,
|
registry,
|
||||||
|
None,
|
||||||
);
|
);
|
||||||
let (_meta, tool) = def();
|
let (_meta, tool) = def();
|
||||||
|
|
||||||
|
|
@ -276,6 +278,7 @@ async fn spawn_pod_rolls_back_reservation_when_socket_never_appears() {
|
||||||
runtime_base,
|
runtime_base,
|
||||||
allow_root.path().to_path_buf(),
|
allow_root.path().to_path_buf(),
|
||||||
registry,
|
registry,
|
||||||
|
None,
|
||||||
);
|
);
|
||||||
let (_meta, tool) = def();
|
let (_meta, tool) = def();
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,7 @@
|
||||||
pub mod stream;
|
pub mod stream;
|
||||||
|
|
||||||
|
use std::path::PathBuf;
|
||||||
|
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
|
|
@ -10,13 +12,63 @@ use serde::{Deserialize, Serialize};
|
||||||
#[serde(tag = "method", content = "params", rename_all = "snake_case")]
|
#[serde(tag = "method", content = "params", rename_all = "snake_case")]
|
||||||
pub enum Method {
|
pub enum Method {
|
||||||
Run { input: String },
|
Run { input: String },
|
||||||
Notify { source: String, message: String },
|
/// Human-readable text injected into the target Pod's LLM context
|
||||||
|
/// as a non-blocking system message. No side effects beyond LLM
|
||||||
|
/// context; use `PodEvent` for typed lifecycle reports.
|
||||||
|
Notify { message: String },
|
||||||
|
/// Typed lifecycle report from a child Pod to its direct parent.
|
||||||
|
PodEvent(PodEvent),
|
||||||
Resume,
|
Resume,
|
||||||
Cancel,
|
Cancel,
|
||||||
Shutdown,
|
Shutdown,
|
||||||
GetHistory,
|
GetHistory,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Typed lifecycle events sent from a child Pod to its parent.
|
||||||
|
///
|
||||||
|
/// Delivered as `Method::PodEvent` over the parent's Unix socket. The
|
||||||
|
/// parent Controller applies variant-specific side effects (registry /
|
||||||
|
/// scope-lock updates) and renders a human-readable string that is
|
||||||
|
/// injected into the parent's LLM context via the notification buffer.
|
||||||
|
///
|
||||||
|
/// Transport is fire-and-forget; receivers must tolerate out-of-order
|
||||||
|
/// delivery (e.g. `TurnEnded` arriving after `ShutDown` for the same
|
||||||
|
/// child Pod).
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
#[serde(tag = "kind", rename_all = "snake_case")]
|
||||||
|
pub enum PodEvent {
|
||||||
|
/// Child finished one turn and is back to IDLE.
|
||||||
|
TurnEnded { pod_name: String },
|
||||||
|
|
||||||
|
/// Worker execution error occurred inside the child's turn.
|
||||||
|
///
|
||||||
|
/// Limited to worker runtime failures (provider / tool errors) —
|
||||||
|
/// does not include transient method-rejection responses such as
|
||||||
|
/// `AlreadyRunning`.
|
||||||
|
Errored { pod_name: String, message: String },
|
||||||
|
|
||||||
|
/// Child has stopped (controller loop is exiting).
|
||||||
|
ShutDown { pod_name: String },
|
||||||
|
|
||||||
|
/// Child sub-delegated scope to a grandchild Pod via `SpawnPod`.
|
||||||
|
///
|
||||||
|
/// The parent uses this to add the grandchild to its own
|
||||||
|
/// `spawned_pods.json` so it can manage the grandchild directly
|
||||||
|
/// even if the intermediate child dies. The parent then re-fires
|
||||||
|
/// this event upward (if it has a parent of its own) to maintain
|
||||||
|
/// the chain to root.
|
||||||
|
ScopeSubDelegated {
|
||||||
|
/// Sub-delegating Pod (= the sender itself).
|
||||||
|
parent_pod: String,
|
||||||
|
/// Name of the grandchild Pod.
|
||||||
|
sub_pod: String,
|
||||||
|
/// Unix-socket path where the grandchild is reachable.
|
||||||
|
sub_socket: PathBuf,
|
||||||
|
/// Scope delegated to the grandchild.
|
||||||
|
scope: Vec<ScopeRule>,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
// Event (Pod → Client via Unix Socket broadcast)
|
// Event (Pod → Client via Unix Socket broadcast)
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
|
|
@ -150,6 +202,48 @@ pub enum ErrorCode {
|
||||||
Internal,
|
Internal,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
// Scope rule / permission (wire type)
|
||||||
|
//
|
||||||
|
// Defined here so that both `manifest` (config parsing) and `protocol`
|
||||||
|
// itself (inter-pod messaging such as `PodEvent::ScopeSubDelegated`) can
|
||||||
|
// reference the same type without introducing a reverse dependency.
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
/// A single allow or deny rule inside a scope configuration.
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct ScopeRule {
|
||||||
|
/// Target path. Must be absolute by the time a `Scope` is built from
|
||||||
|
/// this rule — relative paths are resolved per-layer against the
|
||||||
|
/// manifest file's directory (cwd for overlay layers) before cascade
|
||||||
|
/// merge.
|
||||||
|
pub target: PathBuf,
|
||||||
|
/// Permission level this rule grants (allow) or caps strictly below
|
||||||
|
/// (deny).
|
||||||
|
pub permission: Permission,
|
||||||
|
/// When `false`, the rule only matches the target itself and its
|
||||||
|
/// direct children. Defaults to `true`.
|
||||||
|
#[serde(default = "default_recursive")]
|
||||||
|
pub recursive: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
fn default_recursive() -> bool {
|
||||||
|
true
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Permission lattice used by [`ScopeRule`].
|
||||||
|
///
|
||||||
|
/// The derived `Ord` instance follows declaration order, so
|
||||||
|
/// `Read < Write`. Allow rules grant the stated level (and by extension
|
||||||
|
/// everything below); deny rules cap the effective level **strictly
|
||||||
|
/// below** the stated level.
|
||||||
|
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
|
||||||
|
#[serde(rename_all = "lowercase")]
|
||||||
|
pub enum Permission {
|
||||||
|
Read,
|
||||||
|
Write,
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
@ -195,12 +289,11 @@ mod tests {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn method_notify_json_roundtrip() {
|
fn method_notify_json_roundtrip() {
|
||||||
let json = r#"{"method":"notify","params":{"source":"child-pod","message":"turn done"}}"#;
|
let json = r#"{"method":"notify","params":{"message":"turn done"}}"#;
|
||||||
let method: Method = serde_json::from_str(json).unwrap();
|
let method: Method = serde_json::from_str(json).unwrap();
|
||||||
assert!(matches!(
|
assert!(matches!(
|
||||||
method,
|
method,
|
||||||
Method::Notify { ref source, ref message }
|
Method::Notify { ref message } if message == "turn done"
|
||||||
if source == "child-pod" && message == "turn done"
|
|
||||||
));
|
));
|
||||||
let serialized = serde_json::to_string(&method).unwrap();
|
let serialized = serde_json::to_string(&method).unwrap();
|
||||||
assert_eq!(serialized, json);
|
assert_eq!(serialized, json);
|
||||||
|
|
@ -235,6 +328,87 @@ mod tests {
|
||||||
assert_eq!(parsed["data"]["greeting"]["tools"][0], "Read");
|
assert_eq!(parsed["data"]["greeting"]["tools"][0], "Read");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn method_pod_event_turn_ended_roundtrip() {
|
||||||
|
let method = Method::PodEvent(PodEvent::TurnEnded {
|
||||||
|
pod_name: "child".into(),
|
||||||
|
});
|
||||||
|
let json = serde_json::to_string(&method).unwrap();
|
||||||
|
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
|
||||||
|
assert_eq!(parsed["method"], "pod_event");
|
||||||
|
assert_eq!(parsed["params"]["kind"], "turn_ended");
|
||||||
|
assert_eq!(parsed["params"]["pod_name"], "child");
|
||||||
|
|
||||||
|
let decoded: Method = serde_json::from_str(&json).unwrap();
|
||||||
|
assert!(matches!(
|
||||||
|
decoded,
|
||||||
|
Method::PodEvent(PodEvent::TurnEnded { ref pod_name }) if pod_name == "child"
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn method_pod_event_errored_roundtrip() {
|
||||||
|
let method = Method::PodEvent(PodEvent::Errored {
|
||||||
|
pod_name: "child".into(),
|
||||||
|
message: "provider 429".into(),
|
||||||
|
});
|
||||||
|
let json = serde_json::to_string(&method).unwrap();
|
||||||
|
let decoded: Method = serde_json::from_str(&json).unwrap();
|
||||||
|
match decoded {
|
||||||
|
Method::PodEvent(PodEvent::Errored { pod_name, message }) => {
|
||||||
|
assert_eq!(pod_name, "child");
|
||||||
|
assert_eq!(message, "provider 429");
|
||||||
|
}
|
||||||
|
other => panic!("expected Errored, got {other:?}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn method_pod_event_shutdown_roundtrip() {
|
||||||
|
let method = Method::PodEvent(PodEvent::ShutDown {
|
||||||
|
pod_name: "child".into(),
|
||||||
|
});
|
||||||
|
let json = serde_json::to_string(&method).unwrap();
|
||||||
|
let decoded: Method = serde_json::from_str(&json).unwrap();
|
||||||
|
assert!(matches!(
|
||||||
|
decoded,
|
||||||
|
Method::PodEvent(PodEvent::ShutDown { ref pod_name }) if pod_name == "child"
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn method_pod_event_scope_sub_delegated_roundtrip() {
|
||||||
|
let method = Method::PodEvent(PodEvent::ScopeSubDelegated {
|
||||||
|
parent_pod: "child".into(),
|
||||||
|
sub_pod: "grandchild".into(),
|
||||||
|
sub_socket: "/run/insomnia/grandchild/sock".into(),
|
||||||
|
scope: vec![ScopeRule {
|
||||||
|
target: "/tmp/work".into(),
|
||||||
|
permission: Permission::Write,
|
||||||
|
recursive: true,
|
||||||
|
}],
|
||||||
|
});
|
||||||
|
let json = serde_json::to_string(&method).unwrap();
|
||||||
|
let decoded: Method = serde_json::from_str(&json).unwrap();
|
||||||
|
match decoded {
|
||||||
|
Method::PodEvent(PodEvent::ScopeSubDelegated {
|
||||||
|
parent_pod,
|
||||||
|
sub_pod,
|
||||||
|
sub_socket,
|
||||||
|
scope,
|
||||||
|
}) => {
|
||||||
|
assert_eq!(parent_pod, "child");
|
||||||
|
assert_eq!(sub_pod, "grandchild");
|
||||||
|
assert_eq!(sub_socket, PathBuf::from("/run/insomnia/grandchild/sock"));
|
||||||
|
assert_eq!(scope.len(), 1);
|
||||||
|
assert_eq!(scope[0].target, PathBuf::from("/tmp/work"));
|
||||||
|
assert_eq!(scope[0].permission, Permission::Write);
|
||||||
|
assert!(scope[0].recursive);
|
||||||
|
}
|
||||||
|
other => panic!("expected ScopeSubDelegated, got {other:?}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn event_notification_format() {
|
fn event_notification_format() {
|
||||||
let event = Event::Notification(Notification {
|
let event = Event::Notification(Notification {
|
||||||
|
|
|
||||||
75
tickets/tool-call-empty-args-null.md
Normal file
75
tickets/tool-call-empty-args-null.md
Normal file
|
|
@ -0,0 +1,75 @@
|
||||||
|
# 引数なし tool 呼び出しで `arguments = "null"` が記録される不具合
|
||||||
|
|
||||||
|
## 背景
|
||||||
|
|
||||||
|
引数を取らないツール(例: `ListPods`)を Anthropic の Claude が呼び出したとき、次ターンで履歴を送り返す際に Anthropic API が以下のエラーで 400 を返す:
|
||||||
|
|
||||||
|
```
|
||||||
|
messages.N.content.0.tool_use.input: Input should be a valid dictionary
|
||||||
|
```
|
||||||
|
|
||||||
|
実環境で `cargo run -p pod` + TUI / API 経由で `ListPods` を呼ぶと再現する。セッション jsonl には tool 呼び出しが以下の形で記録されている:
|
||||||
|
|
||||||
|
```json
|
||||||
|
{"type":"tool_call","call_id":"toolu_...","name":"ListPods","arguments":"null"}
|
||||||
|
```
|
||||||
|
|
||||||
|
`arguments` が `"null"` 文字列になっており、次ターンで Anthropic に送る `tool_use.input` が JSON `null` として serialize されてしまうことが原因。
|
||||||
|
|
||||||
|
## 原因
|
||||||
|
|
||||||
|
`crates/llm-worker/src/timeline/tool_call_collector.rs:87-88`:
|
||||||
|
|
||||||
|
```rust
|
||||||
|
let input = serde_json::from_str(&scope.input_json_buffer)
|
||||||
|
.unwrap_or(serde_json::Value::Null);
|
||||||
|
```
|
||||||
|
|
||||||
|
Anthropic は引数なしのツール呼び出しでは `input_json_delta` を一度も送らない。その結果 `input_json_buffer` が空文字 `""` のまま stop イベントに到達し、`from_str("")` が失敗して `Value::Null` に fallback する。
|
||||||
|
|
||||||
|
この `Null` が `worker.rs:499` で `Item::tool_call_json(..., Value::Null)` として履歴に保存され、`Value::Null.to_string()` = `"null"` が `arguments` フィールドに残る。
|
||||||
|
|
||||||
|
次ターンで `anthropic/request.rs:174-175` が history → request body 変換する際:
|
||||||
|
|
||||||
|
```rust
|
||||||
|
let input = serde_json::from_str(arguments)
|
||||||
|
.unwrap_or_else(|_| serde_json::Value::Object(serde_json::Map::new()));
|
||||||
|
```
|
||||||
|
|
||||||
|
`"null"` は valid JSON として parse 成功するため fallback が効かず、`Value::Null` のまま `tool_use.input` に入り API に送信される。Anthropic の tool_use.input は object 必須なので拒否される。
|
||||||
|
|
||||||
|
## 修正方針
|
||||||
|
|
||||||
|
ルート修正を `tool_call_collector.rs` に入れる。引数なし / パース失敗の場合は `Value::Object(Map::new())`(= `{}`)にする:
|
||||||
|
|
||||||
|
```rust
|
||||||
|
let input = if scope.input_json_buffer.is_empty() {
|
||||||
|
serde_json::Value::Object(serde_json::Map::new())
|
||||||
|
} else {
|
||||||
|
serde_json::from_str(&scope.input_json_buffer)
|
||||||
|
.unwrap_or_else(|_| serde_json::Value::Object(serde_json::Map::new()))
|
||||||
|
};
|
||||||
|
```
|
||||||
|
|
||||||
|
加えて防御層として `anthropic/request.rs:174` で parse 結果が object でない場合も `{}` に正規化する。既に `arguments = "null"` として保存済みの古いセッションが resume されたときに回復できるようにするため。
|
||||||
|
|
||||||
|
## 影響範囲
|
||||||
|
|
||||||
|
- `crates/llm-worker/src/timeline/tool_call_collector.rs`: 空バッファ時の default を `Value::Object` に
|
||||||
|
- `crates/llm-worker/src/llm_client/scheme/anthropic/request.rs`: parse 結果が非 object の場合の正規化
|
||||||
|
- `crates/llm-worker/src/worker.rs:576-577` の同様の parse でも非 object を正規化(防御)
|
||||||
|
- OpenAI / Gemini の request.rs でも同等の問題があるか確認、必要なら同じ修正を入れる
|
||||||
|
|
||||||
|
## 完了条件
|
||||||
|
|
||||||
|
- 引数なしツール(`ListPods` など)を呼んだ直後のセッション jsonl で `arguments` が `"{}"` になる
|
||||||
|
- 同一セッション内で引数なしツールを呼んでから次ターンを開始しても 400 エラーが出ない
|
||||||
|
- `"arguments":"null"` が残っている既存セッションを resume しても 400 エラーが出ない
|
||||||
|
- `tool_call_collector.rs` に「空バッファ → `{}`」を検証するテストを追加
|
||||||
|
- 該当パスの回帰テストを単体テストで担保
|
||||||
|
|
||||||
|
## 範囲外
|
||||||
|
|
||||||
|
- LLM 側が「無意味に `null` を `input` に入れてくる」ケースの検出・警告(そもそもプロバイダから来ないはず)
|
||||||
|
- OpenAI / Gemini の同等検証(確認だけ行い、問題あれば別チケット化)
|
||||||
|
- 既存セッション jsonl の自動修復スクリプト(resume 時の defensive 正規化で十分)
|
||||||
Loading…
Reference in New Issue
Block a user