From 2e004161e4caaf1e9bf767a209a40fc9f15c5213 Mon Sep 17 00:00:00 2001 From: Hare Date: Tue, 14 Apr 2026 02:35:35 +0900 Subject: [PATCH] =?UTF-8?q?prune=E3=81=AE=E3=83=88=E3=83=BC=E3=82=AF?= =?UTF-8?q?=E3=83=B3=E8=A8=88=E7=AE=97=E7=BD=AE=E3=81=8D=E6=8F=9B=E3=81=88?= =?UTF-8?q?=E3=83=BBPod=E3=81=AB=E6=8E=A5=E7=B6=9A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- crates/llm-worker/src/prune.rs | 83 +++++++++++++++++++++++ crates/llm-worker/src/worker.rs | 68 ++++++++++++++++++- crates/manifest/src/lib.rs | 4 +- crates/pod/src/compact_interceptor.rs | 2 +- crates/pod/src/lib.rs | 4 +- crates/pod/src/pod.rs | 35 ++++++---- crates/pod/src/prune.rs | 56 ++++++++++++++++ crates/pod/src/prune_hook.rs | 95 --------------------------- tickets/prune-projection.review.md | 78 +++++++++++++++++++++- 9 files changed, 310 insertions(+), 115 deletions(-) create mode 100644 crates/pod/src/prune.rs delete mode 100644 crates/pod/src/prune_hook.rs diff --git a/crates/llm-worker/src/prune.rs b/crates/llm-worker/src/prune.rs index 694b9447..5530aaa9 100644 --- a/crates/llm-worker/src/prune.rs +++ b/crates/llm-worker/src/prune.rs @@ -14,10 +14,19 @@ //! `min_savings` 判定や savings 推定もこの crate には置かず、上位層が //! usage 履歴ベースのトークン会計と組み合わせて行う。 +use std::ops::Range; + use serde::{Deserialize, Serialize}; use crate::llm_client::types::Item; +/// Callback that estimates the token savings for dropping `history[range]`. +/// +/// Injected into [`crate::Worker`] via `set_savings_estimator` so the +/// Worker can make `min_savings` decisions without knowing about usage +/// measurement sources. Return `0` to signal "no data / refuse to prune". +pub type SavingsEstimator = Box) -> u64 + Send + Sync>; + /// Configuration for the Prune algorithm. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct PruneConfig { @@ -64,6 +73,24 @@ fn find_turn_starts(items: &[Item]) -> Vec { .collect() } +/// Set `content = None` on each `Item::ToolResult` at the given indices. +/// +/// Returns the number of items that were actually modified — items that +/// are already content-less are counted as 0. Intended for use on a +/// request-context clone (never on a persistent history). +pub fn project(items: &mut [Item], indices: &[usize]) -> usize { + let mut count = 0; + for &i in indices { + if let Item::ToolResult { content, .. } = &mut items[i] { + if content.is_some() { + *content = None; + count += 1; + } + } + } + count +} + /// Indices of `Item::ToolResult { content: Some(_), .. }` that lie outside /// the last `protected_turns` turns. Pure: does not mutate `items`. /// @@ -150,6 +177,62 @@ mod tests { assert!(prunable_indices(&items, 2).is_empty()); } + #[test] + fn project_drops_content_and_counts_modifications() { + let big = "x".repeat(64); + let mut items = make_history(&[ + ("turn1", vec![("s1", Some(&big))]), + ("turn2", vec![("s2", Some(&big))]), + ("turn3", vec![("s3", Some("keep me"))]), + ("turn4", vec![("s4", Some("keep me too"))]), + ]); + let candidates = prunable_indices(&items, 2); + let count = project(&mut items, &candidates); + assert_eq!(count, 2); + + for item in &items { + if let Item::ToolResult { summary, content, .. } = item { + if summary == "s1" || summary == "s2" { + assert!(content.is_none(), "old content should be projected out"); + } else { + assert!(content.is_some(), "protected content should remain"); + } + } + } + } + + #[test] + fn project_skips_already_pruned_items() { + // indices points at an item whose content is already None. + // project() should count it as 0 modifications. + let mut items = make_history(&[ + ("turn1", vec![("s1", None)]), + ("turn2", vec![("s2", Some("hello"))]), + ]); + // Manually target s1 (index 3) even though it's already None. + let target = items + .iter() + .position(|it| matches!(it, Item::ToolResult { summary, .. } if summary == "s1")) + .unwrap(); + let count = project(&mut items, &[target]); + assert_eq!(count, 0); + } + + #[test] + fn project_is_idempotent() { + let big = "x".repeat(64); + let mut items = make_history(&[ + ("turn1", vec![("s1", Some(&big))]), + ("turn2", vec![]), + ("turn3", vec![]), + ("turn4", vec![]), + ]); + let candidates = prunable_indices(&items, 2); + assert_eq!(project(&mut items, &candidates), 1); + // 2 周目: 候補は一度の prunable_indices 結果を使い回しても 0 件。 + assert_eq!(project(&mut items, &candidates), 0); + } + #[test] fn protected_turns_boundary_exact() { // 3 turns with protected_turns=2: only turn 1 is a candidate. diff --git a/crates/llm-worker/src/worker.rs b/crates/llm-worker/src/worker.rs index c62bed0f..22215d79 100644 --- a/crates/llm-worker/src/worker.rs +++ b/crates/llm-worker/src/worker.rs @@ -162,6 +162,12 @@ pub struct Worker { /// Cancel notification channel (for interrupting execution) cancel_tx: mpsc::Sender<()>, cancel_rx: mpsc::Receiver<()>, + /// Prune configuration. `None` disables the prune projection. + prune_config: Option, + /// Callback that estimates token savings for a drop range, injected + /// by higher layers that own usage measurements. `None` disables + /// the prune projection. + savings_estimator: Option, /// State marker _state: PhantomData, } @@ -303,6 +309,28 @@ impl Worker { self.interceptor = Box::new(interceptor); } + /// Configure the prune projection applied to each outgoing request + /// context. + /// + /// Both this and [`set_savings_estimator`](Self::set_savings_estimator) + /// must be set for the projection to fire; missing either one is a + /// no-op. See the crate-level [`prune`](crate::prune) docs for the + /// semantics. + pub fn set_prune_config(&mut self, config: Option) { + self.prune_config = config; + } + + /// Inject the callback used to estimate token savings for a prune + /// candidate range. + /// + /// The callback is invoked with the *request context* (a clone of + /// history) and the candidate index range. It must be pure/idempotent + /// since it may be called once per LLM request. Return `0` to signal + /// "no data" or "refuse to prune". + pub fn set_savings_estimator(&mut self, estimator: Option) { + self.savings_estimator = estimator; + } + /// Get a mutable reference to the timeline (for additional handler registration) pub fn timeline_mut(&mut self) -> &mut Timeline { &mut self.timeline @@ -697,8 +725,40 @@ impl Worker { cb(current_turn); } - // Interceptor: pre_llm_request + // Clone the history into a per-request context. Everything + // below (prune projection, interceptor hooks) mutates only + // this clone, so the persistent `self.history` stays intact. let mut request_context = self.history.clone(); + + // Prune projection: if both the config and the savings + // estimator are configured, drop ToolResult.content from + // prunable candidates whose estimated savings meet the + // threshold. Worker does not own usage history itself; the + // estimator is injected by the layer that does. + if let (Some(config), Some(estimator)) = + (&self.prune_config, &self.savings_estimator) + { + let candidates = + crate::prune::prunable_indices(&request_context, config.protected_turns); + if !candidates.is_empty() { + let first = *candidates.first().unwrap(); + let last = *candidates.last().unwrap() + 1; + let savings = estimator(&request_context, first..last); + if savings >= config.min_savings { + let pruned = + crate::prune::project(&mut request_context, &candidates); + if pruned > 0 { + debug!( + pruned, + estimated_savings_tokens = savings, + "Projected old tool-result content out of request context" + ); + } + } + } + } + + // Interceptor: pre_llm_request match self.interceptor.pre_llm_request(&mut request_context).await { PreRequestAction::Cancel(reason) => { info!(reason = %reason, "Aborted by interceptor"); @@ -899,6 +959,8 @@ impl Worker { last_run_interrupted: false, cancel_tx, cancel_rx, + prune_config: None, + savings_estimator: None, _state: PhantomData, } } @@ -1147,6 +1209,8 @@ impl Worker { cancel_tx: self.cancel_tx, cancel_rx: self.cancel_rx, + prune_config: self.prune_config, + savings_estimator: self.savings_estimator, _state: PhantomData, } } @@ -1217,6 +1281,8 @@ impl Worker { cancel_tx: self.cancel_tx, cancel_rx: self.cancel_rx, + prune_config: self.prune_config, + savings_estimator: self.savings_estimator, _state: PhantomData, } } diff --git a/crates/manifest/src/lib.rs b/crates/manifest/src/lib.rs index 033eabfb..6dc0244f 100644 --- a/crates/manifest/src/lib.rs +++ b/crates/manifest/src/lib.rs @@ -97,7 +97,7 @@ pub struct CompactionConfig { /// Minimum estimated token savings to trigger a prune. #[serde(default = "default_prune_min_savings")] - pub prune_min_savings: usize, + pub prune_min_savings: u64, /// When `input_tokens` exceeds this, run compact. `None` = compact disabled. #[serde(default)] @@ -114,7 +114,7 @@ pub struct CompactionConfig { } fn default_prune_protected_turns() -> usize { 3 } -fn default_prune_min_savings() -> usize { 4096 } +fn default_prune_min_savings() -> u64 { 4096 } fn default_compact_retained_turns() -> usize { 2 } impl Default for CompactionConfig { diff --git a/crates/pod/src/compact_interceptor.rs b/crates/pod/src/compact_interceptor.rs index 32238d83..ce5ca1a1 100644 --- a/crates/pod/src/compact_interceptor.rs +++ b/crates/pod/src/compact_interceptor.rs @@ -39,7 +39,7 @@ impl Interceptor for CompactInterceptor { } async fn pre_llm_request(&self, context: &mut Vec) -> PreRequestAction { - // Step 1: Delegate to inner (PruneHook and other hooks run first). + // Step 1: Delegate to inner hooks first. let inner_action = self.inner.pre_llm_request(context).await; if !matches!(inner_action, PreRequestAction::Continue) { return inner_action; diff --git a/crates/pod/src/lib.rs b/crates/pod/src/lib.rs index 2feb74a2..3f8fa2fe 100644 --- a/crates/pod/src/lib.rs +++ b/crates/pod/src/lib.rs @@ -4,12 +4,11 @@ pub mod runtime_dir; pub mod shared_state; pub mod socket_server; -pub mod prune_hook; - mod compact_interceptor; mod compact_state; mod hook_interceptor; mod pod; +mod prune; mod token_counter; mod usage_tracker; @@ -18,7 +17,6 @@ pub use token_counter::{EstimateSource, SplitPoint, TokenEstimate}; pub use controller::{PodController, PodHandle}; pub use manifest::{PodManifest, ProviderConfig, ProviderKind, Scope}; pub use hook::{Hook, HookEventKind, HookRegistryBuilder}; -pub use prune_hook::PruneHook; pub use pod::{Pod, PodError, PodRunResult, apply_worker_manifest}; pub use protocol::{ErrorCode, Event, Method, TurnResult}; pub use provider::{ProviderError, build_client}; diff --git a/crates/pod/src/pod.rs b/crates/pod/src/pod.rs index dc92bdd8..88d60726 100644 --- a/crates/pod/src/pod.rs +++ b/crates/pod/src/pod.rs @@ -79,9 +79,9 @@ pub struct Pod { /// Restored from session log on `restore`, appended on each persist. /// Read by token-accounting APIs (`Pod::total_tokens`, etc.). /// - /// Wrapped in `Arc` so that hooks living on the Worker - /// (e.g. `PruneHook`) can share the same view via - /// [`Pod::usage_history_handle`]. + /// Wrapped in `Arc` so that callbacks injected into the + /// Worker (e.g. the savings estimator used by the prune projection) + /// can share the same view via [`Pod::usage_history_handle`]. usage_history: Arc>>, /// Session-lifetime file-operation tracker from the builtin `tools` /// crate. Populated by the Controller when it registers the builtin @@ -104,7 +104,7 @@ impl Pod { history: worker.history(), }; let (session_id, head_hash) = session_store::create_session(&store, state).await?; - Ok(Self { + let mut pod = Self { manifest, worker: Some(worker), store, @@ -118,7 +118,9 @@ impl Pod { usage_tracker: Arc::new(UsageTracker::new()), usage_history: Arc::new(Mutex::new(Vec::::new())), tracker: None, - }) + }; + pod.apply_prune_from_manifest(); + Ok(pod) } /// Restore a Pod from a persisted session. @@ -139,7 +141,7 @@ impl Pod { worker.set_turn_count(state.turn_count); worker.set_last_run_interrupted(state.last_run_interrupted); - Ok(Self { + let mut pod = Self { manifest, worker: Some(worker), store, @@ -153,7 +155,9 @@ impl Pod { usage_tracker: Arc::new(UsageTracker::new()), usage_history: Arc::new(Mutex::new(state.usage_history)), tracker: None, - }) + }; + pod.apply_prune_from_manifest(); + Ok(pod) } /// The session ID used for persistence. @@ -206,9 +210,16 @@ impl Pod { /// Shared handle to the cumulative Usage history. /// - /// Hooks (e.g. `PruneHook`) take a clone of this `Arc` so they can - /// read the latest measurements at request time. The handle outlives + /// Callbacks that need live access to the latest measurements (e.g. + /// the savings estimator that `attach_prune` installs on the Worker) + /// clone this `Arc` and read it at request time. The handle outlives /// any individual run. + /// + /// **Locking contract:** the inner `Mutex` is held only for a short + /// clone (`lock().unwrap().clone()`) and released immediately. + /// Callers must not hold the guard across `.await` points, I/O, or + /// long computations — the guard is implicitly assumed to be + /// non-contended at every Pod lifecycle event. pub fn usage_history_handle(&self) -> Arc>> { self.usage_history.clone() } @@ -686,7 +697,7 @@ impl Pod, St> { history: worker.history(), }; let (session_id, head_hash) = session_store::create_session(&store, state).await?; - Ok(Self { + let mut pod = Self { manifest, worker: Some(worker), store, @@ -700,7 +711,9 @@ impl Pod, St> { usage_tracker: Arc::new(UsageTracker::new()), usage_history: Arc::new(Mutex::new(Vec::new())), tracker: None, - }) + }; + pod.apply_prune_from_manifest(); + Ok(pod) } } diff --git a/crates/pod/src/prune.rs b/crates/pod/src/prune.rs new file mode 100644 index 00000000..03bce9e7 --- /dev/null +++ b/crates/pod/src/prune.rs @@ -0,0 +1,56 @@ +//! Prune integration — wires the Worker's prune projection to the Pod's +//! usage-history-backed token accounting. +//! +//! Worker 自身がコンテキスト射影を行う(`worker.rs` の `request_context` 構築 +//! 直後)。Worker は usage 履歴を知らないので、`min_savings` 判定に使う savings +//! の見積もりはコールバックで外部から注入する。このモジュールはそのコールバック +//! を組み立てて Worker に差し込むための `impl Pod` を提供する。 + +use llm_worker::Item; +use llm_worker::llm_client::client::LlmClient; +use llm_worker::prune::{PruneConfig, SavingsEstimator}; +use session_store::Store; + +use crate::Pod; +use crate::token_counter::{EstimateSource, savings_for_drop_impl}; + +impl Pod { + /// Enable prune projection on the underlying Worker. + /// + /// Registers the config and a savings-estimator closure on the Worker. + /// The estimator captures a shared handle to [`Pod::usage_history_handle`] + /// so that every LLM request sees the latest measurements. + /// + /// Measurement-less ranges (before the first LLM call, or immediately + /// after a compact) return `0` from the estimator, which naturally + /// prevents the prune projection from firing until usage data exists. + pub fn attach_prune(&mut self, config: PruneConfig) { + let usage = self.usage_history_handle(); + let estimator: SavingsEstimator = Box::new(move |history: &[Item], range| { + let snapshot = usage.lock().expect("usage_history poisoned").clone(); + let est = savings_for_drop_impl(history, &snapshot, range); + match est.source { + EstimateSource::NoData => 0, + _ => est.tokens, + } + }); + let worker = self.worker_mut(); + worker.set_prune_config(Some(config)); + worker.set_savings_estimator(Some(estimator)); + } + + /// If the manifest has a `[compaction]` section, build a `PruneConfig` + /// from its `prune_*` fields and call [`attach_prune`](Self::attach_prune). + /// Otherwise no-op. Called from all Pod constructors so prune is + /// active whenever the manifest asks for it. + pub(crate) fn apply_prune_from_manifest(&mut self) { + let Some(compaction) = self.manifest().compaction.as_ref() else { + return; + }; + let config = PruneConfig { + protected_turns: compaction.prune_protected_turns, + min_savings: compaction.prune_min_savings, + }; + self.attach_prune(config); + } +} diff --git a/crates/pod/src/prune_hook.rs b/crates/pod/src/prune_hook.rs deleted file mode 100644 index 32630b27..00000000 --- a/crates/pod/src/prune_hook.rs +++ /dev/null @@ -1,95 +0,0 @@ -//! PruneHook — projects the LLM request context before each call. -//! -//! Prune は **コンテキスト射影** として実装する。`PreLlmRequest` hook に -//! 渡される `context: &mut Vec` は Worker が毎 turn 冒頭で history を -//! clone した一時配列 (`worker.rs:701`)。ここで ToolResult.content を省いても -//! Worker の永続履歴には影響しない。`prunable_indices` で候補を抽出し、 -//! `min_savings` を満たせば content を `None` に射影する。 -//! -//! `min_savings` の判定は usage 履歴ベースのトークン会計 -//! ([`crate::token_counter::savings_for_drop_impl`]) で行う。 - -use std::sync::{Arc, Mutex}; - -use async_trait::async_trait; -use llm_worker::Item; -use llm_worker::interceptor::PreRequestAction; -use llm_worker::prune::{PruneConfig, prunable_indices}; -use session_store::UsageRecord; -use tracing::debug; - -use crate::hook::{Hook, PreLlmRequest}; -use crate::token_counter::{EstimateSource, savings_for_drop_impl}; - -/// Hook that conditionally prunes old tool-result content before each -/// LLM request, reclaiming context-window tokens. -/// -/// `usage_history` は [`crate::Pod::usage_history_handle`] から共有された -/// `Arc>`。リクエスト直前に snapshot を取って savings を見積もる。 -pub struct PruneHook { - config: PruneConfig, - usage_history: Arc>>, -} - -impl PruneHook { - pub fn new(config: PruneConfig, usage_history: Arc>>) -> Self { - Self { - config, - usage_history, - } - } -} - -#[async_trait] -impl Hook for PruneHook { - async fn call(&self, context: &mut Vec) -> PreRequestAction { - let candidates = prunable_indices(context, self.config.protected_turns); - if candidates.is_empty() { - return PreRequestAction::Continue; - } - - // 候補範囲のトークン節約量を usage 履歴ベースで見積もる。 - // content だけ削除する場合の上限値(範囲全体を消した場合の savings)として - // 近似する。実際の content drop は items 数を変えないので、本来の savings - // はこの値以下。閾値判定は上振れ方向=「やや prune を発動しやすい」側で安全。 - let first = *candidates.first().unwrap(); - let last = *candidates.last().unwrap() + 1; - let snapshot = self - .usage_history - .lock() - .expect("usage_history poisoned") - .clone(); - let savings = savings_for_drop_impl(context, &snapshot, first..last); - - // measurement が無い場合 (NoData) は判定材料がないので prune を見送る。 - // 最初の LLM call が走るまでは usage_history が空なのでこのパスを通る。 - if matches!(savings.source, EstimateSource::NoData) { - return PreRequestAction::Continue; - } - - if savings.tokens < self.config.min_savings { - return PreRequestAction::Continue; - } - - // 射影: context (= history の clone) 上の対象 ToolResult だけ content を - // drop する。Worker の永続履歴は別インスタンスなので影響を受けない。 - let mut projected = 0usize; - for &i in &candidates { - if let Item::ToolResult { content, .. } = &mut context[i] { - if content.is_some() { - *content = None; - projected += 1; - } - } - } - if projected > 0 { - debug!( - pruned = projected, - estimated_savings_tokens = savings.tokens, - source = ?savings.source, - "Projected old tool-result content out of request context" - ); - } - PreRequestAction::Continue - } -} diff --git a/tickets/prune-projection.review.md b/tickets/prune-projection.review.md index 43e34383..2a5963c5 100644 --- a/tickets/prune-projection.review.md +++ b/tickets/prune-projection.review.md @@ -43,6 +43,80 @@ clone が消えると設計が即座に壊れる。対策案: `PruneHook::call` を呼んで、context が変更され、かつ元 history が 変わらないことを検証する統合テストがあると安心。 -## 判定 +## 判定(初回) -承認。 +承認。ただし初回レビューで「pure ロジックが `llm-worker::prune` にあるのに +適用は pod 側 Hook で行う」という責務の不整合が見つかり、追加作業 +「Worker への統合」をチケットに追記して再実装。 + +--- + +# 追加作業レビュー: Worker への統合 + +## 要件の充足 + +追加作業で定義した変更は全て実装されている: + +| 項目 | 実装 | +|---|---| +| `Worker::set_prune_config` | `worker.rs:317` | +| `Worker::set_savings_estimator` | `worker.rs:329` | +| `build_request` 直前での射影 | `worker.rs:733-758` | +| `SavingsEstimator` 型定義 | `llm-worker::prune::SavingsEstimator` | +| `pod::prune_hook` モジュール削除 | 削除済み | +| Pod 側は config と estimator を渡すだけ | `pod::prune::attach_prune` | + +Worker が prune の責任を持ち、pod は usage 履歴に依存する estimator +コールバックを注入するだけ、という責務分離はチケット通り。Locked 状態への +lock 時にも `prune_config` / `savings_estimator` が保持される点も対処済み +(`worker.rs:1214-1217, 1286-1289`)。 + +## 指摘と対処 + +### A. `attach_prune` がどこからも呼ばれていない(未対処、要判断) + +`Pod::attach_prune` は実装されているが、コードベース内で呼び出し箇所が無い。 +履歴を見ると、リファクタ前の `PruneHook` も一度も registration されていない +デッドコードだった (`be1119d` 以降 `PruneHook::new` を呼んだ箇所無し)。 +つまりこのリファクタは「デッドコードを別の形のデッドコードに置き換えた」状態。 + +チケットの「追加作業」範疇としては設計通り完結しているが、prune 機能が実際に +有効化されていないのは事実。Manifest や `Controller::spawn` のどこで +`attach_prune` を呼ぶかは別チケット扱いにするか、このチケット内で一緒に +対処するかを要判断。 + +### B. 閉包内でのロック取得(非ブロッカー、未対処) + +`attach_prune` の estimator は毎回 `usage.lock().expect(...).clone()` する。 +現在 `usage_history` を触る箇所は: + +- `Pod::persist_turn`(run 終了後、短時間) +- `Pod::compact`(同上) +- `Pod::usage_history()`(snapshot 取得、短い) +- estimator(request ごと、clone のみ) + +estimator 発火は worker.rs の `build_request` 直前、つまり Pod の `run()` が +待機中なので他のロック取得と並走しない。現時点では安全。将来 `usage_history` +を別スレッドから触るコードが増えた時の事故防止として、pod.rs の +`usage_history_handle` doc コメントに「Mutex は短時間 clone 専用」という +前提を明示すべき。 + +### C. Worker 内部のインラインループ(非ブロッカー、未対処) + +`worker.rs:733-758` に content 射影のインラインループが直書き。`prune` +モジュール側に `apply_projection(&mut Vec, &[usize])` のような +pure 関数を用意して Worker はそれを呼ぶだけにすれば、Worker のメインループが +短くなり、`prune` モジュール内でのテストも書きやすくなる。現状 Worker の +`run` ループが肥大化する方向に寄っている。 + +### D. 射影ロジックに単体テストが無い(初回指摘2から継続、未対処) + +Worker に統合したことで Worker のテストとして書きやすくなったはずだが、 +テストは追加されていない。指摘 C で `apply_projection` に切り出せば、 +その pure 関数単位でテストが書ける。 + +## 判定(追加作業) + +条件付き承認。指摘 A が機能有効化の観点で重要。意図通り(別チケットで +manifest 配線)なら承認、このチケット内で対処するなら未完了扱い。 +指摘 B/C/D はコード品質の改善で非ブロッカー。