Merge branch 'compact-worker-occupancy-cap' into develop
This commit is contained in:
commit
3fece8749b
|
|
@ -116,6 +116,8 @@ pub struct CompactionConfigPartial {
|
|||
#[serde(default)]
|
||||
pub compact_worker_max_input_tokens: Option<u64>,
|
||||
#[serde(default)]
|
||||
pub compact_worker_max_turns: Option<u32>,
|
||||
#[serde(default)]
|
||||
pub model: Option<ModelManifest>,
|
||||
}
|
||||
|
||||
|
|
@ -325,6 +327,9 @@ impl CompactionConfigPartial {
|
|||
compact_worker_max_input_tokens: upper
|
||||
.compact_worker_max_input_tokens
|
||||
.or(self.compact_worker_max_input_tokens),
|
||||
compact_worker_max_turns: upper
|
||||
.compact_worker_max_turns
|
||||
.or(self.compact_worker_max_turns),
|
||||
model: merge_option(self.model, upper.model, ModelManifest::merge),
|
||||
}
|
||||
}
|
||||
|
|
@ -461,6 +466,9 @@ impl TryFrom<PodManifestConfig> for PodManifest {
|
|||
compact_worker_max_input_tokens: c
|
||||
.compact_worker_max_input_tokens
|
||||
.unwrap_or(defaults::COMPACT_WORKER_MAX_INPUT_TOKENS),
|
||||
compact_worker_max_turns: c
|
||||
.compact_worker_max_turns
|
||||
.or(defaults::COMPACT_WORKER_MAX_TURNS),
|
||||
model: c.model,
|
||||
})
|
||||
})
|
||||
|
|
@ -949,6 +957,32 @@ stop_sequences = ["\n\n", "</stop>"]
|
|||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn from_toml_accepts_compact_worker_max_turns() {
|
||||
let cfg = PodManifestConfig::from_toml(
|
||||
r#"
|
||||
[compaction]
|
||||
compact_worker_max_turns = 7
|
||||
"#,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(cfg.compaction.unwrap().compact_worker_max_turns, Some(7));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn try_from_compaction_defaults_compact_worker_max_turns() {
|
||||
let mut cfg = minimal_valid();
|
||||
cfg.compaction = Some(CompactionConfigPartial::default());
|
||||
|
||||
let manifest = PodManifest::try_from(cfg).unwrap();
|
||||
|
||||
assert_eq!(
|
||||
manifest.compaction.unwrap().compact_worker_max_turns,
|
||||
defaults::COMPACT_WORKER_MAX_TURNS
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn from_toml_partial_layer_succeeds() {
|
||||
// A project-layer manifest with only scope set must parse fine.
|
||||
|
|
@ -1042,7 +1076,10 @@ name = "dbg"
|
|||
fn skills_directories_resolved_against_base() {
|
||||
let mut cfg = minimal_valid();
|
||||
cfg.skills = Some(SkillsConfig {
|
||||
directories: vec![PathBuf::from(".claude/skills"), PathBuf::from("/abs/elsewhere")],
|
||||
directories: vec![
|
||||
PathBuf::from(".claude/skills"),
|
||||
PathBuf::from("/abs/elsewhere"),
|
||||
],
|
||||
});
|
||||
let resolved = cfg.resolve_paths(Path::new("/workspace/proj"));
|
||||
let dirs = resolved.skills.as_ref().unwrap().directories.clone();
|
||||
|
|
|
|||
|
|
@ -36,12 +36,16 @@ pub const DEFAULT_INSTRUCTION: &str = "$insomnia/default";
|
|||
/// [`crate::CompactionConfig::compact_auto_read_budget`].
|
||||
pub const COMPACT_AUTO_READ_BUDGET: u64 = 8000;
|
||||
|
||||
/// Cumulative input-token cap for the compact worker's own LLM
|
||||
/// Current prompt-occupancy cap for the compact worker's own LLM
|
||||
/// calls. Exceeding this aborts the compact run (circuit-breaker
|
||||
/// path). See
|
||||
/// [`crate::CompactionConfig::compact_worker_max_input_tokens`].
|
||||
pub const COMPACT_WORKER_MAX_INPUT_TOKENS: u64 = 50_000;
|
||||
|
||||
/// Optional maximum compact-worker tool-loop depth. `None` means unlimited.
|
||||
/// See [`crate::CompactionConfig::compact_worker_max_turns`].
|
||||
pub const COMPACT_WORKER_MAX_TURNS: Option<u32> = Some(20);
|
||||
|
||||
/// Number of recently-touched files fed to the compact worker as
|
||||
/// default references.
|
||||
pub const COMPACT_DEFAULT_REFERENCE_COUNT: usize = 5;
|
||||
|
|
|
|||
|
|
@ -321,11 +321,16 @@ pub struct CompactionConfig {
|
|||
#[serde(default = "default_compact_auto_read_budget")]
|
||||
pub compact_auto_read_budget: u64,
|
||||
|
||||
/// Cumulative input-token cap for the compact worker's own LLM
|
||||
/// calls. Exceeding this aborts the compact run.
|
||||
/// Current prompt-occupancy cap for the compact worker's own LLM
|
||||
/// requests. Exceeding this aborts the compact run.
|
||||
#[serde(default = "default_compact_worker_max_input_tokens")]
|
||||
pub compact_worker_max_input_tokens: u64,
|
||||
|
||||
/// Optional maximum compact-worker tool-loop depth. `None` leaves the
|
||||
/// worker unlimited; the default bounds runaway short-context loops.
|
||||
#[serde(default = "default_compact_worker_max_turns")]
|
||||
pub compact_worker_max_turns: Option<u32>,
|
||||
|
||||
/// Optional model for the compactor (summary) LLM.
|
||||
/// If omitted, the main model is cloned via `clone_boxed()`.
|
||||
#[serde(default)]
|
||||
|
|
@ -347,6 +352,9 @@ fn default_compact_auto_read_budget() -> u64 {
|
|||
fn default_compact_worker_max_input_tokens() -> u64 {
|
||||
defaults::COMPACT_WORKER_MAX_INPUT_TOKENS
|
||||
}
|
||||
fn default_compact_worker_max_turns() -> Option<u32> {
|
||||
defaults::COMPACT_WORKER_MAX_TURNS
|
||||
}
|
||||
|
||||
impl Default for CompactionConfig {
|
||||
fn default() -> Self {
|
||||
|
|
@ -358,6 +366,7 @@ impl Default for CompactionConfig {
|
|||
compact_retained_tokens: default_compact_retained_tokens(),
|
||||
compact_auto_read_budget: default_compact_auto_read_budget(),
|
||||
compact_worker_max_input_tokens: default_compact_worker_max_input_tokens(),
|
||||
compact_worker_max_turns: default_compact_worker_max_turns(),
|
||||
model: None,
|
||||
}
|
||||
}
|
||||
|
|
@ -521,6 +530,19 @@ model_id = "claude-sonnet-4-20250514"
|
|||
assert_eq!(c.compact_threshold, Some(80000));
|
||||
assert_eq!(c.compact_request_threshold, None);
|
||||
assert_eq!(c.compact_retained_tokens, 8000);
|
||||
assert_eq!(c.compact_worker_max_turns, Some(20));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_compaction_worker_max_turns() {
|
||||
let toml = format!(
|
||||
"{MINIMAL_REQUIRED}\n\
|
||||
[compaction]\n\
|
||||
compact_worker_max_turns = 7\n"
|
||||
);
|
||||
let manifest = PodManifest::from_toml(&toml).unwrap();
|
||||
let c = manifest.compaction.unwrap();
|
||||
assert_eq!(c.compact_worker_max_turns, Some(7));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
|
|||
|
|
@ -99,6 +99,18 @@ impl UsageTracker {
|
|||
});
|
||||
}
|
||||
|
||||
/// Return a clone of the accumulated `UsageRecord`s without clearing them.
|
||||
/// Used by request-time circuit breakers that need the same occupancy
|
||||
/// projection as Pod persistence while the run is still active.
|
||||
pub(crate) fn records(&self) -> Vec<UsageRecord> {
|
||||
self.pending_records
|
||||
.lock()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.map(|r| r.record.clone())
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Drain accumulated records. Called by Pod after a run completes,
|
||||
/// before persisting the turn.
|
||||
pub(crate) fn drain(&self) -> Vec<RecordedUsage> {
|
||||
|
|
@ -136,6 +148,19 @@ mod tests {
|
|||
assert!(records[0].correlation_id.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn records_clones_without_clearing() {
|
||||
let tracker = UsageTracker::new();
|
||||
tracker.note_request(1);
|
||||
tracker.record_usage(&make_event(10, 0, 0, 5));
|
||||
|
||||
let records = tracker.records();
|
||||
assert_eq!(records.len(), 1);
|
||||
assert_eq!(records[0].history_len, 1);
|
||||
assert_eq!(records[0].input_total_tokens, 10);
|
||||
assert_eq!(tracker.records().len(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn drain_clears_buffer() {
|
||||
let tracker = UsageTracker::new();
|
||||
|
|
|
|||
|
|
@ -18,7 +18,6 @@
|
|||
//! compacted session's opening system messages.
|
||||
|
||||
use std::path::PathBuf;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use async_trait::async_trait;
|
||||
|
|
@ -28,6 +27,7 @@ use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput};
|
|||
use serde::Deserialize;
|
||||
use tools::ScopedFs;
|
||||
|
||||
use crate::compact::usage_tracker::UsageTracker;
|
||||
use crate::fs_view::{ReadRequirement, slice_lines};
|
||||
|
||||
/// Aggregated output of a compact worker run.
|
||||
|
|
@ -246,24 +246,29 @@ pub(crate) fn write_summary_tool(ctx: Arc<Mutex<CompactWorkerContext>>) -> ToolD
|
|||
})
|
||||
}
|
||||
|
||||
/// Interceptor that aborts the compact worker as soon as its cumulative
|
||||
/// input-token count crosses `max_input_tokens`. Pairs with the
|
||||
/// `on_usage` callback registered by `Pod::compact`, which is what
|
||||
/// actually accumulates `input_so_far`.
|
||||
/// Interceptor that aborts the compact worker when its current prompt
|
||||
/// occupancy estimate crosses `max_input_tokens`. The estimate uses the same
|
||||
/// `UsageRecord` + `llm_worker::token_counter::total_tokens` path as the main
|
||||
/// Pod compaction thresholds, so prompt-cache hits are not counted cumulatively
|
||||
/// across turns.
|
||||
pub(crate) struct CompactWorkerInterceptor {
|
||||
pub input_so_far: Arc<AtomicU64>,
|
||||
pub usage_tracker: Arc<UsageTracker>,
|
||||
pub max_input_tokens: u64,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Interceptor for CompactWorkerInterceptor {
|
||||
async fn pre_llm_request(&self, _context: &mut Vec<Item>) -> PreRequestAction {
|
||||
if self.input_so_far.load(Ordering::Relaxed) > self.max_input_tokens {
|
||||
async fn pre_llm_request(&self, context: &mut Vec<Item>) -> PreRequestAction {
|
||||
let records = self.usage_tracker.records();
|
||||
let estimate = llm_worker::token_counter::total_tokens(context, &records);
|
||||
if estimate.tokens > self.max_input_tokens {
|
||||
return PreRequestAction::Cancel(format!(
|
||||
"compact worker input exceeded {} tokens",
|
||||
"compact worker input occupancy exceeded {} tokens",
|
||||
self.max_input_tokens
|
||||
));
|
||||
}
|
||||
|
||||
self.usage_tracker.note_request(context.len());
|
||||
PreRequestAction::Continue
|
||||
}
|
||||
}
|
||||
|
|
@ -283,6 +288,66 @@ mod tests {
|
|||
ScopedFs::new(scope, tmp.to_path_buf())
|
||||
}
|
||||
|
||||
fn make_usage(input: u64) -> llm_worker::timeline::event::UsageEvent {
|
||||
llm_worker::timeline::event::UsageEvent {
|
||||
input_tokens: Some(input),
|
||||
output_tokens: Some(0),
|
||||
total_tokens: Some(input),
|
||||
cache_read_input_tokens: None,
|
||||
cache_creation_input_tokens: None,
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn compact_worker_interceptor_uses_occupancy_not_cumulative_usage() {
|
||||
let tracker = Arc::new(UsageTracker::new());
|
||||
let interceptor = CompactWorkerInterceptor {
|
||||
usage_tracker: tracker.clone(),
|
||||
max_input_tokens: 150,
|
||||
};
|
||||
let mut context = vec![Item::user_message("hello")];
|
||||
|
||||
assert!(matches!(
|
||||
interceptor.pre_llm_request(&mut context).await,
|
||||
PreRequestAction::Continue
|
||||
));
|
||||
tracker.record_usage(&make_usage(100));
|
||||
|
||||
assert!(matches!(
|
||||
interceptor.pre_llm_request(&mut context).await,
|
||||
PreRequestAction::Continue
|
||||
));
|
||||
tracker.record_usage(&make_usage(100));
|
||||
|
||||
// Two 100-token requests would exceed a cumulative 150-token cap, but
|
||||
// current occupancy is still the latest 100-token measurement.
|
||||
assert!(matches!(
|
||||
interceptor.pre_llm_request(&mut context).await,
|
||||
PreRequestAction::Continue
|
||||
));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn compact_worker_interceptor_cancels_when_occupancy_exceeds_cap() {
|
||||
let tracker = Arc::new(UsageTracker::new());
|
||||
let interceptor = CompactWorkerInterceptor {
|
||||
usage_tracker: tracker.clone(),
|
||||
max_input_tokens: 99,
|
||||
};
|
||||
let mut context = vec![Item::user_message("hello")];
|
||||
|
||||
assert!(matches!(
|
||||
interceptor.pre_llm_request(&mut context).await,
|
||||
PreRequestAction::Continue
|
||||
));
|
||||
tracker.record_usage(&make_usage(100));
|
||||
|
||||
assert!(matches!(
|
||||
interceptor.pre_llm_request(&mut context).await,
|
||||
PreRequestAction::Cancel(message) if message.contains("occupancy")
|
||||
));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn mark_read_required_records_and_deducts_budget() {
|
||||
let tmp = tempfile::TempDir::new().unwrap();
|
||||
|
|
|
|||
|
|
@ -12,8 +12,8 @@ use session_store::{EntryHash, PodScopeSnapshot, SessionId, SessionStartState, S
|
|||
use tracing::{info, warn};
|
||||
|
||||
use manifest::{
|
||||
Permission, PodManifest, PodManifestConfig, ResolveError, Scope, ScopeConfig, ScopeError, ScopeRule,
|
||||
SharedScope, WorkerManifest,
|
||||
Permission, PodManifest, PodManifestConfig, ResolveError, Scope, ScopeConfig, ScopeError,
|
||||
ScopeRule, SharedScope, WorkerManifest,
|
||||
};
|
||||
|
||||
use crate::compact::state::CompactState;
|
||||
|
|
@ -1456,8 +1456,6 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
///
|
||||
/// Returns the new session ID.
|
||||
pub async fn compact(&mut self, retained_tokens: u64) -> Result<SessionId, PodError> {
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
|
||||
use crate::compact::worker::{
|
||||
CompactWorkerContext, CompactWorkerInterceptor, add_reference_tool,
|
||||
mark_read_required_tool, write_summary_tool,
|
||||
|
|
@ -1477,7 +1475,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
|
||||
// Compaction-related knobs. Fall through to manifest defaults when
|
||||
// `[compaction]` is omitted entirely.
|
||||
let (auto_read_budget, compact_worker_max_input_tokens) = self
|
||||
let (auto_read_budget, compact_worker_max_input_tokens, compact_worker_max_turns) = self
|
||||
.manifest
|
||||
.compaction
|
||||
.as_ref()
|
||||
|
|
@ -1485,11 +1483,13 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
(
|
||||
c.compact_auto_read_budget,
|
||||
c.compact_worker_max_input_tokens,
|
||||
c.compact_worker_max_turns,
|
||||
)
|
||||
})
|
||||
.unwrap_or((
|
||||
manifest::defaults::COMPACT_AUTO_READ_BUDGET,
|
||||
manifest::defaults::COMPACT_WORKER_MAX_INPUT_TOKENS,
|
||||
manifest::defaults::COMPACT_WORKER_MAX_TURNS,
|
||||
));
|
||||
|
||||
// Default references: the N most-recently-touched files in the
|
||||
|
|
@ -1530,21 +1530,22 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
let mut summary_worker = Worker::new(summary_client).system_prompt(summary_system_prompt);
|
||||
summary_worker.set_cache_key(Some(self.session_id.to_string()));
|
||||
|
||||
// Cumulative input-token meter + interceptor. The meter is bumped
|
||||
// from the on_usage callback and read on every pre_llm_request.
|
||||
let input_so_far = Arc::new(AtomicU64::new(0));
|
||||
// Occupancy-based input-token meter + interceptor. The tracker pairs
|
||||
// each pre-request history length with the following UsageEvent, then
|
||||
// the interceptor projects current prompt occupancy with the same
|
||||
// UsageRecord counter used by the main Pod thresholds.
|
||||
let summary_usage_tracker = Arc::new(UsageTracker::new());
|
||||
{
|
||||
let acc = input_so_far.clone();
|
||||
let tracker = summary_usage_tracker.clone();
|
||||
summary_worker.on_usage(move |event| {
|
||||
if let Some(tokens) = event.input_tokens {
|
||||
acc.fetch_add(tokens, Ordering::Relaxed);
|
||||
}
|
||||
tracker.record_usage(event);
|
||||
});
|
||||
}
|
||||
summary_worker.set_interceptor(CompactWorkerInterceptor {
|
||||
input_so_far: input_so_far.clone(),
|
||||
usage_tracker: summary_usage_tracker,
|
||||
max_input_tokens: compact_worker_max_input_tokens,
|
||||
});
|
||||
summary_worker.set_max_turns(compact_worker_max_turns);
|
||||
|
||||
// Tools: read_file (shared scope, fresh tracker) + the three
|
||||
// compact-specific tools that populate `ctx`.
|
||||
|
|
@ -3069,7 +3070,11 @@ permission = "write"
|
|||
let shadows = ingest_skills(&mut registry, &manifest);
|
||||
|
||||
// workspace skill `alpha` should be registered (no collision).
|
||||
assert!(registry.get(&memory::Slug::parse("alpha").unwrap()).is_some());
|
||||
assert!(
|
||||
registry
|
||||
.get(&memory::Slug::parse("alpha").unwrap())
|
||||
.is_some()
|
||||
);
|
||||
// No workflow exists to shadow `alpha`, so no shadow event for it.
|
||||
assert!(shadows.iter().all(|s| s.slug.as_str() != "alpha"));
|
||||
}
|
||||
|
|
|
|||
|
|
@ -140,7 +140,8 @@ compact_threshold = 80000 # ターンの合間 (proactive)
|
|||
compact_request_threshold = 90000 # リクエストの合間 (safety net)
|
||||
retained_tokens = 8000 # 直近保護トークン数 (Prune 済みで計測)
|
||||
auto_read_budget = 8000 # compact worker の mark_read_required 合計上限
|
||||
compact_worker_max_input_tokens = 50000 # compact worker 自身の累計入力トークン上限
|
||||
compact_worker_max_input_tokens = 50000 # compact worker 自身の現在占有トークン上限
|
||||
compact_worker_max_turns = 20 # compact worker 自身の tool loop 上限
|
||||
```
|
||||
|
||||
### Auto-Read とリファレンス
|
||||
|
|
|
|||
|
|
@ -212,9 +212,13 @@ permission = "write"
|
|||
# compact_auto_read_budget = 8000
|
||||
#
|
||||
# # 任意。デフォルト: 50000 (`defaults::COMPACT_WORKER_MAX_INPUT_TOKENS`)。
|
||||
# # compact worker 自身の累積入力 token cap。超過で abort (circuit breaker)。
|
||||
# # compact worker 自身の現在占有 token cap。超過で abort (circuit breaker)。
|
||||
# compact_worker_max_input_tokens = 50000
|
||||
#
|
||||
# # 任意。デフォルト: 20 (`defaults::COMPACT_WORKER_MAX_TURNS`)。
|
||||
# # compact worker 自身の tool loop 上限。Rust config で None の場合のみ無制限。
|
||||
# compact_worker_max_turns = 20
|
||||
#
|
||||
# # 任意。デフォルト: メインモデルを `clone_boxed()` で複製。
|
||||
# # compact 専用モデルを使う場合のみ書く ([model] と同じ形式)。
|
||||
# # [compaction.model]
|
||||
|
|
|
|||
|
|
@ -183,6 +183,7 @@ compact_request_threshold = 90000
|
|||
compact_retained_tokens = 8000
|
||||
compact_auto_read_budget = 8000
|
||||
compact_worker_max_input_tokens = 50000
|
||||
compact_worker_max_turns = 20
|
||||
|
||||
[compaction.model]
|
||||
scheme = "gemini"
|
||||
|
|
|
|||
56
tickets/compact-worker-occupancy-cap.md
Normal file
56
tickets/compact-worker-occupancy-cap.md
Normal file
|
|
@ -0,0 +1,56 @@
|
|||
# Compact worker のサーキットブレーカーを占有量ベースに統一
|
||||
|
||||
## 背景
|
||||
|
||||
Compact worker のサーキットブレーカー (`crates/pod/src/compact/worker.rs:253-269` の `CompactWorkerInterceptor`) は `compact_worker_max_input_tokens` を `UsageEvent.input_tokens` の **累積和** で見ている。一方、`UsageEvent.input_tokens` の定義 (`crates/llm-worker/src/llm_client/event.rs:76-94`) は「送信した prompt prefix の総トークン数(占有量、キャッシュ込み)」であり、Anthropic 側でも `cache_read + cache_creation` を加算してこの規約に揃えている。
|
||||
|
||||
結果として現行の累積メトリックは、毎ターン同じ prefix をフルカウントする「context size × ターン数」相当の値を測っており:
|
||||
|
||||
- compact worker は `build_summary_prompt` (`crates/pod/src/pod.rs:2630-2657`) で reasoning / ToolCall.arguments / ToolResult.content を strip した skeleton + 探索ツールという設計なので、初回 input は元 history より大幅に小さい(数十 K 程度)。
|
||||
- それでも cache hit 含む prefix を毎ターン丸ごと足していくため、20-30K の skeleton を入力にツールを 2-3 回叩いた時点で 50K デフォルトに到達する。
|
||||
- prompt cache が 99% ヒットしていても累積値は同じだけ増えるので、コストの近似にも安全マージンの近似にもなっていない。
|
||||
|
||||
メイン Worker 側の対応するしきい値 (`compact_threshold`, `compact_request_threshold`) は `Pod::total_tokens()` (`crates/pod/src/pod.rs:146-149` → `llm_worker::token_counter::total_tokens(history, &usage_records)`) を見ており、これは `UsageRecord` 列を最新測定 + バイト按分で射影した「現在の占有量」(単一の値, 累積ではない)。Compact worker でもこの正規のカウンタに統一すべき。
|
||||
|
||||
サーキットブレーカーとして測るべき軸は二つあり、占有量カウンタは前者だけを担当する:
|
||||
|
||||
1. **占有量** (cost / window 圧迫の相当値): `total_tokens()` を流用。
|
||||
2. **ループ深さ** (短い context でツールを延々叩く暴走): `Worker::set_max_turns` (`crates/llm-worker/src/worker.rs:1369`) で別途上限を入れる。
|
||||
|
||||
## 方針
|
||||
|
||||
`CompactWorkerInterceptor` を、メインと同じ `UsageTracker` + `total_tokens` 機構に乗せ替える。累積メトリックは廃止する。ループ深さ対策として `compact_worker_max_turns` を新設し、`set_max_turns` 経由で compact worker に伝える。
|
||||
|
||||
## 要件
|
||||
|
||||
- `CompactWorkerInterceptor` を削除または書き換え、`pre_llm_request` の判定を `llm_worker::token_counter::total_tokens(worker.history(), &records).tokens > max_input_tokens` に切り替える。`input_so_far: AtomicU64` の累積パスは廃止。
|
||||
- Compact worker にも `UsageTracker` を持たせ、`pre_llm_request` で `note_request(history.len())`、`on_usage` で `record_usage` する。メイン Pod (`pod.rs:777-780`) と同じ配線パターン。
|
||||
- `compact_worker_max_input_tokens` の意味を「compact worker 側の現在占有量しきい値」に変更し、ドキュメントとデフォルト値を更新する。デフォルトは `compact_threshold` と単位が揃うため、現行 50K のままだと typical な main 側設定 (80K) に対して小さく compact 自身がそれを下回るのは妥当な範囲。実値は新セマンティクスで再評価する(要件としては「累積値ではなく占有量を測る」ことのみで固定)。
|
||||
- `CompactionConfig` に `compact_worker_max_turns: Option<u32>` を追加し、`compact()` (`pod.rs:1458-`) で `summary_worker.set_max_turns` に渡す。`None` のときは無制限(既存動作)。デフォルトは要検討(仮: `Some(20)`)。
|
||||
- 後方互換 shim は入れない。`compact_worker_max_input_tokens` はフィールド名を維持しつつセマンティクスだけ差し替えるため、旧設定値はそのまま新セマンティクスで解釈される。閾値のオーダーは大きくは変わらないので運用上の破壊的影響は小さい。
|
||||
|
||||
## 完了条件
|
||||
|
||||
- 通常の compact 実行で、cache hit 込みの prefix がフルカウントされなくなり、`build_summary_prompt` の skeleton + 数回のファイル読み程度では cap に当たらない。
|
||||
- 短い context でツールを延々呼び続ける疑似ケースで、`compact_worker_max_turns` により compact run が打ち切られる。
|
||||
- `Pod::total_tokens()` と compact worker の占有量推定で同じ算出経路 (`llm_worker::token_counter::total_tokens`) が使われている。
|
||||
|
||||
## 範囲外
|
||||
|
||||
- `compact_threshold` / `compact_request_threshold` 自体のセマンティクス・既定値変更。
|
||||
- Compact worker が更に compact をかける(meta-compact)。
|
||||
- `compact_auto_read_budget` ロジックの変更。
|
||||
- token 推定アルゴリズム自体の改善。
|
||||
|
||||
## 影響範囲
|
||||
|
||||
- `crates/pod/src/compact/worker.rs`: `CompactWorkerInterceptor` の実装、`input_so_far` 経路の削除。
|
||||
- `crates/pod/src/pod.rs`: `compact()` の compact worker 構築箇所 (`UsageTracker` 配線、`on_usage` 差し替え、`set_max_turns` 呼び出し)。
|
||||
- `crates/pod/src/compact/usage_tracker.rs`: 既存 `UsageTracker` を compact worker からも使うため、必要なら可視性 / API の調整。
|
||||
- `crates/manifest/src/lib.rs`, `crates/manifest/src/config.rs`, `crates/manifest/src/defaults.rs`: `compact_worker_max_input_tokens` のドキュメント更新、`compact_worker_max_turns` の追加とカスケード。
|
||||
- `crates/pod/src/compact/worker.rs` のユニットテスト、および compact 関連の統合テスト。
|
||||
|
||||
## Review
|
||||
- 状態: Approve with follow-up
|
||||
- レビュー詳細: [./compact-worker-occupancy-cap.review.md](./compact-worker-occupancy-cap.review.md)
|
||||
- 日付: 2026-05-11
|
||||
41
tickets/compact-worker-occupancy-cap.review.md
Normal file
41
tickets/compact-worker-occupancy-cap.review.md
Normal file
|
|
@ -0,0 +1,41 @@
|
|||
# Review: Compact worker のサーキットブレーカーを占有量ベースに統一
|
||||
|
||||
レビュー対象: commit `ef0cdf7` (base `d818b37`).
|
||||
|
||||
## 前提・要件の確認
|
||||
|
||||
- 要件1「`CompactWorkerInterceptor` を `total_tokens` ベースに切り替え、`input_so_far: AtomicU64` 経路を廃止」: 満たされている。`crates/pod/src/compact/worker.rs:249-274` で `usage_tracker.records()` → `llm_worker::token_counter::total_tokens(context, &records)` に置き換わっており、`AtomicU64` import / フィールドも削除されている。`crates/pod/src/pod.rs:1456` 周辺の `use std::sync::atomic` も消えている。
|
||||
- 要件2「Compact worker に `UsageTracker` を持たせ、`pre_llm_request` で `note_request`、`on_usage` で `record_usage`」: 満たされている。`pod.rs:1537-1547` で `summary_usage_tracker = Arc::new(UsageTracker::new())` を作り、`on_usage` で `record_usage(event)`、`CompactWorkerInterceptor::pre_llm_request` 内で `usage_tracker.note_request(context.len())` を呼ぶ (`compact/worker.rs:262-272`)。
|
||||
- 要件3「`compact_worker_max_input_tokens` の意味を「現在占有量しきい値」に変更し doc 更新」: 満たされている。`crates/manifest/src/lib.rs:324`、`crates/manifest/src/defaults.rs:39-43`、`docs/compaction.md:143`、`docs/manifest.toml:215` がすべて「現在占有」「累計ではない」表現に揃っている。フィールド名は維持され、後方互換 shim は入っていない (要件通り)。
|
||||
- 要件4「`compact_worker_max_turns: Option<u32>` を新設し `Worker::set_max_turns` 経由で渡す」: 満たされている。`CompactionConfig` (`lib.rs:329-332`)、`CompactionConfigPartial` (`config.rs:118-119`)、`defaults::COMPACT_WORKER_MAX_TURNS = Some(20)` (`defaults.rs:46-48`)、`pod.rs:1548-1550` で `summary_worker.set_max_turns(...)`。manifest 側の merge / TryFrom / TOML パースもテスト付きで通っている (`config.rs:960-983`, `lib.rs:521-546`)。
|
||||
- 要件5「後方互換 shim 無し」: 満たされている。フィールド名は変更しないため旧 manifest はそのまま新セマンティクスで読まれ、deprecated alias 等は導入されていない。
|
||||
- 完了条件1「cache hit 込み prefix がフルカウントされない」: 単体テスト `compact_worker_interceptor_uses_occupancy_not_cumulative_usage` (`compact/worker.rs:301-328`) で「2 回 100-token 入力 → 累積 200 でも occupancy は最新の 100 のままで cap=150 を通る」を確認。
|
||||
- 完了条件2「`compact_worker_max_turns` で打ち切られる」: `set_max_turns` のロジック自体は llm-worker 側の既存挙動 (`worker.rs:1045-1050`) に乗るのみで、本チケットでは新規ロジックを足していない。配線テスト (manifest 側のパース) は入っているが、compact 経由で実 abort する pod-level の統合テストは無し。Trivial wiring なのでブロッキングではないが、後で run-level テストを足すと安心。
|
||||
- 完了条件3「`Pod::total_tokens()` と同じ算出経路」: 満たされている。`pod.rs:148` (`llm_worker::token_counter::total_tokens(self.history(), &usage)`) と `compact/worker.rs:265` (`llm_worker::token_counter::total_tokens(context, &records)`) が同じ関数を経由する。compact 側は per-request の prune 後 `request_context` を渡すため、メイン側と完全一致ではないが、`pre_llm_request` 時点でのその後 LLM に投げる prefix 占有量という意味では妥当 (むしろ正確)。
|
||||
|
||||
## アーキテクチャ・スコープ
|
||||
|
||||
- 修正は `compact/worker.rs` + `pod.rs` の compact 構築 + manifest 1 フィールド追加に閉じており、ticket 影響範囲表と完全一致。`crates/pod/src/compact/usage_tracker.rs` への変更は「`records()` メソッドを `pub(crate)` で追加して drain せず読む」のみで、API 拡張は最小。
|
||||
- `UsageTracker` は元々「per-LLM-request 計測のペアリング・drain は Pod が persist 時に行う」モデル。compact worker は drain せず read-only で snapshot を覗くだけなので、メインの persist セマンティクスを汚さない。compact worker のループ内で記録しても drain しないままワーカーが破棄される (`pod.rs:1530-1551` で関数ローカル) ため、メインの `Pod::usage_tracker` (turn 永続化用) とは完全に独立した別インスタンスで、相互干渉しない。設計として綺麗。
|
||||
- `note_request` をメインでは `UsageTrackingHook` (`pod.rs:46-59`) として `pre_llm_request` Hook で呼んでいるのに対し、compact 側は `CompactWorkerInterceptor::pre_llm_request` 内で直接呼んでいる。Worker のコール順 (Hook → Interceptor → stream → on_usage) を踏まえると `note_request` は `record_usage` より前に呼ばれていれば意味的に等価で、両者とも同じ tick で呼ばれるため不整合は無い。ただし「メイン: Hook で `note_request`、Interceptor で別判定」「compact: Interceptor で両方」と配線パターンが分岐している点は将来読む人が混乱するかも。今のままでも動くが、補足コメントを Pod 側か Interceptor 側に入れておくと親切。
|
||||
- ループ深さは `Worker::set_max_turns` 既存機構の流用。新規ロジック無し、コードベースを歪めない選択。
|
||||
|
||||
## 指摘事項
|
||||
|
||||
### Blocking
|
||||
無し。
|
||||
|
||||
### Non-blocking / Follow-up
|
||||
- `pod.rs:1548-1550` の `if compact_worker_max_turns.is_some() { summary_worker.set_max_turns(compact_worker_max_turns); }` は不要な分岐。`set_max_turns` は `Option<u32>` を取り、`None` を渡しても初期値 `None` を上書きするだけで no-op (`crates/llm-worker/src/worker.rs:1369-1371` + `worker.rs:1181`)。無条件に `summary_worker.set_max_turns(compact_worker_max_turns);` で十分。条件付けることで「`None` だと何もしない」読者バイアスを生んでむしろ読みづらい。
|
||||
- `crates/pod/src/pod.rs:2178-2201` の `MemoryExtractWorkerInterceptor` は古い累積メトリック方式のまま残っており、コメントには `Mirror of compact::worker::CompactWorkerInterceptor;` と書かれている。今回の変更で「mirror」ではなくなったのでコメントが嘘になっている。memory extract 側のセマンティクス変更は本チケットの範囲外で正しい判断だが、コメント補正 (例: 「以前は CompactWorkerInterceptor のミラーだったが、compact 側は占有量ベースに移行した。memory extract 側は別チケットで追従予定」) を別チケットなり TODO なりで残しておきたい。
|
||||
- `compact_worker_max_turns` が compact 経由で実際に `Aborted/MaxTurnsReached` を引き起こす経路の pod-level 統合テストは含まれていない。配線そのものはトリビアルで、`set_max_turns` の振る舞いは llm-worker 側で別途テストされている前提。後追いで足すかどうかは判断に任せる。
|
||||
- デフォルト `COMPACT_WORKER_MAX_TURNS = Some(20)` は `build_summary_prompt` の skeleton + 数回ファイル読みなら十分余裕がある妥当なライン。ただし auto_read_budget (8000) と read_file の典型呼び出しサイズを考えると、深いツールループが起きる前にトークン cap が先に効きやすい設計なので、20 が上限になる場面はまずレアケース。妥当な保険値として OK。
|
||||
|
||||
### Nits
|
||||
- `compact/worker.rs:249-253` の docstring は新しいセマンティクスを正しく説明できている。良。
|
||||
- `compact/usage_tracker.rs:102-112` `records()` の docstring が「request-time circuit breakers が同じ occupancy projection を見るため」と用途を明示しており、将来の extract worker 側追従の伏線にもなっている。良。
|
||||
- ユニットテスト `compact_worker_interceptor_uses_occupancy_not_cumulative_usage` (`compact/worker.rs:301-328`) は「累積では落ちる量でも occupancy では通る」ケースをピンポイントで掴んでいて、今回の本質的なバグへの回帰防止として的確。
|
||||
|
||||
## 判断
|
||||
|
||||
Approve with follow-up — チケットの 4 要件と完了条件 1/3 はすべて根拠つきで満たされている。残るのは (a) `set_max_turns` 呼び出しの不要な分岐、(b) `MemoryExtractWorkerInterceptor` 側のコメント陳腐化と将来の追従、(c) max_turns の pod-level 統合テスト追加、いずれも非ブロッキング。マージ可。
|
||||
Loading…
Reference in New Issue
Block a user