diff --git a/crates/manifest/src/config.rs b/crates/manifest/src/config.rs index 6af96e46..3b13d440 100644 --- a/crates/manifest/src/config.rs +++ b/crates/manifest/src/config.rs @@ -252,6 +252,9 @@ impl MemoryConfig { extract_worker_max_input_tokens: upper .extract_worker_max_input_tokens .or(self.extract_worker_max_input_tokens), + extract_worker_max_turns: upper + .extract_worker_max_turns + .or(self.extract_worker_max_turns), consolidation_model: upper.consolidation_model.or(self.consolidation_model), consolidation_threshold_files: upper .consolidation_threshold_files diff --git a/crates/manifest/src/defaults.rs b/crates/manifest/src/defaults.rs index 4ac587a5..c3d6022d 100644 --- a/crates/manifest/src/defaults.rs +++ b/crates/manifest/src/defaults.rs @@ -50,7 +50,11 @@ pub const COMPACT_WORKER_MAX_TURNS: Option = Some(20); /// default references. pub const COMPACT_DEFAULT_REFERENCE_COUNT: usize = 5; -/// Cumulative input-token cap for the memory Phase 1 (extract) worker's -/// own LLM calls. Exceeding this aborts the extract run. -/// See [`crate::MemoryConfig::extract_worker_max_input_tokens`]. +/// Current prompt-occupancy cap for the memory extract worker's own +/// LLM requests. Exceeding this aborts the extract run (circuit-breaker +/// path). See [`crate::MemoryConfig::extract_worker_max_input_tokens`]. pub const MEMORY_EXTRACT_WORKER_MAX_INPUT_TOKENS: u64 = 30_000; + +/// Optional maximum extract-worker tool-loop depth. `None` means unlimited. +/// See [`crate::MemoryConfig::extract_worker_max_turns`]. +pub const MEMORY_EXTRACT_WORKER_MAX_TURNS: Option = Some(8); diff --git a/crates/manifest/src/lib.rs b/crates/manifest/src/lib.rs index 92505139..5d79d035 100644 --- a/crates/manifest/src/lib.rs +++ b/crates/manifest/src/lib.rs @@ -108,11 +108,17 @@ pub struct MemoryConfig { /// the auto-extract trigger is dormant. #[serde(default)] pub extract_threshold: Option, - /// Cumulative input-token cap for the extract worker's own LLM - /// calls. Exceeding this aborts the extract run. `None` ⇒ + /// Current prompt-occupancy cap for the extract worker's own LLM + /// requests. Exceeding this aborts the extract run. `None` ⇒ /// [`defaults::MEMORY_EXTRACT_WORKER_MAX_INPUT_TOKENS`]. #[serde(default)] pub extract_worker_max_input_tokens: Option, + /// Optional maximum extract-worker tool-loop depth. `None` leaves + /// the worker unlimited; the default bounds runaway short-context + /// loops. Falls through to + /// [`defaults::MEMORY_EXTRACT_WORKER_MAX_TURNS`] when unset. + #[serde(default)] + pub extract_worker_max_turns: Option, /// Optional model for the Phase 2 (consolidation) worker. When /// `None`, the main pod model is cloned via `clone_boxed()`. /// Reasoning-class models are recommended. diff --git a/crates/pod/src/pod.rs b/crates/pod/src/pod.rs index d15b8f13..46cd0c46 100644 --- a/crates/pod/src/pod.rs +++ b/crates/pod/src/pod.rs @@ -1915,6 +1915,9 @@ impl Pod { let cap = memory_cfg .extract_worker_max_input_tokens .unwrap_or(manifest::defaults::MEMORY_EXTRACT_WORKER_MAX_INPUT_TOKENS); + let extract_worker_max_turns = memory_cfg + .extract_worker_max_turns + .or(manifest::defaults::MEMORY_EXTRACT_WORKER_MAX_TURNS); let client = self.build_extractor_client(memory_cfg)?; let extract_system_prompt = self @@ -1924,22 +1927,22 @@ impl Pod { let mut extract_worker = Worker::new(client).system_prompt(extract_system_prompt); extract_worker.set_cache_key(Some(self.session_id.to_string())); - // Cumulative input-token meter + interceptor (mirror of - // CompactWorkerInterceptor). Aborts the extract worker if its - // own input usage crosses the cap. - let input_so_far = Arc::new(std::sync::atomic::AtomicU64::new(0)); + // Occupancy-based input-token meter + interceptor. The tracker pairs + // each pre-request history length with the following UsageEvent, then + // the interceptor projects current prompt occupancy with the same + // UsageRecord counter used by the main Pod thresholds. + let extract_usage_tracker = Arc::new(UsageTracker::new()); { - let acc = input_so_far.clone(); + let tracker = extract_usage_tracker.clone(); extract_worker.on_usage(move |event| { - if let Some(tokens) = event.input_tokens { - acc.fetch_add(tokens, Ordering::Relaxed); - } + tracker.record_usage(event); }); } extract_worker.set_interceptor(MemoryExtractWorkerInterceptor { - input_so_far: input_so_far.clone(), + usage_tracker: extract_usage_tracker, max_input_tokens: cap, }); + extract_worker.set_max_turns(extract_worker_max_turns); let ctx = Arc::new(extract::ExtractWorkerContext::new()); extract_worker.register_tool(extract::write_extracted_tool(ctx.clone())); @@ -2173,12 +2176,15 @@ enum ExtractDecision { Completed, } -/// Pre-request interceptor for the Phase 1 extract worker. Aborts when -/// cumulative input tokens cross `max_input_tokens`. Mirror of -/// `compact::worker::CompactWorkerInterceptor`; kept separate so each -/// subsystem can tune its own message and budget. +/// Pre-request interceptor for the extract worker. Aborts when current +/// prompt occupancy crosses `max_input_tokens`. Uses the same +/// `UsageRecord` + `llm_worker::token_counter::total_tokens` projection +/// as the main Pod compaction thresholds, so prompt-cache hits are not +/// counted cumulatively across turns. Kept separate from +/// `compact::worker::CompactWorkerInterceptor` so each subsystem can +/// tune its own cancel message and budget. struct MemoryExtractWorkerInterceptor { - input_so_far: Arc, + usage_tracker: Arc, max_input_tokens: u64, } @@ -2186,14 +2192,18 @@ struct MemoryExtractWorkerInterceptor { impl llm_worker::interceptor::Interceptor for MemoryExtractWorkerInterceptor { async fn pre_llm_request( &self, - _context: &mut Vec, + context: &mut Vec, ) -> llm_worker::interceptor::PreRequestAction { - if self.input_so_far.load(Ordering::Relaxed) > self.max_input_tokens { + let records = self.usage_tracker.records(); + let estimate = llm_worker::token_counter::total_tokens(context, &records); + if estimate.tokens > self.max_input_tokens { return llm_worker::interceptor::PreRequestAction::Cancel(format!( - "Phase 1 extract worker input exceeded {} tokens", + "extract worker input occupancy exceeded {} tokens", self.max_input_tokens )); } + + self.usage_tracker.note_request(context.len()); llm_worker::interceptor::PreRequestAction::Continue } } @@ -3079,3 +3089,70 @@ permission = "write" assert!(shadows.iter().all(|s| s.slug.as_str() != "alpha")); } } + +#[cfg(test)] +mod memory_extract_interceptor_tests { + use super::*; + use llm_worker::interceptor::{Interceptor, PreRequestAction}; + use llm_worker::timeline::event::UsageEvent; + + fn make_usage(input: u64) -> UsageEvent { + UsageEvent { + input_tokens: Some(input), + output_tokens: Some(0), + total_tokens: Some(input), + cache_read_input_tokens: None, + cache_creation_input_tokens: None, + } + } + + #[tokio::test] + async fn extract_interceptor_uses_occupancy_not_cumulative_usage() { + let tracker = Arc::new(UsageTracker::new()); + let interceptor = MemoryExtractWorkerInterceptor { + usage_tracker: tracker.clone(), + max_input_tokens: 150, + }; + let mut context = vec![Item::user_message("hello")]; + + assert!(matches!( + interceptor.pre_llm_request(&mut context).await, + PreRequestAction::Continue + )); + tracker.record_usage(&make_usage(100)); + + assert!(matches!( + interceptor.pre_llm_request(&mut context).await, + PreRequestAction::Continue + )); + tracker.record_usage(&make_usage(100)); + + // Two 100-token requests would exceed a cumulative 150-token cap, but + // current occupancy is still the latest 100-token measurement. + assert!(matches!( + interceptor.pre_llm_request(&mut context).await, + PreRequestAction::Continue + )); + } + + #[tokio::test] + async fn extract_interceptor_cancels_when_occupancy_exceeds_cap() { + let tracker = Arc::new(UsageTracker::new()); + let interceptor = MemoryExtractWorkerInterceptor { + usage_tracker: tracker.clone(), + max_input_tokens: 99, + }; + let mut context = vec![Item::user_message("hello")]; + + assert!(matches!( + interceptor.pre_llm_request(&mut context).await, + PreRequestAction::Continue + )); + tracker.record_usage(&make_usage(100)); + + assert!(matches!( + interceptor.pre_llm_request(&mut context).await, + PreRequestAction::Cancel(message) if message.contains("occupancy") + )); + } +} diff --git a/docs/manifest.toml b/docs/manifest.toml index 0f5a2db0..1ce81afc 100644 --- a/docs/manifest.toml +++ b/docs/manifest.toml @@ -258,9 +258,13 @@ permission = "write" # extract_threshold = 30000 # # # 任意。デフォルト: 30000 (`defaults::MEMORY_EXTRACT_WORKER_MAX_INPUT_TOKENS`)。 -# # extract worker 自身の累積入力 token cap (超過で abort)。 +# # extract worker 自身の現在占有 token cap (超過で abort)。 # extract_worker_max_input_tokens = 30000 # +# # 任意。デフォルト: 8 (`defaults::MEMORY_EXTRACT_WORKER_MAX_TURNS`)。 +# # extract worker 自身の tool loop 上限。Rust config で None の場合のみ無制限。 +# extract_worker_max_turns = 8 +# # # 任意。デフォルト: メインモデルを `clone_boxed()` で複製。 # # Phase 2 (consolidation) ワーカーのモデル。reasoning クラス推奨。 # # [memory.consolidation_model]