From 74ee96ef828ea83ce71d89f344f379df15ee9c12 Mon Sep 17 00:00:00 2001 From: Hare Date: Sat, 18 Apr 2026 17:48:35 +0900 Subject: [PATCH] =?UTF-8?q?Notification=E3=81=AE=E5=AE=9F=E8=A3=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- AGENTS.md | 1 + CLAUDE.md | 1 + TODO.md | 1 - crates/pod/src/controller.rs | 62 ++++++++++++- crates/pod/src/lib.rs | 1 + crates/pod/src/notification_buffer.rs | 127 ++++++++++++++++++++++++++ crates/pod/src/pod.rs | 51 ++++++++++- crates/pod/src/pod_interceptor.rs | 66 ++++++++++++- crates/pod/tests/controller_test.rs | 106 ++++++++++++++++++++- crates/protocol/src/lib.rs | 14 +++ tickets/method-notify.md | 116 ----------------------- 11 files changed, 421 insertions(+), 125 deletions(-) create mode 100644 crates/pod/src/notification_buffer.rs delete mode 100644 tickets/method-notify.md diff --git a/AGENTS.md b/AGENTS.md index 321a0b74..e4ebbc9a 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -30,3 +30,4 @@ d. 完了: `tickets/foo.md` と `tickets/foo.review.md` を両方削除してcom TODO.mdのリンクは完了後に切れるが、そのリンクを元にgitで消されたファイルを読み、内容を把握できる。 `.review.md` にはレビューの指摘事項と判断結果を記載する。 レビューはdiffの確認だけでなく、チケットはどのような前提・要件であり、それが達成されたかの確認まで含めて行う。 +常に、提出された実装で良いのか、コードベースを歪めていないか、不必要な実装ではないかを確認すること。 diff --git a/CLAUDE.md b/CLAUDE.md index 321a0b74..e4ebbc9a 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -30,3 +30,4 @@ d. 完了: `tickets/foo.md` と `tickets/foo.review.md` を両方削除してcom TODO.mdのリンクは完了後に切れるが、そのリンクを元にgitで消されたファイルを読み、内容を把握できる。 `.review.md` にはレビューの指摘事項と判断結果を記載する。 レビューはdiffの確認だけでなく、チケットはどのような前提・要件であり、それが達成されたかの確認まで含めて行う。 +常に、提出された実装で良いのか、コードベースを歪めていないか、不必要な実装ではないかを確認すること。 diff --git a/TODO.md b/TODO.md index 3c68af4b..b638119d 100644 --- a/TODO.md +++ b/TODO.md @@ -4,7 +4,6 @@ - [ ] Compact の改善(要約品質 + 挙動詳細) → [tickets/compact-improvements.md](tickets/compact-improvements.md) - [ ] Protocol の設計 → [tickets/protocol-design.md](tickets/protocol-design.md) - [ ] パーミッション: パターンベースのツール実行制御 → [tickets/permission-extension-point.md](tickets/permission-extension-point.md) -- [ ] Method::Notify: システム起点のコンテキスト注入 → [tickets/method-notify.md](tickets/method-notify.md) - [ ] Pod オーケストレーション: LLM によるマルチエージェント分業 → [tickets/pod-orchestration.md](tickets/pod-orchestration.md) - [ ] ネイティブ GUI クライアント MVP → [tickets/native-gui-mvp.md](tickets/native-gui-mvp.md) - [ ] TUI 拡充 diff --git a/crates/pod/src/controller.rs b/crates/pod/src/controller.rs index 48f3e046..71385316 100644 --- a/crates/pod/src/controller.rs +++ b/crates/pod/src/controller.rs @@ -6,6 +6,7 @@ use llm_worker::llm_client::client::LlmClient; use session_store::Store; use tokio::sync::{broadcast, mpsc, oneshot}; +use crate::notification_buffer::NotificationBuffer; use crate::notifier::Notifier; use crate::pod::{Pod, PodError, PodRunResult}; use crate::runtime_dir::RuntimeDir; @@ -203,8 +204,12 @@ impl PodController { pod.attach_tracker(tracker); } - // Clone cancel sender before moving pod + // Clone cancel sender and notification buffer before moving pod + // into the controller task so the main loop can route + // `Method::Notify` into the buffer even while `pod` is held by + // an in-flight `run_for_notification` / `run` future. let cancel_tx = pod.worker_mut().cancel_sender(); + let notification_buffer = pod.notification_buffer_handle(); tokio::spawn(async move { // Hold socket server alive for the lifetime of the controller task @@ -234,6 +239,54 @@ impl PodController { &event_tx, &cancel_tx, &shared_state, + ¬ification_buffer, + ) + .await; + + if new_status == PodStatus::Idle { + if let Err(e) = pod.try_post_run_compact().await { + tracing::warn!(error = %e, "Post-run compaction error"); + notifier.notify( + NotificationLevel::Warn, + NotificationSource::Compactor, + format!("post-run compaction error: {e}"), + ); + } + } + + let items = pod.worker().history().to_vec(); + shared_state.update_history(items); + shared_state.set_status(new_status); + let _ = runtime_dir.write_status(&shared_state).await; + let _ = runtime_dir.write_history(&shared_state).await; + + if shutdown { + let _ = event_tx.send(Event::Shutdown); + break; + } + } + + Method::Notify { source, message } => { + pod.push_notification(source, message); + if shared_state.get_status() != PodStatus::Idle { + // RUNNING / Paused: the buffer push is the + // entire operation; the in-flight turn (or + // next Resume) will drain the buffer at its + // next pre_llm_request. + continue; + } + // IDLE: auto-start a turn so the LLM sees the + // buffered notification(s) without a human Run. + shared_state.set_status(PodStatus::Running); + let _ = runtime_dir.write_status(&shared_state).await; + + let (new_status, shutdown) = run_with_cancel_support( + pod.run_for_notification(), + &mut method_rx, + &event_tx, + &cancel_tx, + &shared_state, + ¬ification_buffer, ) .await; @@ -277,6 +330,7 @@ impl PodController { &event_tx, &cancel_tx, &shared_state, + ¬ification_buffer, ) .await; @@ -337,6 +391,7 @@ async fn run_with_cancel_support( event_tx: &broadcast::Sender, cancel_tx: &mpsc::Sender<()>, shared_state: &Arc, + notification_buffer: &NotificationBuffer, ) -> (PodStatus, bool) where F: std::future::Future>, @@ -382,6 +437,11 @@ where message: "Pod is already executing a turn".into(), }); } + Some(Method::Notify { source, message }) => { + // Route into the buffer; the in-flight turn will + // drain it at its next pre_llm_request. + notification_buffer.push(source, message); + } Some(Method::GetHistory) => {} None => { let _ = cancel_tx.try_send(()); diff --git a/crates/pod/src/lib.rs b/crates/pod/src/lib.rs index 28bae3ea..8cc5880c 100644 --- a/crates/pod/src/lib.rs +++ b/crates/pod/src/lib.rs @@ -8,6 +8,7 @@ pub mod socket_server; mod agents_md; mod compact_state; mod factory; +mod notification_buffer; mod pod; mod pod_interceptor; mod prompt_loader; diff --git a/crates/pod/src/notification_buffer.rs b/crates/pod/src/notification_buffer.rs new file mode 100644 index 00000000..4f056107 --- /dev/null +++ b/crates/pod/src/notification_buffer.rs @@ -0,0 +1,127 @@ +//! Pending-notification buffer for `Method::Notify`. +//! +//! Notifications are queued here by the Controller and drained by +//! `PodInterceptor::pre_llm_request` into the per-request context +//! (never into the Worker's persistent history). Each queued entry +//! becomes one `Item::system_message` in the outgoing request. + +use std::collections::VecDeque; +use std::sync::{Arc, Mutex}; + +use llm_worker::Item; +use tracing::warn; + +/// Maximum queued notifications. Oldest entries are dropped beyond this. +const CAPACITY: usize = 128; + +/// One pending notification awaiting injection into the next LLM request. +#[derive(Debug, Clone)] +pub struct PendingNotification { + pub source: String, + pub message: String, +} + +/// Shared, mutex-guarded buffer of pending notifications. +/// +/// Cloned between the Pod (producer) and PodInterceptor (consumer). +#[derive(Clone, Default)] +pub struct NotificationBuffer { + inner: Arc>>, +} + +impl NotificationBuffer { + pub fn new() -> Self { + Self::default() + } + + /// Push a notification onto the queue. If the queue is full, the + /// oldest entry is dropped and a `tracing::warn` is emitted — the + /// caller should never hit this in normal operation. + pub fn push(&self, source: String, message: String) { + let mut q = self.inner.lock().expect("notification buffer poisoned"); + if q.len() >= CAPACITY { + let dropped = q.pop_front(); + warn!( + capacity = CAPACITY, + dropped_source = dropped.as_ref().map(|n| n.source.as_str()), + "notification buffer overflow; dropped oldest" + ); + } + q.push_back(PendingNotification { source, message }); + } + + /// Remove and return all pending notifications in FIFO order. + pub fn drain(&self) -> Vec { + let mut q = self.inner.lock().expect("notification buffer poisoned"); + q.drain(..).collect() + } + + /// Number of pending notifications. Primarily for tests. + pub fn len(&self) -> usize { + self.inner + .lock() + .expect("notification buffer poisoned") + .len() + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } +} + +/// Format a single pending notification into the `Item::system_message` +/// that gets injected into the per-request context. +pub(crate) fn format_notification(n: &PendingNotification) -> Item { + let text = format!( + "[Notification from {source}]\n{message}\n\n\ + This is a notification, not a blocking request. \ + If you are in the middle of a task, continue your current work \ + and address this at a natural stopping point.", + source = n.source, + message = n.message, + ); + Item::system_message(text) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn push_then_drain_preserves_order() { + let buf = NotificationBuffer::new(); + buf.push("a".into(), "one".into()); + buf.push("b".into(), "two".into()); + let drained = buf.drain(); + assert_eq!(drained.len(), 2); + assert_eq!(drained[0].source, "a"); + assert_eq!(drained[1].source, "b"); + assert!(buf.is_empty()); + } + + #[test] + fn capacity_drops_oldest() { + let buf = NotificationBuffer::new(); + for i in 0..(CAPACITY + 5) { + buf.push(format!("src{i}"), format!("msg{i}")); + } + let drained = buf.drain(); + assert_eq!(drained.len(), CAPACITY); + // Oldest 5 were dropped; first retained is src5. + assert_eq!(drained[0].source, "src5"); + assert_eq!(drained[CAPACITY - 1].source, format!("src{}", CAPACITY + 4)); + } + + #[test] + fn format_notification_includes_source_message_and_nonblocking_hint() { + let n = PendingNotification { + source: "child".into(), + message: "hello".into(), + }; + let item = format_notification(&n); + let text = item.as_text().unwrap_or_default().to_string(); + assert!(text.contains("[Notification from child]")); + assert!(text.contains("hello")); + assert!(text.contains("not a blocking request")); + } +} diff --git a/crates/pod/src/pod.rs b/crates/pod/src/pod.rs index 83cd37c6..86c6ae0b 100644 --- a/crates/pod/src/pod.rs +++ b/crates/pod/src/pod.rs @@ -19,6 +19,7 @@ use crate::hook::{ Hook, HookRegistryBuilder, OnAbort, OnPromptSubmit, OnTurnEnd, PostToolCall, PreLlmRequest, PreRequestInfo, PreToolCall, }; +use crate::notification_buffer::NotificationBuffer; use crate::notifier::Notifier; use crate::pod_interceptor::PodInterceptor; use crate::prompt_loader::PromptLoader; @@ -100,6 +101,10 @@ pub struct Pod { /// User-facing notification sink attached by the Controller at /// spawn time. `None` in tests / direct `Pod::new` usage. notifier: Option, + /// Queue of pending `Method::Notify` notifications awaiting + /// injection into the next LLM request. Shared with the + /// PodInterceptor installed in `ensure_interceptor_installed`. + pending_notifications: NotificationBuffer, } impl Pod { @@ -140,6 +145,7 @@ impl Pod { tracker: None, system_prompt_template: None, notifier: None, + pending_notifications: NotificationBuffer::new(), }; pod.apply_prune_from_manifest(); Ok(pod) @@ -188,6 +194,7 @@ impl Pod { tracker: None, system_prompt_template: None, notifier: None, + pending_notifications: NotificationBuffer::new(), }; pod.apply_prune_from_manifest(); Ok(pod) @@ -293,6 +300,23 @@ impl Pod { } } + /// Push a `Method::Notify` entry onto the pending buffer. + /// + /// The notification will be injected as an `Item::system_message` + /// into the next outgoing LLM request context (not into history). + /// See [`NotificationBuffer`] for overflow behaviour. + pub fn push_notification(&self, source: String, message: String) { + self.pending_notifications.push(source, message); + } + + /// Shared handle to the pending notification buffer. + /// + /// The Controller holds a clone so that `Method::Notify` arriving + /// while `pod.run()` is in flight can still reach the interceptor. + pub fn notification_buffer_handle(&self) -> NotificationBuffer { + self.pending_notifications.clone() + } + // --- Hook registration --- fn assert_hooks_open(&self) { @@ -388,7 +412,8 @@ impl Pod { None }; - let interceptor = PodInterceptor::new(registry, compact_state); + let interceptor = + PodInterceptor::new(registry, compact_state, self.pending_notifications.clone()); self.worker_mut().set_interceptor(interceptor); self.interceptor_installed = true; } @@ -462,6 +487,29 @@ impl Pod { self.handle_worker_result(result, history_before).await } + /// Run a turn triggered by `Method::Notify` while the Pod is idle. + /// + /// Unlike [`run`](Self::run), no user message is appended to + /// history. The `PodInterceptor::pre_llm_request` drains the + /// pending-notification buffer and injects each entry as an + /// `Item::system_message` into the per-request context, then the + /// Worker's resume path issues the LLM request without a new + /// user turn. + pub async fn run_for_notification(&mut self) -> Result { + self.ensure_interceptor_installed(); + self.ensure_system_prompt_materialized()?; + self.ensure_session_head().await?; + + let history_before = self.worker.as_ref().unwrap().history().len(); + + let worker = self.worker.take().expect("worker taken during run"); + let mut locked = worker.lock(); + let result = locked.resume().await; + self.worker = Some(locked.unlock()); + + self.handle_worker_result(result, history_before).await + } + /// Resume from a paused state. pub async fn resume(&mut self) -> Result { self.ensure_interceptor_installed(); @@ -861,6 +909,7 @@ impl Pod, St> { tracker: None, system_prompt_template, notifier: None, + pending_notifications: NotificationBuffer::new(), }; pod.apply_prune_from_manifest(); Ok(pod) diff --git a/crates/pod/src/pod_interceptor.rs b/crates/pod/src/pod_interceptor.rs index da0674d2..5f43b7d4 100644 --- a/crates/pod/src/pod_interceptor.rs +++ b/crates/pod/src/pod_interceptor.rs @@ -24,6 +24,7 @@ use crate::hook::{ AbortInfo, HookRegistry, PreRequestInfo, PromptSubmitInfo, ToolCallSummary, ToolResultSummary, TurnEndInfo, }; +use crate::notification_buffer::{NotificationBuffer, format_notification}; /// Maximum number of bytes copied into `TurnEndInfo::final_text_preview`. const FINAL_TEXT_PREVIEW_LIMIT: usize = 512; @@ -31,6 +32,9 @@ const FINAL_TEXT_PREVIEW_LIMIT: usize = 512; pub(crate) struct PodInterceptor { registry: Arc, compact_state: Option>, + /// Pending-notification buffer drained into the per-request + /// context at the head of `pre_llm_request`. + pending_notifications: NotificationBuffer, /// Next turn index assigned by `on_prompt_submit`. next_turn_index: AtomicUsize, /// Tool calls observed in the current turn (reset on each new prompt). @@ -41,10 +45,12 @@ impl PodInterceptor { pub(crate) fn new( registry: Arc, compact_state: Option>, + pending_notifications: NotificationBuffer, ) -> Self { Self { registry, compact_state, + pending_notifications, next_turn_index: AtomicUsize::new(0), tool_calls_this_turn: AtomicUsize::new(0), } @@ -89,6 +95,14 @@ impl Interceptor for PodInterceptor { } } + // Internal mechanism: drain pending `Method::Notify` notifications + // into the per-request context as transient system messages. + // These are not persisted to the Worker history; they exist only + // for this single LLM request. + for notification in self.pending_notifications.drain() { + context.push(format_notification(¬ification)); + } + let info = PreRequestInfo { item_count: context.len(), estimated_tokens: self.compact_state.as_ref().map(|s| s.last_input_tokens()), @@ -226,7 +240,7 @@ mod tests { let state = Arc::new(CompactState::new(100, 2)); state.update_input_tokens(200); // exceeds turn threshold - let interceptor = PodInterceptor::new(registry, Some(state)); + let interceptor = PodInterceptor::new(registry, Some(state), NotificationBuffer::new()); let mut ctx: Vec = vec![Item::user_message("hi")]; let action = interceptor.pre_llm_request(&mut ctx).await; @@ -243,7 +257,7 @@ mod tests { let state = Arc::new(CompactState::new(100, 2)); // last_input_tokens stays at 0, well below threshold. - let interceptor = PodInterceptor::new(registry, Some(state)); + let interceptor = PodInterceptor::new(registry, Some(state), NotificationBuffer::new()); let mut ctx: Vec = vec![Item::user_message("hi")]; let action = interceptor.pre_llm_request(&mut ctx).await; @@ -256,7 +270,7 @@ mod tests { let count = Arc::new(AtomicUsize::new(0)); let registry = registry_with_pre_llm_hook(count.clone()); - let interceptor = PodInterceptor::new(registry, None); + let interceptor = PodInterceptor::new(registry, None, NotificationBuffer::new()); let mut ctx: Vec = Vec::new(); let action = interceptor.pre_llm_request(&mut ctx).await; @@ -274,6 +288,50 @@ mod tests { } } + #[tokio::test] + async fn pre_llm_request_drains_pending_notifications_into_context() { + let registry = Arc::new(HookRegistryBuilder::new().build()); + let buffer = NotificationBuffer::new(); + buffer.push("child-a".into(), "first".into()); + buffer.push("child-b".into(), "second".into()); + + let interceptor = PodInterceptor::new(registry, None, buffer.clone()); + let mut ctx: Vec = vec![Item::user_message("hi")]; + let action = interceptor.pre_llm_request(&mut ctx).await; + + assert!(matches!(action, PreRequestAction::Continue)); + // Original user message preserved, two notifications appended in order. + assert_eq!(ctx.len(), 3); + let second = ctx[1].as_text().unwrap_or_default(); + let third = ctx[2].as_text().unwrap_or_default(); + assert!(second.contains("[Notification from child-a]")); + assert!(second.contains("first")); + assert!(third.contains("[Notification from child-b]")); + assert!(third.contains("second")); + // Buffer is drained after a single pre_llm_request call. + assert!(buffer.is_empty()); + } + + #[tokio::test] + async fn pre_llm_request_skips_notification_injection_when_yielding() { + // When compaction yields, notifications remain in the buffer for + // the next pre_llm_request (after compaction + resume). + let registry = Arc::new(HookRegistryBuilder::new().build()); + let buffer = NotificationBuffer::new(); + buffer.push("src".into(), "msg".into()); + + let state = Arc::new(CompactState::new(100, 2)); + state.update_input_tokens(200); + + let interceptor = PodInterceptor::new(registry, Some(state), buffer.clone()); + let mut ctx: Vec = Vec::new(); + let action = interceptor.pre_llm_request(&mut ctx).await; + + assert!(matches!(action, PreRequestAction::Yield)); + assert!(ctx.is_empty()); + assert_eq!(buffer.len(), 1); + } + #[tokio::test] async fn pre_llm_request_short_circuits_on_first_non_continue() { let first_called = Arc::new(AtomicBool::new(false)); @@ -283,7 +341,7 @@ mod tests { builder.add_pre_llm_request(CountingHook(second_count.clone())); let registry = Arc::new(builder.build()); - let interceptor = PodInterceptor::new(registry, None); + let interceptor = PodInterceptor::new(registry, None, NotificationBuffer::new()); let mut ctx: Vec = Vec::new(); let action = interceptor.pre_llm_request(&mut ctx).await; diff --git a/crates/pod/tests/controller_test.rs b/crates/pod/tests/controller_test.rs index 8bffa29e..4ccfbf4b 100644 --- a/crates/pod/tests/controller_test.rs +++ b/crates/pod/tests/controller_test.rs @@ -1,5 +1,5 @@ use std::pin::Pin; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::sync::atomic::{AtomicUsize, Ordering}; use async_trait::async_trait; @@ -19,6 +19,7 @@ use pod::{Event, Method, Pod, PodController, PodManifest, PodStatus}; struct MockClient { responses: Arc>>, call_count: Arc, + captured: Arc>>, } impl MockClient { @@ -26,8 +27,13 @@ impl MockClient { Self { responses: Arc::new(vec![events]), call_count: Arc::new(AtomicUsize::new(0)), + captured: Arc::new(Mutex::new(Vec::new())), } } + + fn captured_requests(&self) -> Vec { + self.captured.lock().unwrap().clone() + } } #[async_trait] @@ -38,9 +44,10 @@ impl LlmClient for MockClient { async fn stream( &self, - _request: Request, + request: Request, ) -> Result> + Send>>, ClientError> { + self.captured.lock().unwrap().push(request); let count = self.call_count.fetch_add(1, Ordering::SeqCst); if count >= self.responses.len() { return Err(ClientError::Api { @@ -329,6 +336,101 @@ async fn cancel_without_run_returns_error() { assert!(saw_not_running, "should see not_running error"); } +#[tokio::test] +async fn notify_while_idle_auto_starts_turn_and_injects_system_message() { + let client = MockClient::new(simple_text_events()); + let client_for_assert = client.clone(); + let pod = make_pod(client).await; + let handle = spawn_controller(pod).await; + let mut rx = handle.subscribe(); + + handle + .send(Method::Notify { + source: "child-a".into(), + message: "turn finished".into(), + }) + .await + .unwrap(); + + // Wait for the auto-started turn to complete. + let mut saw_turn_end = false; + let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(2); + loop { + tokio::select! { + event = rx.recv() => { + match event { + Ok(Event::TurnEnd { .. }) => { saw_turn_end = true; break; } + Err(_) => break, + _ => {} + } + } + _ = tokio::time::sleep_until(deadline) => break, + } + } + assert!(saw_turn_end, "auto-triggered turn should complete"); + // Status flips back to Idle on the controller thread after RunEnd. + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + assert_eq!(handle.shared_state.get_status(), PodStatus::Idle); + + // Exactly one request was made; it must contain the formatted + // notification as the last item (injected into request_context by + // PodInterceptor::pre_llm_request). + let requests = client_for_assert.captured_requests(); + assert_eq!(requests.len(), 1, "one LLM call expected"); + let last_item_text = requests[0] + .items + .last() + .and_then(|i| i.as_text()) + .unwrap_or_default() + .to_string(); + assert!( + last_item_text.contains("[Notification from child-a]"), + "injected system message missing, got: {last_item_text:?}" + ); + assert!(last_item_text.contains("turn finished")); + assert!(last_item_text.contains("not a blocking request")); +} + +#[tokio::test] +async fn notify_while_running_does_not_emit_already_running_error() { + let client = MockClient::new(simple_text_events()); + let pod = make_pod(client).await; + let handle = spawn_controller(pod).await; + let mut rx = handle.subscribe(); + + handle + .send(Method::Run { + input: "start".into(), + }) + .await + .unwrap(); + handle + .send(Method::Notify { + source: "child".into(), + message: "ping".into(), + }) + .await + .unwrap(); + + // Drain events until the run ends; AlreadyRunning must never appear. + let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(2); + loop { + tokio::select! { + event = rx.recv() => { + match event { + Ok(Event::Error { code, .. }) if code == pod::ErrorCode::AlreadyRunning => { + panic!("Notify while running must not produce AlreadyRunning"); + } + Ok(Event::TurnEnd { .. }) => break, + Err(_) => break, + _ => {} + } + } + _ = tokio::time::sleep_until(deadline) => break, + } + } +} + #[tokio::test] async fn status_json_reflects_pod_name() { let client = MockClient::new(simple_text_events()); diff --git a/crates/protocol/src/lib.rs b/crates/protocol/src/lib.rs index 4a24bddb..faa4a92a 100644 --- a/crates/protocol/src/lib.rs +++ b/crates/protocol/src/lib.rs @@ -10,6 +10,7 @@ use serde::{Deserialize, Serialize}; #[serde(tag = "method", content = "params", rename_all = "snake_case")] pub enum Method { Run { input: String }, + Notify { source: String, message: String }, Resume, Cancel, Shutdown, @@ -192,6 +193,19 @@ mod tests { assert_eq!(parsed["data"]["result"], "limit_reached"); } + #[test] + fn method_notify_json_roundtrip() { + let json = r#"{"method":"notify","params":{"source":"child-pod","message":"turn done"}}"#; + let method: Method = serde_json::from_str(json).unwrap(); + assert!(matches!( + method, + Method::Notify { ref source, ref message } + if source == "child-pod" && message == "turn done" + )); + let serialized = serde_json::to_string(&method).unwrap(); + assert_eq!(serialized, json); + } + #[test] fn method_get_history() { let json = r#"{"method":"get_history"}"#; diff --git a/tickets/method-notify.md b/tickets/method-notify.md deleted file mode 100644 index ebb3b437..00000000 --- a/tickets/method-notify.md +++ /dev/null @@ -1,116 +0,0 @@ -# Method::Notify: システム起点のコンテキスト注入と自動ターン開始 - -## 背景 - -現状の Pod の実行サイクルは `Method::Run { input }` が唯一のターン開始手段であり、これは人間(または外部クライアント)が明示的にテキストを送ることを前提としている。 - -Pod オーケストレーション(`tickets/pod-orchestration.md`)では、子 Pod が親 Pod にコールバックで通知を送り、親 Pod が**人間の入力を待たずに**その通知を処理して次のアクションを起こす必要がある。現状のアーキテクチャにはこの経路が無い。 - -また、RUNNING 中の Pod に対しても、リクエストの合間に情報を注入して LLM に認知させたいケースがある(子 Pod からの非同期通知など)。現状は RUNNING 中にコンテキストを追加する手段が無い。 - -## ゴール - -`Method::Notify` を新設し、Pod の状態(IDLE / RUNNING)に応じて適切にコンテキストへの注入とターン制御を行う。 - -## 仕様 - -### `Method::Notify { source: String, message: String }` - -protocol に追加する新しい Method。`Run` とは異なり、ユーザーメッセージではなく**システム通知**としてコンテキストに注入される。 - -### Pod が IDLE のとき - -1. 通知メッセージを system message としてコンテキストに注入 -2. **自動でターンを開始**する(人間の入力を待たない) -3. LLM は通知を見て次のアクションを判断する - -### Pod が RUNNING のとき - -1. 通知メッセージを**次の LLM リクエストの直前**に注入する(tool call の応答を送った後、次のリクエストを組み立てる前) -2. ターンは中断しない。LLM は現在のタスクを続行しつつ、次のレスポンスで通知を認知する -3. LLM は今やっていることを優先し、切りの良いタイミングで通知に対処するかを自分で判断する - -### 注入されるメッセージのフォーマット - -通知は素のテキストではなく、以下の構造で注入される: - -``` -[Notification from {source}] -{message} - -This is a notification, not a blocking request. If you are in the middle of a task, continue your current work and address this at a natural stopping point. -``` - -- `[Notification]` prefix で LLM にこれが通知であることを明示 -- 「ブロッカーではないので直ちに対処しなくてよい」という指示を付与 -- LLM が通知を見て即座にタスクを放棄する(指示追従性の暴走)を防ぐ - -### 複数通知のバッファリング - -- RUNNING 中に複数の `Notify` が到着した場合、バッファに溜めて次の LLM リクエスト直前にまとめて注入する -- 個別の `[Notification]` ブロックとして並べる(1つにマージしない) -- IDLE 中に複数到着した場合、1つの system message にまとめて注入し、ターンを1回だけ開始する - -## 実装に必要な変更 - -### protocol crate - -- `Method::Notify { source: String, message: String }` を `Method` enum に追加 -- 対応する `Event` の追加が必要かは設計時に判断 - -### Worker - -- RUNNING 中に外部からメッセージを注入する仕組みが必要 -- 現状の Worker は turn 実行中にコンテキストの追加手段を持たない -- tool call → tool result → **notification 注入** → 次の LLM リクエスト、というフローを追加 -- 注入ポイントは `execute_tools` 完了後、次の LLM リクエスト組み立て前 - -### Controller - -- `Method::Notify` のハンドリングを追加 -- IDLE 時: 通知を注入 → 内部的に `run()` を開始(`Method::Run` と似た経路だがメッセージ種別が異なる) -- RUNNING 時: Worker の notification buffer に push - -### Pod - -- notification buffer を保持するフィールドを追加 -- `ensure_system_prompt_materialized` 的な「ターン開始前に notification を flush する」フックが要るかもしれない - -## `Method::Run` との対比 - -| | `Method::Run` | `Method::Notify` | -|---|---|---| -| 対象状態 | IDLE のみ(RUNNING 中は AlreadyRunning エラー) | IDLE でも RUNNING でも受け付ける | -| コンテキスト上の見え方 | user message | system message(`[Notification]` prefix 付き) | -| ターン制御 | 新ターンを開始 | IDLE: 自動でターン開始。RUNNING: 現ターンに注入 | -| LLM の期待挙動 | 指示に従って即座に行動 | 現タスク優先、切りの良いタイミングで対処を判断 | -| 送信元 | 人間 / クライアント | システム / 子 Pod のコールバック / Hook | - -## 設計で決めること - -- **notification の注入はどの message type で行うか**: 既存の `Role::User` / `Role::Assistant` とは別の `Role::System` (mid-conversation) を追加するか、`Role::User` に `[Notification]` prefix を付けて実質的に区別するか -- **IDLE 時の自動ターン開始と人間の `Run` の競合**: 通知が到着して自動ターンが始まった直後に人間が `Run` を送った場合の挙動 -- **notification buffer のサイズ上限**: RUNNING が長時間の場合にバッファが無制限に溜まるリスク -- **通知メッセージの prefix / suffix テンプレートの置き場**: ハードコードか、instruction 側でカスタマイズ可能にするか -- **Event の対応**: `Event::NotificationInjected` のような確認イベントを返すか - -## 完了条件 - -- `Method::Notify` が protocol に追加され、Controller が IDLE / RUNNING で適切にハンドリングする -- IDLE 時: 通知が system message として注入され、自動でターンが開始される -- RUNNING 時: 通知が次の LLM リクエスト直前に注入され、LLM が認知できる -- 複数通知のバッファリングが動作する(RUNNING 中に溜まった通知がまとめて注入される) -- `[Notification]` prefix と non-blocking 指示が付与される -- 単体テストで IDLE / RUNNING 両パスが検証される - -## 他チケットとの関係 - -- **tickets/pod-orchestration.md**: 本チケットの主要消費者。子 Pod の `Notify` ツール → 親の callback → 親の `Method::Notify` というフローでオーケストレーションの非同期通知が成立する -- **tickets/protocol-design.md**: protocol への Method 追加。既存の Method 設計パターン(Run / Resume / Cancel / Shutdown)と整合させる -- **tickets/compact-improvements.md**: notification がコンテキストに蓄積した場合の compaction 挙動は別途検討 - -## 範囲外 - -- **通知の routing / addressing**: 誰が誰に通知を送るかはオーケストレーション側の責務。本チケットは「Pod が Notify を受けたときの挙動」だけを扱う -- **通知の優先度 / フィルタリング**: 全通知を等しく注入する。重要度に応じた選別は LLM に任せる -- **通知の永続化**: 通知は session-store に永続化しない。コンテキスト上の message としてのみ存在する