feat: notify-history-persist実装

This commit is contained in:
Keisuke Hirata 2026-05-03 19:27:22 +09:00
parent ee9c60bec2
commit e8045776f2
No known key found for this signature in database
8 changed files with 183 additions and 93 deletions

View File

@ -16,4 +16,4 @@
- 使用頻度メトリクス + Knowledge 化候補レポート → [tickets/memory-usage-metrics.md](tickets/memory-usage-metrics.md)
- セッション内 TODO ツール(注意機構付き) → [tickets/session-todo.md](tickets/session-todo.md)
- ワークスペースのメモリーをLintするヘッドレスCLI
- system-reminder 注入機構の汎用化2件目の利用者が出た時に検討。タグ形式と「履歴を汚さない」原則は session-todo で先行確立
- system-reminder 注入機構の汎用化2件目の利用者が出た時に検討。タグ形式 `<system-reminder>...</system-reminder>` の規約は session-todo-reminder で先行確立。注入された Item は worker.history に append する方針

View File

@ -120,8 +120,35 @@ pub trait Interceptor: Send + Sync {
PromptAction::Continue
}
/// Called before each LLM request. The context can be modified
/// (e.g. for context compaction).
/// Items that should be **committed to `worker.history`** just
/// before the next LLM request. Returned items are `extend`ed into
/// the persistent history (and therefore picked up by the per-turn
/// clone that backs the LLM request, plus the usual
/// history-persistence path).
///
/// Use this for inputs that arrive from outside the LLM and need
/// to be reflected in the on-disk history — notifications,
/// cross-Pod events, system reminders. Do **not** use
/// [`Self::pre_llm_request`] for that purpose: it mutates a
/// per-request clone, so any committed assistant response that
/// reacts to the injection would have no visible trigger on the
/// next turn (or after resume / compaction).
///
/// `pre_llm_request` remains the right place for purely
/// reproducible per-request transformations (pruning, content
/// trimming, cache anchors) that depend only on the existing
/// history.
async fn pending_history_appends(&self) -> Vec<Item> {
Vec::new()
}
/// Called before each LLM request. The context starts as a clone
/// of `worker.history` (after `pending_history_appends` and the
/// Worker's own prune projection have been applied) and can be
/// further modified for that single request only — mutations here
/// are **not** persisted back to history. Use
/// [`Self::pending_history_appends`] for inputs that need to land
/// in history.
async fn pre_llm_request(&self, _context: &mut Vec<Item>) -> PreRequestAction {
PreRequestAction::Continue
}

View File

@ -856,6 +856,16 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
cb(current_turn);
}
// Drain interceptor-side inputs that are meant to land in
// history (notifications, cross-Pod events, system
// reminders). These are committed *before* the per-request
// clone so they participate in the LLM request below and
// get persisted by the upper layer that owns history.json.
let pending = self.interceptor.pending_history_appends().await;
if !pending.is_empty() {
self.history.extend(pending);
}
// Clone the history into a per-request context. Everything
// below (prune projection, interceptor hooks) mutates only
// this clone, so the persistent `self.history` stays intact.

View File

@ -17,6 +17,7 @@ use llm_worker::interceptor::{
Interceptor, PostToolAction, PreRequestAction, PreToolAction, PromptAction, ToolCallInfo,
ToolResultInfo, TurnEndAction,
};
use tracing::warn;
use llm_worker::tool::ToolOutput;
use tracing::info;
@ -28,7 +29,6 @@ use crate::hook::{
use crate::ipc::notify_buffer::{NotifyBuffer, format_notify};
use crate::prompt::catalog::PromptCatalog;
use llm_worker::token_counter::total_tokens;
use tracing::warn;
/// Maximum number of bytes copied into `TurnEndInfo::final_text_preview`.
const FINAL_TEXT_PREVIEW_LIMIT: usize = 512;
@ -40,8 +40,10 @@ pub(crate) struct PodInterceptor {
/// per-request `context` to estimate current occupancy for threshold
/// checks. `None` when compaction is disabled (both thresholds unset).
usage_history: Option<Arc<Mutex<Vec<UsageRecord>>>>,
/// Pending-notification buffer drained into the per-request
/// context at the head of `pre_llm_request`.
/// Pending-notification buffer drained into `worker.history`
/// via [`Self::pending_history_appends`] just before the next LLM
/// request. The Worker `extend`s these into its persistent history
/// so the LLM has a visible trigger for any reaction it commits.
pending_notifies: NotifyBuffer,
/// Submit-scoped stash of resolver-produced system messages.
/// Drained inside `on_prompt_submit` and returned via
@ -122,6 +124,27 @@ impl Interceptor for PodInterceptor {
}
}
async fn pending_history_appends(&self) -> Vec<Item> {
let drained = self.pending_notifies.drain();
if drained.is_empty() {
return Vec::new();
}
let mut items = Vec::with_capacity(drained.len());
for n in drained {
match format_notify(&n, &self.prompts) {
Ok(item) => items.push(item),
Err(e) => {
// A render failure here would starve the LLM of
// the notify text. Fall back to the raw message
// so the trigger still lands in history.
warn!(error = %e, "failed to render notify_wrapper; using raw message");
items.push(Item::system_message(n.message.clone()));
}
}
}
items
}
async fn pre_llm_request(&self, context: &mut Vec<Item>) -> PreRequestAction {
let current_tokens = self.estimated_tokens(context);
@ -140,24 +163,6 @@ impl Interceptor for PodInterceptor {
}
}
// Internal mechanism: drain pending `Method::Notify` notifications
// into the per-request context as transient system messages.
// These are not persisted to the Worker history; they exist only
// for this single LLM request.
for n in self.pending_notifies.drain() {
match format_notify(&n, &self.prompts) {
Ok(item) => context.push(item),
Err(e) => {
// A render failure here would starve the LLM of the
// notify text. Fall back to the raw message —
// it still carries the intent, just without the
// wrapper phrasing.
warn!(error = %e, "failed to render notify_wrapper; using raw message");
context.push(Item::system_message(n.message.clone()));
}
}
}
let info = PreRequestInfo {
item_count: context.len(),
estimated_tokens: current_tokens,
@ -406,7 +411,7 @@ mod tests {
}
#[tokio::test]
async fn pre_llm_request_drains_pending_notifies_into_context() {
async fn pending_history_appends_drains_buffer_into_items() {
let registry = Arc::new(HookRegistryBuilder::new().build());
let buffer = NotifyBuffer::new();
buffer.push("first".into());
@ -420,49 +425,52 @@ mod tests {
Arc::new(Mutex::new(Vec::new())),
PromptCatalog::builtins_only().unwrap(),
);
let mut ctx: Vec<Item> = vec![Item::user_message("hi")];
let action = interceptor.pre_llm_request(&mut ctx).await;
assert!(matches!(action, PreRequestAction::Continue));
// Original user message preserved, two notifications appended in order.
assert_eq!(ctx.len(), 3);
let second = ctx[1].as_text().unwrap_or_default();
let third = ctx[2].as_text().unwrap_or_default();
let items = interceptor.pending_history_appends().await;
assert_eq!(items.len(), 2);
let first = items[0].as_text().unwrap_or_default();
let second = items[1].as_text().unwrap_or_default();
assert!(first.contains("[Notification]"));
assert!(first.contains("first"));
assert!(second.contains("[Notification]"));
assert!(second.contains("first"));
assert!(third.contains("[Notification]"));
assert!(third.contains("second"));
// Buffer is drained after a single pre_llm_request call.
assert!(buffer.is_empty());
assert!(second.contains("second"));
assert!(
buffer.is_empty(),
"buffer must be drained after pending_history_appends"
);
// Empty buffer → empty Vec (no synthesised items).
let again = interceptor.pending_history_appends().await;
assert!(again.is_empty());
}
#[tokio::test]
async fn pre_llm_request_skips_notification_injection_when_yielding() {
// When compaction yields, notifications remain in the buffer for
// the next pre_llm_request (after compaction + resume).
async fn pre_llm_request_does_not_touch_pending_notifies() {
// The drain lane has moved to `pending_history_appends`;
// `pre_llm_request` must leave the buffer alone and not inject
// anything itself.
let registry = Arc::new(HookRegistryBuilder::new().build());
let buffer = NotifyBuffer::new();
buffer.push("msg".into());
let state = Arc::new(CompactState::new(None, Some(100), 2));
let ctx_items = vec![Item::user_message("hi")];
let history = usage_handle_with(ctx_items.len(), 200);
let interceptor = PodInterceptor::new(
registry,
Some(state),
Some(history),
None,
None,
buffer.clone(),
Arc::new(Mutex::new(Vec::new())),
PromptCatalog::builtins_only().unwrap(),
);
let mut ctx = ctx_items;
let mut ctx: Vec<Item> = vec![Item::user_message("hi")];
let action = interceptor.pre_llm_request(&mut ctx).await;
assert!(matches!(action, PreRequestAction::Yield));
// Notifications were not drained (still held for post-compact resume).
assert_eq!(ctx.len(), 1);
assert_eq!(buffer.len(), 1);
assert!(matches!(action, PreRequestAction::Continue));
assert_eq!(ctx.len(), 1, "pre_llm_request must not append notifies");
assert_eq!(
buffer.len(),
1,
"pre_llm_request must not drain the notify buffer"
);
}
#[tokio::test]

View File

@ -1,9 +1,22 @@
//! Pending-notify buffer for `Method::Notify`.
//! Pending-notify buffer for `Method::Notify` and `Method::PodEvent`.
//!
//! Notify entries are queued here by the Controller and drained by
//! `PodInterceptor::pre_llm_request` into the per-request context
//! (never into the Worker's persistent history). Each queued entry
//! becomes one `Item::system_message` in the outgoing request.
//! Entries are queued here by the Controller (on receipt of the
//! corresponding IPC method) and drained by
//! `PodInterceptor::pending_history_appends`, which the Worker calls
//! at the head of each turn loop iteration to `extend` them into the
//! persistent `worker.history`. Each queued entry becomes one
//! `Item::system_message`.
//!
//! This is the **single lane** for "system messages produced by Pod
//! state that should land in the next LLM request": Notify, PodEvent,
//! and any future `<system-reminder>` injection all ride this queue
//! (or a sibling queue with the same lifecycle). Per
//! `tickets/notify-history-persist.md` and `AGENTS.md` (LLM コンテキスト
//! の加工原則), there is **no** "transient, history-skipping" lane —
//! everything injected into a request is also committed to history so
//! that any LLM reaction has a visible trigger across turns, resume,
//! and compaction, and so the Anthropic prompt cache prefix stays
//! stable across requests.
use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
@ -68,9 +81,10 @@ impl NotifyBuffer {
}
/// Format a single pending notify entry into the `Item::system_message`
/// that gets injected into the per-request context. The wrapper body
/// comes from `PodPrompt::NotifyWrapper` so the surrounding phrasing
/// can be customised via a prompt pack (translation, tone, ...).
/// that gets appended to `worker.history` just before the next LLM
/// request. The wrapper body comes from `PodPrompt::NotifyWrapper` so
/// the surrounding phrasing can be customised via a prompt pack
/// (translation, tone, ...).
pub(crate) fn format_notify(
n: &PendingNotify,
prompts: &PromptCatalog,

View File

@ -539,11 +539,13 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
}
}
/// Push a `Method::Notify` entry onto the pending buffer.
/// Push a `Method::Notify` (or rendered `Method::PodEvent`) entry
/// onto the pending buffer.
///
/// The notification will be injected as an `Item::system_message`
/// into the next outgoing LLM request context (not into history).
/// See [`NotifyBuffer`] for overflow behaviour.
/// The notification will be appended to `worker.history` as an
/// `Item::system_message` just before the next LLM request, via
/// `PodInterceptor::pending_history_appends`. See [`NotifyBuffer`]
/// for overflow behaviour and the lane-of-record rationale.
pub fn push_notify(&self, message: String) {
self.pending_notifies.push(message);
}

View File

@ -560,22 +560,37 @@ async fn notify_while_idle_auto_starts_turn_and_injects_system_message() {
assert_eq!(handle.shared_state.get_status(), PodStatus::Idle);
// Exactly one request was made; it must contain the formatted
// notification as the last item (injected into request_context by
// PodInterceptor::pre_llm_request).
// notification as one of the items (committed to history by
// PodInterceptor::pending_history_appends and cloned into the
// request context for that turn).
let requests = client_for_assert.captured_requests();
assert_eq!(requests.len(), 1, "one LLM call expected");
let last_item_text = requests[0]
let notify_in_request = requests[0]
.items
.last()
.and_then(|i| i.as_text())
.unwrap_or_default()
.to_string();
.iter()
.any(|i| i.as_text().is_some_and(|t| t.contains("[Notification]") && t.contains("turn finished")));
assert!(
last_item_text.contains("[Notification]"),
"injected system message missing, got: {last_item_text:?}"
notify_in_request,
"injected system message missing from request, got items: {:?}",
requests[0]
.items
.iter()
.filter_map(|i| i.as_text())
.collect::<Vec<_>>()
);
// The notification must also be persisted into the Worker history
// (and therefore eventually into history.json), per
// tickets/notify-history-persist.md.
let history = handle.shared_state.history();
let notify_in_history = history
.iter()
.any(|i| i.as_text().is_some_and(|t| t.contains("[Notification]") && t.contains("turn finished")));
assert!(
notify_in_history,
"notify must be committed to worker.history, got items: {:?}",
history.iter().filter_map(|i| i.as_text()).collect::<Vec<_>>()
);
assert!(last_item_text.contains("turn finished"));
assert!(last_item_text.contains("not a blocking request"));
}
#[tokio::test]
@ -630,19 +645,33 @@ async fn pod_event_turn_ended_while_idle_auto_starts_turn_and_injects_system_mes
1,
"auto-kick should issue exactly one LLM request"
);
let last_item_text = requests[0]
.items
.last()
.and_then(|i| i.as_text())
.unwrap_or_default()
.to_string();
let event_in_request = requests[0].items.iter().any(|i| {
i.as_text().is_some_and(|t| {
t.contains("[Notification]") && t.contains("child") && t.contains("finished a turn")
})
});
assert!(
last_item_text.contains("[Notification]"),
"injected system message missing, got: {last_item_text:?}"
event_in_request,
"rendered TurnEnded text missing from request, got items: {:?}",
requests[0]
.items
.iter()
.filter_map(|i| i.as_text())
.collect::<Vec<_>>()
);
// Same item must be present in worker.history (persisted lane),
// not just the per-request clone — see tickets/notify-history-persist.md.
let history = handle.shared_state.history();
let event_in_history = history.iter().any(|i| {
i.as_text().is_some_and(|t| {
t.contains("[Notification]") && t.contains("child") && t.contains("finished a turn")
})
});
assert!(
last_item_text.contains("child") && last_item_text.contains("finished a turn"),
"rendered TurnEnded text missing, got: {last_item_text:?}"
event_in_history,
"PodEvent must be committed to worker.history, got items: {:?}",
history.iter().filter_map(|i| i.as_text()).collect::<Vec<_>>()
);
}

View File

@ -14,8 +14,8 @@
- **保存先は `tools` 層の session-lifetime 状態**。`Tracker` と同じ生存スコープで `Pod` が所有。`Arc<Mutex<Vec<TodoItem>>>` ベースの `TodoStore` を tool に注入する
- **永続化は専用レーンを持たない**。`tool_call.arguments` がセッションログに既に乗っているため、resume 時には履歴 replay の中で最後の `todo_write` 引数を `TodoStore` に再適用すれば状態が復元される
- **注意機構は `pre_llm_request` Interceptor**。直近の user message に `<system-reminder>` ブロックを揮発的に append するだけ。履歴・ログには載せない
- **system-reminder 注入の汎用化はやらない**。利用者が TODO 1個しかない段階で抽象を立てないCLAUDE.md「概念の追加は不在が問題になってから」。ただし「タグ形式は `<system-reminder>...</system-reminder>` で揃える」「履歴は汚さない」の2点は本実装で確立し、将来の追加機構が同じ規約に乗れるようにする
- **注意機構は `Interceptor::pending_history_appends`**。未完了 TODO がある場合に新規 system message Item として `worker.history` に append する。Notify / PodEvent と同じ lane に乗せ、`history.json` への永続化と resume 後の読み戻しは worker.history 経由で自動的についてくる(→ `tickets/notify-history-persist.md`
- **system-reminder 注入の汎用化はやらない**。利用者が TODO 1個しかない段階で抽象を立てないCLAUDE.md「概念の追加は不在が問題になってから」。ただし「タグ形式は `<system-reminder>...</system-reminder>` で揃える」点は本実装で確立し、将来の追加機構が同じ規約に乗れるようにする
## 要件
@ -43,19 +43,19 @@
### 注意機構Interceptor
- `pre_llm_request` で `Vec<Item>` を受け取り、未完了 TODO`pending` または `in_progress`)が 1 件でも存在する場合に発動
- 直近の user message の contentまたは content[最終 text part])の末尾に `<system-reminder>` ブロックを append
- ブロック内には現在の TODO リストを、status を含む簡潔な形式で列挙
- 履歴 (`Worker` の保持する `Vec<Item>`) は変更しない。リクエスト送信時の Vec のみ加工
- TODO が空の場合は何も差し込まない
- `pending_history_appends` で未完了 TODO`pending` または `in_progress`)が 1 件でも存在する場合に発動し、`<system-reminder>` ブロックを含む新規 system message Item を返す
- Worker はこれを `worker.history` に append し、その後の per-request clone でリクエストにも含める。永続化 / resume / compaction は通常 Item と同じ扱い
- ブロック内には現在の TODO リストを、status を含む簡潔な形式で列挙する
- TODO が空の場合は空の `Vec<Item>` を返し、何も差し込まない
- cooldown は idle 期間に1回 + 反応で counter リセットの設計上、reminder の連続注入は構造的に起きない(仮に複数回出ても、それぞれが「その時点での active TODO snapshot」として履歴に並ぶのは因果として正しい
## 完了条件
- `todo_write` ツールが builtin tool として登録され、Pod で利用できる
- LLM が `todo_write` を呼ぶと TodoStore が更新され、その後の `pre_llm_request` で system-reminder として LLM に再提示される
- LLM が `todo_write` を呼ぶと TodoStore が更新され、その後の `pending_history_appends` で system-reminder Item として `worker.history` に append され、リクエストにも含まれる
- セッションを resume すると、最後の `todo_write` の状態から再開される
- compact を跨いでも、未完了 TODO が新セッション冒頭の system message として残る
- system-reminder の注入は揮発的で、`get_history` / セッションログには現れない
- 注入された system-reminder Item は `worker.history` / `history.json` / `get_history` のいずれにも現れる(揮発レーンは持たない方針 → `tickets/notify-history-persist.md`
- 単体テストで `todo_write` の更新挙動 / replay 復元 / Interceptor の差し込みがカバーされる
## 範囲外