Notificationの実装
This commit is contained in:
parent
b538c2f1ea
commit
74ee96ef82
|
|
@ -30,3 +30,4 @@ d. 完了: `tickets/foo.md` と `tickets/foo.review.md` を両方削除してcom
|
|||
TODO.mdのリンクは完了後に切れるが、そのリンクを元にgitで消されたファイルを読み、内容を把握できる。
|
||||
`.review.md` にはレビューの指摘事項と判断結果を記載する。
|
||||
レビューはdiffの確認だけでなく、チケットはどのような前提・要件であり、それが達成されたかの確認まで含めて行う。
|
||||
常に、提出された実装で良いのか、コードベースを歪めていないか、不必要な実装ではないかを確認すること。
|
||||
|
|
|
|||
|
|
@ -30,3 +30,4 @@ d. 完了: `tickets/foo.md` と `tickets/foo.review.md` を両方削除してcom
|
|||
TODO.mdのリンクは完了後に切れるが、そのリンクを元にgitで消されたファイルを読み、内容を把握できる。
|
||||
`.review.md` にはレビューの指摘事項と判断結果を記載する。
|
||||
レビューはdiffの確認だけでなく、チケットはどのような前提・要件であり、それが達成されたかの確認まで含めて行う。
|
||||
常に、提出された実装で良いのか、コードベースを歪めていないか、不必要な実装ではないかを確認すること。
|
||||
|
|
|
|||
1
TODO.md
1
TODO.md
|
|
@ -4,7 +4,6 @@
|
|||
- [ ] Compact の改善(要約品質 + 挙動詳細) → [tickets/compact-improvements.md](tickets/compact-improvements.md)
|
||||
- [ ] Protocol の設計 → [tickets/protocol-design.md](tickets/protocol-design.md)
|
||||
- [ ] パーミッション: パターンベースのツール実行制御 → [tickets/permission-extension-point.md](tickets/permission-extension-point.md)
|
||||
- [ ] Method::Notify: システム起点のコンテキスト注入 → [tickets/method-notify.md](tickets/method-notify.md)
|
||||
- [ ] Pod オーケストレーション: LLM によるマルチエージェント分業 → [tickets/pod-orchestration.md](tickets/pod-orchestration.md)
|
||||
- [ ] ネイティブ GUI クライアント MVP → [tickets/native-gui-mvp.md](tickets/native-gui-mvp.md)
|
||||
- [ ] TUI 拡充
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ use llm_worker::llm_client::client::LlmClient;
|
|||
use session_store::Store;
|
||||
use tokio::sync::{broadcast, mpsc, oneshot};
|
||||
|
||||
use crate::notification_buffer::NotificationBuffer;
|
||||
use crate::notifier::Notifier;
|
||||
use crate::pod::{Pod, PodError, PodRunResult};
|
||||
use crate::runtime_dir::RuntimeDir;
|
||||
|
|
@ -203,8 +204,12 @@ impl PodController {
|
|||
pod.attach_tracker(tracker);
|
||||
}
|
||||
|
||||
// Clone cancel sender before moving pod
|
||||
// Clone cancel sender and notification buffer before moving pod
|
||||
// into the controller task so the main loop can route
|
||||
// `Method::Notify` into the buffer even while `pod` is held by
|
||||
// an in-flight `run_for_notification` / `run` future.
|
||||
let cancel_tx = pod.worker_mut().cancel_sender();
|
||||
let notification_buffer = pod.notification_buffer_handle();
|
||||
|
||||
tokio::spawn(async move {
|
||||
// Hold socket server alive for the lifetime of the controller task
|
||||
|
|
@ -234,6 +239,54 @@ impl PodController {
|
|||
&event_tx,
|
||||
&cancel_tx,
|
||||
&shared_state,
|
||||
¬ification_buffer,
|
||||
)
|
||||
.await;
|
||||
|
||||
if new_status == PodStatus::Idle {
|
||||
if let Err(e) = pod.try_post_run_compact().await {
|
||||
tracing::warn!(error = %e, "Post-run compaction error");
|
||||
notifier.notify(
|
||||
NotificationLevel::Warn,
|
||||
NotificationSource::Compactor,
|
||||
format!("post-run compaction error: {e}"),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
let items = pod.worker().history().to_vec();
|
||||
shared_state.update_history(items);
|
||||
shared_state.set_status(new_status);
|
||||
let _ = runtime_dir.write_status(&shared_state).await;
|
||||
let _ = runtime_dir.write_history(&shared_state).await;
|
||||
|
||||
if shutdown {
|
||||
let _ = event_tx.send(Event::Shutdown);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Method::Notify { source, message } => {
|
||||
pod.push_notification(source, message);
|
||||
if shared_state.get_status() != PodStatus::Idle {
|
||||
// RUNNING / Paused: the buffer push is the
|
||||
// entire operation; the in-flight turn (or
|
||||
// next Resume) will drain the buffer at its
|
||||
// next pre_llm_request.
|
||||
continue;
|
||||
}
|
||||
// IDLE: auto-start a turn so the LLM sees the
|
||||
// buffered notification(s) without a human Run.
|
||||
shared_state.set_status(PodStatus::Running);
|
||||
let _ = runtime_dir.write_status(&shared_state).await;
|
||||
|
||||
let (new_status, shutdown) = run_with_cancel_support(
|
||||
pod.run_for_notification(),
|
||||
&mut method_rx,
|
||||
&event_tx,
|
||||
&cancel_tx,
|
||||
&shared_state,
|
||||
¬ification_buffer,
|
||||
)
|
||||
.await;
|
||||
|
||||
|
|
@ -277,6 +330,7 @@ impl PodController {
|
|||
&event_tx,
|
||||
&cancel_tx,
|
||||
&shared_state,
|
||||
¬ification_buffer,
|
||||
)
|
||||
.await;
|
||||
|
||||
|
|
@ -337,6 +391,7 @@ async fn run_with_cancel_support<F>(
|
|||
event_tx: &broadcast::Sender<Event>,
|
||||
cancel_tx: &mpsc::Sender<()>,
|
||||
shared_state: &Arc<PodSharedState>,
|
||||
notification_buffer: &NotificationBuffer,
|
||||
) -> (PodStatus, bool)
|
||||
where
|
||||
F: std::future::Future<Output = Result<PodRunResult, PodError>>,
|
||||
|
|
@ -382,6 +437,11 @@ where
|
|||
message: "Pod is already executing a turn".into(),
|
||||
});
|
||||
}
|
||||
Some(Method::Notify { source, message }) => {
|
||||
// Route into the buffer; the in-flight turn will
|
||||
// drain it at its next pre_llm_request.
|
||||
notification_buffer.push(source, message);
|
||||
}
|
||||
Some(Method::GetHistory) => {}
|
||||
None => {
|
||||
let _ = cancel_tx.try_send(());
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ pub mod socket_server;
|
|||
mod agents_md;
|
||||
mod compact_state;
|
||||
mod factory;
|
||||
mod notification_buffer;
|
||||
mod pod;
|
||||
mod pod_interceptor;
|
||||
mod prompt_loader;
|
||||
|
|
|
|||
127
crates/pod/src/notification_buffer.rs
Normal file
127
crates/pod/src/notification_buffer.rs
Normal file
|
|
@ -0,0 +1,127 @@
|
|||
//! Pending-notification buffer for `Method::Notify`.
|
||||
//!
|
||||
//! Notifications are queued here by the Controller and drained by
|
||||
//! `PodInterceptor::pre_llm_request` into the per-request context
|
||||
//! (never into the Worker's persistent history). Each queued entry
|
||||
//! becomes one `Item::system_message` in the outgoing request.
|
||||
|
||||
use std::collections::VecDeque;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use llm_worker::Item;
|
||||
use tracing::warn;
|
||||
|
||||
/// Maximum queued notifications. Oldest entries are dropped beyond this.
|
||||
const CAPACITY: usize = 128;
|
||||
|
||||
/// One pending notification awaiting injection into the next LLM request.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct PendingNotification {
|
||||
pub source: String,
|
||||
pub message: String,
|
||||
}
|
||||
|
||||
/// Shared, mutex-guarded buffer of pending notifications.
|
||||
///
|
||||
/// Cloned between the Pod (producer) and PodInterceptor (consumer).
|
||||
#[derive(Clone, Default)]
|
||||
pub struct NotificationBuffer {
|
||||
inner: Arc<Mutex<VecDeque<PendingNotification>>>,
|
||||
}
|
||||
|
||||
impl NotificationBuffer {
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
/// Push a notification onto the queue. If the queue is full, the
|
||||
/// oldest entry is dropped and a `tracing::warn` is emitted — the
|
||||
/// caller should never hit this in normal operation.
|
||||
pub fn push(&self, source: String, message: String) {
|
||||
let mut q = self.inner.lock().expect("notification buffer poisoned");
|
||||
if q.len() >= CAPACITY {
|
||||
let dropped = q.pop_front();
|
||||
warn!(
|
||||
capacity = CAPACITY,
|
||||
dropped_source = dropped.as_ref().map(|n| n.source.as_str()),
|
||||
"notification buffer overflow; dropped oldest"
|
||||
);
|
||||
}
|
||||
q.push_back(PendingNotification { source, message });
|
||||
}
|
||||
|
||||
/// Remove and return all pending notifications in FIFO order.
|
||||
pub fn drain(&self) -> Vec<PendingNotification> {
|
||||
let mut q = self.inner.lock().expect("notification buffer poisoned");
|
||||
q.drain(..).collect()
|
||||
}
|
||||
|
||||
/// Number of pending notifications. Primarily for tests.
|
||||
pub fn len(&self) -> usize {
|
||||
self.inner
|
||||
.lock()
|
||||
.expect("notification buffer poisoned")
|
||||
.len()
|
||||
}
|
||||
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.len() == 0
|
||||
}
|
||||
}
|
||||
|
||||
/// Format a single pending notification into the `Item::system_message`
|
||||
/// that gets injected into the per-request context.
|
||||
pub(crate) fn format_notification(n: &PendingNotification) -> Item {
|
||||
let text = format!(
|
||||
"[Notification from {source}]\n{message}\n\n\
|
||||
This is a notification, not a blocking request. \
|
||||
If you are in the middle of a task, continue your current work \
|
||||
and address this at a natural stopping point.",
|
||||
source = n.source,
|
||||
message = n.message,
|
||||
);
|
||||
Item::system_message(text)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn push_then_drain_preserves_order() {
|
||||
let buf = NotificationBuffer::new();
|
||||
buf.push("a".into(), "one".into());
|
||||
buf.push("b".into(), "two".into());
|
||||
let drained = buf.drain();
|
||||
assert_eq!(drained.len(), 2);
|
||||
assert_eq!(drained[0].source, "a");
|
||||
assert_eq!(drained[1].source, "b");
|
||||
assert!(buf.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn capacity_drops_oldest() {
|
||||
let buf = NotificationBuffer::new();
|
||||
for i in 0..(CAPACITY + 5) {
|
||||
buf.push(format!("src{i}"), format!("msg{i}"));
|
||||
}
|
||||
let drained = buf.drain();
|
||||
assert_eq!(drained.len(), CAPACITY);
|
||||
// Oldest 5 were dropped; first retained is src5.
|
||||
assert_eq!(drained[0].source, "src5");
|
||||
assert_eq!(drained[CAPACITY - 1].source, format!("src{}", CAPACITY + 4));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn format_notification_includes_source_message_and_nonblocking_hint() {
|
||||
let n = PendingNotification {
|
||||
source: "child".into(),
|
||||
message: "hello".into(),
|
||||
};
|
||||
let item = format_notification(&n);
|
||||
let text = item.as_text().unwrap_or_default().to_string();
|
||||
assert!(text.contains("[Notification from child]"));
|
||||
assert!(text.contains("hello"));
|
||||
assert!(text.contains("not a blocking request"));
|
||||
}
|
||||
}
|
||||
|
|
@ -19,6 +19,7 @@ use crate::hook::{
|
|||
Hook, HookRegistryBuilder, OnAbort, OnPromptSubmit, OnTurnEnd, PostToolCall, PreLlmRequest,
|
||||
PreRequestInfo, PreToolCall,
|
||||
};
|
||||
use crate::notification_buffer::NotificationBuffer;
|
||||
use crate::notifier::Notifier;
|
||||
use crate::pod_interceptor::PodInterceptor;
|
||||
use crate::prompt_loader::PromptLoader;
|
||||
|
|
@ -100,6 +101,10 @@ pub struct Pod<C: LlmClient, St: Store> {
|
|||
/// User-facing notification sink attached by the Controller at
|
||||
/// spawn time. `None` in tests / direct `Pod::new` usage.
|
||||
notifier: Option<Notifier>,
|
||||
/// Queue of pending `Method::Notify` notifications awaiting
|
||||
/// injection into the next LLM request. Shared with the
|
||||
/// PodInterceptor installed in `ensure_interceptor_installed`.
|
||||
pending_notifications: NotificationBuffer,
|
||||
}
|
||||
|
||||
impl<C: LlmClient, St: Store> Pod<C, St> {
|
||||
|
|
@ -140,6 +145,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
tracker: None,
|
||||
system_prompt_template: None,
|
||||
notifier: None,
|
||||
pending_notifications: NotificationBuffer::new(),
|
||||
};
|
||||
pod.apply_prune_from_manifest();
|
||||
Ok(pod)
|
||||
|
|
@ -188,6 +194,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
tracker: None,
|
||||
system_prompt_template: None,
|
||||
notifier: None,
|
||||
pending_notifications: NotificationBuffer::new(),
|
||||
};
|
||||
pod.apply_prune_from_manifest();
|
||||
Ok(pod)
|
||||
|
|
@ -293,6 +300,23 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
}
|
||||
}
|
||||
|
||||
/// Push a `Method::Notify` entry onto the pending buffer.
|
||||
///
|
||||
/// The notification will be injected as an `Item::system_message`
|
||||
/// into the next outgoing LLM request context (not into history).
|
||||
/// See [`NotificationBuffer`] for overflow behaviour.
|
||||
pub fn push_notification(&self, source: String, message: String) {
|
||||
self.pending_notifications.push(source, message);
|
||||
}
|
||||
|
||||
/// Shared handle to the pending notification buffer.
|
||||
///
|
||||
/// The Controller holds a clone so that `Method::Notify` arriving
|
||||
/// while `pod.run()` is in flight can still reach the interceptor.
|
||||
pub fn notification_buffer_handle(&self) -> NotificationBuffer {
|
||||
self.pending_notifications.clone()
|
||||
}
|
||||
|
||||
// --- Hook registration ---
|
||||
|
||||
fn assert_hooks_open(&self) {
|
||||
|
|
@ -388,7 +412,8 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
None
|
||||
};
|
||||
|
||||
let interceptor = PodInterceptor::new(registry, compact_state);
|
||||
let interceptor =
|
||||
PodInterceptor::new(registry, compact_state, self.pending_notifications.clone());
|
||||
self.worker_mut().set_interceptor(interceptor);
|
||||
self.interceptor_installed = true;
|
||||
}
|
||||
|
|
@ -462,6 +487,29 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
self.handle_worker_result(result, history_before).await
|
||||
}
|
||||
|
||||
/// Run a turn triggered by `Method::Notify` while the Pod is idle.
|
||||
///
|
||||
/// Unlike [`run`](Self::run), no user message is appended to
|
||||
/// history. The `PodInterceptor::pre_llm_request` drains the
|
||||
/// pending-notification buffer and injects each entry as an
|
||||
/// `Item::system_message` into the per-request context, then the
|
||||
/// Worker's resume path issues the LLM request without a new
|
||||
/// user turn.
|
||||
pub async fn run_for_notification(&mut self) -> Result<PodRunResult, PodError> {
|
||||
self.ensure_interceptor_installed();
|
||||
self.ensure_system_prompt_materialized()?;
|
||||
self.ensure_session_head().await?;
|
||||
|
||||
let history_before = self.worker.as_ref().unwrap().history().len();
|
||||
|
||||
let worker = self.worker.take().expect("worker taken during run");
|
||||
let mut locked = worker.lock();
|
||||
let result = locked.resume().await;
|
||||
self.worker = Some(locked.unlock());
|
||||
|
||||
self.handle_worker_result(result, history_before).await
|
||||
}
|
||||
|
||||
/// Resume from a paused state.
|
||||
pub async fn resume(&mut self) -> Result<PodRunResult, PodError> {
|
||||
self.ensure_interceptor_installed();
|
||||
|
|
@ -861,6 +909,7 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
|
|||
tracker: None,
|
||||
system_prompt_template,
|
||||
notifier: None,
|
||||
pending_notifications: NotificationBuffer::new(),
|
||||
};
|
||||
pod.apply_prune_from_manifest();
|
||||
Ok(pod)
|
||||
|
|
|
|||
|
|
@ -24,6 +24,7 @@ use crate::hook::{
|
|||
AbortInfo, HookRegistry, PreRequestInfo, PromptSubmitInfo, ToolCallSummary, ToolResultSummary,
|
||||
TurnEndInfo,
|
||||
};
|
||||
use crate::notification_buffer::{NotificationBuffer, format_notification};
|
||||
|
||||
/// Maximum number of bytes copied into `TurnEndInfo::final_text_preview`.
|
||||
const FINAL_TEXT_PREVIEW_LIMIT: usize = 512;
|
||||
|
|
@ -31,6 +32,9 @@ const FINAL_TEXT_PREVIEW_LIMIT: usize = 512;
|
|||
pub(crate) struct PodInterceptor {
|
||||
registry: Arc<HookRegistry>,
|
||||
compact_state: Option<Arc<CompactState>>,
|
||||
/// Pending-notification buffer drained into the per-request
|
||||
/// context at the head of `pre_llm_request`.
|
||||
pending_notifications: NotificationBuffer,
|
||||
/// Next turn index assigned by `on_prompt_submit`.
|
||||
next_turn_index: AtomicUsize,
|
||||
/// Tool calls observed in the current turn (reset on each new prompt).
|
||||
|
|
@ -41,10 +45,12 @@ impl PodInterceptor {
|
|||
pub(crate) fn new(
|
||||
registry: Arc<HookRegistry>,
|
||||
compact_state: Option<Arc<CompactState>>,
|
||||
pending_notifications: NotificationBuffer,
|
||||
) -> Self {
|
||||
Self {
|
||||
registry,
|
||||
compact_state,
|
||||
pending_notifications,
|
||||
next_turn_index: AtomicUsize::new(0),
|
||||
tool_calls_this_turn: AtomicUsize::new(0),
|
||||
}
|
||||
|
|
@ -89,6 +95,14 @@ impl Interceptor for PodInterceptor {
|
|||
}
|
||||
}
|
||||
|
||||
// Internal mechanism: drain pending `Method::Notify` notifications
|
||||
// into the per-request context as transient system messages.
|
||||
// These are not persisted to the Worker history; they exist only
|
||||
// for this single LLM request.
|
||||
for notification in self.pending_notifications.drain() {
|
||||
context.push(format_notification(¬ification));
|
||||
}
|
||||
|
||||
let info = PreRequestInfo {
|
||||
item_count: context.len(),
|
||||
estimated_tokens: self.compact_state.as_ref().map(|s| s.last_input_tokens()),
|
||||
|
|
@ -226,7 +240,7 @@ mod tests {
|
|||
let state = Arc::new(CompactState::new(100, 2));
|
||||
state.update_input_tokens(200); // exceeds turn threshold
|
||||
|
||||
let interceptor = PodInterceptor::new(registry, Some(state));
|
||||
let interceptor = PodInterceptor::new(registry, Some(state), NotificationBuffer::new());
|
||||
let mut ctx: Vec<Item> = vec![Item::user_message("hi")];
|
||||
let action = interceptor.pre_llm_request(&mut ctx).await;
|
||||
|
||||
|
|
@ -243,7 +257,7 @@ mod tests {
|
|||
let state = Arc::new(CompactState::new(100, 2));
|
||||
// last_input_tokens stays at 0, well below threshold.
|
||||
|
||||
let interceptor = PodInterceptor::new(registry, Some(state));
|
||||
let interceptor = PodInterceptor::new(registry, Some(state), NotificationBuffer::new());
|
||||
let mut ctx: Vec<Item> = vec![Item::user_message("hi")];
|
||||
let action = interceptor.pre_llm_request(&mut ctx).await;
|
||||
|
||||
|
|
@ -256,7 +270,7 @@ mod tests {
|
|||
let count = Arc::new(AtomicUsize::new(0));
|
||||
let registry = registry_with_pre_llm_hook(count.clone());
|
||||
|
||||
let interceptor = PodInterceptor::new(registry, None);
|
||||
let interceptor = PodInterceptor::new(registry, None, NotificationBuffer::new());
|
||||
let mut ctx: Vec<Item> = Vec::new();
|
||||
let action = interceptor.pre_llm_request(&mut ctx).await;
|
||||
|
||||
|
|
@ -274,6 +288,50 @@ mod tests {
|
|||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn pre_llm_request_drains_pending_notifications_into_context() {
|
||||
let registry = Arc::new(HookRegistryBuilder::new().build());
|
||||
let buffer = NotificationBuffer::new();
|
||||
buffer.push("child-a".into(), "first".into());
|
||||
buffer.push("child-b".into(), "second".into());
|
||||
|
||||
let interceptor = PodInterceptor::new(registry, None, buffer.clone());
|
||||
let mut ctx: Vec<Item> = vec![Item::user_message("hi")];
|
||||
let action = interceptor.pre_llm_request(&mut ctx).await;
|
||||
|
||||
assert!(matches!(action, PreRequestAction::Continue));
|
||||
// Original user message preserved, two notifications appended in order.
|
||||
assert_eq!(ctx.len(), 3);
|
||||
let second = ctx[1].as_text().unwrap_or_default();
|
||||
let third = ctx[2].as_text().unwrap_or_default();
|
||||
assert!(second.contains("[Notification from child-a]"));
|
||||
assert!(second.contains("first"));
|
||||
assert!(third.contains("[Notification from child-b]"));
|
||||
assert!(third.contains("second"));
|
||||
// Buffer is drained after a single pre_llm_request call.
|
||||
assert!(buffer.is_empty());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn pre_llm_request_skips_notification_injection_when_yielding() {
|
||||
// When compaction yields, notifications remain in the buffer for
|
||||
// the next pre_llm_request (after compaction + resume).
|
||||
let registry = Arc::new(HookRegistryBuilder::new().build());
|
||||
let buffer = NotificationBuffer::new();
|
||||
buffer.push("src".into(), "msg".into());
|
||||
|
||||
let state = Arc::new(CompactState::new(100, 2));
|
||||
state.update_input_tokens(200);
|
||||
|
||||
let interceptor = PodInterceptor::new(registry, Some(state), buffer.clone());
|
||||
let mut ctx: Vec<Item> = Vec::new();
|
||||
let action = interceptor.pre_llm_request(&mut ctx).await;
|
||||
|
||||
assert!(matches!(action, PreRequestAction::Yield));
|
||||
assert!(ctx.is_empty());
|
||||
assert_eq!(buffer.len(), 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn pre_llm_request_short_circuits_on_first_non_continue() {
|
||||
let first_called = Arc::new(AtomicBool::new(false));
|
||||
|
|
@ -283,7 +341,7 @@ mod tests {
|
|||
builder.add_pre_llm_request(CountingHook(second_count.clone()));
|
||||
let registry = Arc::new(builder.build());
|
||||
|
||||
let interceptor = PodInterceptor::new(registry, None);
|
||||
let interceptor = PodInterceptor::new(registry, None, NotificationBuffer::new());
|
||||
let mut ctx: Vec<Item> = Vec::new();
|
||||
let action = interceptor.pre_llm_request(&mut ctx).await;
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
|
||||
use async_trait::async_trait;
|
||||
|
|
@ -19,6 +19,7 @@ use pod::{Event, Method, Pod, PodController, PodManifest, PodStatus};
|
|||
struct MockClient {
|
||||
responses: Arc<Vec<Vec<LlmEvent>>>,
|
||||
call_count: Arc<AtomicUsize>,
|
||||
captured: Arc<Mutex<Vec<Request>>>,
|
||||
}
|
||||
|
||||
impl MockClient {
|
||||
|
|
@ -26,8 +27,13 @@ impl MockClient {
|
|||
Self {
|
||||
responses: Arc::new(vec![events]),
|
||||
call_count: Arc::new(AtomicUsize::new(0)),
|
||||
captured: Arc::new(Mutex::new(Vec::new())),
|
||||
}
|
||||
}
|
||||
|
||||
fn captured_requests(&self) -> Vec<Request> {
|
||||
self.captured.lock().unwrap().clone()
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
|
|
@ -38,9 +44,10 @@ impl LlmClient for MockClient {
|
|||
|
||||
async fn stream(
|
||||
&self,
|
||||
_request: Request,
|
||||
request: Request,
|
||||
) -> Result<Pin<Box<dyn Stream<Item = Result<LlmEvent, ClientError>> + Send>>, ClientError>
|
||||
{
|
||||
self.captured.lock().unwrap().push(request);
|
||||
let count = self.call_count.fetch_add(1, Ordering::SeqCst);
|
||||
if count >= self.responses.len() {
|
||||
return Err(ClientError::Api {
|
||||
|
|
@ -329,6 +336,101 @@ async fn cancel_without_run_returns_error() {
|
|||
assert!(saw_not_running, "should see not_running error");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn notify_while_idle_auto_starts_turn_and_injects_system_message() {
|
||||
let client = MockClient::new(simple_text_events());
|
||||
let client_for_assert = client.clone();
|
||||
let pod = make_pod(client).await;
|
||||
let handle = spawn_controller(pod).await;
|
||||
let mut rx = handle.subscribe();
|
||||
|
||||
handle
|
||||
.send(Method::Notify {
|
||||
source: "child-a".into(),
|
||||
message: "turn finished".into(),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Wait for the auto-started turn to complete.
|
||||
let mut saw_turn_end = false;
|
||||
let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(2);
|
||||
loop {
|
||||
tokio::select! {
|
||||
event = rx.recv() => {
|
||||
match event {
|
||||
Ok(Event::TurnEnd { .. }) => { saw_turn_end = true; break; }
|
||||
Err(_) => break,
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
_ = tokio::time::sleep_until(deadline) => break,
|
||||
}
|
||||
}
|
||||
assert!(saw_turn_end, "auto-triggered turn should complete");
|
||||
// Status flips back to Idle on the controller thread after RunEnd.
|
||||
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
|
||||
assert_eq!(handle.shared_state.get_status(), PodStatus::Idle);
|
||||
|
||||
// Exactly one request was made; it must contain the formatted
|
||||
// notification as the last item (injected into request_context by
|
||||
// PodInterceptor::pre_llm_request).
|
||||
let requests = client_for_assert.captured_requests();
|
||||
assert_eq!(requests.len(), 1, "one LLM call expected");
|
||||
let last_item_text = requests[0]
|
||||
.items
|
||||
.last()
|
||||
.and_then(|i| i.as_text())
|
||||
.unwrap_or_default()
|
||||
.to_string();
|
||||
assert!(
|
||||
last_item_text.contains("[Notification from child-a]"),
|
||||
"injected system message missing, got: {last_item_text:?}"
|
||||
);
|
||||
assert!(last_item_text.contains("turn finished"));
|
||||
assert!(last_item_text.contains("not a blocking request"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn notify_while_running_does_not_emit_already_running_error() {
|
||||
let client = MockClient::new(simple_text_events());
|
||||
let pod = make_pod(client).await;
|
||||
let handle = spawn_controller(pod).await;
|
||||
let mut rx = handle.subscribe();
|
||||
|
||||
handle
|
||||
.send(Method::Run {
|
||||
input: "start".into(),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
handle
|
||||
.send(Method::Notify {
|
||||
source: "child".into(),
|
||||
message: "ping".into(),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Drain events until the run ends; AlreadyRunning must never appear.
|
||||
let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(2);
|
||||
loop {
|
||||
tokio::select! {
|
||||
event = rx.recv() => {
|
||||
match event {
|
||||
Ok(Event::Error { code, .. }) if code == pod::ErrorCode::AlreadyRunning => {
|
||||
panic!("Notify while running must not produce AlreadyRunning");
|
||||
}
|
||||
Ok(Event::TurnEnd { .. }) => break,
|
||||
Err(_) => break,
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
_ = tokio::time::sleep_until(deadline) => break,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn status_json_reflects_pod_name() {
|
||||
let client = MockClient::new(simple_text_events());
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ use serde::{Deserialize, Serialize};
|
|||
#[serde(tag = "method", content = "params", rename_all = "snake_case")]
|
||||
pub enum Method {
|
||||
Run { input: String },
|
||||
Notify { source: String, message: String },
|
||||
Resume,
|
||||
Cancel,
|
||||
Shutdown,
|
||||
|
|
@ -192,6 +193,19 @@ mod tests {
|
|||
assert_eq!(parsed["data"]["result"], "limit_reached");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn method_notify_json_roundtrip() {
|
||||
let json = r#"{"method":"notify","params":{"source":"child-pod","message":"turn done"}}"#;
|
||||
let method: Method = serde_json::from_str(json).unwrap();
|
||||
assert!(matches!(
|
||||
method,
|
||||
Method::Notify { ref source, ref message }
|
||||
if source == "child-pod" && message == "turn done"
|
||||
));
|
||||
let serialized = serde_json::to_string(&method).unwrap();
|
||||
assert_eq!(serialized, json);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn method_get_history() {
|
||||
let json = r#"{"method":"get_history"}"#;
|
||||
|
|
|
|||
|
|
@ -1,116 +0,0 @@
|
|||
# Method::Notify: システム起点のコンテキスト注入と自動ターン開始
|
||||
|
||||
## 背景
|
||||
|
||||
現状の Pod の実行サイクルは `Method::Run { input }` が唯一のターン開始手段であり、これは人間(または外部クライアント)が明示的にテキストを送ることを前提としている。
|
||||
|
||||
Pod オーケストレーション(`tickets/pod-orchestration.md`)では、子 Pod が親 Pod にコールバックで通知を送り、親 Pod が**人間の入力を待たずに**その通知を処理して次のアクションを起こす必要がある。現状のアーキテクチャにはこの経路が無い。
|
||||
|
||||
また、RUNNING 中の Pod に対しても、リクエストの合間に情報を注入して LLM に認知させたいケースがある(子 Pod からの非同期通知など)。現状は RUNNING 中にコンテキストを追加する手段が無い。
|
||||
|
||||
## ゴール
|
||||
|
||||
`Method::Notify` を新設し、Pod の状態(IDLE / RUNNING)に応じて適切にコンテキストへの注入とターン制御を行う。
|
||||
|
||||
## 仕様
|
||||
|
||||
### `Method::Notify { source: String, message: String }`
|
||||
|
||||
protocol に追加する新しい Method。`Run` とは異なり、ユーザーメッセージではなく**システム通知**としてコンテキストに注入される。
|
||||
|
||||
### Pod が IDLE のとき
|
||||
|
||||
1. 通知メッセージを system message としてコンテキストに注入
|
||||
2. **自動でターンを開始**する(人間の入力を待たない)
|
||||
3. LLM は通知を見て次のアクションを判断する
|
||||
|
||||
### Pod が RUNNING のとき
|
||||
|
||||
1. 通知メッセージを**次の LLM リクエストの直前**に注入する(tool call の応答を送った後、次のリクエストを組み立てる前)
|
||||
2. ターンは中断しない。LLM は現在のタスクを続行しつつ、次のレスポンスで通知を認知する
|
||||
3. LLM は今やっていることを優先し、切りの良いタイミングで通知に対処するかを自分で判断する
|
||||
|
||||
### 注入されるメッセージのフォーマット
|
||||
|
||||
通知は素のテキストではなく、以下の構造で注入される:
|
||||
|
||||
```
|
||||
[Notification from {source}]
|
||||
{message}
|
||||
|
||||
This is a notification, not a blocking request. If you are in the middle of a task, continue your current work and address this at a natural stopping point.
|
||||
```
|
||||
|
||||
- `[Notification]` prefix で LLM にこれが通知であることを明示
|
||||
- 「ブロッカーではないので直ちに対処しなくてよい」という指示を付与
|
||||
- LLM が通知を見て即座にタスクを放棄する(指示追従性の暴走)を防ぐ
|
||||
|
||||
### 複数通知のバッファリング
|
||||
|
||||
- RUNNING 中に複数の `Notify` が到着した場合、バッファに溜めて次の LLM リクエスト直前にまとめて注入する
|
||||
- 個別の `[Notification]` ブロックとして並べる(1つにマージしない)
|
||||
- IDLE 中に複数到着した場合、1つの system message にまとめて注入し、ターンを1回だけ開始する
|
||||
|
||||
## 実装に必要な変更
|
||||
|
||||
### protocol crate
|
||||
|
||||
- `Method::Notify { source: String, message: String }` を `Method` enum に追加
|
||||
- 対応する `Event` の追加が必要かは設計時に判断
|
||||
|
||||
### Worker
|
||||
|
||||
- RUNNING 中に外部からメッセージを注入する仕組みが必要
|
||||
- 現状の Worker は turn 実行中にコンテキストの追加手段を持たない
|
||||
- tool call → tool result → **notification 注入** → 次の LLM リクエスト、というフローを追加
|
||||
- 注入ポイントは `execute_tools` 完了後、次の LLM リクエスト組み立て前
|
||||
|
||||
### Controller
|
||||
|
||||
- `Method::Notify` のハンドリングを追加
|
||||
- IDLE 時: 通知を注入 → 内部的に `run()` を開始(`Method::Run` と似た経路だがメッセージ種別が異なる)
|
||||
- RUNNING 時: Worker の notification buffer に push
|
||||
|
||||
### Pod
|
||||
|
||||
- notification buffer を保持するフィールドを追加
|
||||
- `ensure_system_prompt_materialized` 的な「ターン開始前に notification を flush する」フックが要るかもしれない
|
||||
|
||||
## `Method::Run` との対比
|
||||
|
||||
| | `Method::Run` | `Method::Notify` |
|
||||
|---|---|---|
|
||||
| 対象状態 | IDLE のみ(RUNNING 中は AlreadyRunning エラー) | IDLE でも RUNNING でも受け付ける |
|
||||
| コンテキスト上の見え方 | user message | system message(`[Notification]` prefix 付き) |
|
||||
| ターン制御 | 新ターンを開始 | IDLE: 自動でターン開始。RUNNING: 現ターンに注入 |
|
||||
| LLM の期待挙動 | 指示に従って即座に行動 | 現タスク優先、切りの良いタイミングで対処を判断 |
|
||||
| 送信元 | 人間 / クライアント | システム / 子 Pod のコールバック / Hook |
|
||||
|
||||
## 設計で決めること
|
||||
|
||||
- **notification の注入はどの message type で行うか**: 既存の `Role::User` / `Role::Assistant` とは別の `Role::System` (mid-conversation) を追加するか、`Role::User` に `[Notification]` prefix を付けて実質的に区別するか
|
||||
- **IDLE 時の自動ターン開始と人間の `Run` の競合**: 通知が到着して自動ターンが始まった直後に人間が `Run` を送った場合の挙動
|
||||
- **notification buffer のサイズ上限**: RUNNING が長時間の場合にバッファが無制限に溜まるリスク
|
||||
- **通知メッセージの prefix / suffix テンプレートの置き場**: ハードコードか、instruction 側でカスタマイズ可能にするか
|
||||
- **Event の対応**: `Event::NotificationInjected` のような確認イベントを返すか
|
||||
|
||||
## 完了条件
|
||||
|
||||
- `Method::Notify` が protocol に追加され、Controller が IDLE / RUNNING で適切にハンドリングする
|
||||
- IDLE 時: 通知が system message として注入され、自動でターンが開始される
|
||||
- RUNNING 時: 通知が次の LLM リクエスト直前に注入され、LLM が認知できる
|
||||
- 複数通知のバッファリングが動作する(RUNNING 中に溜まった通知がまとめて注入される)
|
||||
- `[Notification]` prefix と non-blocking 指示が付与される
|
||||
- 単体テストで IDLE / RUNNING 両パスが検証される
|
||||
|
||||
## 他チケットとの関係
|
||||
|
||||
- **tickets/pod-orchestration.md**: 本チケットの主要消費者。子 Pod の `Notify` ツール → 親の callback → 親の `Method::Notify` というフローでオーケストレーションの非同期通知が成立する
|
||||
- **tickets/protocol-design.md**: protocol への Method 追加。既存の Method 設計パターン(Run / Resume / Cancel / Shutdown)と整合させる
|
||||
- **tickets/compact-improvements.md**: notification がコンテキストに蓄積した場合の compaction 挙動は別途検討
|
||||
|
||||
## 範囲外
|
||||
|
||||
- **通知の routing / addressing**: 誰が誰に通知を送るかはオーケストレーション側の責務。本チケットは「Pod が Notify を受けたときの挙動」だけを扱う
|
||||
- **通知の優先度 / フィルタリング**: 全通知を等しく注入する。重要度に応じた選別は LLM に任せる
|
||||
- **通知の永続化**: 通知は session-store に永続化しない。コンテキスト上の message としてのみ存在する
|
||||
Loading…
Reference in New Issue
Block a user