compact: 閾値を個別指定化し占有量ソースを UsageRecord に一本化
- manifest に compact_request_threshold を追加 (proactive と safety net を個別指定) - CompactState の両閾値を Option<u64> 化、last_input_tokens を撤去 - 閾値判定は Pod::total_tokens() / usage_history 経由の実測値ベースに切替 - turn_threshold → request_threshold にリネーム、Between-requests のログへ
This commit is contained in:
parent
255e370856
commit
967acd23ee
|
|
@ -84,6 +84,8 @@ pub struct CompactionConfigPartial {
|
|||
#[serde(default)]
|
||||
pub compact_threshold: Option<u64>,
|
||||
#[serde(default)]
|
||||
pub compact_request_threshold: Option<u64>,
|
||||
#[serde(default)]
|
||||
pub compact_retained_turns: Option<usize>,
|
||||
#[serde(default)]
|
||||
pub provider: Option<ProviderConfigPartial>,
|
||||
|
|
@ -236,6 +238,9 @@ impl CompactionConfigPartial {
|
|||
prune_protected_turns: upper.prune_protected_turns.or(self.prune_protected_turns),
|
||||
prune_min_savings: upper.prune_min_savings.or(self.prune_min_savings),
|
||||
compact_threshold: upper.compact_threshold.or(self.compact_threshold),
|
||||
compact_request_threshold: upper
|
||||
.compact_request_threshold
|
||||
.or(self.compact_request_threshold),
|
||||
compact_retained_turns: upper
|
||||
.compact_retained_turns
|
||||
.or(self.compact_retained_turns),
|
||||
|
|
@ -365,6 +370,7 @@ impl TryFrom<PodManifestConfig> for PodManifest {
|
|||
.prune_min_savings
|
||||
.unwrap_or(defaults::PRUNE_MIN_SAVINGS),
|
||||
compact_threshold: c.compact_threshold,
|
||||
compact_request_threshold: c.compact_request_threshold,
|
||||
compact_retained_turns: c
|
||||
.compact_retained_turns
|
||||
.unwrap_or(defaults::COMPACT_RETAINED_TURNS),
|
||||
|
|
|
|||
|
|
@ -174,10 +174,27 @@ pub struct CompactionConfig {
|
|||
#[serde(default = "default_prune_min_savings")]
|
||||
pub prune_min_savings: u64,
|
||||
|
||||
/// When `input_tokens` exceeds this, run compact. `None` = compact disabled.
|
||||
/// Proactive (between-turns) compaction threshold.
|
||||
///
|
||||
/// Checked by the Controller after each run. When current occupancy
|
||||
/// exceeds this value, compact runs before the next turn. `None`
|
||||
/// disables the between-turns check.
|
||||
#[serde(default)]
|
||||
pub compact_threshold: Option<u64>,
|
||||
|
||||
/// Safety-net (between-requests) compaction threshold.
|
||||
///
|
||||
/// Checked by `PodInterceptor::pre_llm_request` inside a turn. When
|
||||
/// current occupancy exceeds this value, the run yields so that the
|
||||
/// Controller can compact before the next LLM request. `None`
|
||||
/// disables the between-requests check.
|
||||
///
|
||||
/// Expected relation: `compact_threshold < compact_request_threshold`
|
||||
/// (proactive triggers before safety net). A reversed configuration
|
||||
/// is accepted but logged as a warning.
|
||||
#[serde(default)]
|
||||
pub compact_request_threshold: Option<u64>,
|
||||
|
||||
/// Number of recent turns retained after compaction.
|
||||
#[serde(default = "default_compact_retained_turns")]
|
||||
pub compact_retained_turns: usize,
|
||||
|
|
@ -204,6 +221,7 @@ impl Default for CompactionConfig {
|
|||
prune_protected_turns: default_prune_protected_turns(),
|
||||
prune_min_savings: default_prune_min_savings(),
|
||||
compact_threshold: None,
|
||||
compact_request_threshold: None,
|
||||
compact_retained_turns: default_compact_retained_turns(),
|
||||
provider: None,
|
||||
}
|
||||
|
|
@ -338,9 +356,37 @@ model = "claude-sonnet-4-20250514"
|
|||
assert_eq!(c.prune_protected_turns, 3);
|
||||
assert_eq!(c.prune_min_savings, 4096);
|
||||
assert_eq!(c.compact_threshold, Some(80000));
|
||||
assert_eq!(c.compact_request_threshold, None);
|
||||
assert_eq!(c.compact_retained_turns, 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_compaction_both_thresholds() {
|
||||
let toml = format!(
|
||||
"{MINIMAL_REQUIRED}\n\
|
||||
[compaction]\n\
|
||||
compact_threshold = 80000\n\
|
||||
compact_request_threshold = 90000\n"
|
||||
);
|
||||
let manifest = PodManifest::from_toml(&toml).unwrap();
|
||||
let c = manifest.compaction.unwrap();
|
||||
assert_eq!(c.compact_threshold, Some(80000));
|
||||
assert_eq!(c.compact_request_threshold, Some(90000));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_compaction_request_threshold_only() {
|
||||
let toml = format!(
|
||||
"{MINIMAL_REQUIRED}\n\
|
||||
[compaction]\n\
|
||||
compact_request_threshold = 90000\n"
|
||||
);
|
||||
let manifest = PodManifest::from_toml(&toml).unwrap();
|
||||
let c = manifest.compaction.unwrap();
|
||||
assert_eq!(c.compact_threshold, None);
|
||||
assert_eq!(c.compact_request_threshold, Some(90000));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_compaction_with_provider() {
|
||||
let toml = format!(
|
||||
|
|
|
|||
|
|
@ -1,22 +1,30 @@
|
|||
//! Shared state for compaction decisions.
|
||||
//!
|
||||
//! Holds atomic counters shared between:
|
||||
//! - `on_usage` callback (writes `last_input_tokens`)
|
||||
//! - `CompactInterceptor` (reads token count, checks thresholds)
|
||||
//! - `Pod::run()`/`resume()` (circuit breaker, thrash detection)
|
||||
//! Holds the two configured thresholds and circuit-breaker / thrash-detection
|
||||
//! flags shared between:
|
||||
//! - `PodInterceptor` (reads `request_threshold` — the *safety net* for
|
||||
//! between-requests yielding)
|
||||
//! - `Pod::try_post_run_compact` (reads `post_run_threshold` — the
|
||||
//! *proactive* check between turns)
|
||||
//! - `Pod::run()` / `resume()` (circuit breaker, thrash detection)
|
||||
//!
|
||||
//! Current occupancy (input-token count) is **not** stored here. The single
|
||||
//! source of truth is `session_store::UsageRecord` (persisted per LLM call)
|
||||
//! projected through `Pod::total_tokens()`. Callers pass the current
|
||||
//! occupancy to `exceeds_*` at check time.
|
||||
|
||||
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
|
||||
const MAX_COMPACT_FAILURES: usize = 3;
|
||||
|
||||
/// Shared mutable state for compaction decisions.
|
||||
pub(crate) struct CompactState {
|
||||
/// Last observed input_tokens from `on_usage` callback.
|
||||
last_input_tokens: AtomicU64,
|
||||
/// Proactive threshold — checked in `pre_llm_request` (between turns).
|
||||
turn_threshold: u64,
|
||||
/// Post-run threshold — checked by Controller after run completes.
|
||||
post_run_threshold: u64,
|
||||
/// Between-turns threshold (proactive). Checked by the Controller
|
||||
/// after a run completes. `None` disables the post-run check.
|
||||
post_run_threshold: Option<u64>,
|
||||
/// Between-requests threshold (safety net). Checked inside a turn
|
||||
/// before each LLM request. `None` disables the request check.
|
||||
request_threshold: Option<u64>,
|
||||
/// Number of recent turns to retain after compaction.
|
||||
retained_turns: usize,
|
||||
/// Consecutive compact failures. At `MAX_COMPACT_FAILURES`, compaction is disabled.
|
||||
|
|
@ -28,15 +36,14 @@ pub(crate) struct CompactState {
|
|||
}
|
||||
|
||||
impl CompactState {
|
||||
/// Create a new CompactState.
|
||||
///
|
||||
/// `turn_threshold` is the proactive (80%) threshold from the manifest.
|
||||
/// `post_run_threshold` is derived as `turn_threshold * 9 / 8` (≈90%).
|
||||
pub(crate) fn new(turn_threshold: u64, retained_turns: usize) -> Self {
|
||||
pub(crate) fn new(
|
||||
post_run_threshold: Option<u64>,
|
||||
request_threshold: Option<u64>,
|
||||
retained_turns: usize,
|
||||
) -> Self {
|
||||
Self {
|
||||
last_input_tokens: AtomicU64::new(0),
|
||||
turn_threshold,
|
||||
post_run_threshold: turn_threshold * 9 / 8,
|
||||
post_run_threshold,
|
||||
request_threshold,
|
||||
retained_turns,
|
||||
consecutive_failures: AtomicUsize::new(0),
|
||||
just_compacted: AtomicBool::new(false),
|
||||
|
|
@ -44,19 +51,9 @@ impl CompactState {
|
|||
}
|
||||
}
|
||||
|
||||
/// Update the last observed input_tokens (called from `on_usage`).
|
||||
pub(crate) fn update_input_tokens(&self, tokens: u64) {
|
||||
self.last_input_tokens.store(tokens, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
/// Read the last observed input_tokens.
|
||||
pub(crate) fn last_input_tokens(&self) -> u64 {
|
||||
self.last_input_tokens.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
/// The between-turns threshold value.
|
||||
pub(crate) fn turn_threshold(&self) -> u64 {
|
||||
self.turn_threshold
|
||||
/// Configured between-requests threshold (if any).
|
||||
pub(crate) fn request_threshold(&self) -> Option<u64> {
|
||||
self.request_threshold
|
||||
}
|
||||
|
||||
/// Number of turns to retain after compaction.
|
||||
|
|
@ -69,14 +66,20 @@ impl CompactState {
|
|||
self.disabled.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
/// Whether `last_input_tokens` exceeds the between-turns threshold.
|
||||
pub(crate) fn exceeds_turn(&self) -> bool {
|
||||
self.last_input_tokens() > self.turn_threshold
|
||||
/// Whether `current_tokens` exceeds the between-requests threshold.
|
||||
/// Returns `false` when `request_threshold` is unset.
|
||||
pub(crate) fn exceeds_request(&self, current_tokens: u64) -> bool {
|
||||
self.request_threshold
|
||||
.map(|t| current_tokens > t)
|
||||
.unwrap_or(false)
|
||||
}
|
||||
|
||||
/// Whether `last_input_tokens` exceeds the post-run threshold.
|
||||
pub(crate) fn exceeds_post_run(&self) -> bool {
|
||||
self.last_input_tokens() > self.post_run_threshold
|
||||
/// Whether `current_tokens` exceeds the post-run threshold.
|
||||
/// Returns `false` when `post_run_threshold` is unset.
|
||||
pub(crate) fn exceeds_post_run(&self, current_tokens: u64) -> bool {
|
||||
self.post_run_threshold
|
||||
.map(|t| current_tokens > t)
|
||||
.unwrap_or(false)
|
||||
}
|
||||
|
||||
/// Whether a compact just completed (for thrash detection).
|
||||
|
|
@ -109,31 +112,46 @@ mod tests {
|
|||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn threshold_derivation() {
|
||||
let state = CompactState::new(80_000, 2);
|
||||
assert_eq!(state.turn_threshold, 80_000);
|
||||
assert_eq!(state.post_run_threshold, 90_000);
|
||||
fn both_thresholds_configured() {
|
||||
let state = CompactState::new(Some(80_000), Some(90_000), 2);
|
||||
assert_eq!(state.request_threshold(), Some(90_000));
|
||||
assert_eq!(state.retained_turns(), 2);
|
||||
|
||||
assert!(!state.exceeds_request(70_000));
|
||||
assert!(!state.exceeds_post_run(70_000));
|
||||
|
||||
assert!(!state.exceeds_request(85_000));
|
||||
assert!(state.exceeds_post_run(85_000));
|
||||
|
||||
assert!(state.exceeds_request(95_000));
|
||||
assert!(state.exceeds_post_run(95_000));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn exceeds_checks() {
|
||||
let state = CompactState::new(80_000, 2);
|
||||
assert!(!state.exceeds_turn());
|
||||
assert!(!state.exceeds_post_run());
|
||||
fn post_run_only() {
|
||||
let state = CompactState::new(Some(80_000), None, 2);
|
||||
// request check always false when threshold is None.
|
||||
assert!(!state.exceeds_request(1_000_000));
|
||||
assert!(state.exceeds_post_run(85_000));
|
||||
}
|
||||
|
||||
state.update_input_tokens(85_000);
|
||||
assert!(state.exceeds_turn());
|
||||
assert!(!state.exceeds_post_run());
|
||||
#[test]
|
||||
fn request_only() {
|
||||
let state = CompactState::new(None, Some(90_000), 2);
|
||||
assert!(!state.exceeds_post_run(1_000_000));
|
||||
assert!(state.exceeds_request(95_000));
|
||||
}
|
||||
|
||||
state.update_input_tokens(95_000);
|
||||
assert!(state.exceeds_turn());
|
||||
assert!(state.exceeds_post_run());
|
||||
#[test]
|
||||
fn both_none_disables_all_checks() {
|
||||
let state = CompactState::new(None, None, 2);
|
||||
assert!(!state.exceeds_request(1_000_000));
|
||||
assert!(!state.exceeds_post_run(1_000_000));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn circuit_breaker_trips_after_max_failures() {
|
||||
let state = CompactState::new(80_000, 2);
|
||||
let state = CompactState::new(Some(80_000), Some(90_000), 2);
|
||||
assert!(!state.is_disabled());
|
||||
|
||||
state.record_compact_failure();
|
||||
|
|
@ -146,7 +164,7 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn success_resets_failure_count() {
|
||||
let state = CompactState::new(80_000, 2);
|
||||
let state = CompactState::new(Some(80_000), Some(90_000), 2);
|
||||
state.record_compact_failure();
|
||||
state.record_compact_failure();
|
||||
assert!(!state.is_disabled());
|
||||
|
|
@ -154,7 +172,6 @@ mod tests {
|
|||
state.record_compact_success();
|
||||
assert!(state.just_compacted());
|
||||
|
||||
// After success + 2 more failures, still not disabled (count was reset).
|
||||
state.record_compact_failure();
|
||||
state.record_compact_failure();
|
||||
assert!(!state.is_disabled());
|
||||
|
|
@ -162,7 +179,7 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn just_compacted_lifecycle() {
|
||||
let state = CompactState::new(80_000, 2);
|
||||
let state = CompactState::new(Some(80_000), Some(90_000), 2);
|
||||
assert!(!state.just_compacted());
|
||||
|
||||
state.record_compact_success();
|
||||
|
|
|
|||
|
|
@ -391,9 +391,10 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
|
||||
/// Install the hook-based interceptor on the Worker if not already done.
|
||||
///
|
||||
/// When `compact_threshold` is configured in the manifest, wraps the
|
||||
/// `HookInterceptor` in a [`CompactInterceptor`] and registers an
|
||||
/// `on_usage` callback to track `input_tokens`.
|
||||
/// When either compaction threshold (`compact_threshold` or
|
||||
/// `compact_request_threshold`) is configured in the manifest, allocates
|
||||
/// a shared [`CompactState`] and wires the interceptor to read current
|
||||
/// occupancy through the `UsageRecord` timeline.
|
||||
fn ensure_interceptor_installed(&mut self) {
|
||||
if !self.interceptor_installed {
|
||||
// Pre-LLM-request hook: record the item count at send time
|
||||
|
|
@ -406,41 +407,56 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
let builder = std::mem::take(&mut self.hook_builder);
|
||||
let registry = Arc::new(builder.build());
|
||||
|
||||
let compact_threshold = self
|
||||
let (post_run_threshold, request_threshold, retained) = self
|
||||
.manifest
|
||||
.compaction
|
||||
.as_ref()
|
||||
.and_then(|c| c.compact_threshold);
|
||||
.map(|c| {
|
||||
(
|
||||
c.compact_threshold,
|
||||
c.compact_request_threshold,
|
||||
c.compact_retained_turns,
|
||||
)
|
||||
})
|
||||
.unwrap_or((None, None, 2));
|
||||
|
||||
let tracker_for_usage = self.usage_tracker.clone();
|
||||
self.worker_mut().on_usage(move |event| {
|
||||
tracker_for_usage.record_usage(event);
|
||||
});
|
||||
|
||||
let compact_state = if let Some(threshold) = compact_threshold {
|
||||
let retained = self
|
||||
.manifest
|
||||
.compaction
|
||||
.as_ref()
|
||||
.map(|c| c.compact_retained_turns)
|
||||
.unwrap_or(2);
|
||||
|
||||
let state = Arc::new(CompactState::new(threshold, retained));
|
||||
let state_for_usage = state.clone();
|
||||
self.worker_mut().on_usage(move |event| {
|
||||
if let Some(tokens) = event.input_tokens {
|
||||
state_for_usage.update_input_tokens(tokens);
|
||||
let compact_state = if post_run_threshold.is_some() || request_threshold.is_some() {
|
||||
if let (Some(post), Some(req)) = (post_run_threshold, request_threshold) {
|
||||
if post > req {
|
||||
warn!(
|
||||
post_run_threshold = post,
|
||||
request_threshold = req,
|
||||
"compact_threshold > compact_request_threshold; \
|
||||
proactive check will never fire before the safety net"
|
||||
);
|
||||
}
|
||||
tracker_for_usage.record_usage(event);
|
||||
});
|
||||
}
|
||||
let state = Arc::new(CompactState::new(
|
||||
post_run_threshold,
|
||||
request_threshold,
|
||||
retained,
|
||||
));
|
||||
self.compact_state = Some(state.clone());
|
||||
Some(state)
|
||||
} else {
|
||||
self.worker_mut().on_usage(move |event| {
|
||||
tracker_for_usage.record_usage(event);
|
||||
});
|
||||
None
|
||||
};
|
||||
|
||||
let interceptor =
|
||||
PodInterceptor::new(registry, compact_state, self.pending_notifications.clone());
|
||||
let usage_history_handle = compact_state
|
||||
.as_ref()
|
||||
.map(|_| self.usage_history.clone());
|
||||
|
||||
let interceptor = PodInterceptor::new(
|
||||
registry,
|
||||
compact_state,
|
||||
usage_history_handle,
|
||||
self.pending_notifications.clone(),
|
||||
);
|
||||
self.worker_mut().set_interceptor(interceptor);
|
||||
self.interceptor_installed = true;
|
||||
}
|
||||
|
|
@ -667,9 +683,13 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
/// Best-effort: failures are logged but do not propagate.
|
||||
pub async fn try_post_run_compact(&mut self) -> Result<(), PodError> {
|
||||
let state = match self.compact_state.as_ref() {
|
||||
Some(s) if !s.is_disabled() && s.exceeds_post_run() && !s.just_compacted() => s.clone(),
|
||||
Some(s) if !s.is_disabled() && !s.just_compacted() => s.clone(),
|
||||
_ => return Ok(()),
|
||||
};
|
||||
let current_tokens = self.total_tokens().tokens;
|
||||
if !state.exceeds_post_run(current_tokens) {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let retained = state.retained_turns();
|
||||
match self.compact(retained).await {
|
||||
|
|
|
|||
|
|
@ -7,8 +7,8 @@
|
|||
//! read-only summary information and only return control-flow
|
||||
//! decisions (continue / skip / abort / pause).
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use llm_worker::Item;
|
||||
|
|
@ -17,6 +17,7 @@ use llm_worker::interceptor::{
|
|||
ToolResultInfo, TurnEndAction,
|
||||
};
|
||||
use llm_worker::tool::ToolOutput;
|
||||
use session_store::UsageRecord;
|
||||
use tracing::info;
|
||||
|
||||
use crate::compact_state::CompactState;
|
||||
|
|
@ -25,6 +26,7 @@ use crate::hook::{
|
|||
TurnEndInfo,
|
||||
};
|
||||
use crate::notification_buffer::{NotificationBuffer, format_notification};
|
||||
use crate::token_counter::total_tokens_impl;
|
||||
|
||||
/// Maximum number of bytes copied into `TurnEndInfo::final_text_preview`.
|
||||
const FINAL_TEXT_PREVIEW_LIMIT: usize = 512;
|
||||
|
|
@ -32,6 +34,10 @@ const FINAL_TEXT_PREVIEW_LIMIT: usize = 512;
|
|||
pub(crate) struct PodInterceptor {
|
||||
registry: Arc<HookRegistry>,
|
||||
compact_state: Option<Arc<CompactState>>,
|
||||
/// Shared view of the cumulative UsageRecord timeline. Used with the
|
||||
/// per-request `context` to estimate current occupancy for threshold
|
||||
/// checks. `None` when compaction is disabled (both thresholds unset).
|
||||
usage_history: Option<Arc<Mutex<Vec<UsageRecord>>>>,
|
||||
/// Pending-notification buffer drained into the per-request
|
||||
/// context at the head of `pre_llm_request`.
|
||||
pending_notifications: NotificationBuffer,
|
||||
|
|
@ -45,11 +51,13 @@ impl PodInterceptor {
|
|||
pub(crate) fn new(
|
||||
registry: Arc<HookRegistry>,
|
||||
compact_state: Option<Arc<CompactState>>,
|
||||
usage_history: Option<Arc<Mutex<Vec<UsageRecord>>>>,
|
||||
pending_notifications: NotificationBuffer,
|
||||
) -> Self {
|
||||
Self {
|
||||
registry,
|
||||
compact_state,
|
||||
usage_history,
|
||||
pending_notifications,
|
||||
next_turn_index: AtomicUsize::new(0),
|
||||
tool_calls_this_turn: AtomicUsize::new(0),
|
||||
|
|
@ -61,6 +69,15 @@ impl PodInterceptor {
|
|||
.load(Ordering::Relaxed)
|
||||
.saturating_sub(1)
|
||||
}
|
||||
|
||||
/// Estimate current input-token occupancy for `context`, projected
|
||||
/// through the shared UsageRecord timeline. Returns `None` when
|
||||
/// `usage_history` is not attached (compaction fully disabled).
|
||||
fn estimated_tokens(&self, context: &[Item]) -> Option<u64> {
|
||||
let handle = self.usage_history.as_ref()?;
|
||||
let records = handle.lock().expect("usage_history poisoned").clone();
|
||||
Some(total_tokens_impl(context, &records).tokens)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
|
|
@ -83,15 +100,20 @@ impl Interceptor for PodInterceptor {
|
|||
}
|
||||
|
||||
async fn pre_llm_request(&self, context: &mut Vec<Item>) -> PreRequestAction {
|
||||
// Internal mechanism: between-turns compaction trigger.
|
||||
let current_tokens = self.estimated_tokens(context);
|
||||
|
||||
// Internal mechanism: between-requests compaction trigger (safety net).
|
||||
if let Some(state) = self.compact_state.as_ref() {
|
||||
if !state.is_disabled() && state.exceeds_turn() {
|
||||
info!(
|
||||
input_tokens = state.last_input_tokens(),
|
||||
threshold = state.turn_threshold(),
|
||||
"Between-turns compaction threshold exceeded, yielding"
|
||||
);
|
||||
return PreRequestAction::Yield;
|
||||
if !state.is_disabled() {
|
||||
let current = current_tokens.unwrap_or(0);
|
||||
if state.exceeds_request(current) {
|
||||
info!(
|
||||
input_tokens = current,
|
||||
threshold = state.request_threshold().unwrap_or(0),
|
||||
"Between-requests compaction threshold exceeded, yielding"
|
||||
);
|
||||
return PreRequestAction::Yield;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -105,7 +127,7 @@ impl Interceptor for PodInterceptor {
|
|||
|
||||
let info = PreRequestInfo {
|
||||
item_count: context.len(),
|
||||
estimated_tokens: self.compact_state.as_ref().map(|s| s.last_input_tokens()),
|
||||
estimated_tokens: current_tokens,
|
||||
turn_index: self.current_turn_index(),
|
||||
tool_calls_this_turn: self.tool_calls_this_turn.load(Ordering::Relaxed),
|
||||
};
|
||||
|
|
@ -232,16 +254,35 @@ mod tests {
|
|||
Arc::new(builder.build())
|
||||
}
|
||||
|
||||
/// Build a usage_history handle with a single record pinned at the
|
||||
/// current `context_len` so that `total_tokens_impl` returns exactly
|
||||
/// `tokens` (Measured, no interpolation or byte-based fallback).
|
||||
fn usage_handle_with(context_len: usize, tokens: u64) -> Arc<Mutex<Vec<UsageRecord>>> {
|
||||
Arc::new(Mutex::new(vec![UsageRecord {
|
||||
history_len: context_len,
|
||||
input_total_tokens: tokens,
|
||||
cache_read_tokens: 0,
|
||||
cache_write_tokens: 0,
|
||||
output_tokens: 0,
|
||||
}]))
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn pre_llm_request_yields_and_skips_hooks_when_compact_threshold_exceeded() {
|
||||
async fn pre_llm_request_yields_and_skips_hooks_when_request_threshold_exceeded() {
|
||||
let count = Arc::new(AtomicUsize::new(0));
|
||||
let registry = registry_with_pre_llm_hook(count.clone());
|
||||
|
||||
let state = Arc::new(CompactState::new(100, 2));
|
||||
state.update_input_tokens(200); // exceeds turn threshold
|
||||
let state = Arc::new(CompactState::new(None, Some(100), 2));
|
||||
let ctx_items = vec![Item::user_message("hi")];
|
||||
let history = usage_handle_with(ctx_items.len(), 200);
|
||||
|
||||
let interceptor = PodInterceptor::new(registry, Some(state), NotificationBuffer::new());
|
||||
let mut ctx: Vec<Item> = vec![Item::user_message("hi")];
|
||||
let interceptor = PodInterceptor::new(
|
||||
registry,
|
||||
Some(state),
|
||||
Some(history),
|
||||
NotificationBuffer::new(),
|
||||
);
|
||||
let mut ctx = ctx_items;
|
||||
let action = interceptor.pre_llm_request(&mut ctx).await;
|
||||
|
||||
assert!(matches!(action, PreRequestAction::Yield));
|
||||
|
|
@ -254,11 +295,41 @@ mod tests {
|
|||
let count = Arc::new(AtomicUsize::new(0));
|
||||
let registry = registry_with_pre_llm_hook(count.clone());
|
||||
|
||||
let state = Arc::new(CompactState::new(100, 2));
|
||||
// last_input_tokens stays at 0, well below threshold.
|
||||
let state = Arc::new(CompactState::new(None, Some(100), 2));
|
||||
let ctx_items = vec![Item::user_message("hi")];
|
||||
let history = usage_handle_with(ctx_items.len(), 50);
|
||||
|
||||
let interceptor = PodInterceptor::new(registry, Some(state), NotificationBuffer::new());
|
||||
let mut ctx: Vec<Item> = vec![Item::user_message("hi")];
|
||||
let interceptor = PodInterceptor::new(
|
||||
registry,
|
||||
Some(state),
|
||||
Some(history),
|
||||
NotificationBuffer::new(),
|
||||
);
|
||||
let mut ctx = ctx_items;
|
||||
let action = interceptor.pre_llm_request(&mut ctx).await;
|
||||
|
||||
assert!(matches!(action, PreRequestAction::Continue));
|
||||
assert_eq!(count.load(Ordering::Relaxed), 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn pre_llm_request_does_not_yield_when_only_post_run_threshold_set() {
|
||||
// request_threshold = None → safety-net check is inert inside the turn
|
||||
// even if current occupancy is huge. Post-run check runs elsewhere.
|
||||
let count = Arc::new(AtomicUsize::new(0));
|
||||
let registry = registry_with_pre_llm_hook(count.clone());
|
||||
|
||||
let state = Arc::new(CompactState::new(Some(100), None, 2));
|
||||
let ctx_items = vec![Item::user_message("hi")];
|
||||
let history = usage_handle_with(ctx_items.len(), 10_000);
|
||||
|
||||
let interceptor = PodInterceptor::new(
|
||||
registry,
|
||||
Some(state),
|
||||
Some(history),
|
||||
NotificationBuffer::new(),
|
||||
);
|
||||
let mut ctx = ctx_items;
|
||||
let action = interceptor.pre_llm_request(&mut ctx).await;
|
||||
|
||||
assert!(matches!(action, PreRequestAction::Continue));
|
||||
|
|
@ -270,7 +341,7 @@ mod tests {
|
|||
let count = Arc::new(AtomicUsize::new(0));
|
||||
let registry = registry_with_pre_llm_hook(count.clone());
|
||||
|
||||
let interceptor = PodInterceptor::new(registry, None, NotificationBuffer::new());
|
||||
let interceptor = PodInterceptor::new(registry, None, None, NotificationBuffer::new());
|
||||
let mut ctx: Vec<Item> = Vec::new();
|
||||
let action = interceptor.pre_llm_request(&mut ctx).await;
|
||||
|
||||
|
|
@ -295,7 +366,7 @@ mod tests {
|
|||
buffer.push("first".into());
|
||||
buffer.push("second".into());
|
||||
|
||||
let interceptor = PodInterceptor::new(registry, None, buffer.clone());
|
||||
let interceptor = PodInterceptor::new(registry, None, None, buffer.clone());
|
||||
let mut ctx: Vec<Item> = vec![Item::user_message("hi")];
|
||||
let action = interceptor.pre_llm_request(&mut ctx).await;
|
||||
|
||||
|
|
@ -320,15 +391,18 @@ mod tests {
|
|||
let buffer = NotificationBuffer::new();
|
||||
buffer.push("msg".into());
|
||||
|
||||
let state = Arc::new(CompactState::new(100, 2));
|
||||
state.update_input_tokens(200);
|
||||
let state = Arc::new(CompactState::new(None, Some(100), 2));
|
||||
let ctx_items = vec![Item::user_message("hi")];
|
||||
let history = usage_handle_with(ctx_items.len(), 200);
|
||||
|
||||
let interceptor = PodInterceptor::new(registry, Some(state), buffer.clone());
|
||||
let mut ctx: Vec<Item> = Vec::new();
|
||||
let interceptor =
|
||||
PodInterceptor::new(registry, Some(state), Some(history), buffer.clone());
|
||||
let mut ctx = ctx_items;
|
||||
let action = interceptor.pre_llm_request(&mut ctx).await;
|
||||
|
||||
assert!(matches!(action, PreRequestAction::Yield));
|
||||
assert!(ctx.is_empty());
|
||||
// Notifications were not drained (still held for post-compact resume).
|
||||
assert_eq!(ctx.len(), 1);
|
||||
assert_eq!(buffer.len(), 1);
|
||||
}
|
||||
|
||||
|
|
@ -341,7 +415,7 @@ mod tests {
|
|||
builder.add_pre_llm_request(CountingHook(second_count.clone()));
|
||||
let registry = Arc::new(builder.build());
|
||||
|
||||
let interceptor = PodInterceptor::new(registry, None, NotificationBuffer::new());
|
||||
let interceptor = PodInterceptor::new(registry, None, None, NotificationBuffer::new());
|
||||
let mut ctx: Vec<Item> = Vec::new();
|
||||
let action = interceptor.pre_llm_request(&mut ctx).await;
|
||||
|
||||
|
|
|
|||
|
|
@ -169,7 +169,7 @@ fn tokens_at(
|
|||
}
|
||||
}
|
||||
|
||||
fn total_tokens_impl(history: &[Item], records: &[UsageRecord]) -> TokenEstimate {
|
||||
pub(crate) fn total_tokens_impl(history: &[Item], records: &[UsageRecord]) -> TokenEstimate {
|
||||
let prefix = prefix_bytes(history);
|
||||
tokens_at(history, records, history.len(), &prefix)
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user