feat: extract worker サーキットブレーカーを占有量ベースに統一

This commit is contained in:
Keisuke Hirata 2026-05-11 01:20:37 +09:00
parent eec33aba98
commit 0356e29707
No known key found for this signature in database
5 changed files with 117 additions and 23 deletions

View File

@ -252,6 +252,9 @@ impl MemoryConfig {
extract_worker_max_input_tokens: upper extract_worker_max_input_tokens: upper
.extract_worker_max_input_tokens .extract_worker_max_input_tokens
.or(self.extract_worker_max_input_tokens), .or(self.extract_worker_max_input_tokens),
extract_worker_max_turns: upper
.extract_worker_max_turns
.or(self.extract_worker_max_turns),
consolidation_model: upper.consolidation_model.or(self.consolidation_model), consolidation_model: upper.consolidation_model.or(self.consolidation_model),
consolidation_threshold_files: upper consolidation_threshold_files: upper
.consolidation_threshold_files .consolidation_threshold_files

View File

@ -50,7 +50,11 @@ pub const COMPACT_WORKER_MAX_TURNS: Option<u32> = Some(20);
/// default references. /// default references.
pub const COMPACT_DEFAULT_REFERENCE_COUNT: usize = 5; pub const COMPACT_DEFAULT_REFERENCE_COUNT: usize = 5;
/// Cumulative input-token cap for the memory Phase 1 (extract) worker's /// Current prompt-occupancy cap for the memory extract worker's own
/// own LLM calls. Exceeding this aborts the extract run. /// LLM requests. Exceeding this aborts the extract run (circuit-breaker
/// See [`crate::MemoryConfig::extract_worker_max_input_tokens`]. /// path). See [`crate::MemoryConfig::extract_worker_max_input_tokens`].
pub const MEMORY_EXTRACT_WORKER_MAX_INPUT_TOKENS: u64 = 30_000; pub const MEMORY_EXTRACT_WORKER_MAX_INPUT_TOKENS: u64 = 30_000;
/// Optional maximum extract-worker tool-loop depth. `None` means unlimited.
/// See [`crate::MemoryConfig::extract_worker_max_turns`].
pub const MEMORY_EXTRACT_WORKER_MAX_TURNS: Option<u32> = Some(8);

View File

@ -108,11 +108,17 @@ pub struct MemoryConfig {
/// the auto-extract trigger is dormant. /// the auto-extract trigger is dormant.
#[serde(default)] #[serde(default)]
pub extract_threshold: Option<u64>, pub extract_threshold: Option<u64>,
/// Cumulative input-token cap for the extract worker's own LLM /// Current prompt-occupancy cap for the extract worker's own LLM
/// calls. Exceeding this aborts the extract run. `None` ⇒ /// requests. Exceeding this aborts the extract run. `None` ⇒
/// [`defaults::MEMORY_EXTRACT_WORKER_MAX_INPUT_TOKENS`]. /// [`defaults::MEMORY_EXTRACT_WORKER_MAX_INPUT_TOKENS`].
#[serde(default)] #[serde(default)]
pub extract_worker_max_input_tokens: Option<u64>, pub extract_worker_max_input_tokens: Option<u64>,
/// Optional maximum extract-worker tool-loop depth. `None` leaves
/// the worker unlimited; the default bounds runaway short-context
/// loops. Falls through to
/// [`defaults::MEMORY_EXTRACT_WORKER_MAX_TURNS`] when unset.
#[serde(default)]
pub extract_worker_max_turns: Option<u32>,
/// Optional model for the Phase 2 (consolidation) worker. When /// Optional model for the Phase 2 (consolidation) worker. When
/// `None`, the main pod model is cloned via `clone_boxed()`. /// `None`, the main pod model is cloned via `clone_boxed()`.
/// Reasoning-class models are recommended. /// Reasoning-class models are recommended.

View File

@ -1915,6 +1915,9 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
let cap = memory_cfg let cap = memory_cfg
.extract_worker_max_input_tokens .extract_worker_max_input_tokens
.unwrap_or(manifest::defaults::MEMORY_EXTRACT_WORKER_MAX_INPUT_TOKENS); .unwrap_or(manifest::defaults::MEMORY_EXTRACT_WORKER_MAX_INPUT_TOKENS);
let extract_worker_max_turns = memory_cfg
.extract_worker_max_turns
.or(manifest::defaults::MEMORY_EXTRACT_WORKER_MAX_TURNS);
let client = self.build_extractor_client(memory_cfg)?; let client = self.build_extractor_client(memory_cfg)?;
let extract_system_prompt = self let extract_system_prompt = self
@ -1924,22 +1927,22 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
let mut extract_worker = Worker::new(client).system_prompt(extract_system_prompt); let mut extract_worker = Worker::new(client).system_prompt(extract_system_prompt);
extract_worker.set_cache_key(Some(self.session_id.to_string())); extract_worker.set_cache_key(Some(self.session_id.to_string()));
// Cumulative input-token meter + interceptor (mirror of // Occupancy-based input-token meter + interceptor. The tracker pairs
// CompactWorkerInterceptor). Aborts the extract worker if its // each pre-request history length with the following UsageEvent, then
// own input usage crosses the cap. // the interceptor projects current prompt occupancy with the same
let input_so_far = Arc::new(std::sync::atomic::AtomicU64::new(0)); // UsageRecord counter used by the main Pod thresholds.
let extract_usage_tracker = Arc::new(UsageTracker::new());
{ {
let acc = input_so_far.clone(); let tracker = extract_usage_tracker.clone();
extract_worker.on_usage(move |event| { extract_worker.on_usage(move |event| {
if let Some(tokens) = event.input_tokens { tracker.record_usage(event);
acc.fetch_add(tokens, Ordering::Relaxed);
}
}); });
} }
extract_worker.set_interceptor(MemoryExtractWorkerInterceptor { extract_worker.set_interceptor(MemoryExtractWorkerInterceptor {
input_so_far: input_so_far.clone(), usage_tracker: extract_usage_tracker,
max_input_tokens: cap, max_input_tokens: cap,
}); });
extract_worker.set_max_turns(extract_worker_max_turns);
let ctx = Arc::new(extract::ExtractWorkerContext::new()); let ctx = Arc::new(extract::ExtractWorkerContext::new());
extract_worker.register_tool(extract::write_extracted_tool(ctx.clone())); extract_worker.register_tool(extract::write_extracted_tool(ctx.clone()));
@ -2173,12 +2176,15 @@ enum ExtractDecision {
Completed, Completed,
} }
/// Pre-request interceptor for the Phase 1 extract worker. Aborts when /// Pre-request interceptor for the extract worker. Aborts when current
/// cumulative input tokens cross `max_input_tokens`. Mirror of /// prompt occupancy crosses `max_input_tokens`. Uses the same
/// `compact::worker::CompactWorkerInterceptor`; kept separate so each /// `UsageRecord` + `llm_worker::token_counter::total_tokens` projection
/// subsystem can tune its own message and budget. /// as the main Pod compaction thresholds, so prompt-cache hits are not
/// counted cumulatively across turns. Kept separate from
/// `compact::worker::CompactWorkerInterceptor` so each subsystem can
/// tune its own cancel message and budget.
struct MemoryExtractWorkerInterceptor { struct MemoryExtractWorkerInterceptor {
input_so_far: Arc<std::sync::atomic::AtomicU64>, usage_tracker: Arc<UsageTracker>,
max_input_tokens: u64, max_input_tokens: u64,
} }
@ -2186,14 +2192,18 @@ struct MemoryExtractWorkerInterceptor {
impl llm_worker::interceptor::Interceptor for MemoryExtractWorkerInterceptor { impl llm_worker::interceptor::Interceptor for MemoryExtractWorkerInterceptor {
async fn pre_llm_request( async fn pre_llm_request(
&self, &self,
_context: &mut Vec<Item>, context: &mut Vec<Item>,
) -> llm_worker::interceptor::PreRequestAction { ) -> llm_worker::interceptor::PreRequestAction {
if self.input_so_far.load(Ordering::Relaxed) > self.max_input_tokens { let records = self.usage_tracker.records();
let estimate = llm_worker::token_counter::total_tokens(context, &records);
if estimate.tokens > self.max_input_tokens {
return llm_worker::interceptor::PreRequestAction::Cancel(format!( return llm_worker::interceptor::PreRequestAction::Cancel(format!(
"Phase 1 extract worker input exceeded {} tokens", "extract worker input occupancy exceeded {} tokens",
self.max_input_tokens self.max_input_tokens
)); ));
} }
self.usage_tracker.note_request(context.len());
llm_worker::interceptor::PreRequestAction::Continue llm_worker::interceptor::PreRequestAction::Continue
} }
} }
@ -3079,3 +3089,70 @@ permission = "write"
assert!(shadows.iter().all(|s| s.slug.as_str() != "alpha")); assert!(shadows.iter().all(|s| s.slug.as_str() != "alpha"));
} }
} }
#[cfg(test)]
mod memory_extract_interceptor_tests {
use super::*;
use llm_worker::interceptor::{Interceptor, PreRequestAction};
use llm_worker::timeline::event::UsageEvent;
fn make_usage(input: u64) -> UsageEvent {
UsageEvent {
input_tokens: Some(input),
output_tokens: Some(0),
total_tokens: Some(input),
cache_read_input_tokens: None,
cache_creation_input_tokens: None,
}
}
#[tokio::test]
async fn extract_interceptor_uses_occupancy_not_cumulative_usage() {
let tracker = Arc::new(UsageTracker::new());
let interceptor = MemoryExtractWorkerInterceptor {
usage_tracker: tracker.clone(),
max_input_tokens: 150,
};
let mut context = vec![Item::user_message("hello")];
assert!(matches!(
interceptor.pre_llm_request(&mut context).await,
PreRequestAction::Continue
));
tracker.record_usage(&make_usage(100));
assert!(matches!(
interceptor.pre_llm_request(&mut context).await,
PreRequestAction::Continue
));
tracker.record_usage(&make_usage(100));
// Two 100-token requests would exceed a cumulative 150-token cap, but
// current occupancy is still the latest 100-token measurement.
assert!(matches!(
interceptor.pre_llm_request(&mut context).await,
PreRequestAction::Continue
));
}
#[tokio::test]
async fn extract_interceptor_cancels_when_occupancy_exceeds_cap() {
let tracker = Arc::new(UsageTracker::new());
let interceptor = MemoryExtractWorkerInterceptor {
usage_tracker: tracker.clone(),
max_input_tokens: 99,
};
let mut context = vec![Item::user_message("hello")];
assert!(matches!(
interceptor.pre_llm_request(&mut context).await,
PreRequestAction::Continue
));
tracker.record_usage(&make_usage(100));
assert!(matches!(
interceptor.pre_llm_request(&mut context).await,
PreRequestAction::Cancel(message) if message.contains("occupancy")
));
}
}

View File

@ -258,9 +258,13 @@ permission = "write"
# extract_threshold = 30000 # extract_threshold = 30000
# #
# # 任意。デフォルト: 30000 (`defaults::MEMORY_EXTRACT_WORKER_MAX_INPUT_TOKENS`)。 # # 任意。デフォルト: 30000 (`defaults::MEMORY_EXTRACT_WORKER_MAX_INPUT_TOKENS`)。
# # extract worker 自身の累積入力 token cap (超過で abort)。 # # extract worker 自身の現在占有 token cap (超過で abort)。
# extract_worker_max_input_tokens = 30000 # extract_worker_max_input_tokens = 30000
# #
# # 任意。デフォルト: 8 (`defaults::MEMORY_EXTRACT_WORKER_MAX_TURNS`)。
# # extract worker 自身の tool loop 上限。Rust config で None の場合のみ無制限。
# extract_worker_max_turns = 8
#
# # 任意。デフォルト: メインモデルを `clone_boxed()` で複製。 # # 任意。デフォルト: メインモデルを `clone_boxed()` で複製。
# # Phase 2 (consolidation) ワーカーのモデル。reasoning クラス推奨。 # # Phase 2 (consolidation) ワーカーのモデル。reasoning クラス推奨。
# # [memory.consolidation_model] # # [memory.consolidation_model]