feat: extract worker サーキットブレーカーを占有量ベースに統一

This commit is contained in:
Keisuke Hirata 2026-05-11 01:20:37 +09:00
parent c8871ec4fe
commit 0b79e0ed65
5 changed files with 117 additions and 23 deletions

View File

@ -252,6 +252,9 @@ impl MemoryConfig {
extract_worker_max_input_tokens: upper
.extract_worker_max_input_tokens
.or(self.extract_worker_max_input_tokens),
extract_worker_max_turns: upper
.extract_worker_max_turns
.or(self.extract_worker_max_turns),
consolidation_model: upper.consolidation_model.or(self.consolidation_model),
consolidation_threshold_files: upper
.consolidation_threshold_files

View File

@ -50,7 +50,11 @@ pub const COMPACT_WORKER_MAX_TURNS: Option<u32> = Some(20);
/// default references.
pub const COMPACT_DEFAULT_REFERENCE_COUNT: usize = 5;
/// Cumulative input-token cap for the memory Phase 1 (extract) worker's
/// own LLM calls. Exceeding this aborts the extract run.
/// See [`crate::MemoryConfig::extract_worker_max_input_tokens`].
/// Current prompt-occupancy cap for the memory extract worker's own
/// LLM requests. Exceeding this aborts the extract run (circuit-breaker
/// path). See [`crate::MemoryConfig::extract_worker_max_input_tokens`].
pub const MEMORY_EXTRACT_WORKER_MAX_INPUT_TOKENS: u64 = 30_000;
/// Optional maximum extract-worker tool-loop depth. `None` means unlimited.
/// See [`crate::MemoryConfig::extract_worker_max_turns`].
pub const MEMORY_EXTRACT_WORKER_MAX_TURNS: Option<u32> = Some(8);

View File

@ -108,11 +108,17 @@ pub struct MemoryConfig {
/// the auto-extract trigger is dormant.
#[serde(default)]
pub extract_threshold: Option<u64>,
/// Cumulative input-token cap for the extract worker's own LLM
/// calls. Exceeding this aborts the extract run. `None` ⇒
/// Current prompt-occupancy cap for the extract worker's own LLM
/// requests. Exceeding this aborts the extract run. `None` ⇒
/// [`defaults::MEMORY_EXTRACT_WORKER_MAX_INPUT_TOKENS`].
#[serde(default)]
pub extract_worker_max_input_tokens: Option<u64>,
/// Optional maximum extract-worker tool-loop depth. `None` leaves
/// the worker unlimited; the default bounds runaway short-context
/// loops. Falls through to
/// [`defaults::MEMORY_EXTRACT_WORKER_MAX_TURNS`] when unset.
#[serde(default)]
pub extract_worker_max_turns: Option<u32>,
/// Optional model for the Phase 2 (consolidation) worker. When
/// `None`, the main pod model is cloned via `clone_boxed()`.
/// Reasoning-class models are recommended.

View File

@ -1915,6 +1915,9 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
let cap = memory_cfg
.extract_worker_max_input_tokens
.unwrap_or(manifest::defaults::MEMORY_EXTRACT_WORKER_MAX_INPUT_TOKENS);
let extract_worker_max_turns = memory_cfg
.extract_worker_max_turns
.or(manifest::defaults::MEMORY_EXTRACT_WORKER_MAX_TURNS);
let client = self.build_extractor_client(memory_cfg)?;
let extract_system_prompt = self
@ -1924,22 +1927,22 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
let mut extract_worker = Worker::new(client).system_prompt(extract_system_prompt);
extract_worker.set_cache_key(Some(self.session_id.to_string()));
// Cumulative input-token meter + interceptor (mirror of
// CompactWorkerInterceptor). Aborts the extract worker if its
// own input usage crosses the cap.
let input_so_far = Arc::new(std::sync::atomic::AtomicU64::new(0));
// Occupancy-based input-token meter + interceptor. The tracker pairs
// each pre-request history length with the following UsageEvent, then
// the interceptor projects current prompt occupancy with the same
// UsageRecord counter used by the main Pod thresholds.
let extract_usage_tracker = Arc::new(UsageTracker::new());
{
let acc = input_so_far.clone();
let tracker = extract_usage_tracker.clone();
extract_worker.on_usage(move |event| {
if let Some(tokens) = event.input_tokens {
acc.fetch_add(tokens, Ordering::Relaxed);
}
tracker.record_usage(event);
});
}
extract_worker.set_interceptor(MemoryExtractWorkerInterceptor {
input_so_far: input_so_far.clone(),
usage_tracker: extract_usage_tracker,
max_input_tokens: cap,
});
extract_worker.set_max_turns(extract_worker_max_turns);
let ctx = Arc::new(extract::ExtractWorkerContext::new());
extract_worker.register_tool(extract::write_extracted_tool(ctx.clone()));
@ -2173,12 +2176,15 @@ enum ExtractDecision {
Completed,
}
/// Pre-request interceptor for the Phase 1 extract worker. Aborts when
/// cumulative input tokens cross `max_input_tokens`. Mirror of
/// `compact::worker::CompactWorkerInterceptor`; kept separate so each
/// subsystem can tune its own message and budget.
/// Pre-request interceptor for the extract worker. Aborts when current
/// prompt occupancy crosses `max_input_tokens`. Uses the same
/// `UsageRecord` + `llm_worker::token_counter::total_tokens` projection
/// as the main Pod compaction thresholds, so prompt-cache hits are not
/// counted cumulatively across turns. Kept separate from
/// `compact::worker::CompactWorkerInterceptor` so each subsystem can
/// tune its own cancel message and budget.
struct MemoryExtractWorkerInterceptor {
input_so_far: Arc<std::sync::atomic::AtomicU64>,
usage_tracker: Arc<UsageTracker>,
max_input_tokens: u64,
}
@ -2186,14 +2192,18 @@ struct MemoryExtractWorkerInterceptor {
impl llm_worker::interceptor::Interceptor for MemoryExtractWorkerInterceptor {
async fn pre_llm_request(
&self,
_context: &mut Vec<Item>,
context: &mut Vec<Item>,
) -> llm_worker::interceptor::PreRequestAction {
if self.input_so_far.load(Ordering::Relaxed) > self.max_input_tokens {
let records = self.usage_tracker.records();
let estimate = llm_worker::token_counter::total_tokens(context, &records);
if estimate.tokens > self.max_input_tokens {
return llm_worker::interceptor::PreRequestAction::Cancel(format!(
"Phase 1 extract worker input exceeded {} tokens",
"extract worker input occupancy exceeded {} tokens",
self.max_input_tokens
));
}
self.usage_tracker.note_request(context.len());
llm_worker::interceptor::PreRequestAction::Continue
}
}
@ -3079,3 +3089,70 @@ permission = "write"
assert!(shadows.iter().all(|s| s.slug.as_str() != "alpha"));
}
}
#[cfg(test)]
mod memory_extract_interceptor_tests {
use super::*;
use llm_worker::interceptor::{Interceptor, PreRequestAction};
use llm_worker::timeline::event::UsageEvent;
fn make_usage(input: u64) -> UsageEvent {
UsageEvent {
input_tokens: Some(input),
output_tokens: Some(0),
total_tokens: Some(input),
cache_read_input_tokens: None,
cache_creation_input_tokens: None,
}
}
#[tokio::test]
async fn extract_interceptor_uses_occupancy_not_cumulative_usage() {
let tracker = Arc::new(UsageTracker::new());
let interceptor = MemoryExtractWorkerInterceptor {
usage_tracker: tracker.clone(),
max_input_tokens: 150,
};
let mut context = vec![Item::user_message("hello")];
assert!(matches!(
interceptor.pre_llm_request(&mut context).await,
PreRequestAction::Continue
));
tracker.record_usage(&make_usage(100));
assert!(matches!(
interceptor.pre_llm_request(&mut context).await,
PreRequestAction::Continue
));
tracker.record_usage(&make_usage(100));
// Two 100-token requests would exceed a cumulative 150-token cap, but
// current occupancy is still the latest 100-token measurement.
assert!(matches!(
interceptor.pre_llm_request(&mut context).await,
PreRequestAction::Continue
));
}
#[tokio::test]
async fn extract_interceptor_cancels_when_occupancy_exceeds_cap() {
let tracker = Arc::new(UsageTracker::new());
let interceptor = MemoryExtractWorkerInterceptor {
usage_tracker: tracker.clone(),
max_input_tokens: 99,
};
let mut context = vec![Item::user_message("hello")];
assert!(matches!(
interceptor.pre_llm_request(&mut context).await,
PreRequestAction::Continue
));
tracker.record_usage(&make_usage(100));
assert!(matches!(
interceptor.pre_llm_request(&mut context).await,
PreRequestAction::Cancel(message) if message.contains("occupancy")
));
}
}

View File

@ -258,9 +258,13 @@ permission = "write"
# extract_threshold = 30000
#
# # 任意。デフォルト: 30000 (`defaults::MEMORY_EXTRACT_WORKER_MAX_INPUT_TOKENS`)。
# # extract worker 自身の累積入力 token cap (超過で abort)。
# # extract worker 自身の現在占有 token cap (超過で abort)。
# extract_worker_max_input_tokens = 30000
#
# # 任意。デフォルト: 8 (`defaults::MEMORY_EXTRACT_WORKER_MAX_TURNS`)。
# # extract worker 自身の tool loop 上限。Rust config で None の場合のみ無制限。
# extract_worker_max_turns = 8
#
# # 任意。デフォルト: メインモデルを `clone_boxed()` で複製。
# # Phase 2 (consolidation) ワーカーのモデル。reasoning クラス推奨。
# # [memory.consolidation_model]