feat: compact worker サーキットブレーカーを占有量ベースに統一

This commit is contained in:
Keisuke Hirata 2026-05-11 00:43:16 +09:00
parent d6f27f7c45
commit 8100a5dfd1
10 changed files with 246 additions and 29 deletions

View File

@ -116,6 +116,8 @@ pub struct CompactionConfigPartial {
#[serde(default)]
pub compact_worker_max_input_tokens: Option<u64>,
#[serde(default)]
pub compact_worker_max_turns: Option<u32>,
#[serde(default)]
pub model: Option<ModelManifest>,
}
@ -325,6 +327,9 @@ impl CompactionConfigPartial {
compact_worker_max_input_tokens: upper
.compact_worker_max_input_tokens
.or(self.compact_worker_max_input_tokens),
compact_worker_max_turns: upper
.compact_worker_max_turns
.or(self.compact_worker_max_turns),
model: merge_option(self.model, upper.model, ModelManifest::merge),
}
}
@ -461,6 +466,9 @@ impl TryFrom<PodManifestConfig> for PodManifest {
compact_worker_max_input_tokens: c
.compact_worker_max_input_tokens
.unwrap_or(defaults::COMPACT_WORKER_MAX_INPUT_TOKENS),
compact_worker_max_turns: c
.compact_worker_max_turns
.or(defaults::COMPACT_WORKER_MAX_TURNS),
model: c.model,
})
})
@ -949,6 +957,32 @@ stop_sequences = ["\n\n", "</stop>"]
);
}
#[test]
fn from_toml_accepts_compact_worker_max_turns() {
let cfg = PodManifestConfig::from_toml(
r#"
[compaction]
compact_worker_max_turns = 7
"#,
)
.unwrap();
assert_eq!(cfg.compaction.unwrap().compact_worker_max_turns, Some(7));
}
#[test]
fn try_from_compaction_defaults_compact_worker_max_turns() {
let mut cfg = minimal_valid();
cfg.compaction = Some(CompactionConfigPartial::default());
let manifest = PodManifest::try_from(cfg).unwrap();
assert_eq!(
manifest.compaction.unwrap().compact_worker_max_turns,
defaults::COMPACT_WORKER_MAX_TURNS
);
}
#[test]
fn from_toml_partial_layer_succeeds() {
// A project-layer manifest with only scope set must parse fine.
@ -1042,7 +1076,10 @@ name = "dbg"
fn skills_directories_resolved_against_base() {
let mut cfg = minimal_valid();
cfg.skills = Some(SkillsConfig {
directories: vec![PathBuf::from(".claude/skills"), PathBuf::from("/abs/elsewhere")],
directories: vec![
PathBuf::from(".claude/skills"),
PathBuf::from("/abs/elsewhere"),
],
});
let resolved = cfg.resolve_paths(Path::new("/workspace/proj"));
let dirs = resolved.skills.as_ref().unwrap().directories.clone();

View File

@ -36,12 +36,16 @@ pub const DEFAULT_INSTRUCTION: &str = "$insomnia/default";
/// [`crate::CompactionConfig::compact_auto_read_budget`].
pub const COMPACT_AUTO_READ_BUDGET: u64 = 8000;
/// Cumulative input-token cap for the compact worker's own LLM
/// Current prompt-occupancy cap for the compact worker's own LLM
/// calls. Exceeding this aborts the compact run (circuit-breaker
/// path). See
/// [`crate::CompactionConfig::compact_worker_max_input_tokens`].
pub const COMPACT_WORKER_MAX_INPUT_TOKENS: u64 = 50_000;
/// Optional maximum compact-worker tool-loop depth. `None` means unlimited.
/// See [`crate::CompactionConfig::compact_worker_max_turns`].
pub const COMPACT_WORKER_MAX_TURNS: Option<u32> = Some(20);
/// Number of recently-touched files fed to the compact worker as
/// default references.
pub const COMPACT_DEFAULT_REFERENCE_COUNT: usize = 5;

View File

@ -321,11 +321,16 @@ pub struct CompactionConfig {
#[serde(default = "default_compact_auto_read_budget")]
pub compact_auto_read_budget: u64,
/// Cumulative input-token cap for the compact worker's own LLM
/// calls. Exceeding this aborts the compact run.
/// Current prompt-occupancy cap for the compact worker's own LLM
/// requests. Exceeding this aborts the compact run.
#[serde(default = "default_compact_worker_max_input_tokens")]
pub compact_worker_max_input_tokens: u64,
/// Optional maximum compact-worker tool-loop depth. `None` leaves the
/// worker unlimited; the default bounds runaway short-context loops.
#[serde(default = "default_compact_worker_max_turns")]
pub compact_worker_max_turns: Option<u32>,
/// Optional model for the compactor (summary) LLM.
/// If omitted, the main model is cloned via `clone_boxed()`.
#[serde(default)]
@ -347,6 +352,9 @@ fn default_compact_auto_read_budget() -> u64 {
fn default_compact_worker_max_input_tokens() -> u64 {
defaults::COMPACT_WORKER_MAX_INPUT_TOKENS
}
fn default_compact_worker_max_turns() -> Option<u32> {
defaults::COMPACT_WORKER_MAX_TURNS
}
impl Default for CompactionConfig {
fn default() -> Self {
@ -358,6 +366,7 @@ impl Default for CompactionConfig {
compact_retained_tokens: default_compact_retained_tokens(),
compact_auto_read_budget: default_compact_auto_read_budget(),
compact_worker_max_input_tokens: default_compact_worker_max_input_tokens(),
compact_worker_max_turns: default_compact_worker_max_turns(),
model: None,
}
}
@ -521,6 +530,19 @@ model_id = "claude-sonnet-4-20250514"
assert_eq!(c.compact_threshold, Some(80000));
assert_eq!(c.compact_request_threshold, None);
assert_eq!(c.compact_retained_tokens, 8000);
assert_eq!(c.compact_worker_max_turns, Some(20));
}
#[test]
fn parse_compaction_worker_max_turns() {
let toml = format!(
"{MINIMAL_REQUIRED}\n\
[compaction]\n\
compact_worker_max_turns = 7\n"
);
let manifest = PodManifest::from_toml(&toml).unwrap();
let c = manifest.compaction.unwrap();
assert_eq!(c.compact_worker_max_turns, Some(7));
}
#[test]

View File

@ -99,6 +99,18 @@ impl UsageTracker {
});
}
/// Return a clone of the accumulated `UsageRecord`s without clearing them.
/// Used by request-time circuit breakers that need the same occupancy
/// projection as Pod persistence while the run is still active.
pub(crate) fn records(&self) -> Vec<UsageRecord> {
self.pending_records
.lock()
.unwrap()
.iter()
.map(|r| r.record.clone())
.collect()
}
/// Drain accumulated records. Called by Pod after a run completes,
/// before persisting the turn.
pub(crate) fn drain(&self) -> Vec<RecordedUsage> {
@ -136,6 +148,19 @@ mod tests {
assert!(records[0].correlation_id.is_none());
}
#[test]
fn records_clones_without_clearing() {
let tracker = UsageTracker::new();
tracker.note_request(1);
tracker.record_usage(&make_event(10, 0, 0, 5));
let records = tracker.records();
assert_eq!(records.len(), 1);
assert_eq!(records[0].history_len, 1);
assert_eq!(records[0].input_total_tokens, 10);
assert_eq!(tracker.records().len(), 1);
}
#[test]
fn drain_clears_buffer() {
let tracker = UsageTracker::new();

View File

@ -18,7 +18,6 @@
//! compacted session's opening system messages.
use std::path::PathBuf;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use async_trait::async_trait;
@ -28,6 +27,7 @@ use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput};
use serde::Deserialize;
use tools::ScopedFs;
use crate::compact::usage_tracker::UsageTracker;
use crate::fs_view::{ReadRequirement, slice_lines};
/// Aggregated output of a compact worker run.
@ -246,24 +246,29 @@ pub(crate) fn write_summary_tool(ctx: Arc<Mutex<CompactWorkerContext>>) -> ToolD
})
}
/// Interceptor that aborts the compact worker as soon as its cumulative
/// input-token count crosses `max_input_tokens`. Pairs with the
/// `on_usage` callback registered by `Pod::compact`, which is what
/// actually accumulates `input_so_far`.
/// Interceptor that aborts the compact worker when its current prompt
/// occupancy estimate crosses `max_input_tokens`. The estimate uses the same
/// `UsageRecord` + `llm_worker::token_counter::total_tokens` path as the main
/// Pod compaction thresholds, so prompt-cache hits are not counted cumulatively
/// across turns.
pub(crate) struct CompactWorkerInterceptor {
pub input_so_far: Arc<AtomicU64>,
pub usage_tracker: Arc<UsageTracker>,
pub max_input_tokens: u64,
}
#[async_trait]
impl Interceptor for CompactWorkerInterceptor {
async fn pre_llm_request(&self, _context: &mut Vec<Item>) -> PreRequestAction {
if self.input_so_far.load(Ordering::Relaxed) > self.max_input_tokens {
async fn pre_llm_request(&self, context: &mut Vec<Item>) -> PreRequestAction {
let records = self.usage_tracker.records();
let estimate = llm_worker::token_counter::total_tokens(context, &records);
if estimate.tokens > self.max_input_tokens {
return PreRequestAction::Cancel(format!(
"compact worker input exceeded {} tokens",
"compact worker input occupancy exceeded {} tokens",
self.max_input_tokens
));
}
self.usage_tracker.note_request(context.len());
PreRequestAction::Continue
}
}
@ -283,6 +288,66 @@ mod tests {
ScopedFs::new(scope, tmp.to_path_buf())
}
fn make_usage(input: u64) -> llm_worker::timeline::event::UsageEvent {
llm_worker::timeline::event::UsageEvent {
input_tokens: Some(input),
output_tokens: Some(0),
total_tokens: Some(input),
cache_read_input_tokens: None,
cache_creation_input_tokens: None,
}
}
#[tokio::test]
async fn compact_worker_interceptor_uses_occupancy_not_cumulative_usage() {
let tracker = Arc::new(UsageTracker::new());
let interceptor = CompactWorkerInterceptor {
usage_tracker: tracker.clone(),
max_input_tokens: 150,
};
let mut context = vec![Item::user_message("hello")];
assert!(matches!(
interceptor.pre_llm_request(&mut context).await,
PreRequestAction::Continue
));
tracker.record_usage(&make_usage(100));
assert!(matches!(
interceptor.pre_llm_request(&mut context).await,
PreRequestAction::Continue
));
tracker.record_usage(&make_usage(100));
// Two 100-token requests would exceed a cumulative 150-token cap, but
// current occupancy is still the latest 100-token measurement.
assert!(matches!(
interceptor.pre_llm_request(&mut context).await,
PreRequestAction::Continue
));
}
#[tokio::test]
async fn compact_worker_interceptor_cancels_when_occupancy_exceeds_cap() {
let tracker = Arc::new(UsageTracker::new());
let interceptor = CompactWorkerInterceptor {
usage_tracker: tracker.clone(),
max_input_tokens: 99,
};
let mut context = vec![Item::user_message("hello")];
assert!(matches!(
interceptor.pre_llm_request(&mut context).await,
PreRequestAction::Continue
));
tracker.record_usage(&make_usage(100));
assert!(matches!(
interceptor.pre_llm_request(&mut context).await,
PreRequestAction::Cancel(message) if message.contains("occupancy")
));
}
#[tokio::test]
async fn mark_read_required_records_and_deducts_budget() {
let tmp = tempfile::TempDir::new().unwrap();

View File

@ -12,8 +12,8 @@ use session_store::{EntryHash, PodScopeSnapshot, SessionId, SessionStartState, S
use tracing::{info, warn};
use manifest::{
Permission, PodManifest, PodManifestConfig, ResolveError, Scope, ScopeConfig, ScopeError, ScopeRule,
SharedScope, WorkerManifest,
Permission, PodManifest, PodManifestConfig, ResolveError, Scope, ScopeConfig, ScopeError,
ScopeRule, SharedScope, WorkerManifest,
};
use crate::compact::state::CompactState;
@ -1456,8 +1456,6 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
///
/// Returns the new session ID.
pub async fn compact(&mut self, retained_tokens: u64) -> Result<SessionId, PodError> {
use std::sync::atomic::{AtomicU64, Ordering};
use crate::compact::worker::{
CompactWorkerContext, CompactWorkerInterceptor, add_reference_tool,
mark_read_required_tool, write_summary_tool,
@ -1477,7 +1475,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
// Compaction-related knobs. Fall through to manifest defaults when
// `[compaction]` is omitted entirely.
let (auto_read_budget, compact_worker_max_input_tokens) = self
let (auto_read_budget, compact_worker_max_input_tokens, compact_worker_max_turns) = self
.manifest
.compaction
.as_ref()
@ -1485,11 +1483,13 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
(
c.compact_auto_read_budget,
c.compact_worker_max_input_tokens,
c.compact_worker_max_turns,
)
})
.unwrap_or((
manifest::defaults::COMPACT_AUTO_READ_BUDGET,
manifest::defaults::COMPACT_WORKER_MAX_INPUT_TOKENS,
manifest::defaults::COMPACT_WORKER_MAX_TURNS,
));
// Default references: the N most-recently-touched files in the
@ -1530,21 +1530,24 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
let mut summary_worker = Worker::new(summary_client).system_prompt(summary_system_prompt);
summary_worker.set_cache_key(Some(self.session_id.to_string()));
// Cumulative input-token meter + interceptor. The meter is bumped
// from the on_usage callback and read on every pre_llm_request.
let input_so_far = Arc::new(AtomicU64::new(0));
// Occupancy-based input-token meter + interceptor. The tracker pairs
// each pre-request history length with the following UsageEvent, then
// the interceptor projects current prompt occupancy with the same
// UsageRecord counter used by the main Pod thresholds.
let summary_usage_tracker = Arc::new(UsageTracker::new());
{
let acc = input_so_far.clone();
let tracker = summary_usage_tracker.clone();
summary_worker.on_usage(move |event| {
if let Some(tokens) = event.input_tokens {
acc.fetch_add(tokens, Ordering::Relaxed);
}
tracker.record_usage(event);
});
}
summary_worker.set_interceptor(CompactWorkerInterceptor {
input_so_far: input_so_far.clone(),
usage_tracker: summary_usage_tracker,
max_input_tokens: compact_worker_max_input_tokens,
});
if compact_worker_max_turns.is_some() {
summary_worker.set_max_turns(compact_worker_max_turns);
}
// Tools: read_file (shared scope, fresh tracker) + the three
// compact-specific tools that populate `ctx`.
@ -3069,7 +3072,11 @@ permission = "write"
let shadows = ingest_skills(&mut registry, &manifest);
// workspace skill `alpha` should be registered (no collision).
assert!(registry.get(&memory::Slug::parse("alpha").unwrap()).is_some());
assert!(
registry
.get(&memory::Slug::parse("alpha").unwrap())
.is_some()
);
// No workflow exists to shadow `alpha`, so no shadow event for it.
assert!(shadows.iter().all(|s| s.slug.as_str() != "alpha"));
}

View File

@ -140,7 +140,8 @@ compact_threshold = 80000 # ターンの合間 (proactive)
compact_request_threshold = 90000 # リクエストの合間 (safety net)
retained_tokens = 8000 # 直近保護トークン数 (Prune 済みで計測)
auto_read_budget = 8000 # compact worker の mark_read_required 合計上限
compact_worker_max_input_tokens = 50000 # compact worker 自身の累計入力トークン上限
compact_worker_max_input_tokens = 50000 # compact worker 自身の現在占有トークン上限
compact_worker_max_turns = 20 # compact worker 自身の tool loop 上限
```
### Auto-Read とリファレンス

View File

@ -212,9 +212,13 @@ permission = "write"
# compact_auto_read_budget = 8000
#
# # 任意。デフォルト: 50000 (`defaults::COMPACT_WORKER_MAX_INPUT_TOKENS`)。
# # compact worker 自身の累積入力 token cap。超過で abort (circuit breaker)。
# # compact worker 自身の現在占有 token cap。超過で abort (circuit breaker)。
# compact_worker_max_input_tokens = 50000
#
# # 任意。デフォルト: 20 (`defaults::COMPACT_WORKER_MAX_TURNS`)。
# # compact worker 自身の tool loop 上限。Rust config で None の場合のみ無制限。
# compact_worker_max_turns = 20
#
# # 任意。デフォルト: メインモデルを `clone_boxed()` で複製。
# # compact 専用モデルを使う場合のみ書く ([model] と同じ形式)。
# # [compaction.model]

View File

@ -183,6 +183,7 @@ compact_request_threshold = 90000
compact_retained_tokens = 8000
compact_auto_read_budget = 8000
compact_worker_max_input_tokens = 50000
compact_worker_max_turns = 20
[compaction.model]
scheme = "gemini"

View File

@ -0,0 +1,51 @@
# Compact worker のサーキットブレーカーを占有量ベースに統一
## 背景
Compact worker のサーキットブレーカー (`crates/pod/src/compact/worker.rs:253-269` の `CompactWorkerInterceptor`) は `compact_worker_max_input_tokens``UsageEvent.input_tokens`**累積和** で見ている。一方、`UsageEvent.input_tokens` の定義 (`crates/llm-worker/src/llm_client/event.rs:76-94`) は「送信した prompt prefix の総トークン数占有量、キャッシュ込み」であり、Anthropic 側でも `cache_read + cache_creation` を加算してこの規約に揃えている。
結果として現行の累積メトリックは、毎ターン同じ prefix をフルカウントする「context size × ターン数」相当の値を測っており:
- compact worker は `build_summary_prompt` (`crates/pod/src/pod.rs:2630-2657`) で reasoning / ToolCall.arguments / ToolResult.content を strip した skeleton + 探索ツールという設計なので、初回 input は元 history より大幅に小さい(数十 K 程度)。
- それでも cache hit 含む prefix を毎ターン丸ごと足していくため、20-30K の skeleton を入力にツールを 2-3 回叩いた時点で 50K デフォルトに到達する。
- prompt cache が 99% ヒットしていても累積値は同じだけ増えるので、コストの近似にも安全マージンの近似にもなっていない。
メイン Worker 側の対応するしきい値 (`compact_threshold`, `compact_request_threshold`) は `Pod::total_tokens()` (`crates/pod/src/pod.rs:146-149` → `llm_worker::token_counter::total_tokens(history, &usage_records)`) を見ており、これは `UsageRecord` 列を最新測定 + バイト按分で射影した「現在の占有量」(単一の値, 累積ではない)。Compact worker でもこの正規のカウンタに統一すべき。
サーキットブレーカーとして測るべき軸は二つあり、占有量カウンタは前者だけを担当する:
1. **占有量** (cost / window 圧迫の相当値): `total_tokens()` を流用。
2. **ループ深さ** (短い context でツールを延々叩く暴走): `Worker::set_max_turns` (`crates/llm-worker/src/worker.rs:1369`) で別途上限を入れる。
## 方針
`CompactWorkerInterceptor` を、メインと同じ `UsageTracker` + `total_tokens` 機構に乗せ替える。累積メトリックは廃止する。ループ深さ対策として `compact_worker_max_turns` を新設し、`set_max_turns` 経由で compact worker に伝える。
## 要件
- `CompactWorkerInterceptor` を削除または書き換え、`pre_llm_request` の判定を `llm_worker::token_counter::total_tokens(worker.history(), &records).tokens > max_input_tokens` に切り替える。`input_so_far: AtomicU64` の累積パスは廃止。
- Compact worker にも `UsageTracker` を持たせ、`pre_llm_request` で `note_request(history.len())`、`on_usage` で `record_usage` する。メイン Pod (`pod.rs:777-780`) と同じ配線パターン。
- `compact_worker_max_input_tokens` の意味を「compact worker 側の現在占有量しきい値」に変更し、ドキュメントとデフォルト値を更新する。デフォルトは `compact_threshold` と単位が揃うため、現行 50K のままだと typical な main 側設定 (80K) に対して小さく compact 自身がそれを下回るのは妥当な範囲。実値は新セマンティクスで再評価する(要件としては「累積値ではなく占有量を測る」ことのみで固定)。
- `CompactionConfig``compact_worker_max_turns: Option<u32>` を追加し、`compact()` (`pod.rs:1458-`) で `summary_worker.set_max_turns` に渡す。`None` のときは無制限(既存動作)。デフォルトは要検討(仮: `Some(20)`)。
- 後方互換 shim は入れない。`compact_worker_max_input_tokens` はフィールド名を維持しつつセマンティクスだけ差し替えるため、旧設定値はそのまま新セマンティクスで解釈される。閾値のオーダーは大きくは変わらないので運用上の破壊的影響は小さい。
## 完了条件
- 通常の compact 実行で、cache hit 込みの prefix がフルカウントされなくなり、`build_summary_prompt` の skeleton + 数回のファイル読み程度では cap に当たらない。
- 短い context でツールを延々呼び続ける疑似ケースで、`compact_worker_max_turns` により compact run が打ち切られる。
- `Pod::total_tokens()` と compact worker の占有量推定で同じ算出経路 (`llm_worker::token_counter::total_tokens`) が使われている。
## 範囲外
- `compact_threshold` / `compact_request_threshold` 自体のセマンティクス・既定値変更。
- Compact worker が更に compact をかけるmeta-compact
- `compact_auto_read_budget` ロジックの変更。
- token 推定アルゴリズム自体の改善。
## 影響範囲
- `crates/pod/src/compact/worker.rs`: `CompactWorkerInterceptor` の実装、`input_so_far` 経路の削除。
- `crates/pod/src/pod.rs`: `compact()` の compact worker 構築箇所 (`UsageTracker` 配線、`on_usage` 差し替え、`set_max_turns` 呼び出し)。
- `crates/pod/src/compact/usage_tracker.rs`: 既存 `UsageTracker` を compact worker からも使うため、必要なら可視性 / API の調整。
- `crates/manifest/src/lib.rs`, `crates/manifest/src/config.rs`, `crates/manifest/src/defaults.rs`: `compact_worker_max_input_tokens` のドキュメント更新、`compact_worker_max_turns` の追加とカスケード。
- `crates/pod/src/compact/worker.rs` のユニットテスト、および compact 関連の統合テスト。