From e664def92035cb58535a6a4dd60e98c700914d8e Mon Sep 17 00:00:00 2001 From: Hare Date: Mon, 11 May 2026 00:43:16 +0900 Subject: [PATCH 1/2] =?UTF-8?q?feat:=20compact=20worker=20=E3=82=B5?= =?UTF-8?q?=E3=83=BC=E3=82=AD=E3=83=83=E3=83=88=E3=83=96=E3=83=AC=E3=83=BC?= =?UTF-8?q?=E3=82=AB=E3=83=BC=E3=82=92=E5=8D=A0=E6=9C=89=E9=87=8F=E3=83=99?= =?UTF-8?q?=E3=83=BC=E3=82=B9=E3=81=AB=E7=B5=B1=E4=B8=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- crates/manifest/src/config.rs | 39 +++++++++++- crates/manifest/src/defaults.rs | 6 +- crates/manifest/src/lib.rs | 26 +++++++- crates/pod/src/compact/usage_tracker.rs | 25 ++++++++ crates/pod/src/compact/worker.rs | 83 ++++++++++++++++++++++--- crates/pod/src/pod.rs | 35 ++++++----- docs/compaction.md | 3 +- docs/manifest.toml | 6 +- docs/pod-factory.md | 1 + tickets/compact-worker-occupancy-cap.md | 51 +++++++++++++++ 10 files changed, 246 insertions(+), 29 deletions(-) create mode 100644 tickets/compact-worker-occupancy-cap.md diff --git a/crates/manifest/src/config.rs b/crates/manifest/src/config.rs index 022c82a1..6af96e46 100644 --- a/crates/manifest/src/config.rs +++ b/crates/manifest/src/config.rs @@ -116,6 +116,8 @@ pub struct CompactionConfigPartial { #[serde(default)] pub compact_worker_max_input_tokens: Option, #[serde(default)] + pub compact_worker_max_turns: Option, + #[serde(default)] pub model: Option, } @@ -325,6 +327,9 @@ impl CompactionConfigPartial { compact_worker_max_input_tokens: upper .compact_worker_max_input_tokens .or(self.compact_worker_max_input_tokens), + compact_worker_max_turns: upper + .compact_worker_max_turns + .or(self.compact_worker_max_turns), model: merge_option(self.model, upper.model, ModelManifest::merge), } } @@ -461,6 +466,9 @@ impl TryFrom for PodManifest { compact_worker_max_input_tokens: c .compact_worker_max_input_tokens .unwrap_or(defaults::COMPACT_WORKER_MAX_INPUT_TOKENS), + compact_worker_max_turns: c + .compact_worker_max_turns + .or(defaults::COMPACT_WORKER_MAX_TURNS), model: c.model, }) }) @@ -949,6 +957,32 @@ stop_sequences = ["\n\n", ""] ); } + #[test] + fn from_toml_accepts_compact_worker_max_turns() { + let cfg = PodManifestConfig::from_toml( + r#" +[compaction] +compact_worker_max_turns = 7 +"#, + ) + .unwrap(); + + assert_eq!(cfg.compaction.unwrap().compact_worker_max_turns, Some(7)); + } + + #[test] + fn try_from_compaction_defaults_compact_worker_max_turns() { + let mut cfg = minimal_valid(); + cfg.compaction = Some(CompactionConfigPartial::default()); + + let manifest = PodManifest::try_from(cfg).unwrap(); + + assert_eq!( + manifest.compaction.unwrap().compact_worker_max_turns, + defaults::COMPACT_WORKER_MAX_TURNS + ); + } + #[test] fn from_toml_partial_layer_succeeds() { // A project-layer manifest with only scope set must parse fine. @@ -1042,7 +1076,10 @@ name = "dbg" fn skills_directories_resolved_against_base() { let mut cfg = minimal_valid(); cfg.skills = Some(SkillsConfig { - directories: vec![PathBuf::from(".claude/skills"), PathBuf::from("/abs/elsewhere")], + directories: vec![ + PathBuf::from(".claude/skills"), + PathBuf::from("/abs/elsewhere"), + ], }); let resolved = cfg.resolve_paths(Path::new("/workspace/proj")); let dirs = resolved.skills.as_ref().unwrap().directories.clone(); diff --git a/crates/manifest/src/defaults.rs b/crates/manifest/src/defaults.rs index 102a0832..4ac587a5 100644 --- a/crates/manifest/src/defaults.rs +++ b/crates/manifest/src/defaults.rs @@ -36,12 +36,16 @@ pub const DEFAULT_INSTRUCTION: &str = "$insomnia/default"; /// [`crate::CompactionConfig::compact_auto_read_budget`]. pub const COMPACT_AUTO_READ_BUDGET: u64 = 8000; -/// Cumulative input-token cap for the compact worker's own LLM +/// Current prompt-occupancy cap for the compact worker's own LLM /// calls. Exceeding this aborts the compact run (circuit-breaker /// path). See /// [`crate::CompactionConfig::compact_worker_max_input_tokens`]. pub const COMPACT_WORKER_MAX_INPUT_TOKENS: u64 = 50_000; +/// Optional maximum compact-worker tool-loop depth. `None` means unlimited. +/// See [`crate::CompactionConfig::compact_worker_max_turns`]. +pub const COMPACT_WORKER_MAX_TURNS: Option = Some(20); + /// Number of recently-touched files fed to the compact worker as /// default references. pub const COMPACT_DEFAULT_REFERENCE_COUNT: usize = 5; diff --git a/crates/manifest/src/lib.rs b/crates/manifest/src/lib.rs index c400aecb..92505139 100644 --- a/crates/manifest/src/lib.rs +++ b/crates/manifest/src/lib.rs @@ -321,11 +321,16 @@ pub struct CompactionConfig { #[serde(default = "default_compact_auto_read_budget")] pub compact_auto_read_budget: u64, - /// Cumulative input-token cap for the compact worker's own LLM - /// calls. Exceeding this aborts the compact run. + /// Current prompt-occupancy cap for the compact worker's own LLM + /// requests. Exceeding this aborts the compact run. #[serde(default = "default_compact_worker_max_input_tokens")] pub compact_worker_max_input_tokens: u64, + /// Optional maximum compact-worker tool-loop depth. `None` leaves the + /// worker unlimited; the default bounds runaway short-context loops. + #[serde(default = "default_compact_worker_max_turns")] + pub compact_worker_max_turns: Option, + /// Optional model for the compactor (summary) LLM. /// If omitted, the main model is cloned via `clone_boxed()`. #[serde(default)] @@ -347,6 +352,9 @@ fn default_compact_auto_read_budget() -> u64 { fn default_compact_worker_max_input_tokens() -> u64 { defaults::COMPACT_WORKER_MAX_INPUT_TOKENS } +fn default_compact_worker_max_turns() -> Option { + defaults::COMPACT_WORKER_MAX_TURNS +} impl Default for CompactionConfig { fn default() -> Self { @@ -358,6 +366,7 @@ impl Default for CompactionConfig { compact_retained_tokens: default_compact_retained_tokens(), compact_auto_read_budget: default_compact_auto_read_budget(), compact_worker_max_input_tokens: default_compact_worker_max_input_tokens(), + compact_worker_max_turns: default_compact_worker_max_turns(), model: None, } } @@ -521,6 +530,19 @@ model_id = "claude-sonnet-4-20250514" assert_eq!(c.compact_threshold, Some(80000)); assert_eq!(c.compact_request_threshold, None); assert_eq!(c.compact_retained_tokens, 8000); + assert_eq!(c.compact_worker_max_turns, Some(20)); + } + + #[test] + fn parse_compaction_worker_max_turns() { + let toml = format!( + "{MINIMAL_REQUIRED}\n\ + [compaction]\n\ + compact_worker_max_turns = 7\n" + ); + let manifest = PodManifest::from_toml(&toml).unwrap(); + let c = manifest.compaction.unwrap(); + assert_eq!(c.compact_worker_max_turns, Some(7)); } #[test] diff --git a/crates/pod/src/compact/usage_tracker.rs b/crates/pod/src/compact/usage_tracker.rs index 34b87281..e830369a 100644 --- a/crates/pod/src/compact/usage_tracker.rs +++ b/crates/pod/src/compact/usage_tracker.rs @@ -99,6 +99,18 @@ impl UsageTracker { }); } + /// Return a clone of the accumulated `UsageRecord`s without clearing them. + /// Used by request-time circuit breakers that need the same occupancy + /// projection as Pod persistence while the run is still active. + pub(crate) fn records(&self) -> Vec { + self.pending_records + .lock() + .unwrap() + .iter() + .map(|r| r.record.clone()) + .collect() + } + /// Drain accumulated records. Called by Pod after a run completes, /// before persisting the turn. pub(crate) fn drain(&self) -> Vec { @@ -136,6 +148,19 @@ mod tests { assert!(records[0].correlation_id.is_none()); } + #[test] + fn records_clones_without_clearing() { + let tracker = UsageTracker::new(); + tracker.note_request(1); + tracker.record_usage(&make_event(10, 0, 0, 5)); + + let records = tracker.records(); + assert_eq!(records.len(), 1); + assert_eq!(records[0].history_len, 1); + assert_eq!(records[0].input_total_tokens, 10); + assert_eq!(tracker.records().len(), 1); + } + #[test] fn drain_clears_buffer() { let tracker = UsageTracker::new(); diff --git a/crates/pod/src/compact/worker.rs b/crates/pod/src/compact/worker.rs index c9c0dfad..7660a170 100644 --- a/crates/pod/src/compact/worker.rs +++ b/crates/pod/src/compact/worker.rs @@ -18,7 +18,6 @@ //! compacted session's opening system messages. use std::path::PathBuf; -use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use async_trait::async_trait; @@ -28,6 +27,7 @@ use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput}; use serde::Deserialize; use tools::ScopedFs; +use crate::compact::usage_tracker::UsageTracker; use crate::fs_view::{ReadRequirement, slice_lines}; /// Aggregated output of a compact worker run. @@ -246,24 +246,29 @@ pub(crate) fn write_summary_tool(ctx: Arc>) -> ToolD }) } -/// Interceptor that aborts the compact worker as soon as its cumulative -/// input-token count crosses `max_input_tokens`. Pairs with the -/// `on_usage` callback registered by `Pod::compact`, which is what -/// actually accumulates `input_so_far`. +/// Interceptor that aborts the compact worker when its current prompt +/// occupancy estimate crosses `max_input_tokens`. The estimate uses the same +/// `UsageRecord` + `llm_worker::token_counter::total_tokens` path as the main +/// Pod compaction thresholds, so prompt-cache hits are not counted cumulatively +/// across turns. pub(crate) struct CompactWorkerInterceptor { - pub input_so_far: Arc, + pub usage_tracker: Arc, pub max_input_tokens: u64, } #[async_trait] impl Interceptor for CompactWorkerInterceptor { - async fn pre_llm_request(&self, _context: &mut Vec) -> PreRequestAction { - if self.input_so_far.load(Ordering::Relaxed) > self.max_input_tokens { + async fn pre_llm_request(&self, context: &mut Vec) -> PreRequestAction { + let records = self.usage_tracker.records(); + let estimate = llm_worker::token_counter::total_tokens(context, &records); + if estimate.tokens > self.max_input_tokens { return PreRequestAction::Cancel(format!( - "compact worker input exceeded {} tokens", + "compact worker input occupancy exceeded {} tokens", self.max_input_tokens )); } + + self.usage_tracker.note_request(context.len()); PreRequestAction::Continue } } @@ -283,6 +288,66 @@ mod tests { ScopedFs::new(scope, tmp.to_path_buf()) } + fn make_usage(input: u64) -> llm_worker::timeline::event::UsageEvent { + llm_worker::timeline::event::UsageEvent { + input_tokens: Some(input), + output_tokens: Some(0), + total_tokens: Some(input), + cache_read_input_tokens: None, + cache_creation_input_tokens: None, + } + } + + #[tokio::test] + async fn compact_worker_interceptor_uses_occupancy_not_cumulative_usage() { + let tracker = Arc::new(UsageTracker::new()); + let interceptor = CompactWorkerInterceptor { + usage_tracker: tracker.clone(), + max_input_tokens: 150, + }; + let mut context = vec![Item::user_message("hello")]; + + assert!(matches!( + interceptor.pre_llm_request(&mut context).await, + PreRequestAction::Continue + )); + tracker.record_usage(&make_usage(100)); + + assert!(matches!( + interceptor.pre_llm_request(&mut context).await, + PreRequestAction::Continue + )); + tracker.record_usage(&make_usage(100)); + + // Two 100-token requests would exceed a cumulative 150-token cap, but + // current occupancy is still the latest 100-token measurement. + assert!(matches!( + interceptor.pre_llm_request(&mut context).await, + PreRequestAction::Continue + )); + } + + #[tokio::test] + async fn compact_worker_interceptor_cancels_when_occupancy_exceeds_cap() { + let tracker = Arc::new(UsageTracker::new()); + let interceptor = CompactWorkerInterceptor { + usage_tracker: tracker.clone(), + max_input_tokens: 99, + }; + let mut context = vec![Item::user_message("hello")]; + + assert!(matches!( + interceptor.pre_llm_request(&mut context).await, + PreRequestAction::Continue + )); + tracker.record_usage(&make_usage(100)); + + assert!(matches!( + interceptor.pre_llm_request(&mut context).await, + PreRequestAction::Cancel(message) if message.contains("occupancy") + )); + } + #[tokio::test] async fn mark_read_required_records_and_deducts_budget() { let tmp = tempfile::TempDir::new().unwrap(); diff --git a/crates/pod/src/pod.rs b/crates/pod/src/pod.rs index 3d277bc1..9cf66e92 100644 --- a/crates/pod/src/pod.rs +++ b/crates/pod/src/pod.rs @@ -12,8 +12,8 @@ use session_store::{EntryHash, PodScopeSnapshot, SessionId, SessionStartState, S use tracing::{info, warn}; use manifest::{ - Permission, PodManifest, PodManifestConfig, ResolveError, Scope, ScopeConfig, ScopeError, ScopeRule, - SharedScope, WorkerManifest, + Permission, PodManifest, PodManifestConfig, ResolveError, Scope, ScopeConfig, ScopeError, + ScopeRule, SharedScope, WorkerManifest, }; use crate::compact::state::CompactState; @@ -1456,8 +1456,6 @@ impl Pod { /// /// Returns the new session ID. pub async fn compact(&mut self, retained_tokens: u64) -> Result { - use std::sync::atomic::{AtomicU64, Ordering}; - use crate::compact::worker::{ CompactWorkerContext, CompactWorkerInterceptor, add_reference_tool, mark_read_required_tool, write_summary_tool, @@ -1477,7 +1475,7 @@ impl Pod { // Compaction-related knobs. Fall through to manifest defaults when // `[compaction]` is omitted entirely. - let (auto_read_budget, compact_worker_max_input_tokens) = self + let (auto_read_budget, compact_worker_max_input_tokens, compact_worker_max_turns) = self .manifest .compaction .as_ref() @@ -1485,11 +1483,13 @@ impl Pod { ( c.compact_auto_read_budget, c.compact_worker_max_input_tokens, + c.compact_worker_max_turns, ) }) .unwrap_or(( manifest::defaults::COMPACT_AUTO_READ_BUDGET, manifest::defaults::COMPACT_WORKER_MAX_INPUT_TOKENS, + manifest::defaults::COMPACT_WORKER_MAX_TURNS, )); // Default references: the N most-recently-touched files in the @@ -1530,21 +1530,24 @@ impl Pod { let mut summary_worker = Worker::new(summary_client).system_prompt(summary_system_prompt); summary_worker.set_cache_key(Some(self.session_id.to_string())); - // Cumulative input-token meter + interceptor. The meter is bumped - // from the on_usage callback and read on every pre_llm_request. - let input_so_far = Arc::new(AtomicU64::new(0)); + // Occupancy-based input-token meter + interceptor. The tracker pairs + // each pre-request history length with the following UsageEvent, then + // the interceptor projects current prompt occupancy with the same + // UsageRecord counter used by the main Pod thresholds. + let summary_usage_tracker = Arc::new(UsageTracker::new()); { - let acc = input_so_far.clone(); + let tracker = summary_usage_tracker.clone(); summary_worker.on_usage(move |event| { - if let Some(tokens) = event.input_tokens { - acc.fetch_add(tokens, Ordering::Relaxed); - } + tracker.record_usage(event); }); } summary_worker.set_interceptor(CompactWorkerInterceptor { - input_so_far: input_so_far.clone(), + usage_tracker: summary_usage_tracker, max_input_tokens: compact_worker_max_input_tokens, }); + if compact_worker_max_turns.is_some() { + summary_worker.set_max_turns(compact_worker_max_turns); + } // Tools: read_file (shared scope, fresh tracker) + the three // compact-specific tools that populate `ctx`. @@ -3069,7 +3072,11 @@ permission = "write" let shadows = ingest_skills(&mut registry, &manifest); // workspace skill `alpha` should be registered (no collision). - assert!(registry.get(&memory::Slug::parse("alpha").unwrap()).is_some()); + assert!( + registry + .get(&memory::Slug::parse("alpha").unwrap()) + .is_some() + ); // No workflow exists to shadow `alpha`, so no shadow event for it. assert!(shadows.iter().all(|s| s.slug.as_str() != "alpha")); } diff --git a/docs/compaction.md b/docs/compaction.md index 876ccf54..c81498ff 100644 --- a/docs/compaction.md +++ b/docs/compaction.md @@ -140,7 +140,8 @@ compact_threshold = 80000 # ターンの合間 (proactive) compact_request_threshold = 90000 # リクエストの合間 (safety net) retained_tokens = 8000 # 直近保護トークン数 (Prune 済みで計測) auto_read_budget = 8000 # compact worker の mark_read_required 合計上限 -compact_worker_max_input_tokens = 50000 # compact worker 自身の累計入力トークン上限 +compact_worker_max_input_tokens = 50000 # compact worker 自身の現在占有トークン上限 +compact_worker_max_turns = 20 # compact worker 自身の tool loop 上限 ``` ### Auto-Read とリファレンス diff --git a/docs/manifest.toml b/docs/manifest.toml index baa742cb..0f5a2db0 100644 --- a/docs/manifest.toml +++ b/docs/manifest.toml @@ -212,9 +212,13 @@ permission = "write" # compact_auto_read_budget = 8000 # # # 任意。デフォルト: 50000 (`defaults::COMPACT_WORKER_MAX_INPUT_TOKENS`)。 -# # compact worker 自身の累積入力 token cap。超過で abort (circuit breaker)。 +# # compact worker 自身の現在占有 token cap。超過で abort (circuit breaker)。 # compact_worker_max_input_tokens = 50000 # +# # 任意。デフォルト: 20 (`defaults::COMPACT_WORKER_MAX_TURNS`)。 +# # compact worker 自身の tool loop 上限。Rust config で None の場合のみ無制限。 +# compact_worker_max_turns = 20 +# # # 任意。デフォルト: メインモデルを `clone_boxed()` で複製。 # # compact 専用モデルを使う場合のみ書く ([model] と同じ形式)。 # # [compaction.model] diff --git a/docs/pod-factory.md b/docs/pod-factory.md index 7f23af8a..ab0408a8 100644 --- a/docs/pod-factory.md +++ b/docs/pod-factory.md @@ -183,6 +183,7 @@ compact_request_threshold = 90000 compact_retained_tokens = 8000 compact_auto_read_budget = 8000 compact_worker_max_input_tokens = 50000 +compact_worker_max_turns = 20 [compaction.model] scheme = "gemini" diff --git a/tickets/compact-worker-occupancy-cap.md b/tickets/compact-worker-occupancy-cap.md new file mode 100644 index 00000000..b98cd017 --- /dev/null +++ b/tickets/compact-worker-occupancy-cap.md @@ -0,0 +1,51 @@ +# Compact worker のサーキットブレーカーを占有量ベースに統一 + +## 背景 + +Compact worker のサーキットブレーカー (`crates/pod/src/compact/worker.rs:253-269` の `CompactWorkerInterceptor`) は `compact_worker_max_input_tokens` を `UsageEvent.input_tokens` の **累積和** で見ている。一方、`UsageEvent.input_tokens` の定義 (`crates/llm-worker/src/llm_client/event.rs:76-94`) は「送信した prompt prefix の総トークン数(占有量、キャッシュ込み)」であり、Anthropic 側でも `cache_read + cache_creation` を加算してこの規約に揃えている。 + +結果として現行の累積メトリックは、毎ターン同じ prefix をフルカウントする「context size × ターン数」相当の値を測っており: + +- compact worker は `build_summary_prompt` (`crates/pod/src/pod.rs:2630-2657`) で reasoning / ToolCall.arguments / ToolResult.content を strip した skeleton + 探索ツールという設計なので、初回 input は元 history より大幅に小さい(数十 K 程度)。 +- それでも cache hit 含む prefix を毎ターン丸ごと足していくため、20-30K の skeleton を入力にツールを 2-3 回叩いた時点で 50K デフォルトに到達する。 +- prompt cache が 99% ヒットしていても累積値は同じだけ増えるので、コストの近似にも安全マージンの近似にもなっていない。 + +メイン Worker 側の対応するしきい値 (`compact_threshold`, `compact_request_threshold`) は `Pod::total_tokens()` (`crates/pod/src/pod.rs:146-149` → `llm_worker::token_counter::total_tokens(history, &usage_records)`) を見ており、これは `UsageRecord` 列を最新測定 + バイト按分で射影した「現在の占有量」(単一の値, 累積ではない)。Compact worker でもこの正規のカウンタに統一すべき。 + +サーキットブレーカーとして測るべき軸は二つあり、占有量カウンタは前者だけを担当する: + +1. **占有量** (cost / window 圧迫の相当値): `total_tokens()` を流用。 +2. **ループ深さ** (短い context でツールを延々叩く暴走): `Worker::set_max_turns` (`crates/llm-worker/src/worker.rs:1369`) で別途上限を入れる。 + +## 方針 + +`CompactWorkerInterceptor` を、メインと同じ `UsageTracker` + `total_tokens` 機構に乗せ替える。累積メトリックは廃止する。ループ深さ対策として `compact_worker_max_turns` を新設し、`set_max_turns` 経由で compact worker に伝える。 + +## 要件 + +- `CompactWorkerInterceptor` を削除または書き換え、`pre_llm_request` の判定を `llm_worker::token_counter::total_tokens(worker.history(), &records).tokens > max_input_tokens` に切り替える。`input_so_far: AtomicU64` の累積パスは廃止。 +- Compact worker にも `UsageTracker` を持たせ、`pre_llm_request` で `note_request(history.len())`、`on_usage` で `record_usage` する。メイン Pod (`pod.rs:777-780`) と同じ配線パターン。 +- `compact_worker_max_input_tokens` の意味を「compact worker 側の現在占有量しきい値」に変更し、ドキュメントとデフォルト値を更新する。デフォルトは `compact_threshold` と単位が揃うため、現行 50K のままだと typical な main 側設定 (80K) に対して小さく compact 自身がそれを下回るのは妥当な範囲。実値は新セマンティクスで再評価する(要件としては「累積値ではなく占有量を測る」ことのみで固定)。 +- `CompactionConfig` に `compact_worker_max_turns: Option` を追加し、`compact()` (`pod.rs:1458-`) で `summary_worker.set_max_turns` に渡す。`None` のときは無制限(既存動作)。デフォルトは要検討(仮: `Some(20)`)。 +- 後方互換 shim は入れない。`compact_worker_max_input_tokens` はフィールド名を維持しつつセマンティクスだけ差し替えるため、旧設定値はそのまま新セマンティクスで解釈される。閾値のオーダーは大きくは変わらないので運用上の破壊的影響は小さい。 + +## 完了条件 + +- 通常の compact 実行で、cache hit 込みの prefix がフルカウントされなくなり、`build_summary_prompt` の skeleton + 数回のファイル読み程度では cap に当たらない。 +- 短い context でツールを延々呼び続ける疑似ケースで、`compact_worker_max_turns` により compact run が打ち切られる。 +- `Pod::total_tokens()` と compact worker の占有量推定で同じ算出経路 (`llm_worker::token_counter::total_tokens`) が使われている。 + +## 範囲外 + +- `compact_threshold` / `compact_request_threshold` 自体のセマンティクス・既定値変更。 +- Compact worker が更に compact をかける(meta-compact)。 +- `compact_auto_read_budget` ロジックの変更。 +- token 推定アルゴリズム自体の改善。 + +## 影響範囲 + +- `crates/pod/src/compact/worker.rs`: `CompactWorkerInterceptor` の実装、`input_so_far` 経路の削除。 +- `crates/pod/src/pod.rs`: `compact()` の compact worker 構築箇所 (`UsageTracker` 配線、`on_usage` 差し替え、`set_max_turns` 呼び出し)。 +- `crates/pod/src/compact/usage_tracker.rs`: 既存 `UsageTracker` を compact worker からも使うため、必要なら可視性 / API の調整。 +- `crates/manifest/src/lib.rs`, `crates/manifest/src/config.rs`, `crates/manifest/src/defaults.rs`: `compact_worker_max_input_tokens` のドキュメント更新、`compact_worker_max_turns` の追加とカスケード。 +- `crates/pod/src/compact/worker.rs` のユニットテスト、および compact 関連の統合テスト。 From cac1f4d4fe2b842654807112b187fa95247a6c61 Mon Sep 17 00:00:00 2001 From: Hare Date: Mon, 11 May 2026 00:56:41 +0900 Subject: [PATCH 2/2] =?UTF-8?q?review:=20compact-worker-occupancy-cap=20(s?= =?UTF-8?q?et=5Fmax=5Fturns=20=E5=88=86=E5=B2=90=E5=89=8A=E9=99=A4)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- crates/pod/src/pod.rs | 4 +- tickets/compact-worker-occupancy-cap.md | 5 +++ .../compact-worker-occupancy-cap.review.md | 41 +++++++++++++++++++ 3 files changed, 47 insertions(+), 3 deletions(-) create mode 100644 tickets/compact-worker-occupancy-cap.review.md diff --git a/crates/pod/src/pod.rs b/crates/pod/src/pod.rs index 9cf66e92..d15b8f13 100644 --- a/crates/pod/src/pod.rs +++ b/crates/pod/src/pod.rs @@ -1545,9 +1545,7 @@ impl Pod { usage_tracker: summary_usage_tracker, max_input_tokens: compact_worker_max_input_tokens, }); - if compact_worker_max_turns.is_some() { - summary_worker.set_max_turns(compact_worker_max_turns); - } + summary_worker.set_max_turns(compact_worker_max_turns); // Tools: read_file (shared scope, fresh tracker) + the three // compact-specific tools that populate `ctx`. diff --git a/tickets/compact-worker-occupancy-cap.md b/tickets/compact-worker-occupancy-cap.md index b98cd017..4dff99cc 100644 --- a/tickets/compact-worker-occupancy-cap.md +++ b/tickets/compact-worker-occupancy-cap.md @@ -49,3 +49,8 @@ Compact worker のサーキットブレーカー (`crates/pod/src/compact/worker - `crates/pod/src/compact/usage_tracker.rs`: 既存 `UsageTracker` を compact worker からも使うため、必要なら可視性 / API の調整。 - `crates/manifest/src/lib.rs`, `crates/manifest/src/config.rs`, `crates/manifest/src/defaults.rs`: `compact_worker_max_input_tokens` のドキュメント更新、`compact_worker_max_turns` の追加とカスケード。 - `crates/pod/src/compact/worker.rs` のユニットテスト、および compact 関連の統合テスト。 + +## Review +- 状態: Approve with follow-up +- レビュー詳細: [./compact-worker-occupancy-cap.review.md](./compact-worker-occupancy-cap.review.md) +- 日付: 2026-05-11 diff --git a/tickets/compact-worker-occupancy-cap.review.md b/tickets/compact-worker-occupancy-cap.review.md new file mode 100644 index 00000000..141baa27 --- /dev/null +++ b/tickets/compact-worker-occupancy-cap.review.md @@ -0,0 +1,41 @@ +# Review: Compact worker のサーキットブレーカーを占有量ベースに統一 + +レビュー対象: commit `ef0cdf7` (base `d818b37`). + +## 前提・要件の確認 + +- 要件1「`CompactWorkerInterceptor` を `total_tokens` ベースに切り替え、`input_so_far: AtomicU64` 経路を廃止」: 満たされている。`crates/pod/src/compact/worker.rs:249-274` で `usage_tracker.records()` → `llm_worker::token_counter::total_tokens(context, &records)` に置き換わっており、`AtomicU64` import / フィールドも削除されている。`crates/pod/src/pod.rs:1456` 周辺の `use std::sync::atomic` も消えている。 +- 要件2「Compact worker に `UsageTracker` を持たせ、`pre_llm_request` で `note_request`、`on_usage` で `record_usage`」: 満たされている。`pod.rs:1537-1547` で `summary_usage_tracker = Arc::new(UsageTracker::new())` を作り、`on_usage` で `record_usage(event)`、`CompactWorkerInterceptor::pre_llm_request` 内で `usage_tracker.note_request(context.len())` を呼ぶ (`compact/worker.rs:262-272`)。 +- 要件3「`compact_worker_max_input_tokens` の意味を「現在占有量しきい値」に変更し doc 更新」: 満たされている。`crates/manifest/src/lib.rs:324`、`crates/manifest/src/defaults.rs:39-43`、`docs/compaction.md:143`、`docs/manifest.toml:215` がすべて「現在占有」「累計ではない」表現に揃っている。フィールド名は維持され、後方互換 shim は入っていない (要件通り)。 +- 要件4「`compact_worker_max_turns: Option` を新設し `Worker::set_max_turns` 経由で渡す」: 満たされている。`CompactionConfig` (`lib.rs:329-332`)、`CompactionConfigPartial` (`config.rs:118-119`)、`defaults::COMPACT_WORKER_MAX_TURNS = Some(20)` (`defaults.rs:46-48`)、`pod.rs:1548-1550` で `summary_worker.set_max_turns(...)`。manifest 側の merge / TryFrom / TOML パースもテスト付きで通っている (`config.rs:960-983`, `lib.rs:521-546`)。 +- 要件5「後方互換 shim 無し」: 満たされている。フィールド名は変更しないため旧 manifest はそのまま新セマンティクスで読まれ、deprecated alias 等は導入されていない。 +- 完了条件1「cache hit 込み prefix がフルカウントされない」: 単体テスト `compact_worker_interceptor_uses_occupancy_not_cumulative_usage` (`compact/worker.rs:301-328`) で「2 回 100-token 入力 → 累積 200 でも occupancy は最新の 100 のままで cap=150 を通る」を確認。 +- 完了条件2「`compact_worker_max_turns` で打ち切られる」: `set_max_turns` のロジック自体は llm-worker 側の既存挙動 (`worker.rs:1045-1050`) に乗るのみで、本チケットでは新規ロジックを足していない。配線テスト (manifest 側のパース) は入っているが、compact 経由で実 abort する pod-level の統合テストは無し。Trivial wiring なのでブロッキングではないが、後で run-level テストを足すと安心。 +- 完了条件3「`Pod::total_tokens()` と同じ算出経路」: 満たされている。`pod.rs:148` (`llm_worker::token_counter::total_tokens(self.history(), &usage)`) と `compact/worker.rs:265` (`llm_worker::token_counter::total_tokens(context, &records)`) が同じ関数を経由する。compact 側は per-request の prune 後 `request_context` を渡すため、メイン側と完全一致ではないが、`pre_llm_request` 時点でのその後 LLM に投げる prefix 占有量という意味では妥当 (むしろ正確)。 + +## アーキテクチャ・スコープ + +- 修正は `compact/worker.rs` + `pod.rs` の compact 構築 + manifest 1 フィールド追加に閉じており、ticket 影響範囲表と完全一致。`crates/pod/src/compact/usage_tracker.rs` への変更は「`records()` メソッドを `pub(crate)` で追加して drain せず読む」のみで、API 拡張は最小。 +- `UsageTracker` は元々「per-LLM-request 計測のペアリング・drain は Pod が persist 時に行う」モデル。compact worker は drain せず read-only で snapshot を覗くだけなので、メインの persist セマンティクスを汚さない。compact worker のループ内で記録しても drain しないままワーカーが破棄される (`pod.rs:1530-1551` で関数ローカル) ため、メインの `Pod::usage_tracker` (turn 永続化用) とは完全に独立した別インスタンスで、相互干渉しない。設計として綺麗。 +- `note_request` をメインでは `UsageTrackingHook` (`pod.rs:46-59`) として `pre_llm_request` Hook で呼んでいるのに対し、compact 側は `CompactWorkerInterceptor::pre_llm_request` 内で直接呼んでいる。Worker のコール順 (Hook → Interceptor → stream → on_usage) を踏まえると `note_request` は `record_usage` より前に呼ばれていれば意味的に等価で、両者とも同じ tick で呼ばれるため不整合は無い。ただし「メイン: Hook で `note_request`、Interceptor で別判定」「compact: Interceptor で両方」と配線パターンが分岐している点は将来読む人が混乱するかも。今のままでも動くが、補足コメントを Pod 側か Interceptor 側に入れておくと親切。 +- ループ深さは `Worker::set_max_turns` 既存機構の流用。新規ロジック無し、コードベースを歪めない選択。 + +## 指摘事項 + +### Blocking +無し。 + +### Non-blocking / Follow-up +- `pod.rs:1548-1550` の `if compact_worker_max_turns.is_some() { summary_worker.set_max_turns(compact_worker_max_turns); }` は不要な分岐。`set_max_turns` は `Option` を取り、`None` を渡しても初期値 `None` を上書きするだけで no-op (`crates/llm-worker/src/worker.rs:1369-1371` + `worker.rs:1181`)。無条件に `summary_worker.set_max_turns(compact_worker_max_turns);` で十分。条件付けることで「`None` だと何もしない」読者バイアスを生んでむしろ読みづらい。 +- `crates/pod/src/pod.rs:2178-2201` の `MemoryExtractWorkerInterceptor` は古い累積メトリック方式のまま残っており、コメントには `Mirror of compact::worker::CompactWorkerInterceptor;` と書かれている。今回の変更で「mirror」ではなくなったのでコメントが嘘になっている。memory extract 側のセマンティクス変更は本チケットの範囲外で正しい判断だが、コメント補正 (例: 「以前は CompactWorkerInterceptor のミラーだったが、compact 側は占有量ベースに移行した。memory extract 側は別チケットで追従予定」) を別チケットなり TODO なりで残しておきたい。 +- `compact_worker_max_turns` が compact 経由で実際に `Aborted/MaxTurnsReached` を引き起こす経路の pod-level 統合テストは含まれていない。配線そのものはトリビアルで、`set_max_turns` の振る舞いは llm-worker 側で別途テストされている前提。後追いで足すかどうかは判断に任せる。 +- デフォルト `COMPACT_WORKER_MAX_TURNS = Some(20)` は `build_summary_prompt` の skeleton + 数回ファイル読みなら十分余裕がある妥当なライン。ただし auto_read_budget (8000) と read_file の典型呼び出しサイズを考えると、深いツールループが起きる前にトークン cap が先に効きやすい設計なので、20 が上限になる場面はまずレアケース。妥当な保険値として OK。 + +### Nits +- `compact/worker.rs:249-253` の docstring は新しいセマンティクスを正しく説明できている。良。 +- `compact/usage_tracker.rs:102-112` `records()` の docstring が「request-time circuit breakers が同じ occupancy projection を見るため」と用途を明示しており、将来の extract worker 側追従の伏線にもなっている。良。 +- ユニットテスト `compact_worker_interceptor_uses_occupancy_not_cumulative_usage` (`compact/worker.rs:301-328`) は「累積では落ちる量でも occupancy では通る」ケースをピンポイントで掴んでいて、今回の本質的なバグへの回帰防止として的確。 + +## 判断 + +Approve with follow-up — チケットの 4 要件と完了条件 1/3 はすべて根拠つきで満たされている。残るのは (a) `set_max_turns` 呼び出しの不要な分岐、(b) `MemoryExtractWorkerInterceptor` 側のコメント陳腐化と将来の追従、(c) max_turns の pod-level 統合テスト追加、いずれも非ブロッキング。マージ可。