From e8045776f2f5c9530034601d7b2f5137f5856ee6 Mon Sep 17 00:00:00 2001 From: Hare Date: Sun, 3 May 2026 19:27:22 +0900 Subject: [PATCH] =?UTF-8?q?feat:=20notify-history-persist=E5=AE=9F?= =?UTF-8?q?=E8=A3=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- TODO.md | 2 +- crates/llm-worker/src/interceptor.rs | 31 +++++++- crates/llm-worker/src/worker.rs | 10 +++ crates/pod/src/ipc/interceptor.rs | 104 ++++++++++++++------------- crates/pod/src/ipc/notify_buffer.rs | 30 +++++--- crates/pod/src/pod.rs | 10 +-- crates/pod/tests/controller_test.rs | 71 ++++++++++++------ tickets/session-todo.md | 18 ++--- 8 files changed, 183 insertions(+), 93 deletions(-) diff --git a/TODO.md b/TODO.md index 110beb42..46847bc0 100644 --- a/TODO.md +++ b/TODO.md @@ -16,4 +16,4 @@ - 使用頻度メトリクス + Knowledge 化候補レポート → [tickets/memory-usage-metrics.md](tickets/memory-usage-metrics.md) - セッション内 TODO ツール(注意機構付き) → [tickets/session-todo.md](tickets/session-todo.md) - ワークスペースのメモリーをLintするヘッドレスCLI -- system-reminder 注入機構の汎用化(2件目の利用者が出た時に検討。タグ形式と「履歴を汚さない」原則は session-todo で先行確立) +- system-reminder 注入機構の汎用化(2件目の利用者が出た時に検討。タグ形式 `...` の規約は session-todo-reminder で先行確立。注入された Item は worker.history に append する方針) diff --git a/crates/llm-worker/src/interceptor.rs b/crates/llm-worker/src/interceptor.rs index 4795d798..9ab8323a 100644 --- a/crates/llm-worker/src/interceptor.rs +++ b/crates/llm-worker/src/interceptor.rs @@ -120,8 +120,35 @@ pub trait Interceptor: Send + Sync { PromptAction::Continue } - /// Called before each LLM request. The context can be modified - /// (e.g. for context compaction). + /// Items that should be **committed to `worker.history`** just + /// before the next LLM request. Returned items are `extend`ed into + /// the persistent history (and therefore picked up by the per-turn + /// clone that backs the LLM request, plus the usual + /// history-persistence path). + /// + /// Use this for inputs that arrive from outside the LLM and need + /// to be reflected in the on-disk history — notifications, + /// cross-Pod events, system reminders. Do **not** use + /// [`Self::pre_llm_request`] for that purpose: it mutates a + /// per-request clone, so any committed assistant response that + /// reacts to the injection would have no visible trigger on the + /// next turn (or after resume / compaction). + /// + /// `pre_llm_request` remains the right place for purely + /// reproducible per-request transformations (pruning, content + /// trimming, cache anchors) that depend only on the existing + /// history. + async fn pending_history_appends(&self) -> Vec { + Vec::new() + } + + /// Called before each LLM request. The context starts as a clone + /// of `worker.history` (after `pending_history_appends` and the + /// Worker's own prune projection have been applied) and can be + /// further modified for that single request only — mutations here + /// are **not** persisted back to history. Use + /// [`Self::pending_history_appends`] for inputs that need to land + /// in history. async fn pre_llm_request(&self, _context: &mut Vec) -> PreRequestAction { PreRequestAction::Continue } diff --git a/crates/llm-worker/src/worker.rs b/crates/llm-worker/src/worker.rs index fd305e42..6337742f 100644 --- a/crates/llm-worker/src/worker.rs +++ b/crates/llm-worker/src/worker.rs @@ -856,6 +856,16 @@ impl Worker { cb(current_turn); } + // Drain interceptor-side inputs that are meant to land in + // history (notifications, cross-Pod events, system + // reminders). These are committed *before* the per-request + // clone so they participate in the LLM request below and + // get persisted by the upper layer that owns history.json. + let pending = self.interceptor.pending_history_appends().await; + if !pending.is_empty() { + self.history.extend(pending); + } + // Clone the history into a per-request context. Everything // below (prune projection, interceptor hooks) mutates only // this clone, so the persistent `self.history` stays intact. diff --git a/crates/pod/src/ipc/interceptor.rs b/crates/pod/src/ipc/interceptor.rs index f7b9d9de..7594753d 100644 --- a/crates/pod/src/ipc/interceptor.rs +++ b/crates/pod/src/ipc/interceptor.rs @@ -17,6 +17,7 @@ use llm_worker::interceptor::{ Interceptor, PostToolAction, PreRequestAction, PreToolAction, PromptAction, ToolCallInfo, ToolResultInfo, TurnEndAction, }; +use tracing::warn; use llm_worker::tool::ToolOutput; use tracing::info; @@ -28,7 +29,6 @@ use crate::hook::{ use crate::ipc::notify_buffer::{NotifyBuffer, format_notify}; use crate::prompt::catalog::PromptCatalog; use llm_worker::token_counter::total_tokens; -use tracing::warn; /// Maximum number of bytes copied into `TurnEndInfo::final_text_preview`. const FINAL_TEXT_PREVIEW_LIMIT: usize = 512; @@ -40,8 +40,10 @@ pub(crate) struct PodInterceptor { /// per-request `context` to estimate current occupancy for threshold /// checks. `None` when compaction is disabled (both thresholds unset). usage_history: Option>>>, - /// Pending-notification buffer drained into the per-request - /// context at the head of `pre_llm_request`. + /// Pending-notification buffer drained into `worker.history` + /// via [`Self::pending_history_appends`] just before the next LLM + /// request. The Worker `extend`s these into its persistent history + /// so the LLM has a visible trigger for any reaction it commits. pending_notifies: NotifyBuffer, /// Submit-scoped stash of resolver-produced system messages. /// Drained inside `on_prompt_submit` and returned via @@ -122,6 +124,27 @@ impl Interceptor for PodInterceptor { } } + async fn pending_history_appends(&self) -> Vec { + let drained = self.pending_notifies.drain(); + if drained.is_empty() { + return Vec::new(); + } + let mut items = Vec::with_capacity(drained.len()); + for n in drained { + match format_notify(&n, &self.prompts) { + Ok(item) => items.push(item), + Err(e) => { + // A render failure here would starve the LLM of + // the notify text. Fall back to the raw message + // so the trigger still lands in history. + warn!(error = %e, "failed to render notify_wrapper; using raw message"); + items.push(Item::system_message(n.message.clone())); + } + } + } + items + } + async fn pre_llm_request(&self, context: &mut Vec) -> PreRequestAction { let current_tokens = self.estimated_tokens(context); @@ -140,24 +163,6 @@ impl Interceptor for PodInterceptor { } } - // Internal mechanism: drain pending `Method::Notify` notifications - // into the per-request context as transient system messages. - // These are not persisted to the Worker history; they exist only - // for this single LLM request. - for n in self.pending_notifies.drain() { - match format_notify(&n, &self.prompts) { - Ok(item) => context.push(item), - Err(e) => { - // A render failure here would starve the LLM of the - // notify text. Fall back to the raw message — - // it still carries the intent, just without the - // wrapper phrasing. - warn!(error = %e, "failed to render notify_wrapper; using raw message"); - context.push(Item::system_message(n.message.clone())); - } - } - } - let info = PreRequestInfo { item_count: context.len(), estimated_tokens: current_tokens, @@ -406,7 +411,7 @@ mod tests { } #[tokio::test] - async fn pre_llm_request_drains_pending_notifies_into_context() { + async fn pending_history_appends_drains_buffer_into_items() { let registry = Arc::new(HookRegistryBuilder::new().build()); let buffer = NotifyBuffer::new(); buffer.push("first".into()); @@ -420,49 +425,52 @@ mod tests { Arc::new(Mutex::new(Vec::new())), PromptCatalog::builtins_only().unwrap(), ); - let mut ctx: Vec = vec![Item::user_message("hi")]; - let action = interceptor.pre_llm_request(&mut ctx).await; - assert!(matches!(action, PreRequestAction::Continue)); - // Original user message preserved, two notifications appended in order. - assert_eq!(ctx.len(), 3); - let second = ctx[1].as_text().unwrap_or_default(); - let third = ctx[2].as_text().unwrap_or_default(); + let items = interceptor.pending_history_appends().await; + assert_eq!(items.len(), 2); + let first = items[0].as_text().unwrap_or_default(); + let second = items[1].as_text().unwrap_or_default(); + assert!(first.contains("[Notification]")); + assert!(first.contains("first")); assert!(second.contains("[Notification]")); - assert!(second.contains("first")); - assert!(third.contains("[Notification]")); - assert!(third.contains("second")); - // Buffer is drained after a single pre_llm_request call. - assert!(buffer.is_empty()); + assert!(second.contains("second")); + assert!( + buffer.is_empty(), + "buffer must be drained after pending_history_appends" + ); + + // Empty buffer → empty Vec (no synthesised items). + let again = interceptor.pending_history_appends().await; + assert!(again.is_empty()); } #[tokio::test] - async fn pre_llm_request_skips_notification_injection_when_yielding() { - // When compaction yields, notifications remain in the buffer for - // the next pre_llm_request (after compaction + resume). + async fn pre_llm_request_does_not_touch_pending_notifies() { + // The drain lane has moved to `pending_history_appends`; + // `pre_llm_request` must leave the buffer alone and not inject + // anything itself. let registry = Arc::new(HookRegistryBuilder::new().build()); let buffer = NotifyBuffer::new(); buffer.push("msg".into()); - let state = Arc::new(CompactState::new(None, Some(100), 2)); - let ctx_items = vec![Item::user_message("hi")]; - let history = usage_handle_with(ctx_items.len(), 200); - let interceptor = PodInterceptor::new( registry, - Some(state), - Some(history), + None, + None, buffer.clone(), Arc::new(Mutex::new(Vec::new())), PromptCatalog::builtins_only().unwrap(), ); - let mut ctx = ctx_items; + let mut ctx: Vec = vec![Item::user_message("hi")]; let action = interceptor.pre_llm_request(&mut ctx).await; - assert!(matches!(action, PreRequestAction::Yield)); - // Notifications were not drained (still held for post-compact resume). - assert_eq!(ctx.len(), 1); - assert_eq!(buffer.len(), 1); + assert!(matches!(action, PreRequestAction::Continue)); + assert_eq!(ctx.len(), 1, "pre_llm_request must not append notifies"); + assert_eq!( + buffer.len(), + 1, + "pre_llm_request must not drain the notify buffer" + ); } #[tokio::test] diff --git a/crates/pod/src/ipc/notify_buffer.rs b/crates/pod/src/ipc/notify_buffer.rs index b07c1d8d..2700f2eb 100644 --- a/crates/pod/src/ipc/notify_buffer.rs +++ b/crates/pod/src/ipc/notify_buffer.rs @@ -1,9 +1,22 @@ -//! Pending-notify buffer for `Method::Notify`. +//! Pending-notify buffer for `Method::Notify` and `Method::PodEvent`. //! -//! Notify entries are queued here by the Controller and drained by -//! `PodInterceptor::pre_llm_request` into the per-request context -//! (never into the Worker's persistent history). Each queued entry -//! becomes one `Item::system_message` in the outgoing request. +//! Entries are queued here by the Controller (on receipt of the +//! corresponding IPC method) and drained by +//! `PodInterceptor::pending_history_appends`, which the Worker calls +//! at the head of each turn loop iteration to `extend` them into the +//! persistent `worker.history`. Each queued entry becomes one +//! `Item::system_message`. +//! +//! This is the **single lane** for "system messages produced by Pod +//! state that should land in the next LLM request": Notify, PodEvent, +//! and any future `` injection all ride this queue +//! (or a sibling queue with the same lifecycle). Per +//! `tickets/notify-history-persist.md` and `AGENTS.md` (LLM コンテキスト +//! の加工原則), there is **no** "transient, history-skipping" lane — +//! everything injected into a request is also committed to history so +//! that any LLM reaction has a visible trigger across turns, resume, +//! and compaction, and so the Anthropic prompt cache prefix stays +//! stable across requests. use std::collections::VecDeque; use std::sync::{Arc, Mutex}; @@ -68,9 +81,10 @@ impl NotifyBuffer { } /// Format a single pending notify entry into the `Item::system_message` -/// that gets injected into the per-request context. The wrapper body -/// comes from `PodPrompt::NotifyWrapper` so the surrounding phrasing -/// can be customised via a prompt pack (translation, tone, ...). +/// that gets appended to `worker.history` just before the next LLM +/// request. The wrapper body comes from `PodPrompt::NotifyWrapper` so +/// the surrounding phrasing can be customised via a prompt pack +/// (translation, tone, ...). pub(crate) fn format_notify( n: &PendingNotify, prompts: &PromptCatalog, diff --git a/crates/pod/src/pod.rs b/crates/pod/src/pod.rs index a291885b..35ce31d0 100644 --- a/crates/pod/src/pod.rs +++ b/crates/pod/src/pod.rs @@ -539,11 +539,13 @@ impl Pod { } } - /// Push a `Method::Notify` entry onto the pending buffer. + /// Push a `Method::Notify` (or rendered `Method::PodEvent`) entry + /// onto the pending buffer. /// - /// The notification will be injected as an `Item::system_message` - /// into the next outgoing LLM request context (not into history). - /// See [`NotifyBuffer`] for overflow behaviour. + /// The notification will be appended to `worker.history` as an + /// `Item::system_message` just before the next LLM request, via + /// `PodInterceptor::pending_history_appends`. See [`NotifyBuffer`] + /// for overflow behaviour and the lane-of-record rationale. pub fn push_notify(&self, message: String) { self.pending_notifies.push(message); } diff --git a/crates/pod/tests/controller_test.rs b/crates/pod/tests/controller_test.rs index 4603bb16..d8b391e6 100644 --- a/crates/pod/tests/controller_test.rs +++ b/crates/pod/tests/controller_test.rs @@ -560,22 +560,37 @@ async fn notify_while_idle_auto_starts_turn_and_injects_system_message() { assert_eq!(handle.shared_state.get_status(), PodStatus::Idle); // Exactly one request was made; it must contain the formatted - // notification as the last item (injected into request_context by - // PodInterceptor::pre_llm_request). + // notification as one of the items (committed to history by + // PodInterceptor::pending_history_appends and cloned into the + // request context for that turn). let requests = client_for_assert.captured_requests(); assert_eq!(requests.len(), 1, "one LLM call expected"); - let last_item_text = requests[0] + let notify_in_request = requests[0] .items - .last() - .and_then(|i| i.as_text()) - .unwrap_or_default() - .to_string(); + .iter() + .any(|i| i.as_text().is_some_and(|t| t.contains("[Notification]") && t.contains("turn finished"))); assert!( - last_item_text.contains("[Notification]"), - "injected system message missing, got: {last_item_text:?}" + notify_in_request, + "injected system message missing from request, got items: {:?}", + requests[0] + .items + .iter() + .filter_map(|i| i.as_text()) + .collect::>() + ); + + // The notification must also be persisted into the Worker history + // (and therefore eventually into history.json), per + // tickets/notify-history-persist.md. + let history = handle.shared_state.history(); + let notify_in_history = history + .iter() + .any(|i| i.as_text().is_some_and(|t| t.contains("[Notification]") && t.contains("turn finished"))); + assert!( + notify_in_history, + "notify must be committed to worker.history, got items: {:?}", + history.iter().filter_map(|i| i.as_text()).collect::>() ); - assert!(last_item_text.contains("turn finished")); - assert!(last_item_text.contains("not a blocking request")); } #[tokio::test] @@ -630,19 +645,33 @@ async fn pod_event_turn_ended_while_idle_auto_starts_turn_and_injects_system_mes 1, "auto-kick should issue exactly one LLM request" ); - let last_item_text = requests[0] - .items - .last() - .and_then(|i| i.as_text()) - .unwrap_or_default() - .to_string(); + let event_in_request = requests[0].items.iter().any(|i| { + i.as_text().is_some_and(|t| { + t.contains("[Notification]") && t.contains("child") && t.contains("finished a turn") + }) + }); assert!( - last_item_text.contains("[Notification]"), - "injected system message missing, got: {last_item_text:?}" + event_in_request, + "rendered TurnEnded text missing from request, got items: {:?}", + requests[0] + .items + .iter() + .filter_map(|i| i.as_text()) + .collect::>() ); + + // Same item must be present in worker.history (persisted lane), + // not just the per-request clone — see tickets/notify-history-persist.md. + let history = handle.shared_state.history(); + let event_in_history = history.iter().any(|i| { + i.as_text().is_some_and(|t| { + t.contains("[Notification]") && t.contains("child") && t.contains("finished a turn") + }) + }); assert!( - last_item_text.contains("child") && last_item_text.contains("finished a turn"), - "rendered TurnEnded text missing, got: {last_item_text:?}" + event_in_history, + "PodEvent must be committed to worker.history, got items: {:?}", + history.iter().filter_map(|i| i.as_text()).collect::>() ); } diff --git a/tickets/session-todo.md b/tickets/session-todo.md index 81994966..6643db08 100644 --- a/tickets/session-todo.md +++ b/tickets/session-todo.md @@ -14,8 +14,8 @@ - **保存先は `tools` 層の session-lifetime 状態**。`Tracker` と同じ生存スコープで `Pod` が所有。`Arc>>` ベースの `TodoStore` を tool に注入する - **永続化は専用レーンを持たない**。`tool_call.arguments` がセッションログに既に乗っているため、resume 時には履歴 replay の中で最後の `todo_write` 引数を `TodoStore` に再適用すれば状態が復元される -- **注意機構は `pre_llm_request` Interceptor**。直近の user message に `` ブロックを揮発的に append するだけ。履歴・ログには載せない -- **system-reminder 注入の汎用化はやらない**。利用者が TODO 1個しかない段階で抽象を立てない(CLAUDE.md「概念の追加は不在が問題になってから」)。ただし「タグ形式は `...` で揃える」「履歴は汚さない」の2点は本実装で確立し、将来の追加機構が同じ規約に乗れるようにする +- **注意機構は `Interceptor::pending_history_appends`**。未完了 TODO がある場合に新規 system message Item として `worker.history` に append する。Notify / PodEvent と同じ lane に乗せ、`history.json` への永続化と resume 後の読み戻しは worker.history 経由で自動的についてくる(→ `tickets/notify-history-persist.md`) +- **system-reminder 注入の汎用化はやらない**。利用者が TODO 1個しかない段階で抽象を立てない(CLAUDE.md「概念の追加は不在が問題になってから」)。ただし「タグ形式は `...` で揃える」点は本実装で確立し、将来の追加機構が同じ規約に乗れるようにする ## 要件 @@ -43,19 +43,19 @@ ### 注意機構(Interceptor) -- `pre_llm_request` で `Vec` を受け取り、未完了 TODO(`pending` または `in_progress`)が 1 件でも存在する場合に発動 -- 直近の user message の content(または content[最終 text part])の末尾に `` ブロックを append -- ブロック内には現在の TODO リストを、status を含む簡潔な形式で列挙 -- 履歴 (`Worker` の保持する `Vec`) は変更しない。リクエスト送信時の Vec のみ加工 -- TODO が空の場合は何も差し込まない +- `pending_history_appends` で未完了 TODO(`pending` または `in_progress`)が 1 件でも存在する場合に発動し、`` ブロックを含む新規 system message Item を返す +- Worker はこれを `worker.history` に append し、その後の per-request clone でリクエストにも含める。永続化 / resume / compaction は通常 Item と同じ扱い +- ブロック内には現在の TODO リストを、status を含む簡潔な形式で列挙する +- TODO が空の場合は空の `Vec` を返し、何も差し込まない +- cooldown は idle 期間に1回 + 反応で counter リセットの設計上、reminder の連続注入は構造的に起きない(仮に複数回出ても、それぞれが「その時点での active TODO snapshot」として履歴に並ぶのは因果として正しい) ## 完了条件 - `todo_write` ツールが builtin tool として登録され、Pod で利用できる -- LLM が `todo_write` を呼ぶと TodoStore が更新され、その後の `pre_llm_request` で system-reminder として LLM に再提示される +- LLM が `todo_write` を呼ぶと TodoStore が更新され、その後の `pending_history_appends` で system-reminder Item として `worker.history` に append され、リクエストにも含まれる - セッションを resume すると、最後の `todo_write` の状態から再開される - compact を跨いでも、未完了 TODO が新セッション冒頭の system message として残る -- system-reminder の注入は揮発的で、`get_history` / セッションログには現れない +- 注入された system-reminder Item は `worker.history` / `history.json` / `get_history` のいずれにも現れる(揮発レーンは持たない方針 → `tickets/notify-history-persist.md`) - 単体テストで `todo_write` の更新挙動 / replay 復元 / Interceptor の差し込みがカバーされる ## 範囲外