From f0a865552c5a71ef292522e7bb5c16fa177966ba Mon Sep 17 00:00:00 2001 From: Hare Date: Sun, 19 Apr 2026 08:49:25 +0900 Subject: [PATCH] =?UTF-8?q?compact:=20=E9=96=BE=E5=80=A4=E3=82=92=E5=80=8B?= =?UTF-8?q?=E5=88=A5=E6=8C=87=E5=AE=9A=E5=8C=96=E3=81=97=E5=8D=A0=E6=9C=89?= =?UTF-8?q?=E9=87=8F=E3=82=BD=E3=83=BC=E3=82=B9=E3=82=92=20UsageRecord=20?= =?UTF-8?q?=E3=81=AB=E4=B8=80=E6=9C=AC=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - manifest に compact_request_threshold を追加 (proactive と safety net を個別指定) - CompactState の両閾値を Option 化、last_input_tokens を撤去 - 閾値判定は Pod::total_tokens() / usage_history 経由の実測値ベースに切替 - turn_threshold → request_threshold にリネーム、Between-requests のログへ --- crates/manifest/src/config.rs | 6 ++ crates/manifest/src/lib.rs | 48 ++++++++++- crates/pod/src/compact_state.rs | 129 +++++++++++++++++------------- crates/pod/src/pod.rs | 72 +++++++++++------ crates/pod/src/pod_interceptor.rs | 128 ++++++++++++++++++++++------- crates/pod/src/token_counter.rs | 2 +- 6 files changed, 274 insertions(+), 111 deletions(-) diff --git a/crates/manifest/src/config.rs b/crates/manifest/src/config.rs index f6fddcf8..0d736e9f 100644 --- a/crates/manifest/src/config.rs +++ b/crates/manifest/src/config.rs @@ -84,6 +84,8 @@ pub struct CompactionConfigPartial { #[serde(default)] pub compact_threshold: Option, #[serde(default)] + pub compact_request_threshold: Option, + #[serde(default)] pub compact_retained_turns: Option, #[serde(default)] pub provider: Option, @@ -236,6 +238,9 @@ impl CompactionConfigPartial { prune_protected_turns: upper.prune_protected_turns.or(self.prune_protected_turns), prune_min_savings: upper.prune_min_savings.or(self.prune_min_savings), compact_threshold: upper.compact_threshold.or(self.compact_threshold), + compact_request_threshold: upper + .compact_request_threshold + .or(self.compact_request_threshold), compact_retained_turns: upper .compact_retained_turns .or(self.compact_retained_turns), @@ -365,6 +370,7 @@ impl TryFrom for PodManifest { .prune_min_savings .unwrap_or(defaults::PRUNE_MIN_SAVINGS), compact_threshold: c.compact_threshold, + compact_request_threshold: c.compact_request_threshold, compact_retained_turns: c .compact_retained_turns .unwrap_or(defaults::COMPACT_RETAINED_TURNS), diff --git a/crates/manifest/src/lib.rs b/crates/manifest/src/lib.rs index d7aef1ed..4c07f043 100644 --- a/crates/manifest/src/lib.rs +++ b/crates/manifest/src/lib.rs @@ -174,10 +174,27 @@ pub struct CompactionConfig { #[serde(default = "default_prune_min_savings")] pub prune_min_savings: u64, - /// When `input_tokens` exceeds this, run compact. `None` = compact disabled. + /// Proactive (between-turns) compaction threshold. + /// + /// Checked by the Controller after each run. When current occupancy + /// exceeds this value, compact runs before the next turn. `None` + /// disables the between-turns check. #[serde(default)] pub compact_threshold: Option, + /// Safety-net (between-requests) compaction threshold. + /// + /// Checked by `PodInterceptor::pre_llm_request` inside a turn. When + /// current occupancy exceeds this value, the run yields so that the + /// Controller can compact before the next LLM request. `None` + /// disables the between-requests check. + /// + /// Expected relation: `compact_threshold < compact_request_threshold` + /// (proactive triggers before safety net). A reversed configuration + /// is accepted but logged as a warning. + #[serde(default)] + pub compact_request_threshold: Option, + /// Number of recent turns retained after compaction. #[serde(default = "default_compact_retained_turns")] pub compact_retained_turns: usize, @@ -204,6 +221,7 @@ impl Default for CompactionConfig { prune_protected_turns: default_prune_protected_turns(), prune_min_savings: default_prune_min_savings(), compact_threshold: None, + compact_request_threshold: None, compact_retained_turns: default_compact_retained_turns(), provider: None, } @@ -338,9 +356,37 @@ model = "claude-sonnet-4-20250514" assert_eq!(c.prune_protected_turns, 3); assert_eq!(c.prune_min_savings, 4096); assert_eq!(c.compact_threshold, Some(80000)); + assert_eq!(c.compact_request_threshold, None); assert_eq!(c.compact_retained_turns, 2); } + #[test] + fn parse_compaction_both_thresholds() { + let toml = format!( + "{MINIMAL_REQUIRED}\n\ + [compaction]\n\ + compact_threshold = 80000\n\ + compact_request_threshold = 90000\n" + ); + let manifest = PodManifest::from_toml(&toml).unwrap(); + let c = manifest.compaction.unwrap(); + assert_eq!(c.compact_threshold, Some(80000)); + assert_eq!(c.compact_request_threshold, Some(90000)); + } + + #[test] + fn parse_compaction_request_threshold_only() { + let toml = format!( + "{MINIMAL_REQUIRED}\n\ + [compaction]\n\ + compact_request_threshold = 90000\n" + ); + let manifest = PodManifest::from_toml(&toml).unwrap(); + let c = manifest.compaction.unwrap(); + assert_eq!(c.compact_threshold, None); + assert_eq!(c.compact_request_threshold, Some(90000)); + } + #[test] fn parse_compaction_with_provider() { let toml = format!( diff --git a/crates/pod/src/compact_state.rs b/crates/pod/src/compact_state.rs index 30e0da37..a01fa5f4 100644 --- a/crates/pod/src/compact_state.rs +++ b/crates/pod/src/compact_state.rs @@ -1,22 +1,30 @@ //! Shared state for compaction decisions. //! -//! Holds atomic counters shared between: -//! - `on_usage` callback (writes `last_input_tokens`) -//! - `CompactInterceptor` (reads token count, checks thresholds) -//! - `Pod::run()`/`resume()` (circuit breaker, thrash detection) +//! Holds the two configured thresholds and circuit-breaker / thrash-detection +//! flags shared between: +//! - `PodInterceptor` (reads `request_threshold` — the *safety net* for +//! between-requests yielding) +//! - `Pod::try_post_run_compact` (reads `post_run_threshold` — the +//! *proactive* check between turns) +//! - `Pod::run()` / `resume()` (circuit breaker, thrash detection) +//! +//! Current occupancy (input-token count) is **not** stored here. The single +//! source of truth is `session_store::UsageRecord` (persisted per LLM call) +//! projected through `Pod::total_tokens()`. Callers pass the current +//! occupancy to `exceeds_*` at check time. -use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; const MAX_COMPACT_FAILURES: usize = 3; /// Shared mutable state for compaction decisions. pub(crate) struct CompactState { - /// Last observed input_tokens from `on_usage` callback. - last_input_tokens: AtomicU64, - /// Proactive threshold — checked in `pre_llm_request` (between turns). - turn_threshold: u64, - /// Post-run threshold — checked by Controller after run completes. - post_run_threshold: u64, + /// Between-turns threshold (proactive). Checked by the Controller + /// after a run completes. `None` disables the post-run check. + post_run_threshold: Option, + /// Between-requests threshold (safety net). Checked inside a turn + /// before each LLM request. `None` disables the request check. + request_threshold: Option, /// Number of recent turns to retain after compaction. retained_turns: usize, /// Consecutive compact failures. At `MAX_COMPACT_FAILURES`, compaction is disabled. @@ -28,15 +36,14 @@ pub(crate) struct CompactState { } impl CompactState { - /// Create a new CompactState. - /// - /// `turn_threshold` is the proactive (80%) threshold from the manifest. - /// `post_run_threshold` is derived as `turn_threshold * 9 / 8` (≈90%). - pub(crate) fn new(turn_threshold: u64, retained_turns: usize) -> Self { + pub(crate) fn new( + post_run_threshold: Option, + request_threshold: Option, + retained_turns: usize, + ) -> Self { Self { - last_input_tokens: AtomicU64::new(0), - turn_threshold, - post_run_threshold: turn_threshold * 9 / 8, + post_run_threshold, + request_threshold, retained_turns, consecutive_failures: AtomicUsize::new(0), just_compacted: AtomicBool::new(false), @@ -44,19 +51,9 @@ impl CompactState { } } - /// Update the last observed input_tokens (called from `on_usage`). - pub(crate) fn update_input_tokens(&self, tokens: u64) { - self.last_input_tokens.store(tokens, Ordering::Relaxed); - } - - /// Read the last observed input_tokens. - pub(crate) fn last_input_tokens(&self) -> u64 { - self.last_input_tokens.load(Ordering::Relaxed) - } - - /// The between-turns threshold value. - pub(crate) fn turn_threshold(&self) -> u64 { - self.turn_threshold + /// Configured between-requests threshold (if any). + pub(crate) fn request_threshold(&self) -> Option { + self.request_threshold } /// Number of turns to retain after compaction. @@ -69,14 +66,20 @@ impl CompactState { self.disabled.load(Ordering::Relaxed) } - /// Whether `last_input_tokens` exceeds the between-turns threshold. - pub(crate) fn exceeds_turn(&self) -> bool { - self.last_input_tokens() > self.turn_threshold + /// Whether `current_tokens` exceeds the between-requests threshold. + /// Returns `false` when `request_threshold` is unset. + pub(crate) fn exceeds_request(&self, current_tokens: u64) -> bool { + self.request_threshold + .map(|t| current_tokens > t) + .unwrap_or(false) } - /// Whether `last_input_tokens` exceeds the post-run threshold. - pub(crate) fn exceeds_post_run(&self) -> bool { - self.last_input_tokens() > self.post_run_threshold + /// Whether `current_tokens` exceeds the post-run threshold. + /// Returns `false` when `post_run_threshold` is unset. + pub(crate) fn exceeds_post_run(&self, current_tokens: u64) -> bool { + self.post_run_threshold + .map(|t| current_tokens > t) + .unwrap_or(false) } /// Whether a compact just completed (for thrash detection). @@ -109,31 +112,46 @@ mod tests { use super::*; #[test] - fn threshold_derivation() { - let state = CompactState::new(80_000, 2); - assert_eq!(state.turn_threshold, 80_000); - assert_eq!(state.post_run_threshold, 90_000); + fn both_thresholds_configured() { + let state = CompactState::new(Some(80_000), Some(90_000), 2); + assert_eq!(state.request_threshold(), Some(90_000)); assert_eq!(state.retained_turns(), 2); + + assert!(!state.exceeds_request(70_000)); + assert!(!state.exceeds_post_run(70_000)); + + assert!(!state.exceeds_request(85_000)); + assert!(state.exceeds_post_run(85_000)); + + assert!(state.exceeds_request(95_000)); + assert!(state.exceeds_post_run(95_000)); } #[test] - fn exceeds_checks() { - let state = CompactState::new(80_000, 2); - assert!(!state.exceeds_turn()); - assert!(!state.exceeds_post_run()); + fn post_run_only() { + let state = CompactState::new(Some(80_000), None, 2); + // request check always false when threshold is None. + assert!(!state.exceeds_request(1_000_000)); + assert!(state.exceeds_post_run(85_000)); + } - state.update_input_tokens(85_000); - assert!(state.exceeds_turn()); - assert!(!state.exceeds_post_run()); + #[test] + fn request_only() { + let state = CompactState::new(None, Some(90_000), 2); + assert!(!state.exceeds_post_run(1_000_000)); + assert!(state.exceeds_request(95_000)); + } - state.update_input_tokens(95_000); - assert!(state.exceeds_turn()); - assert!(state.exceeds_post_run()); + #[test] + fn both_none_disables_all_checks() { + let state = CompactState::new(None, None, 2); + assert!(!state.exceeds_request(1_000_000)); + assert!(!state.exceeds_post_run(1_000_000)); } #[test] fn circuit_breaker_trips_after_max_failures() { - let state = CompactState::new(80_000, 2); + let state = CompactState::new(Some(80_000), Some(90_000), 2); assert!(!state.is_disabled()); state.record_compact_failure(); @@ -146,7 +164,7 @@ mod tests { #[test] fn success_resets_failure_count() { - let state = CompactState::new(80_000, 2); + let state = CompactState::new(Some(80_000), Some(90_000), 2); state.record_compact_failure(); state.record_compact_failure(); assert!(!state.is_disabled()); @@ -154,7 +172,6 @@ mod tests { state.record_compact_success(); assert!(state.just_compacted()); - // After success + 2 more failures, still not disabled (count was reset). state.record_compact_failure(); state.record_compact_failure(); assert!(!state.is_disabled()); @@ -162,7 +179,7 @@ mod tests { #[test] fn just_compacted_lifecycle() { - let state = CompactState::new(80_000, 2); + let state = CompactState::new(Some(80_000), Some(90_000), 2); assert!(!state.just_compacted()); state.record_compact_success(); diff --git a/crates/pod/src/pod.rs b/crates/pod/src/pod.rs index 5967ff22..4fbbb012 100644 --- a/crates/pod/src/pod.rs +++ b/crates/pod/src/pod.rs @@ -391,9 +391,10 @@ impl Pod { /// Install the hook-based interceptor on the Worker if not already done. /// - /// When `compact_threshold` is configured in the manifest, wraps the - /// `HookInterceptor` in a [`CompactInterceptor`] and registers an - /// `on_usage` callback to track `input_tokens`. + /// When either compaction threshold (`compact_threshold` or + /// `compact_request_threshold`) is configured in the manifest, allocates + /// a shared [`CompactState`] and wires the interceptor to read current + /// occupancy through the `UsageRecord` timeline. fn ensure_interceptor_installed(&mut self) { if !self.interceptor_installed { // Pre-LLM-request hook: record the item count at send time @@ -406,41 +407,56 @@ impl Pod { let builder = std::mem::take(&mut self.hook_builder); let registry = Arc::new(builder.build()); - let compact_threshold = self + let (post_run_threshold, request_threshold, retained) = self .manifest .compaction .as_ref() - .and_then(|c| c.compact_threshold); + .map(|c| { + ( + c.compact_threshold, + c.compact_request_threshold, + c.compact_retained_turns, + ) + }) + .unwrap_or((None, None, 2)); let tracker_for_usage = self.usage_tracker.clone(); + self.worker_mut().on_usage(move |event| { + tracker_for_usage.record_usage(event); + }); - let compact_state = if let Some(threshold) = compact_threshold { - let retained = self - .manifest - .compaction - .as_ref() - .map(|c| c.compact_retained_turns) - .unwrap_or(2); - - let state = Arc::new(CompactState::new(threshold, retained)); - let state_for_usage = state.clone(); - self.worker_mut().on_usage(move |event| { - if let Some(tokens) = event.input_tokens { - state_for_usage.update_input_tokens(tokens); + let compact_state = if post_run_threshold.is_some() || request_threshold.is_some() { + if let (Some(post), Some(req)) = (post_run_threshold, request_threshold) { + if post > req { + warn!( + post_run_threshold = post, + request_threshold = req, + "compact_threshold > compact_request_threshold; \ + proactive check will never fire before the safety net" + ); } - tracker_for_usage.record_usage(event); - }); + } + let state = Arc::new(CompactState::new( + post_run_threshold, + request_threshold, + retained, + )); self.compact_state = Some(state.clone()); Some(state) } else { - self.worker_mut().on_usage(move |event| { - tracker_for_usage.record_usage(event); - }); None }; - let interceptor = - PodInterceptor::new(registry, compact_state, self.pending_notifications.clone()); + let usage_history_handle = compact_state + .as_ref() + .map(|_| self.usage_history.clone()); + + let interceptor = PodInterceptor::new( + registry, + compact_state, + usage_history_handle, + self.pending_notifications.clone(), + ); self.worker_mut().set_interceptor(interceptor); self.interceptor_installed = true; } @@ -667,9 +683,13 @@ impl Pod { /// Best-effort: failures are logged but do not propagate. pub async fn try_post_run_compact(&mut self) -> Result<(), PodError> { let state = match self.compact_state.as_ref() { - Some(s) if !s.is_disabled() && s.exceeds_post_run() && !s.just_compacted() => s.clone(), + Some(s) if !s.is_disabled() && !s.just_compacted() => s.clone(), _ => return Ok(()), }; + let current_tokens = self.total_tokens().tokens; + if !state.exceeds_post_run(current_tokens) { + return Ok(()); + } let retained = state.retained_turns(); match self.compact(retained).await { diff --git a/crates/pod/src/pod_interceptor.rs b/crates/pod/src/pod_interceptor.rs index b00b85a9..be803086 100644 --- a/crates/pod/src/pod_interceptor.rs +++ b/crates/pod/src/pod_interceptor.rs @@ -7,8 +7,8 @@ //! read-only summary information and only return control-flow //! decisions (continue / skip / abort / pause). -use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex}; use async_trait::async_trait; use llm_worker::Item; @@ -17,6 +17,7 @@ use llm_worker::interceptor::{ ToolResultInfo, TurnEndAction, }; use llm_worker::tool::ToolOutput; +use session_store::UsageRecord; use tracing::info; use crate::compact_state::CompactState; @@ -25,6 +26,7 @@ use crate::hook::{ TurnEndInfo, }; use crate::notification_buffer::{NotificationBuffer, format_notification}; +use crate::token_counter::total_tokens_impl; /// Maximum number of bytes copied into `TurnEndInfo::final_text_preview`. const FINAL_TEXT_PREVIEW_LIMIT: usize = 512; @@ -32,6 +34,10 @@ const FINAL_TEXT_PREVIEW_LIMIT: usize = 512; pub(crate) struct PodInterceptor { registry: Arc, compact_state: Option>, + /// Shared view of the cumulative UsageRecord timeline. Used with the + /// per-request `context` to estimate current occupancy for threshold + /// checks. `None` when compaction is disabled (both thresholds unset). + usage_history: Option>>>, /// Pending-notification buffer drained into the per-request /// context at the head of `pre_llm_request`. pending_notifications: NotificationBuffer, @@ -45,11 +51,13 @@ impl PodInterceptor { pub(crate) fn new( registry: Arc, compact_state: Option>, + usage_history: Option>>>, pending_notifications: NotificationBuffer, ) -> Self { Self { registry, compact_state, + usage_history, pending_notifications, next_turn_index: AtomicUsize::new(0), tool_calls_this_turn: AtomicUsize::new(0), @@ -61,6 +69,15 @@ impl PodInterceptor { .load(Ordering::Relaxed) .saturating_sub(1) } + + /// Estimate current input-token occupancy for `context`, projected + /// through the shared UsageRecord timeline. Returns `None` when + /// `usage_history` is not attached (compaction fully disabled). + fn estimated_tokens(&self, context: &[Item]) -> Option { + let handle = self.usage_history.as_ref()?; + let records = handle.lock().expect("usage_history poisoned").clone(); + Some(total_tokens_impl(context, &records).tokens) + } } #[async_trait] @@ -83,15 +100,20 @@ impl Interceptor for PodInterceptor { } async fn pre_llm_request(&self, context: &mut Vec) -> PreRequestAction { - // Internal mechanism: between-turns compaction trigger. + let current_tokens = self.estimated_tokens(context); + + // Internal mechanism: between-requests compaction trigger (safety net). if let Some(state) = self.compact_state.as_ref() { - if !state.is_disabled() && state.exceeds_turn() { - info!( - input_tokens = state.last_input_tokens(), - threshold = state.turn_threshold(), - "Between-turns compaction threshold exceeded, yielding" - ); - return PreRequestAction::Yield; + if !state.is_disabled() { + let current = current_tokens.unwrap_or(0); + if state.exceeds_request(current) { + info!( + input_tokens = current, + threshold = state.request_threshold().unwrap_or(0), + "Between-requests compaction threshold exceeded, yielding" + ); + return PreRequestAction::Yield; + } } } @@ -105,7 +127,7 @@ impl Interceptor for PodInterceptor { let info = PreRequestInfo { item_count: context.len(), - estimated_tokens: self.compact_state.as_ref().map(|s| s.last_input_tokens()), + estimated_tokens: current_tokens, turn_index: self.current_turn_index(), tool_calls_this_turn: self.tool_calls_this_turn.load(Ordering::Relaxed), }; @@ -232,16 +254,35 @@ mod tests { Arc::new(builder.build()) } + /// Build a usage_history handle with a single record pinned at the + /// current `context_len` so that `total_tokens_impl` returns exactly + /// `tokens` (Measured, no interpolation or byte-based fallback). + fn usage_handle_with(context_len: usize, tokens: u64) -> Arc>> { + Arc::new(Mutex::new(vec![UsageRecord { + history_len: context_len, + input_total_tokens: tokens, + cache_read_tokens: 0, + cache_write_tokens: 0, + output_tokens: 0, + }])) + } + #[tokio::test] - async fn pre_llm_request_yields_and_skips_hooks_when_compact_threshold_exceeded() { + async fn pre_llm_request_yields_and_skips_hooks_when_request_threshold_exceeded() { let count = Arc::new(AtomicUsize::new(0)); let registry = registry_with_pre_llm_hook(count.clone()); - let state = Arc::new(CompactState::new(100, 2)); - state.update_input_tokens(200); // exceeds turn threshold + let state = Arc::new(CompactState::new(None, Some(100), 2)); + let ctx_items = vec![Item::user_message("hi")]; + let history = usage_handle_with(ctx_items.len(), 200); - let interceptor = PodInterceptor::new(registry, Some(state), NotificationBuffer::new()); - let mut ctx: Vec = vec![Item::user_message("hi")]; + let interceptor = PodInterceptor::new( + registry, + Some(state), + Some(history), + NotificationBuffer::new(), + ); + let mut ctx = ctx_items; let action = interceptor.pre_llm_request(&mut ctx).await; assert!(matches!(action, PreRequestAction::Yield)); @@ -254,11 +295,41 @@ mod tests { let count = Arc::new(AtomicUsize::new(0)); let registry = registry_with_pre_llm_hook(count.clone()); - let state = Arc::new(CompactState::new(100, 2)); - // last_input_tokens stays at 0, well below threshold. + let state = Arc::new(CompactState::new(None, Some(100), 2)); + let ctx_items = vec![Item::user_message("hi")]; + let history = usage_handle_with(ctx_items.len(), 50); - let interceptor = PodInterceptor::new(registry, Some(state), NotificationBuffer::new()); - let mut ctx: Vec = vec![Item::user_message("hi")]; + let interceptor = PodInterceptor::new( + registry, + Some(state), + Some(history), + NotificationBuffer::new(), + ); + let mut ctx = ctx_items; + let action = interceptor.pre_llm_request(&mut ctx).await; + + assert!(matches!(action, PreRequestAction::Continue)); + assert_eq!(count.load(Ordering::Relaxed), 1); + } + + #[tokio::test] + async fn pre_llm_request_does_not_yield_when_only_post_run_threshold_set() { + // request_threshold = None → safety-net check is inert inside the turn + // even if current occupancy is huge. Post-run check runs elsewhere. + let count = Arc::new(AtomicUsize::new(0)); + let registry = registry_with_pre_llm_hook(count.clone()); + + let state = Arc::new(CompactState::new(Some(100), None, 2)); + let ctx_items = vec![Item::user_message("hi")]; + let history = usage_handle_with(ctx_items.len(), 10_000); + + let interceptor = PodInterceptor::new( + registry, + Some(state), + Some(history), + NotificationBuffer::new(), + ); + let mut ctx = ctx_items; let action = interceptor.pre_llm_request(&mut ctx).await; assert!(matches!(action, PreRequestAction::Continue)); @@ -270,7 +341,7 @@ mod tests { let count = Arc::new(AtomicUsize::new(0)); let registry = registry_with_pre_llm_hook(count.clone()); - let interceptor = PodInterceptor::new(registry, None, NotificationBuffer::new()); + let interceptor = PodInterceptor::new(registry, None, None, NotificationBuffer::new()); let mut ctx: Vec = Vec::new(); let action = interceptor.pre_llm_request(&mut ctx).await; @@ -295,7 +366,7 @@ mod tests { buffer.push("first".into()); buffer.push("second".into()); - let interceptor = PodInterceptor::new(registry, None, buffer.clone()); + let interceptor = PodInterceptor::new(registry, None, None, buffer.clone()); let mut ctx: Vec = vec![Item::user_message("hi")]; let action = interceptor.pre_llm_request(&mut ctx).await; @@ -320,15 +391,18 @@ mod tests { let buffer = NotificationBuffer::new(); buffer.push("msg".into()); - let state = Arc::new(CompactState::new(100, 2)); - state.update_input_tokens(200); + let state = Arc::new(CompactState::new(None, Some(100), 2)); + let ctx_items = vec![Item::user_message("hi")]; + let history = usage_handle_with(ctx_items.len(), 200); - let interceptor = PodInterceptor::new(registry, Some(state), buffer.clone()); - let mut ctx: Vec = Vec::new(); + let interceptor = + PodInterceptor::new(registry, Some(state), Some(history), buffer.clone()); + let mut ctx = ctx_items; let action = interceptor.pre_llm_request(&mut ctx).await; assert!(matches!(action, PreRequestAction::Yield)); - assert!(ctx.is_empty()); + // Notifications were not drained (still held for post-compact resume). + assert_eq!(ctx.len(), 1); assert_eq!(buffer.len(), 1); } @@ -341,7 +415,7 @@ mod tests { builder.add_pre_llm_request(CountingHook(second_count.clone())); let registry = Arc::new(builder.build()); - let interceptor = PodInterceptor::new(registry, None, NotificationBuffer::new()); + let interceptor = PodInterceptor::new(registry, None, None, NotificationBuffer::new()); let mut ctx: Vec = Vec::new(); let action = interceptor.pre_llm_request(&mut ctx).await; diff --git a/crates/pod/src/token_counter.rs b/crates/pod/src/token_counter.rs index 3ca4dc76..f698a867 100644 --- a/crates/pod/src/token_counter.rs +++ b/crates/pod/src/token_counter.rs @@ -169,7 +169,7 @@ fn tokens_at( } } -fn total_tokens_impl(history: &[Item], records: &[UsageRecord]) -> TokenEstimate { +pub(crate) fn total_tokens_impl(history: &[Item], records: &[UsageRecord]) -> TokenEstimate { let prefix = prefix_bytes(history); tokens_at(history, records, history.len(), &prefix) }