From f0a865552c5a71ef292522e7bb5c16fa177966ba Mon Sep 17 00:00:00 2001 From: Hare Date: Sun, 19 Apr 2026 08:49:25 +0900 Subject: [PATCH 1/6] =?UTF-8?q?compact:=20=E9=96=BE=E5=80=A4=E3=82=92?= =?UTF-8?q?=E5=80=8B=E5=88=A5=E6=8C=87=E5=AE=9A=E5=8C=96=E3=81=97=E5=8D=A0?= =?UTF-8?q?=E6=9C=89=E9=87=8F=E3=82=BD=E3=83=BC=E3=82=B9=E3=82=92=20UsageR?= =?UTF-8?q?ecord=20=E3=81=AB=E4=B8=80=E6=9C=AC=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - manifest に compact_request_threshold を追加 (proactive と safety net を個別指定) - CompactState の両閾値を Option 化、last_input_tokens を撤去 - 閾値判定は Pod::total_tokens() / usage_history 経由の実測値ベースに切替 - turn_threshold → request_threshold にリネーム、Between-requests のログへ --- crates/manifest/src/config.rs | 6 ++ crates/manifest/src/lib.rs | 48 ++++++++++- crates/pod/src/compact_state.rs | 129 +++++++++++++++++------------- crates/pod/src/pod.rs | 72 +++++++++++------ crates/pod/src/pod_interceptor.rs | 128 ++++++++++++++++++++++------- crates/pod/src/token_counter.rs | 2 +- 6 files changed, 274 insertions(+), 111 deletions(-) diff --git a/crates/manifest/src/config.rs b/crates/manifest/src/config.rs index f6fddcf8..0d736e9f 100644 --- a/crates/manifest/src/config.rs +++ b/crates/manifest/src/config.rs @@ -84,6 +84,8 @@ pub struct CompactionConfigPartial { #[serde(default)] pub compact_threshold: Option, #[serde(default)] + pub compact_request_threshold: Option, + #[serde(default)] pub compact_retained_turns: Option, #[serde(default)] pub provider: Option, @@ -236,6 +238,9 @@ impl CompactionConfigPartial { prune_protected_turns: upper.prune_protected_turns.or(self.prune_protected_turns), prune_min_savings: upper.prune_min_savings.or(self.prune_min_savings), compact_threshold: upper.compact_threshold.or(self.compact_threshold), + compact_request_threshold: upper + .compact_request_threshold + .or(self.compact_request_threshold), compact_retained_turns: upper .compact_retained_turns .or(self.compact_retained_turns), @@ -365,6 +370,7 @@ impl TryFrom for PodManifest { .prune_min_savings .unwrap_or(defaults::PRUNE_MIN_SAVINGS), compact_threshold: c.compact_threshold, + compact_request_threshold: c.compact_request_threshold, compact_retained_turns: c .compact_retained_turns .unwrap_or(defaults::COMPACT_RETAINED_TURNS), diff --git a/crates/manifest/src/lib.rs b/crates/manifest/src/lib.rs index d7aef1ed..4c07f043 100644 --- a/crates/manifest/src/lib.rs +++ b/crates/manifest/src/lib.rs @@ -174,10 +174,27 @@ pub struct CompactionConfig { #[serde(default = "default_prune_min_savings")] pub prune_min_savings: u64, - /// When `input_tokens` exceeds this, run compact. `None` = compact disabled. + /// Proactive (between-turns) compaction threshold. + /// + /// Checked by the Controller after each run. When current occupancy + /// exceeds this value, compact runs before the next turn. `None` + /// disables the between-turns check. #[serde(default)] pub compact_threshold: Option, + /// Safety-net (between-requests) compaction threshold. + /// + /// Checked by `PodInterceptor::pre_llm_request` inside a turn. When + /// current occupancy exceeds this value, the run yields so that the + /// Controller can compact before the next LLM request. `None` + /// disables the between-requests check. + /// + /// Expected relation: `compact_threshold < compact_request_threshold` + /// (proactive triggers before safety net). A reversed configuration + /// is accepted but logged as a warning. + #[serde(default)] + pub compact_request_threshold: Option, + /// Number of recent turns retained after compaction. #[serde(default = "default_compact_retained_turns")] pub compact_retained_turns: usize, @@ -204,6 +221,7 @@ impl Default for CompactionConfig { prune_protected_turns: default_prune_protected_turns(), prune_min_savings: default_prune_min_savings(), compact_threshold: None, + compact_request_threshold: None, compact_retained_turns: default_compact_retained_turns(), provider: None, } @@ -338,9 +356,37 @@ model = "claude-sonnet-4-20250514" assert_eq!(c.prune_protected_turns, 3); assert_eq!(c.prune_min_savings, 4096); assert_eq!(c.compact_threshold, Some(80000)); + assert_eq!(c.compact_request_threshold, None); assert_eq!(c.compact_retained_turns, 2); } + #[test] + fn parse_compaction_both_thresholds() { + let toml = format!( + "{MINIMAL_REQUIRED}\n\ + [compaction]\n\ + compact_threshold = 80000\n\ + compact_request_threshold = 90000\n" + ); + let manifest = PodManifest::from_toml(&toml).unwrap(); + let c = manifest.compaction.unwrap(); + assert_eq!(c.compact_threshold, Some(80000)); + assert_eq!(c.compact_request_threshold, Some(90000)); + } + + #[test] + fn parse_compaction_request_threshold_only() { + let toml = format!( + "{MINIMAL_REQUIRED}\n\ + [compaction]\n\ + compact_request_threshold = 90000\n" + ); + let manifest = PodManifest::from_toml(&toml).unwrap(); + let c = manifest.compaction.unwrap(); + assert_eq!(c.compact_threshold, None); + assert_eq!(c.compact_request_threshold, Some(90000)); + } + #[test] fn parse_compaction_with_provider() { let toml = format!( diff --git a/crates/pod/src/compact_state.rs b/crates/pod/src/compact_state.rs index 30e0da37..a01fa5f4 100644 --- a/crates/pod/src/compact_state.rs +++ b/crates/pod/src/compact_state.rs @@ -1,22 +1,30 @@ //! Shared state for compaction decisions. //! -//! Holds atomic counters shared between: -//! - `on_usage` callback (writes `last_input_tokens`) -//! - `CompactInterceptor` (reads token count, checks thresholds) -//! - `Pod::run()`/`resume()` (circuit breaker, thrash detection) +//! Holds the two configured thresholds and circuit-breaker / thrash-detection +//! flags shared between: +//! - `PodInterceptor` (reads `request_threshold` — the *safety net* for +//! between-requests yielding) +//! - `Pod::try_post_run_compact` (reads `post_run_threshold` — the +//! *proactive* check between turns) +//! - `Pod::run()` / `resume()` (circuit breaker, thrash detection) +//! +//! Current occupancy (input-token count) is **not** stored here. The single +//! source of truth is `session_store::UsageRecord` (persisted per LLM call) +//! projected through `Pod::total_tokens()`. Callers pass the current +//! occupancy to `exceeds_*` at check time. -use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; const MAX_COMPACT_FAILURES: usize = 3; /// Shared mutable state for compaction decisions. pub(crate) struct CompactState { - /// Last observed input_tokens from `on_usage` callback. - last_input_tokens: AtomicU64, - /// Proactive threshold — checked in `pre_llm_request` (between turns). - turn_threshold: u64, - /// Post-run threshold — checked by Controller after run completes. - post_run_threshold: u64, + /// Between-turns threshold (proactive). Checked by the Controller + /// after a run completes. `None` disables the post-run check. + post_run_threshold: Option, + /// Between-requests threshold (safety net). Checked inside a turn + /// before each LLM request. `None` disables the request check. + request_threshold: Option, /// Number of recent turns to retain after compaction. retained_turns: usize, /// Consecutive compact failures. At `MAX_COMPACT_FAILURES`, compaction is disabled. @@ -28,15 +36,14 @@ pub(crate) struct CompactState { } impl CompactState { - /// Create a new CompactState. - /// - /// `turn_threshold` is the proactive (80%) threshold from the manifest. - /// `post_run_threshold` is derived as `turn_threshold * 9 / 8` (≈90%). - pub(crate) fn new(turn_threshold: u64, retained_turns: usize) -> Self { + pub(crate) fn new( + post_run_threshold: Option, + request_threshold: Option, + retained_turns: usize, + ) -> Self { Self { - last_input_tokens: AtomicU64::new(0), - turn_threshold, - post_run_threshold: turn_threshold * 9 / 8, + post_run_threshold, + request_threshold, retained_turns, consecutive_failures: AtomicUsize::new(0), just_compacted: AtomicBool::new(false), @@ -44,19 +51,9 @@ impl CompactState { } } - /// Update the last observed input_tokens (called from `on_usage`). - pub(crate) fn update_input_tokens(&self, tokens: u64) { - self.last_input_tokens.store(tokens, Ordering::Relaxed); - } - - /// Read the last observed input_tokens. - pub(crate) fn last_input_tokens(&self) -> u64 { - self.last_input_tokens.load(Ordering::Relaxed) - } - - /// The between-turns threshold value. - pub(crate) fn turn_threshold(&self) -> u64 { - self.turn_threshold + /// Configured between-requests threshold (if any). + pub(crate) fn request_threshold(&self) -> Option { + self.request_threshold } /// Number of turns to retain after compaction. @@ -69,14 +66,20 @@ impl CompactState { self.disabled.load(Ordering::Relaxed) } - /// Whether `last_input_tokens` exceeds the between-turns threshold. - pub(crate) fn exceeds_turn(&self) -> bool { - self.last_input_tokens() > self.turn_threshold + /// Whether `current_tokens` exceeds the between-requests threshold. + /// Returns `false` when `request_threshold` is unset. + pub(crate) fn exceeds_request(&self, current_tokens: u64) -> bool { + self.request_threshold + .map(|t| current_tokens > t) + .unwrap_or(false) } - /// Whether `last_input_tokens` exceeds the post-run threshold. - pub(crate) fn exceeds_post_run(&self) -> bool { - self.last_input_tokens() > self.post_run_threshold + /// Whether `current_tokens` exceeds the post-run threshold. + /// Returns `false` when `post_run_threshold` is unset. + pub(crate) fn exceeds_post_run(&self, current_tokens: u64) -> bool { + self.post_run_threshold + .map(|t| current_tokens > t) + .unwrap_or(false) } /// Whether a compact just completed (for thrash detection). @@ -109,31 +112,46 @@ mod tests { use super::*; #[test] - fn threshold_derivation() { - let state = CompactState::new(80_000, 2); - assert_eq!(state.turn_threshold, 80_000); - assert_eq!(state.post_run_threshold, 90_000); + fn both_thresholds_configured() { + let state = CompactState::new(Some(80_000), Some(90_000), 2); + assert_eq!(state.request_threshold(), Some(90_000)); assert_eq!(state.retained_turns(), 2); + + assert!(!state.exceeds_request(70_000)); + assert!(!state.exceeds_post_run(70_000)); + + assert!(!state.exceeds_request(85_000)); + assert!(state.exceeds_post_run(85_000)); + + assert!(state.exceeds_request(95_000)); + assert!(state.exceeds_post_run(95_000)); } #[test] - fn exceeds_checks() { - let state = CompactState::new(80_000, 2); - assert!(!state.exceeds_turn()); - assert!(!state.exceeds_post_run()); + fn post_run_only() { + let state = CompactState::new(Some(80_000), None, 2); + // request check always false when threshold is None. + assert!(!state.exceeds_request(1_000_000)); + assert!(state.exceeds_post_run(85_000)); + } - state.update_input_tokens(85_000); - assert!(state.exceeds_turn()); - assert!(!state.exceeds_post_run()); + #[test] + fn request_only() { + let state = CompactState::new(None, Some(90_000), 2); + assert!(!state.exceeds_post_run(1_000_000)); + assert!(state.exceeds_request(95_000)); + } - state.update_input_tokens(95_000); - assert!(state.exceeds_turn()); - assert!(state.exceeds_post_run()); + #[test] + fn both_none_disables_all_checks() { + let state = CompactState::new(None, None, 2); + assert!(!state.exceeds_request(1_000_000)); + assert!(!state.exceeds_post_run(1_000_000)); } #[test] fn circuit_breaker_trips_after_max_failures() { - let state = CompactState::new(80_000, 2); + let state = CompactState::new(Some(80_000), Some(90_000), 2); assert!(!state.is_disabled()); state.record_compact_failure(); @@ -146,7 +164,7 @@ mod tests { #[test] fn success_resets_failure_count() { - let state = CompactState::new(80_000, 2); + let state = CompactState::new(Some(80_000), Some(90_000), 2); state.record_compact_failure(); state.record_compact_failure(); assert!(!state.is_disabled()); @@ -154,7 +172,6 @@ mod tests { state.record_compact_success(); assert!(state.just_compacted()); - // After success + 2 more failures, still not disabled (count was reset). state.record_compact_failure(); state.record_compact_failure(); assert!(!state.is_disabled()); @@ -162,7 +179,7 @@ mod tests { #[test] fn just_compacted_lifecycle() { - let state = CompactState::new(80_000, 2); + let state = CompactState::new(Some(80_000), Some(90_000), 2); assert!(!state.just_compacted()); state.record_compact_success(); diff --git a/crates/pod/src/pod.rs b/crates/pod/src/pod.rs index 5967ff22..4fbbb012 100644 --- a/crates/pod/src/pod.rs +++ b/crates/pod/src/pod.rs @@ -391,9 +391,10 @@ impl Pod { /// Install the hook-based interceptor on the Worker if not already done. /// - /// When `compact_threshold` is configured in the manifest, wraps the - /// `HookInterceptor` in a [`CompactInterceptor`] and registers an - /// `on_usage` callback to track `input_tokens`. + /// When either compaction threshold (`compact_threshold` or + /// `compact_request_threshold`) is configured in the manifest, allocates + /// a shared [`CompactState`] and wires the interceptor to read current + /// occupancy through the `UsageRecord` timeline. fn ensure_interceptor_installed(&mut self) { if !self.interceptor_installed { // Pre-LLM-request hook: record the item count at send time @@ -406,41 +407,56 @@ impl Pod { let builder = std::mem::take(&mut self.hook_builder); let registry = Arc::new(builder.build()); - let compact_threshold = self + let (post_run_threshold, request_threshold, retained) = self .manifest .compaction .as_ref() - .and_then(|c| c.compact_threshold); + .map(|c| { + ( + c.compact_threshold, + c.compact_request_threshold, + c.compact_retained_turns, + ) + }) + .unwrap_or((None, None, 2)); let tracker_for_usage = self.usage_tracker.clone(); + self.worker_mut().on_usage(move |event| { + tracker_for_usage.record_usage(event); + }); - let compact_state = if let Some(threshold) = compact_threshold { - let retained = self - .manifest - .compaction - .as_ref() - .map(|c| c.compact_retained_turns) - .unwrap_or(2); - - let state = Arc::new(CompactState::new(threshold, retained)); - let state_for_usage = state.clone(); - self.worker_mut().on_usage(move |event| { - if let Some(tokens) = event.input_tokens { - state_for_usage.update_input_tokens(tokens); + let compact_state = if post_run_threshold.is_some() || request_threshold.is_some() { + if let (Some(post), Some(req)) = (post_run_threshold, request_threshold) { + if post > req { + warn!( + post_run_threshold = post, + request_threshold = req, + "compact_threshold > compact_request_threshold; \ + proactive check will never fire before the safety net" + ); } - tracker_for_usage.record_usage(event); - }); + } + let state = Arc::new(CompactState::new( + post_run_threshold, + request_threshold, + retained, + )); self.compact_state = Some(state.clone()); Some(state) } else { - self.worker_mut().on_usage(move |event| { - tracker_for_usage.record_usage(event); - }); None }; - let interceptor = - PodInterceptor::new(registry, compact_state, self.pending_notifications.clone()); + let usage_history_handle = compact_state + .as_ref() + .map(|_| self.usage_history.clone()); + + let interceptor = PodInterceptor::new( + registry, + compact_state, + usage_history_handle, + self.pending_notifications.clone(), + ); self.worker_mut().set_interceptor(interceptor); self.interceptor_installed = true; } @@ -667,9 +683,13 @@ impl Pod { /// Best-effort: failures are logged but do not propagate. pub async fn try_post_run_compact(&mut self) -> Result<(), PodError> { let state = match self.compact_state.as_ref() { - Some(s) if !s.is_disabled() && s.exceeds_post_run() && !s.just_compacted() => s.clone(), + Some(s) if !s.is_disabled() && !s.just_compacted() => s.clone(), _ => return Ok(()), }; + let current_tokens = self.total_tokens().tokens; + if !state.exceeds_post_run(current_tokens) { + return Ok(()); + } let retained = state.retained_turns(); match self.compact(retained).await { diff --git a/crates/pod/src/pod_interceptor.rs b/crates/pod/src/pod_interceptor.rs index b00b85a9..be803086 100644 --- a/crates/pod/src/pod_interceptor.rs +++ b/crates/pod/src/pod_interceptor.rs @@ -7,8 +7,8 @@ //! read-only summary information and only return control-flow //! decisions (continue / skip / abort / pause). -use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex}; use async_trait::async_trait; use llm_worker::Item; @@ -17,6 +17,7 @@ use llm_worker::interceptor::{ ToolResultInfo, TurnEndAction, }; use llm_worker::tool::ToolOutput; +use session_store::UsageRecord; use tracing::info; use crate::compact_state::CompactState; @@ -25,6 +26,7 @@ use crate::hook::{ TurnEndInfo, }; use crate::notification_buffer::{NotificationBuffer, format_notification}; +use crate::token_counter::total_tokens_impl; /// Maximum number of bytes copied into `TurnEndInfo::final_text_preview`. const FINAL_TEXT_PREVIEW_LIMIT: usize = 512; @@ -32,6 +34,10 @@ const FINAL_TEXT_PREVIEW_LIMIT: usize = 512; pub(crate) struct PodInterceptor { registry: Arc, compact_state: Option>, + /// Shared view of the cumulative UsageRecord timeline. Used with the + /// per-request `context` to estimate current occupancy for threshold + /// checks. `None` when compaction is disabled (both thresholds unset). + usage_history: Option>>>, /// Pending-notification buffer drained into the per-request /// context at the head of `pre_llm_request`. pending_notifications: NotificationBuffer, @@ -45,11 +51,13 @@ impl PodInterceptor { pub(crate) fn new( registry: Arc, compact_state: Option>, + usage_history: Option>>>, pending_notifications: NotificationBuffer, ) -> Self { Self { registry, compact_state, + usage_history, pending_notifications, next_turn_index: AtomicUsize::new(0), tool_calls_this_turn: AtomicUsize::new(0), @@ -61,6 +69,15 @@ impl PodInterceptor { .load(Ordering::Relaxed) .saturating_sub(1) } + + /// Estimate current input-token occupancy for `context`, projected + /// through the shared UsageRecord timeline. Returns `None` when + /// `usage_history` is not attached (compaction fully disabled). + fn estimated_tokens(&self, context: &[Item]) -> Option { + let handle = self.usage_history.as_ref()?; + let records = handle.lock().expect("usage_history poisoned").clone(); + Some(total_tokens_impl(context, &records).tokens) + } } #[async_trait] @@ -83,15 +100,20 @@ impl Interceptor for PodInterceptor { } async fn pre_llm_request(&self, context: &mut Vec) -> PreRequestAction { - // Internal mechanism: between-turns compaction trigger. + let current_tokens = self.estimated_tokens(context); + + // Internal mechanism: between-requests compaction trigger (safety net). if let Some(state) = self.compact_state.as_ref() { - if !state.is_disabled() && state.exceeds_turn() { - info!( - input_tokens = state.last_input_tokens(), - threshold = state.turn_threshold(), - "Between-turns compaction threshold exceeded, yielding" - ); - return PreRequestAction::Yield; + if !state.is_disabled() { + let current = current_tokens.unwrap_or(0); + if state.exceeds_request(current) { + info!( + input_tokens = current, + threshold = state.request_threshold().unwrap_or(0), + "Between-requests compaction threshold exceeded, yielding" + ); + return PreRequestAction::Yield; + } } } @@ -105,7 +127,7 @@ impl Interceptor for PodInterceptor { let info = PreRequestInfo { item_count: context.len(), - estimated_tokens: self.compact_state.as_ref().map(|s| s.last_input_tokens()), + estimated_tokens: current_tokens, turn_index: self.current_turn_index(), tool_calls_this_turn: self.tool_calls_this_turn.load(Ordering::Relaxed), }; @@ -232,16 +254,35 @@ mod tests { Arc::new(builder.build()) } + /// Build a usage_history handle with a single record pinned at the + /// current `context_len` so that `total_tokens_impl` returns exactly + /// `tokens` (Measured, no interpolation or byte-based fallback). + fn usage_handle_with(context_len: usize, tokens: u64) -> Arc>> { + Arc::new(Mutex::new(vec![UsageRecord { + history_len: context_len, + input_total_tokens: tokens, + cache_read_tokens: 0, + cache_write_tokens: 0, + output_tokens: 0, + }])) + } + #[tokio::test] - async fn pre_llm_request_yields_and_skips_hooks_when_compact_threshold_exceeded() { + async fn pre_llm_request_yields_and_skips_hooks_when_request_threshold_exceeded() { let count = Arc::new(AtomicUsize::new(0)); let registry = registry_with_pre_llm_hook(count.clone()); - let state = Arc::new(CompactState::new(100, 2)); - state.update_input_tokens(200); // exceeds turn threshold + let state = Arc::new(CompactState::new(None, Some(100), 2)); + let ctx_items = vec![Item::user_message("hi")]; + let history = usage_handle_with(ctx_items.len(), 200); - let interceptor = PodInterceptor::new(registry, Some(state), NotificationBuffer::new()); - let mut ctx: Vec = vec![Item::user_message("hi")]; + let interceptor = PodInterceptor::new( + registry, + Some(state), + Some(history), + NotificationBuffer::new(), + ); + let mut ctx = ctx_items; let action = interceptor.pre_llm_request(&mut ctx).await; assert!(matches!(action, PreRequestAction::Yield)); @@ -254,11 +295,41 @@ mod tests { let count = Arc::new(AtomicUsize::new(0)); let registry = registry_with_pre_llm_hook(count.clone()); - let state = Arc::new(CompactState::new(100, 2)); - // last_input_tokens stays at 0, well below threshold. + let state = Arc::new(CompactState::new(None, Some(100), 2)); + let ctx_items = vec![Item::user_message("hi")]; + let history = usage_handle_with(ctx_items.len(), 50); - let interceptor = PodInterceptor::new(registry, Some(state), NotificationBuffer::new()); - let mut ctx: Vec = vec![Item::user_message("hi")]; + let interceptor = PodInterceptor::new( + registry, + Some(state), + Some(history), + NotificationBuffer::new(), + ); + let mut ctx = ctx_items; + let action = interceptor.pre_llm_request(&mut ctx).await; + + assert!(matches!(action, PreRequestAction::Continue)); + assert_eq!(count.load(Ordering::Relaxed), 1); + } + + #[tokio::test] + async fn pre_llm_request_does_not_yield_when_only_post_run_threshold_set() { + // request_threshold = None → safety-net check is inert inside the turn + // even if current occupancy is huge. Post-run check runs elsewhere. + let count = Arc::new(AtomicUsize::new(0)); + let registry = registry_with_pre_llm_hook(count.clone()); + + let state = Arc::new(CompactState::new(Some(100), None, 2)); + let ctx_items = vec![Item::user_message("hi")]; + let history = usage_handle_with(ctx_items.len(), 10_000); + + let interceptor = PodInterceptor::new( + registry, + Some(state), + Some(history), + NotificationBuffer::new(), + ); + let mut ctx = ctx_items; let action = interceptor.pre_llm_request(&mut ctx).await; assert!(matches!(action, PreRequestAction::Continue)); @@ -270,7 +341,7 @@ mod tests { let count = Arc::new(AtomicUsize::new(0)); let registry = registry_with_pre_llm_hook(count.clone()); - let interceptor = PodInterceptor::new(registry, None, NotificationBuffer::new()); + let interceptor = PodInterceptor::new(registry, None, None, NotificationBuffer::new()); let mut ctx: Vec = Vec::new(); let action = interceptor.pre_llm_request(&mut ctx).await; @@ -295,7 +366,7 @@ mod tests { buffer.push("first".into()); buffer.push("second".into()); - let interceptor = PodInterceptor::new(registry, None, buffer.clone()); + let interceptor = PodInterceptor::new(registry, None, None, buffer.clone()); let mut ctx: Vec = vec![Item::user_message("hi")]; let action = interceptor.pre_llm_request(&mut ctx).await; @@ -320,15 +391,18 @@ mod tests { let buffer = NotificationBuffer::new(); buffer.push("msg".into()); - let state = Arc::new(CompactState::new(100, 2)); - state.update_input_tokens(200); + let state = Arc::new(CompactState::new(None, Some(100), 2)); + let ctx_items = vec![Item::user_message("hi")]; + let history = usage_handle_with(ctx_items.len(), 200); - let interceptor = PodInterceptor::new(registry, Some(state), buffer.clone()); - let mut ctx: Vec = Vec::new(); + let interceptor = + PodInterceptor::new(registry, Some(state), Some(history), buffer.clone()); + let mut ctx = ctx_items; let action = interceptor.pre_llm_request(&mut ctx).await; assert!(matches!(action, PreRequestAction::Yield)); - assert!(ctx.is_empty()); + // Notifications were not drained (still held for post-compact resume). + assert_eq!(ctx.len(), 1); assert_eq!(buffer.len(), 1); } @@ -341,7 +415,7 @@ mod tests { builder.add_pre_llm_request(CountingHook(second_count.clone())); let registry = Arc::new(builder.build()); - let interceptor = PodInterceptor::new(registry, None, NotificationBuffer::new()); + let interceptor = PodInterceptor::new(registry, None, None, NotificationBuffer::new()); let mut ctx: Vec = Vec::new(); let action = interceptor.pre_llm_request(&mut ctx).await; diff --git a/crates/pod/src/token_counter.rs b/crates/pod/src/token_counter.rs index 3ca4dc76..f698a867 100644 --- a/crates/pod/src/token_counter.rs +++ b/crates/pod/src/token_counter.rs @@ -169,7 +169,7 @@ fn tokens_at( } } -fn total_tokens_impl(history: &[Item], records: &[UsageRecord]) -> TokenEstimate { +pub(crate) fn total_tokens_impl(history: &[Item], records: &[UsageRecord]) -> TokenEstimate { let prefix = prefix_bytes(history); tokens_at(history, records, history.len(), &prefix) } From 83f68e35ade76c220d05b089f9941a28c220cd0e Mon Sep 17 00:00:00 2001 From: Hare Date: Sun, 19 Apr 2026 08:51:04 +0900 Subject: [PATCH 2/6] =?UTF-8?q?compact:=20=E8=A6=81=E7=B4=84=E5=85=A5?= =?UTF-8?q?=E5=8A=9B=E3=81=8B=E3=82=89=20content/arguments/reasoning=20?= =?UTF-8?q?=E3=82=92=E9=99=A4=E3=81=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ToolCall.arguments, ToolResult.content, Reasoning は auto-read 側の責務。 要約は意思決定と意図のキャプチャに集中させ、コードや tool IO は持ち込まない。 --- crates/pod/src/pod.rs | 77 ++++++++++++++++++++++++++++++++++++------- 1 file changed, 65 insertions(+), 12 deletions(-) diff --git a/crates/pod/src/pod.rs b/crates/pod/src/pod.rs index 4fbbb012..13006a88 100644 --- a/crates/pod/src/pod.rs +++ b/crates/pod/src/pod.rs @@ -1097,6 +1097,13 @@ impl From for PodRunResult { } /// Format conversation items into a text prompt for the summary Worker. +/// +/// The summary should capture decisions and user intent, not recreate code. +/// File contents and tool IO belong in auto-read / references, not in the +/// summary input. So this strips: +/// - `ToolCall.arguments` (keep only the tool name) +/// - `ToolResult.content` (keep only the summary line) +/// - `Reasoning` entirely (intermediate thought, superseded by decisions) fn build_summary_prompt(items: &[Item]) -> String { let mut lines = Vec::new(); for item in items { @@ -1114,20 +1121,13 @@ fn build_summary_prompt(items: &[Item]) -> String { .join(""); lines.push(format!("[{role_label}] {text}")); } - Item::ToolCall { - name, arguments, .. - } => { - lines.push(format!("[ToolCall] {name}({arguments})")); + Item::ToolCall { name, .. } => { + lines.push(format!("[ToolCall] {name}")); } - Item::ToolResult { - summary, content, .. - } => match content { - Some(c) => lines.push(format!("[ToolResult] {summary}\n{c}")), - None => lines.push(format!("[ToolResult] {summary}")), - }, - Item::Reasoning { text, .. } => { - lines.push(format!("[Reasoning] {text}")); + Item::ToolResult { summary, .. } => { + lines.push(format!("[ToolResult] {summary}")); } + Item::Reasoning { .. } => {} } } lines.join("\n\n") @@ -1197,3 +1197,56 @@ fn current_pwd() -> Result { source, }) } + +#[cfg(test)] +mod build_summary_prompt_tests { + use super::*; + + #[test] + fn strips_tool_call_arguments() { + let items = vec![Item::tool_call_json( + "call-1", + "read_file", + serde_json::json!({ "path": "src/main.rs" }), + )]; + let prompt = build_summary_prompt(&items); + assert_eq!(prompt, "[ToolCall] read_file"); + assert!(!prompt.contains("src/main.rs")); + } + + #[test] + fn strips_tool_result_content() { + let items = vec![Item::tool_result_with_content( + "call-1", + "read 3 lines", + "fn main() { println!(\"hello\"); }", + )]; + let prompt = build_summary_prompt(&items); + assert_eq!(prompt, "[ToolResult] read 3 lines"); + assert!(!prompt.contains("println")); + } + + #[test] + fn drops_reasoning_entirely() { + let items = vec![ + Item::user_message("hi"), + Item::reasoning("internal deliberation"), + Item::assistant_message("hello"), + ]; + let prompt = build_summary_prompt(&items); + assert!(prompt.contains("[User] hi")); + assert!(prompt.contains("[Assistant] hello")); + assert!(!prompt.contains("Reasoning")); + assert!(!prompt.contains("deliberation")); + } + + #[test] + fn keeps_user_and_assistant_messages() { + let items = vec![ + Item::user_message("fix the bug"), + Item::assistant_message("done"), + ]; + let prompt = build_summary_prompt(&items); + assert_eq!(prompt, "[User] fix the bug\n\n[Assistant] done"); + } +} From db2dd8a3c0f37737ddb11ea58e24174e38899322 Mon Sep 17 00:00:00 2001 From: Hare Date: Sun, 19 Apr 2026 08:56:16 +0900 Subject: [PATCH 3/6] =?UTF-8?q?compact:=20retained=5Fturns=20=E3=82=92=20r?= =?UTF-8?q?etained=5Ftokens=20=E3=81=AB=E7=BD=AE=E6=8F=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 保護単位をターン数からトークン量に変更。compact 時のカット位置は Pod::split_for_retained() で UsageRecord を逆算ソースとして決定し、 ターン境界ではなくアイテム単位で切る。デフォルトは 8000 トークン。 --- crates/manifest/src/config.rs | 14 ++++---- crates/manifest/src/defaults.rs | 8 +++-- crates/manifest/src/lib.rs | 16 +++++---- crates/pod/src/compact_state.rs | 16 ++++----- crates/pod/src/pod.rs | 33 +++++++------------ .../pod/tests/system_prompt_template_test.rs | 4 +-- 6 files changed, 43 insertions(+), 48 deletions(-) diff --git a/crates/manifest/src/config.rs b/crates/manifest/src/config.rs index 0d736e9f..ca447e19 100644 --- a/crates/manifest/src/config.rs +++ b/crates/manifest/src/config.rs @@ -86,7 +86,7 @@ pub struct CompactionConfigPartial { #[serde(default)] pub compact_request_threshold: Option, #[serde(default)] - pub compact_retained_turns: Option, + pub compact_retained_tokens: Option, #[serde(default)] pub provider: Option, } @@ -241,9 +241,9 @@ impl CompactionConfigPartial { compact_request_threshold: upper .compact_request_threshold .or(self.compact_request_threshold), - compact_retained_turns: upper - .compact_retained_turns - .or(self.compact_retained_turns), + compact_retained_tokens: upper + .compact_retained_tokens + .or(self.compact_retained_tokens), provider: merge_option(self.provider, upper.provider, ProviderConfigPartial::merge), } } @@ -371,9 +371,9 @@ impl TryFrom for PodManifest { .unwrap_or(defaults::PRUNE_MIN_SAVINGS), compact_threshold: c.compact_threshold, compact_request_threshold: c.compact_request_threshold, - compact_retained_turns: c - .compact_retained_turns - .unwrap_or(defaults::COMPACT_RETAINED_TURNS), + compact_retained_tokens: c + .compact_retained_tokens + .unwrap_or(defaults::COMPACT_RETAINED_TOKENS), provider: comp_provider, }) }) diff --git a/crates/manifest/src/defaults.rs b/crates/manifest/src/defaults.rs index 273b693d..cca50e1f 100644 --- a/crates/manifest/src/defaults.rs +++ b/crates/manifest/src/defaults.rs @@ -18,9 +18,11 @@ pub const PRUNE_PROTECTED_TURNS: usize = 3; /// [`crate::CompactionConfig::prune_min_savings`]. pub const PRUNE_MIN_SAVINGS: u64 = 4096; -/// Number of most-recent turns retained after a compact. See -/// [`crate::CompactionConfig::compact_retained_turns`]. -pub const COMPACT_RETAINED_TURNS: usize = 2; +/// Token budget retained (unchanged) at the tail of the history across +/// a compact. Items whose cumulative token count fits within this budget +/// starting from the end are kept verbatim; the rest are summarised. +/// See [`crate::CompactionConfig::compact_retained_tokens`]. +pub const COMPACT_RETAINED_TOKENS: u64 = 8000; /// Default instruction asset reference used when `worker.instruction` /// is omitted. See the `PromptLoader` prefix addressing scheme for the diff --git a/crates/manifest/src/lib.rs b/crates/manifest/src/lib.rs index 4c07f043..d387b619 100644 --- a/crates/manifest/src/lib.rs +++ b/crates/manifest/src/lib.rs @@ -195,9 +195,11 @@ pub struct CompactionConfig { #[serde(default)] pub compact_request_threshold: Option, - /// Number of recent turns retained after compaction. - #[serde(default = "default_compact_retained_turns")] - pub compact_retained_turns: usize, + /// Token budget retained verbatim at the tail of the history after + /// compaction. Measured against the occupancy estimate from + /// `UsageRecord` history; turn boundaries are ignored. + #[serde(default = "default_compact_retained_tokens")] + pub compact_retained_tokens: u64, /// Optional provider for the compactor (summary) LLM. /// If omitted, the main provider is cloned via `clone_boxed()`. @@ -211,8 +213,8 @@ fn default_prune_protected_turns() -> usize { fn default_prune_min_savings() -> u64 { defaults::PRUNE_MIN_SAVINGS } -fn default_compact_retained_turns() -> usize { - defaults::COMPACT_RETAINED_TURNS +fn default_compact_retained_tokens() -> u64 { + defaults::COMPACT_RETAINED_TOKENS } impl Default for CompactionConfig { @@ -222,7 +224,7 @@ impl Default for CompactionConfig { prune_min_savings: default_prune_min_savings(), compact_threshold: None, compact_request_threshold: None, - compact_retained_turns: default_compact_retained_turns(), + compact_retained_tokens: default_compact_retained_tokens(), provider: None, } } @@ -357,7 +359,7 @@ model = "claude-sonnet-4-20250514" assert_eq!(c.prune_min_savings, 4096); assert_eq!(c.compact_threshold, Some(80000)); assert_eq!(c.compact_request_threshold, None); - assert_eq!(c.compact_retained_turns, 2); + assert_eq!(c.compact_retained_tokens, 8000); } #[test] diff --git a/crates/pod/src/compact_state.rs b/crates/pod/src/compact_state.rs index a01fa5f4..4f6ff106 100644 --- a/crates/pod/src/compact_state.rs +++ b/crates/pod/src/compact_state.rs @@ -25,8 +25,8 @@ pub(crate) struct CompactState { /// Between-requests threshold (safety net). Checked inside a turn /// before each LLM request. `None` disables the request check. request_threshold: Option, - /// Number of recent turns to retain after compaction. - retained_turns: usize, + /// Token budget retained verbatim at the tail after compaction. + retained_tokens: u64, /// Consecutive compact failures. At `MAX_COMPACT_FAILURES`, compaction is disabled. consecutive_failures: AtomicUsize, /// `true` immediately after a successful compact, cleared on next normal completion. @@ -39,12 +39,12 @@ impl CompactState { pub(crate) fn new( post_run_threshold: Option, request_threshold: Option, - retained_turns: usize, + retained_tokens: u64, ) -> Self { Self { post_run_threshold, request_threshold, - retained_turns, + retained_tokens, consecutive_failures: AtomicUsize::new(0), just_compacted: AtomicBool::new(false), disabled: AtomicBool::new(false), @@ -56,9 +56,9 @@ impl CompactState { self.request_threshold } - /// Number of turns to retain after compaction. - pub(crate) fn retained_turns(&self) -> usize { - self.retained_turns + /// Token budget retained verbatim at the tail after compaction. + pub(crate) fn retained_tokens(&self) -> u64 { + self.retained_tokens } /// Whether compaction has been disabled by the circuit breaker. @@ -115,7 +115,7 @@ mod tests { fn both_thresholds_configured() { let state = CompactState::new(Some(80_000), Some(90_000), 2); assert_eq!(state.request_threshold(), Some(90_000)); - assert_eq!(state.retained_turns(), 2); + assert_eq!(state.retained_tokens(), 2); assert!(!state.exceeds_request(70_000)); assert!(!state.exceeds_post_run(70_000)); diff --git a/crates/pod/src/pod.rs b/crates/pod/src/pod.rs index 13006a88..247ece40 100644 --- a/crates/pod/src/pod.rs +++ b/crates/pod/src/pod.rs @@ -415,10 +415,10 @@ impl Pod { ( c.compact_threshold, c.compact_request_threshold, - c.compact_retained_turns, + c.compact_retained_tokens, ) }) - .unwrap_or((None, None, 2)); + .unwrap_or((None, None, manifest::defaults::COMPACT_RETAINED_TOKENS)); let tracker_for_usage = self.usage_tracker.clone(); self.worker_mut().on_usage(move |event| { @@ -648,8 +648,8 @@ impl Pod { let retained = self .compact_state .as_ref() - .map(|s| s.retained_turns()) - .unwrap_or(2); + .map(|s| s.retained_tokens()) + .unwrap_or(manifest::defaults::COMPACT_RETAINED_TOKENS); match self.compact(retained).await { Ok(new_session_id) => { @@ -691,7 +691,7 @@ impl Pod { return Ok(()); } - let retained = state.retained_turns(); + let retained = state.retained_tokens(); match self.compact(retained).await { Ok(new_session_id) => { info!( @@ -791,24 +791,15 @@ impl Pod { /// - a clone of the main LlmClient via `clone_boxed()`. /// /// Returns the new session ID. - pub async fn compact(&mut self, retained_turns: usize) -> Result { + pub async fn compact(&mut self, retained_tokens: u64) -> Result { + // Decide the cut point by projecting the UsageRecord timeline onto + // the current history: keep the tail whose estimated token count is + // within `retained_tokens`. Item-granular, turn boundaries ignored. + let cut = self.split_for_retained(retained_tokens); + let worker = self.worker.as_ref().expect("worker taken during run"); let history = worker.history(); - - // Identify turn boundaries (user message positions). - let turn_starts: Vec = history - .iter() - .enumerate() - .filter(|(_, item)| item.is_user_message()) - .map(|(i, _)| i) - .collect(); - - // Items to retain: everything from `retained_turns` turns ago onward. - let retain_from = if turn_starts.len() > retained_turns { - turn_starts[turn_starts.len() - retained_turns] - } else { - 0 - }; + let retain_from = cut.index.min(history.len()); let retained_items = history[retain_from..].to_vec(); let items_to_summarise = &history[..retain_from]; diff --git a/crates/pod/tests/system_prompt_template_test.rs b/crates/pod/tests/system_prompt_template_test.rs index ab273e90..1fe3d1a1 100644 --- a/crates/pod/tests/system_prompt_template_test.rs +++ b/crates/pod/tests/system_prompt_template_test.rs @@ -250,7 +250,7 @@ async fn agents_md_not_reread_after_compact() { // Mutate the file after the first turn — must not affect the cached // system prompt either on a subsequent turn or across compaction. std::fs::write(&agents_path, "mutated").unwrap(); - pod.compact(1).await.unwrap(); + pod.compact(0).await.unwrap(); let after_compact = pod.worker().get_system_prompt().unwrap().to_string(); assert!(after_compact.contains("original")); assert!(!after_compact.contains("mutated")); @@ -277,7 +277,7 @@ async fn compact_preserves_system_prompt() { let before = pod.worker().get_system_prompt().unwrap().to_string(); pod.run("second").await.unwrap(); - pod.compact(1).await.unwrap(); + pod.compact(0).await.unwrap(); let after = pod.worker().get_system_prompt().unwrap().to_string(); assert_eq!(before, after); From da021103e442d82ccb396a8020930d45018bc073 Mon Sep 17 00:00:00 2001 From: Hare Date: Sun, 19 Apr 2026 09:26:55 +0900 Subject: [PATCH 4/6] =?UTF-8?q?compact:=20compact=20worker=20=E3=82=92?= =?UTF-8?q?=E3=83=84=E3=83=BC=E3=83=AB=E9=A7=86=E5=8B=95=E3=83=9E=E3=83=AB?= =?UTF-8?q?=E3=83=81=E3=82=BF=E3=83=BC=E3=83=B3=E3=81=AB=E5=86=8D=E8=A8=AD?= =?UTF-8?q?=E8=A8=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 段階 4〜9 を一括で実装: - mark_read_required / add_reference / write_summary + read_file の 4 ツールで compact worker を駆動。結果は CompactWorkerContext に集約 - 新セッションの先頭を [summary, ...auto-read, references, ...retained] で構築 - デフォルトリファレンスは tracker.recent_files(5) から - auto-read は compact_auto_read_budget で総量制限。超過は即エラー - compact worker 自身は compact_worker_max_input_tokens で累計入力を制限 - 5 セクション要約フォーマットに system prompt を更新 - write_summary 未呼び出し / auto-read 空のときは 1 回追加プロンプトで促す --- crates/manifest/src/config.rs | 16 + crates/manifest/src/defaults.rs | 17 + crates/manifest/src/lib.rs | 18 + crates/pod/src/compact_worker.rs | 384 ++++++++++++++++++ crates/pod/src/lib.rs | 1 + crates/pod/src/pod.rs | 262 ++++++++++-- .../pod/tests/system_prompt_template_test.rs | 31 +- 7 files changed, 688 insertions(+), 41 deletions(-) create mode 100644 crates/pod/src/compact_worker.rs diff --git a/crates/manifest/src/config.rs b/crates/manifest/src/config.rs index ca447e19..9b3c9e57 100644 --- a/crates/manifest/src/config.rs +++ b/crates/manifest/src/config.rs @@ -88,6 +88,10 @@ pub struct CompactionConfigPartial { #[serde(default)] pub compact_retained_tokens: Option, #[serde(default)] + pub compact_auto_read_budget: Option, + #[serde(default)] + pub compact_worker_max_input_tokens: Option, + #[serde(default)] pub provider: Option, } @@ -244,6 +248,12 @@ impl CompactionConfigPartial { compact_retained_tokens: upper .compact_retained_tokens .or(self.compact_retained_tokens), + compact_auto_read_budget: upper + .compact_auto_read_budget + .or(self.compact_auto_read_budget), + compact_worker_max_input_tokens: upper + .compact_worker_max_input_tokens + .or(self.compact_worker_max_input_tokens), provider: merge_option(self.provider, upper.provider, ProviderConfigPartial::merge), } } @@ -374,6 +384,12 @@ impl TryFrom for PodManifest { compact_retained_tokens: c .compact_retained_tokens .unwrap_or(defaults::COMPACT_RETAINED_TOKENS), + compact_auto_read_budget: c + .compact_auto_read_budget + .unwrap_or(defaults::COMPACT_AUTO_READ_BUDGET), + compact_worker_max_input_tokens: c + .compact_worker_max_input_tokens + .unwrap_or(defaults::COMPACT_WORKER_MAX_INPUT_TOKENS), provider: comp_provider, }) }) diff --git a/crates/manifest/src/defaults.rs b/crates/manifest/src/defaults.rs index cca50e1f..5317c269 100644 --- a/crates/manifest/src/defaults.rs +++ b/crates/manifest/src/defaults.rs @@ -28,3 +28,20 @@ pub const COMPACT_RETAINED_TOKENS: u64 = 8000; /// is omitted. See the `PromptLoader` prefix addressing scheme for the /// `$insomnia/` / `$user/` / `$workspace/` namespaces. pub const DEFAULT_INSTRUCTION: &str = "$insomnia/default"; + +/// Token budget for auto-read file contents injected into the new +/// session after compaction. Limits how much raw file text the +/// compact worker can pull into the compacted context via +/// `mark_read_required`. See +/// [`crate::CompactionConfig::compact_auto_read_budget`]. +pub const COMPACT_AUTO_READ_BUDGET: u64 = 8000; + +/// Cumulative input-token cap for the compact worker's own LLM +/// calls. Exceeding this aborts the compact run (circuit-breaker +/// path). See +/// [`crate::CompactionConfig::compact_worker_max_input_tokens`]. +pub const COMPACT_WORKER_MAX_INPUT_TOKENS: u64 = 50_000; + +/// Number of recently-touched files fed to the compact worker as +/// default references. +pub const COMPACT_DEFAULT_REFERENCE_COUNT: usize = 5; diff --git a/crates/manifest/src/lib.rs b/crates/manifest/src/lib.rs index d387b619..e50c9893 100644 --- a/crates/manifest/src/lib.rs +++ b/crates/manifest/src/lib.rs @@ -201,6 +201,16 @@ pub struct CompactionConfig { #[serde(default = "default_compact_retained_tokens")] pub compact_retained_tokens: u64, + /// Aggregate token budget for auto-read file contents injected into + /// the compacted session by the compact worker. + #[serde(default = "default_compact_auto_read_budget")] + pub compact_auto_read_budget: u64, + + /// Cumulative input-token cap for the compact worker's own LLM + /// calls. Exceeding this aborts the compact run. + #[serde(default = "default_compact_worker_max_input_tokens")] + pub compact_worker_max_input_tokens: u64, + /// Optional provider for the compactor (summary) LLM. /// If omitted, the main provider is cloned via `clone_boxed()`. #[serde(default)] @@ -216,6 +226,12 @@ fn default_prune_min_savings() -> u64 { fn default_compact_retained_tokens() -> u64 { defaults::COMPACT_RETAINED_TOKENS } +fn default_compact_auto_read_budget() -> u64 { + defaults::COMPACT_AUTO_READ_BUDGET +} +fn default_compact_worker_max_input_tokens() -> u64 { + defaults::COMPACT_WORKER_MAX_INPUT_TOKENS +} impl Default for CompactionConfig { fn default() -> Self { @@ -225,6 +241,8 @@ impl Default for CompactionConfig { compact_threshold: None, compact_request_threshold: None, compact_retained_tokens: default_compact_retained_tokens(), + compact_auto_read_budget: default_compact_auto_read_budget(), + compact_worker_max_input_tokens: default_compact_worker_max_input_tokens(), provider: None, } } diff --git a/crates/pod/src/compact_worker.rs b/crates/pod/src/compact_worker.rs new file mode 100644 index 00000000..ce9ed4c0 --- /dev/null +++ b/crates/pod/src/compact_worker.rs @@ -0,0 +1,384 @@ +//! Compact worker state and the four tools that drive it. +//! +//! The compact worker is a disposable `Worker` instance spun up by +//! [`Pod::compact`]. It receives the history to summarise plus a list of +//! default reference files (from the session-lifetime `Tracker`) and runs +//! a tool-driven LLM loop. The tools here let it: +//! +//! - `read_file` — inspect referenced files (reuses `tools::read_tool`) +//! - `mark_read_required(path, offset?, limit?)` — nominate a file whose +//! contents should be injected into the compacted context as an +//! auto-read system message +//! - `add_reference(path)` — nominate a file the next session should +//! know about by name only (contents not included) +//! - `write_summary(text)` — deliver (or overwrite) the structured summary +//! +//! Everything the worker decides ends up in [`CompactWorkerContext`], +//! which `Pod::compact` drains after the loop and turns into the +//! compacted session's opening system messages. + +use std::path::PathBuf; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::{Arc, Mutex}; + +use async_trait::async_trait; +use llm_worker::Item; +use llm_worker::interceptor::{Interceptor, PreRequestAction}; +use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput}; +use serde::Deserialize; +use tools::ScopedFs; + +/// A file the compact worker has marked for auto-read in the new session. +#[derive(Debug, Clone)] +pub(crate) struct ReadRequirement { + pub path: PathBuf, + /// 0-based line offset. `None` means from the start of the file. + pub offset: Option, + /// Maximum number of lines. `None` means to the end of the file. + pub limit: Option, +} + +/// Aggregated output of a compact worker run. +#[derive(Debug, Default, Clone)] +pub(crate) struct CompactWorkerContext { + pub read_required: Vec, + pub references: Vec, + pub summary: Option, + /// Tokens already consumed by `mark_read_required` calls. + pub auto_read_consumed: u64, + /// Aggregate cap. `0` treats the budget as disabled. + pub auto_read_budget: u64, +} + +impl CompactWorkerContext { + pub(crate) fn with_budget(auto_read_budget: u64) -> Self { + Self { + auto_read_budget, + ..Self::default() + } + } + + fn remaining_budget(&self) -> u64 { + self.auto_read_budget.saturating_sub(self.auto_read_consumed) + } +} + +/// Input to `mark_read_required`. +#[derive(Debug, Deserialize, schemars::JsonSchema)] +struct MarkParams { + /// Absolute path to the file. + pub file_path: PathBuf, + /// 0-based line offset. + #[serde(default)] + pub offset: Option, + /// Maximum number of lines to inject. + #[serde(default)] + pub limit: Option, +} + +/// Input to `add_reference`. +#[derive(Debug, Deserialize, schemars::JsonSchema)] +struct ReferenceParams { + /// Absolute path to the file. + pub file_path: PathBuf, +} + +/// Input to `write_summary`. +#[derive(Debug, Deserialize, schemars::JsonSchema)] +struct SummaryParams { + /// Full structured summary text (overwrites any previous call). + pub text: String, +} + +const MARK_DESCRIPTION: &str = "Inject a file's contents into the compacted context so the \ +next session starts with it already read. Use this for files the next task needs in full. \ +Optionally specify `offset` (0-based line) and `limit` (line count) to inject only a slice. \ +Counts against `auto_read_budget`; overflow returns an error and the mark is not recorded. \ +Paths must be absolute."; + +const REFERENCE_DESCRIPTION: &str = "Record a file path as a named reference in the compacted \ +context without injecting its contents. Use for files that are contextually relevant but \ +whose current content the next session can fetch on demand."; + +const SUMMARY_DESCRIPTION: &str = "Provide the final structured summary text. Subsequent calls \ +replace the previous content; only the last call is used. Must be called before the compact run \ +ends or compaction fails."; + +struct MarkReadRequiredTool { + fs: ScopedFs, + ctx: Arc>, +} + +#[async_trait] +impl Tool for MarkReadRequiredTool { + async fn execute(&self, input_json: &str) -> Result { + let params: MarkParams = serde_json::from_str(input_json).map_err(|e| { + ToolError::InvalidArgument(format!("invalid mark_read_required input: {e}")) + })?; + + // Read the file through the shared ScopedFs so scope and I/O + // errors surface the same way the regular `read_file` tool does. + let bytes = self + .fs + .read_bytes(¶ms.file_path) + .map_err(|e| ToolError::ExecutionFailed(format!("read failed: {e}")))?; + let text = String::from_utf8_lossy(&bytes); + let slice = slice_lines(&text, params.offset.unwrap_or(0), params.limit); + let estimated_tokens = estimate_tokens(slice.len()); + + let mut guard = self.ctx.lock().expect("compact worker context poisoned"); + let budget = guard.auto_read_budget; + let would_consume = guard.auto_read_consumed.saturating_add(estimated_tokens); + if budget > 0 && would_consume > budget { + return Err(ToolError::ExecutionFailed(format!( + "auto-read budget exhausted ({budget} tokens). Remove an existing mark or use \ + add_reference instead." + ))); + } + guard.read_required.push(ReadRequirement { + path: params.file_path.clone(), + offset: params.offset, + limit: params.limit, + }); + guard.auto_read_consumed = would_consume; + let remaining = guard.remaining_budget(); + drop(guard); + + let mut summary = format!( + "Marked {} for auto-read (≈{estimated_tokens} tokens). \ + Budget: {remaining}/{budget} tokens remaining.", + params.file_path.display() + ); + if budget > 0 && remaining * 2 <= budget { + summary.push_str( + "\nNote: auto-read budget is at least half consumed. \ + Consider calling write_summary and finishing up soon.", + ); + } + Ok(ToolOutput { + summary, + content: None, + }) + } +} + +struct AddReferenceTool { + ctx: Arc>, +} + +#[async_trait] +impl Tool for AddReferenceTool { + async fn execute(&self, input_json: &str) -> Result { + let params: ReferenceParams = serde_json::from_str(input_json) + .map_err(|e| ToolError::InvalidArgument(format!("invalid add_reference input: {e}")))?; + let mut guard = self.ctx.lock().expect("compact worker context poisoned"); + if !guard + .references + .iter() + .any(|p| p.as_path() == params.file_path.as_path()) + { + guard.references.push(params.file_path.clone()); + } + Ok(ToolOutput { + summary: format!("Added reference {}", params.file_path.display()), + content: None, + }) + } +} + +struct WriteSummaryTool { + ctx: Arc>, +} + +#[async_trait] +impl Tool for WriteSummaryTool { + async fn execute(&self, input_json: &str) -> Result { + let params: SummaryParams = serde_json::from_str(input_json) + .map_err(|e| ToolError::InvalidArgument(format!("invalid write_summary input: {e}")))?; + let mut guard = self.ctx.lock().expect("compact worker context poisoned"); + let overwritten = guard.summary.is_some(); + guard.summary = Some(params.text); + drop(guard); + let note = if overwritten { + "Summary replaced." + } else { + "Summary recorded." + }; + Ok(ToolOutput { + summary: note.to_string(), + content: None, + }) + } +} + +pub(crate) fn mark_read_required_tool( + fs: ScopedFs, + ctx: Arc>, +) -> ToolDefinition { + Arc::new(move || { + let schema = schemars::schema_for!(MarkParams); + let schema_value = serde_json::to_value(schema).unwrap_or(serde_json::json!({})); + let meta = ToolMeta::new("mark_read_required") + .description(MARK_DESCRIPTION) + .input_schema(schema_value); + let tool: Arc = Arc::new(MarkReadRequiredTool { + fs: fs.clone(), + ctx: ctx.clone(), + }); + (meta, tool) + }) +} + +pub(crate) fn add_reference_tool(ctx: Arc>) -> ToolDefinition { + Arc::new(move || { + let schema = schemars::schema_for!(ReferenceParams); + let schema_value = serde_json::to_value(schema).unwrap_or(serde_json::json!({})); + let meta = ToolMeta::new("add_reference") + .description(REFERENCE_DESCRIPTION) + .input_schema(schema_value); + let tool: Arc = Arc::new(AddReferenceTool { ctx: ctx.clone() }); + (meta, tool) + }) +} + +pub(crate) fn write_summary_tool(ctx: Arc>) -> ToolDefinition { + Arc::new(move || { + let schema = schemars::schema_for!(SummaryParams); + let schema_value = serde_json::to_value(schema).unwrap_or(serde_json::json!({})); + let meta = ToolMeta::new("write_summary") + .description(SUMMARY_DESCRIPTION) + .input_schema(schema_value); + let tool: Arc = Arc::new(WriteSummaryTool { ctx: ctx.clone() }); + (meta, tool) + }) +} + +/// Interceptor that aborts the compact worker as soon as its cumulative +/// input-token count crosses `max_input_tokens`. Pairs with the +/// `on_usage` callback registered by `Pod::compact`, which is what +/// actually accumulates `input_so_far`. +pub(crate) struct CompactWorkerInterceptor { + pub input_so_far: Arc, + pub max_input_tokens: u64, +} + +#[async_trait] +impl Interceptor for CompactWorkerInterceptor { + async fn pre_llm_request(&self, _context: &mut Vec) -> PreRequestAction { + if self.input_so_far.load(Ordering::Relaxed) > self.max_input_tokens { + return PreRequestAction::Cancel(format!( + "compact worker input exceeded {} tokens", + self.max_input_tokens + )); + } + PreRequestAction::Continue + } +} + +/// Crude bytes→tokens estimate; good enough for budget accounting. +fn estimate_tokens(bytes: usize) -> u64 { + (bytes as u64).div_ceil(4) +} + +/// Return the slice of `text` covered by `offset` (line index) and +/// optional `limit` (line count), preserving the original newline +/// separation. Returns the whole file when both defaults apply. +pub(crate) fn slice_lines(text: &str, offset: usize, limit: Option) -> String { + let lines: Vec<&str> = text.lines().collect(); + let start = offset.min(lines.len()); + let end = limit + .map(|n| start.saturating_add(n).min(lines.len())) + .unwrap_or(lines.len()); + lines[start..end].join("\n") +} + +#[cfg(test)] +mod tests { + use super::*; + use manifest::Scope; + + fn make_fs(tmp: &std::path::Path) -> ScopedFs { + let scope = Scope::writable(tmp.to_path_buf()).unwrap(); + ScopedFs::new(scope, tmp.to_path_buf()) + } + + #[tokio::test] + async fn mark_read_required_records_and_deducts_budget() { + let tmp = tempfile::TempDir::new().unwrap(); + let path = tmp.path().join("hello.txt"); + std::fs::write(&path, "hello world\n").unwrap(); + + let ctx = Arc::new(Mutex::new(CompactWorkerContext::with_budget(1_000))); + let tool: Arc = Arc::new(MarkReadRequiredTool { + fs: make_fs(tmp.path()), + ctx: ctx.clone(), + }); + let input = serde_json::json!({ "file_path": path.to_str().unwrap() }).to_string(); + let out = tool.execute(&input).await.unwrap(); + + assert!(out.summary.starts_with("Marked")); + let guard = ctx.lock().unwrap(); + assert_eq!(guard.read_required.len(), 1); + assert!(guard.auto_read_consumed > 0); + assert!(guard.auto_read_consumed <= 1_000); + } + + #[tokio::test] + async fn mark_read_required_rejects_over_budget() { + let tmp = tempfile::TempDir::new().unwrap(); + let path = tmp.path().join("big.txt"); + std::fs::write(&path, "x".repeat(4_096)).unwrap(); // ≈1024 tokens + + let ctx = Arc::new(Mutex::new(CompactWorkerContext::with_budget(100))); + let tool: Arc = Arc::new(MarkReadRequiredTool { + fs: make_fs(tmp.path()), + ctx: ctx.clone(), + }); + let input = serde_json::json!({ "file_path": path.to_str().unwrap() }).to_string(); + let res = tool.execute(&input).await; + + assert!(matches!(res, Err(ToolError::ExecutionFailed(_)))); + let guard = ctx.lock().unwrap(); + assert!(guard.read_required.is_empty()); + assert_eq!(guard.auto_read_consumed, 0); + } + + #[tokio::test] + async fn write_summary_overwrites_previous_call() { + let ctx = Arc::new(Mutex::new(CompactWorkerContext::with_budget(0))); + let tool: Arc = Arc::new(WriteSummaryTool { ctx: ctx.clone() }); + + let first = serde_json::json!({ "text": "first" }).to_string(); + let out1 = tool.execute(&first).await.unwrap(); + assert!(out1.summary.contains("recorded")); + + let second = serde_json::json!({ "text": "second" }).to_string(); + let out2 = tool.execute(&second).await.unwrap(); + assert!(out2.summary.contains("replaced")); + + assert_eq!(ctx.lock().unwrap().summary.as_deref(), Some("second")); + } + + #[tokio::test] + async fn add_reference_deduplicates() { + let ctx = Arc::new(Mutex::new(CompactWorkerContext::with_budget(0))); + let tool: Arc = Arc::new(AddReferenceTool { ctx: ctx.clone() }); + + let p = "/abs/path.rs"; + let input = serde_json::json!({ "file_path": p }).to_string(); + tool.execute(&input).await.unwrap(); + tool.execute(&input).await.unwrap(); + + let guard = ctx.lock().unwrap(); + assert_eq!(guard.references.len(), 1); + assert_eq!(guard.references[0], PathBuf::from(p)); + } + + #[test] + fn slice_lines_handles_offset_and_limit() { + let text = "a\nb\nc\nd"; + assert_eq!(slice_lines(text, 0, None), "a\nb\nc\nd"); + assert_eq!(slice_lines(text, 1, Some(2)), "b\nc"); + assert_eq!(slice_lines(text, 10, None), ""); + } +} diff --git a/crates/pod/src/lib.rs b/crates/pod/src/lib.rs index 6897567a..46e89e24 100644 --- a/crates/pod/src/lib.rs +++ b/crates/pod/src/lib.rs @@ -12,6 +12,7 @@ pub mod spawned_pod_registry; mod agents_md; mod compact_state; +mod compact_worker; mod factory; mod notification_buffer; mod pod; diff --git a/crates/pod/src/pod.rs b/crates/pod/src/pod.rs index 247ece40..bded686c 100644 --- a/crates/pod/src/pod.rs +++ b/crates/pod/src/pod.rs @@ -47,18 +47,35 @@ impl Hook for UsageTrackingHook { } const SUMMARY_SYSTEM_PROMPT: &str = "\ -You are a context compaction assistant. \ -Summarise the conversation below into a structured summary. \ -Preserve concrete details: file paths, function names, error messages, decisions made. \ -Use the following format:\n\n\ -## Original Task\n\ -(the user's original request)\n\n\ -## Completed Work\n\ -- (what was done, with specifics)\n\n\ -## Key Discoveries\n\ -- (facts, constraints, errors found)\n\n\ -## Current State\n\ -- (files changed, remaining work)"; +You are a context compaction assistant. Your job is to hand the next session a \ +structured summary plus pointers to the files it actually needs.\n\n\ +Tools you can call:\n\ +- `read_file(file_path, offset?, limit?)` — inspect referenced files before deciding.\n\ +- `mark_read_required(file_path, offset?, limit?)` — inject a file's contents into the \ + next session as an auto-read system message. Counts against `auto_read_budget`.\n\ +- `add_reference(file_path)` — record a file path the next session should know about \ + without embedding its contents.\n\ +- `write_summary(text)` — deliver the final structured summary. May be called multiple \ + times; only the last call is kept.\n\n\ +Always finish by calling `write_summary`. Produce the summary in this exact format:\n\n\ +## Completed Tasks\n\ +### (task name)\n\ +- what was done (use concrete type / file / function names)\n\ +- gotchas or facts that came up\n\n\ +## Active Task\n\ +### (task name)\n\ +- goal\n\ +- current state (what is done / not done)\n\ +- next step\n\n\ +## Key Decisions\n\ +- (decision) — (reason)\n\n\ +## User Directives\n\ +- \"verbatim user line\" — only include directives whose wording the next session \ + should not lose.\n\n\ +## Current Work\n\ +(2–3 lines on what was happening just before compaction).\n\n\ +Keep code snippets and raw tool output OUT of the summary — that is what auto-read \ +and references are for. Target 1000–2000 tokens."; /// An independent agent execution unit. /// @@ -792,6 +809,13 @@ impl Pod { /// /// Returns the new session ID. pub async fn compact(&mut self, retained_tokens: u64) -> Result { + use std::sync::atomic::{AtomicU64, Ordering}; + + use crate::compact_worker::{ + CompactWorkerContext, CompactWorkerInterceptor, add_reference_tool, + mark_read_required_tool, slice_lines, write_summary_tool, + }; + // Decide the cut point by projecting the UsageRecord timeline onto // the current history: keep the tail whose estimated token count is // within `retained_tokens`. Item-granular, turn boundaries ignored. @@ -801,41 +825,179 @@ impl Pod { let history = worker.history(); let retain_from = cut.index.min(history.len()); let retained_items = history[retain_from..].to_vec(); - let items_to_summarise = &history[..retain_from]; + let items_to_summarise = history[..retain_from].to_vec(); - // Build summary prompt. - let summary_prompt = build_summary_prompt(items_to_summarise); + // Compaction-related knobs. Fall through to manifest defaults when + // `[compaction]` is omitted entirely. + let (auto_read_budget, compact_worker_max_input_tokens) = self + .manifest + .compaction + .as_ref() + .map(|c| (c.compact_auto_read_budget, c.compact_worker_max_input_tokens)) + .unwrap_or(( + manifest::defaults::COMPACT_AUTO_READ_BUDGET, + manifest::defaults::COMPACT_WORKER_MAX_INPUT_TOKENS, + )); - // Create a disposable summary Worker. + // Default references: the N most-recently-touched files in the + // session, surfaced so the compact worker can inspect them and + // decide which (if any) the next session needs. + let default_refs: Vec = self + .tracker + .as_ref() + .map(|t| t.recent_files(manifest::defaults::COMPACT_DEFAULT_REFERENCE_COUNT)) + .unwrap_or_default(); + + // Input text fed to the compact worker. Includes the default + // references and the (pruned) conversation text. + let summary_input = build_summary_input(&items_to_summarise, &default_refs); + + // Worker-side state collected by the compact worker's tool calls. + let ctx = Arc::new(std::sync::Mutex::new(CompactWorkerContext::with_budget( + auto_read_budget, + ))); + + // Build an independent compact worker. Scope and pwd are shared + // with the main Pod (reads go through the same policy) but the + // Tracker is fresh — compact-time reads must not pollute the + // main session's recency list, which feeds `default_refs` above. + let scoped_fs = tools::ScopedFs::new(self.scope.clone(), self.pwd.clone()); + let summary_tracker = tools::Tracker::new(); let summary_client: Box = self.build_compactor_client()?; let mut summary_worker = Worker::new(summary_client) .system_prompt(SUMMARY_SYSTEM_PROMPT) .temperature(0.0); - summary_worker.set_max_tokens(2048); + summary_worker.set_max_tokens(4096); + + // Cumulative input-token meter + interceptor. The meter is bumped + // from the on_usage callback and read on every pre_llm_request. + let input_so_far = Arc::new(AtomicU64::new(0)); + { + let acc = input_so_far.clone(); + summary_worker.on_usage(move |event| { + if let Some(tokens) = event.input_tokens { + acc.fetch_add(tokens, Ordering::Relaxed); + } + }); + } + summary_worker.set_interceptor(CompactWorkerInterceptor { + input_so_far: input_so_far.clone(), + max_input_tokens: compact_worker_max_input_tokens, + }); + + // Tools: read_file (shared scope, fresh tracker) + the three + // compact-specific tools that populate `ctx`. + summary_worker.register_tool(tools::read_tool(scoped_fs.clone(), summary_tracker)); + summary_worker + .register_tool(mark_read_required_tool(scoped_fs.clone(), ctx.clone())); + summary_worker.register_tool(add_reference_tool(ctx.clone())); + summary_worker.register_tool(write_summary_tool(ctx.clone())); let out = summary_worker - .run(summary_prompt) + .run(summary_input) .await .map_err(PodError::Worker)?; - let summary_text = out - .worker - .history() - .iter() - .filter_map(|item| { - if item.is_assistant_message() { - item.as_text().map(String::from) - } else { - None - } - }) - .collect::>() - .join("\n"); + let mut locked_worker = out.worker; - // Build new history: [summary as user message, ...retained]. - let mut new_history = Vec::with_capacity(retained_items.len() + 1); + // Guard: nudge the worker once more if the expected outputs + // (summary, and any auto-read nominations when default refs + // existed) were not produced on the first pass. `write_summary` + // is idempotent-by-overwrite so a second call is safe. + let nudge = { + let snapshot = ctx.lock().expect("compact ctx poisoned").clone(); + if snapshot.summary.is_none() { + Some( + "You have not called `write_summary` yet. Deliver the structured \ + summary now (Completed Tasks / Active Task / Key Decisions / \ + User Directives / Current Work) and nominate any files the next \ + session needs with `mark_read_required`." + .to_string(), + ) + } else if snapshot.read_required.is_empty() && !default_refs.is_empty() { + Some( + "Summary received. If any of the referenced files are required \ + for the next session to continue the task, call \ + `mark_read_required` on them now. Otherwise reply briefly to \ + close out." + .to_string(), + ) + } else { + None + } + }; + if let Some(prompt) = nudge { + let _ = locked_worker + .run(prompt) + .await + .map_err(PodError::Worker)?; + } + + let final_ctx = ctx.lock().expect("compact ctx poisoned").clone(); + let summary_text = final_ctx + .summary + .clone() + .ok_or(PodError::CompactSummaryMissing)?; + + // Re-read each auto-read target through ScopedFs and render the + // requested slice. Errors are logged and skipped rather than + // aborting compaction — a missing / moved file should not fail + // the whole compact. + let mut auto_read_messages = Vec::new(); + for req in &final_ctx.read_required { + match scoped_fs.read_bytes(&req.path) { + Ok(bytes) => { + let text = String::from_utf8_lossy(&bytes).into_owned(); + let body = slice_lines(&text, req.offset.unwrap_or(0), req.limit); + let range = match (req.offset, req.limit) { + (None, None) => String::new(), + (Some(off), None) => format!(":{}-", off + 1), + (None, Some(lim)) => format!(":1-{lim}"), + (Some(off), Some(lim)) => { + format!(":{}-{}", off + 1, off.saturating_add(lim)) + } + }; + auto_read_messages.push(Item::system_message(format!( + "[Auto-read file: {}{range}]\n{body}", + req.path.display() + ))); + } + Err(e) => { + warn!( + path = %req.path.display(), + error = %e, + "auto-read target could not be read; skipping", + ); + } + } + } + + // Reference list as a single system message; omitted when empty. + let reference_message = (!final_ctx.references.is_empty()).then(|| { + let list = final_ctx + .references + .iter() + .map(|p| format!("- {}", p.display())) + .collect::>() + .join("\n"); + Item::system_message(format!( + "[Referenced files — read before compaction, contents not included]\n\ + {list}\n\ + Use read_file to access current contents if needed." + )) + }); + + // Build new history: [summary, ...auto-read, references, ...retained]. + let mut new_history = Vec::with_capacity( + 1 + auto_read_messages.len() + reference_message.is_some() as usize + + retained_items.len(), + ); new_history.push(Item::system_message(format!( "[Compacted context summary]\n\n{summary_text}" ))); + new_history.extend(auto_read_messages); + if let Some(msg) = reference_message { + new_history.push(msg); + } new_history.extend(retained_items); // Persist as a new compacted session. @@ -1087,6 +1249,37 @@ impl From for PodRunResult { } } +/// Build the compact worker's input: default-reference instructions, +/// the list of recently-touched files, and the pruned conversation +/// produced by [`build_summary_prompt`]. +fn build_summary_input(items: &[Item], default_refs: &[PathBuf]) -> String { + let mut out = String::new(); + out.push_str( + "Summarise the conversation below into a structured summary and nominate \ + files the next session needs.\n\n", + ); + if !default_refs.is_empty() { + out.push_str( + "These files were touched recently in this session. Use `read_file` \ + on them as needed, then call `mark_read_required` for any whose \ + contents the next session must have, and `add_reference` for files \ + it should know about by name only.\n\n## Referenced files\n", + ); + for p in default_refs { + out.push_str("- "); + out.push_str(&p.display().to_string()); + out.push('\n'); + } + out.push('\n'); + } + out.push_str("## Conversation\n"); + out.push_str(&build_summary_prompt(items)); + out.push_str( + "\n\nWhen you are done, call `write_summary` with the final 5-section text.", + ); + out +} + /// Format conversation items into a text prompt for the summary Worker. /// /// The summary should capture decisions and user intent, not recreate code. @@ -1158,6 +1351,9 @@ pub enum PodError { #[error("compaction thrash: context still exceeds threshold immediately after compact")] CompactThrash, + #[error("compact worker did not produce a summary (write_summary was never called)")] + CompactSummaryMissing, + #[error("invalid system prompt template: {source}")] InvalidSystemPromptTemplate { #[source] diff --git a/crates/pod/tests/system_prompt_template_test.rs b/crates/pod/tests/system_prompt_template_test.rs index 1fe3d1a1..f7d7f3ac 100644 --- a/crates/pod/tests/system_prompt_template_test.rs +++ b/crates/pod/tests/system_prompt_template_test.rs @@ -61,6 +61,19 @@ fn single_text_events(text: &str) -> Vec { ] } +/// Emit a single `write_summary(text=...)` tool call as one LLM response. +fn write_summary_tool_use_events(call_id: &str, text: &str) -> Vec { + let input = serde_json::json!({ "text": text }).to_string(); + vec![ + LlmEvent::tool_use_start(0, call_id, "write_summary"), + LlmEvent::tool_input_delta(0, input), + LlmEvent::tool_use_stop(0), + LlmEvent::Status(StatusEvent { + status: ResponseStatus::Completed, + }), + ] +} + const MINIMAL_MANIFEST_TOML: &str = r#" [pod] name = "test-pod" @@ -233,10 +246,11 @@ async fn agents_md_absent_omits_trailing_section() { #[tokio::test] async fn agents_md_not_reread_after_compact() { let client = MockClient::new(vec![ - single_text_events("a"), - single_text_events("b"), - single_text_events("summary"), - single_text_events("c"), + single_text_events("a"), // pod.run("first") + single_text_events("b"), // pod.run("second") + write_summary_tool_use_events("call-1", "compacted summary"), // compact worker: tool_use + single_text_events("done"), // compact worker: close + single_text_events("c"), // pod.run("third") ]); let (mut pod, pwd) = make_pod_with_body("BODY", client).await.unwrap(); let agents_path = pwd.join("AGENTS.md"); @@ -264,10 +278,11 @@ async fn agents_md_not_reread_after_compact() { #[tokio::test] async fn compact_preserves_system_prompt() { let client = MockClient::new(vec![ - single_text_events("a"), - single_text_events("b"), - single_text_events("summary"), - single_text_events("c"), + single_text_events("a"), // pod.run("first") + single_text_events("b"), // pod.run("second") + write_summary_tool_use_events("call-1", "compacted summary"), // compact worker: tool_use + single_text_events("done"), // compact worker: close + single_text_events("c"), // pod.run("third") ]); let (mut pod, _pwd) = make_pod_with_body("SP cwd={{ cwd }}", client) .await From 1a3e9030bd3be628aa3187e704c6d37346acab19 Mon Sep 17 00:00:00 2001 From: Hare Date: Sun, 19 Apr 2026 12:02:11 +0900 Subject: [PATCH 5/6] =?UTF-8?q?compact:=20retained=5Ftokens=20=E3=83=86?= =?UTF-8?q?=E3=82=B9=E3=83=88=E5=80=A4=E3=82=92=E7=8F=BE=E5=AE=9F=E7=9A=84?= =?UTF-8?q?=E3=81=AA=E5=80=A4=E3=81=AB=E5=A4=89=E6=9B=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 2 を 8_000 に。retained_turns 時代の名残で 2 は "2 トークン保持" と読めてしまい意味不明だったため。 --- crates/pod/src/compact_state.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/crates/pod/src/compact_state.rs b/crates/pod/src/compact_state.rs index 4f6ff106..54f94369 100644 --- a/crates/pod/src/compact_state.rs +++ b/crates/pod/src/compact_state.rs @@ -113,9 +113,9 @@ mod tests { #[test] fn both_thresholds_configured() { - let state = CompactState::new(Some(80_000), Some(90_000), 2); + let state = CompactState::new(Some(80_000), Some(90_000), 8_000); assert_eq!(state.request_threshold(), Some(90_000)); - assert_eq!(state.retained_tokens(), 2); + assert_eq!(state.retained_tokens(), 8_000); assert!(!state.exceeds_request(70_000)); assert!(!state.exceeds_post_run(70_000)); @@ -129,7 +129,7 @@ mod tests { #[test] fn post_run_only() { - let state = CompactState::new(Some(80_000), None, 2); + let state = CompactState::new(Some(80_000), None, 8_000); // request check always false when threshold is None. assert!(!state.exceeds_request(1_000_000)); assert!(state.exceeds_post_run(85_000)); @@ -137,21 +137,21 @@ mod tests { #[test] fn request_only() { - let state = CompactState::new(None, Some(90_000), 2); + let state = CompactState::new(None, Some(90_000), 8_000); assert!(!state.exceeds_post_run(1_000_000)); assert!(state.exceeds_request(95_000)); } #[test] fn both_none_disables_all_checks() { - let state = CompactState::new(None, None, 2); + let state = CompactState::new(None, None, 8_000); assert!(!state.exceeds_request(1_000_000)); assert!(!state.exceeds_post_run(1_000_000)); } #[test] fn circuit_breaker_trips_after_max_failures() { - let state = CompactState::new(Some(80_000), Some(90_000), 2); + let state = CompactState::new(Some(80_000), Some(90_000), 8_000); assert!(!state.is_disabled()); state.record_compact_failure(); @@ -164,7 +164,7 @@ mod tests { #[test] fn success_resets_failure_count() { - let state = CompactState::new(Some(80_000), Some(90_000), 2); + let state = CompactState::new(Some(80_000), Some(90_000), 8_000); state.record_compact_failure(); state.record_compact_failure(); assert!(!state.is_disabled()); @@ -179,7 +179,7 @@ mod tests { #[test] fn just_compacted_lifecycle() { - let state = CompactState::new(Some(80_000), Some(90_000), 2); + let state = CompactState::new(Some(80_000), Some(90_000), 8_000); assert!(!state.just_compacted()); state.record_compact_success(); From 18b7556e0a06c32103d75fc0faff70c41cc0d585 Mon Sep 17 00:00:00 2001 From: Hare Date: Sun, 19 Apr 2026 12:13:03 +0900 Subject: [PATCH 6/6] =?UTF-8?q?compact-improvements=20=E3=83=81=E3=82=B1?= =?UTF-8?q?=E3=83=83=E3=83=88=E5=AE=8C=E4=BA=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- TODO.md | 1 - tickets/compact-improvements.md | 418 -------------------------------- 2 files changed, 419 deletions(-) delete mode 100644 tickets/compact-improvements.md diff --git a/TODO.md b/TODO.md index 155a9a8b..f7d30343 100644 --- a/TODO.md +++ b/TODO.md @@ -2,7 +2,6 @@ - [ ] 引数なし tool 呼び出しで `arguments = "null"` が記録される不具合 → [tickets/tool-call-empty-args-null.md](tickets/tool-call-empty-args-null.md) - [ ] ツール設計 - [ ] Bash ツール (Permission 層と統合) → [tickets/bash-tool.md](tickets/bash-tool.md) -- [ ] Compact の改善(要約品質 + 挙動詳細) → [tickets/compact-improvements.md](tickets/compact-improvements.md) - [ ] Protocol の設計 → [tickets/protocol-design.md](tickets/protocol-design.md) - [ ] パーミッション: パターンベースのツール実行制御 → [tickets/permission-extension-point.md](tickets/permission-extension-point.md) - [ ] Pod オーケストレーション diff --git a/tickets/compact-improvements.md b/tickets/compact-improvements.md deleted file mode 100644 index 5be9a8c6..00000000 --- a/tickets/compact-improvements.md +++ /dev/null @@ -1,418 +0,0 @@ -# Compact の改善 - -## 背景 - -`Pod::compact()` とその周辺機構は実装済み。 -要約品質、保護単位、compact 後のコンテキスト構築に改善が必要。 - -## 前提(完了済み) - -以下は完了済み。git 履歴参照。 - -- **usage-history** — session-store に `UsageRecord` を積む基盤(`101679d usageデータの永続化実装`) -- **token-counter** — Usage 履歴ベースのトークン会計。`Pod::total_tokens()` / `Pod::split_for_retained(n)` として公開済み(`a89c448 token-counter実装`) -- **tracker** — `tools::Tracker` に `recent_files(n)` 実装済み(`6f2362e ToolsのTracker実装`) - ---- - -## 要件 - -### R1: 一貫した振る舞い -- システムプロンプトは不変 -- compact 前後でユーザーが違和感を覚えない -- 「何を知っていて何を忘れたか」が自然であること - -### R2: 直近の記憶の確実性 -- 直近 N トークン分の会話をそのまま保持(Prune 済み = summary only の状態で計測) -- **トークン数ベース** で保護量を決める(ターン単位ではない) - - 自走エージェントは1ターン内で多数のリクエストを回す - - ターン単位だと保護量がターン長に依存してしまう - -### R3: Auto-Read + リファレンス -- compact 後の最初のターンで、タスク遂行に必要なファイルが既に読まれている -- 2段階: **Read**(全文/範囲をコンテキストに注入)と **Reference**(「読んだことがある」とだけ伝える) -- compact worker が「続行に必要なファイル」を判断して指定する - -### R4: マルチタスク対応 -- セッション中に一貫した課題に取り組んでいないものとする -- **完了タスク**: 簡潔に。注意点・発覚した事実だけ -- **進行中タスク**: サマリ + 現状 + 次のステップを十分に - ---- - -## 用語の定義(重要 — 混乱防止のため明記) - -- **run = turn**: 同じ概念を指す。1 ユーザープロンプト → 完了までの単位 -- **リクエスト**: 1 run/turn 内で投げる個別の LLM 呼び出し。ツール使用で 1 turn に複数リクエストが発生する -- **リクエストの合間** (between requests): 1 turn 内、次の LLM リクエストを投げる前の地点。`CompactInterceptor::pre_llm_request` で観測される -- **ターンの合間** (between turns): turn が完了して次の turn を待つ状態。`Controller::try_post_run_compact` で観測される - -この 2 つを区別することに意味がある: -- **ターンの合間**は自然なタスクの区切り。次の turn に入る前に **先を見越して早めに** compact すべき -- **リクエストの合間**は turn 内部の中継点。通常は proactive な必要はなく、暴走的な膨張を拾う **safety net** として **遅めに** 発動すれば十分 - ---- - -## 閾値の修正(重要) - -現状の実装は: -1. 閾値の大小関係が意図と逆 -2. `turn_threshold` が pre_llm_request 側で使われていて命名がミスリード -3. もう片方を `turn_threshold * 9 / 8` で導出しているが、9/8 に根拠がない - -これらをまとめて修正する。値入れ替え + リネーム + マニフェストで両閾値を個別指定。 - -### 正しい方針 - -| チェックポイント | 変数名 (コード) | マニフェスト | 役割 | -|----------------|---------------|------------|------| -| `Controller::try_post_run_compact` (ターンの合間) | `post_run_threshold` | `compact_threshold` | proactive (小) | -| `CompactInterceptor::pre_llm_request` (リクエストの合間) | `request_threshold` | `compact_request_threshold` | safety net (大) | - -両方とも manifest で個別指定する。導出はしない。 - -```toml -[compaction] -compact_threshold = 80000 # ターンの合間, proactive -compact_request_threshold = 90000 # リクエストの合間, safety net -``` - -想定: `compact_threshold < compact_request_threshold`。逆転していてもエラーにはしないが、 -warn を出す。両方 None なら compact 無効(今まで通り)。片方だけ None なら... - -**片方だけ指定されたときの挙動**: -- `compact_threshold` のみ設定 → `compact_request_threshold` は無効 (リクエスト間チェック無し) -- `compact_request_threshold` のみ設定 → `compact_threshold` は無効 (post_run チェック無し) -- 両方設定 → 両方有効 - -→ `CompactState` 内部では `Option` 2 本持ち。`exceeds_*` メソッドは `Option` が `None` なら常に `false`。 - -### 占有量ソースの統合(重要) - -現在 `CompactState::last_input_tokens: AtomicU64` が `on_usage` callback から -更新され、閾値判定に使われている。これは session-store の `UsageRecord` -履歴(usage-history で導入済み)と**情報源が二重化**している状態。 - -本チケットで両者を統合する。**`Pod::total_tokens()`(token-counter で導入済み)を -単一の情報源とし、`last_input_tokens` 経路を撤去する**: - -- `CompactState` から `last_input_tokens: AtomicU64` フィールドを削除 -- `CompactState::update_input_tokens` メソッドを削除 -- `Pod::ensure_interceptor_installed` の on_usage callback から - `state_for_usage.update_input_tokens(tokens)` の行を削除 - (`tracker_for_usage.record_usage(event)` だけが残る) -- 閾値判定 (`exceeds_request` / `exceeds_post_run`) は `Pod::total_tokens()` の - 戻り値を見る形に変える -- これにより「実測値の単一履歴 → `Pod::total_tokens()` → 閾値判定」と一直線になる - -Anthropic のキャッシュヒット時に占有量を取りこぼす旧バグも、このパスを -廃止することで自動的に解消する(`UsageRecord.input_total_tokens` は -scheme 層で占有量に正規化済み)。 - -### 影響箇所 - -- **`crates/manifest/src/lib.rs`** - - `CompactionConfig` に `compact_request_threshold: Option` フィールドを追加 - - デフォルトは `None` - - テスト更新 (両閾値が読めること) - -- **`crates/pod/src/compact_state.rs`** - - `last_input_tokens: AtomicU64` フィールドを **削除**(情報源を usage_history に一本化) - - `update_input_tokens` / `last_input_tokens` メソッドも削除 - - `turn_threshold` フィールドを `request_threshold: Option` にリネーム + `Option` 化 - - `post_run_threshold: u64` → `Option` に変更 - - コンストラクタシグネチャ変更: - ```rust - // Before - pub fn new(turn_threshold: u64, retained_turns: usize) -> Self - // After - pub fn new( - post_run_threshold: Option, - request_threshold: Option, - retained_turns: usize, - ) -> Self - ``` - - `exceeds_turn()` → `exceeds_request()` にリネーム。閾値超過判定は - 呼び出し側で現在の占有量を渡す形に変える(CompactState は閾値しか持たない): - ```rust - pub(crate) fn exceeds_request(&self, current_tokens: u64) -> bool { - self.request_threshold - .map(|t| current_tokens > t) - .unwrap_or(false) - } - ``` - 呼び出し元 (`compact_interceptor.rs` / `controller.rs`) は `Pod::total_tokens()` - から現在の占有量を取って渡す - - `exceeds_post_run()` も同様に Option 対応 - - `turn_threshold()` getter → `request_threshold()`、戻り値は `Option` - - ドックコメントを「proactive = post_run」「safety net = request」で書き直し - - テスト: 両方設定/片方だけ/両方 None の 3 ケース - -- **`crates/pod/src/pod.rs`** (上記の compact_state 変更に伴って) - - `ensure_interceptor_installed` の on_usage callback から - `state_for_usage.update_input_tokens(tokens)` の行を削除。 - `tracker_for_usage.record_usage(event)` だけが残る - -- **`crates/pod/src/compact_interceptor.rs`** - - `exceeds_turn()` 呼び出しを `exceeds_request()` に - - ログメッセージ "Between-turns ..." → "Between-requests ..." - - コメント "Step 2: Check between-turns compaction threshold" → "Step 2: Check between-requests compaction threshold (safety net)" - -- **`crates/pod/src/pod.rs`** - - `ensure_interceptor_installed` で `compact_threshold` + `compact_request_threshold` の両方を manifest から読み、`CompactState::new` に渡す - - wrap 条件: 両方 None なら CompactInterceptor を挟まない (+ Controller の post_run チェックも実質無効)。片方でも Some なら挟む - - Disjoint チェックで `post_run_threshold > request_threshold` の場合 warn ログ - -- **`docs/compaction.md`** - - TOML 例に `compact_request_threshold` を追加 - - トリガーセクションから「9/8 で導出」の記述を削除、個別指定である旨に修正 - ---- - -## compact 後の history 構造 - -全て system message(`Item::Message { role: System }`)として注入。 - -``` -[system prompt] ← 不変 (R1) -[system: 構造化要約] ← R4: compact worker の出力 -[system: auto-read ファイル群] ← R3: read_required の結果 -[system: リファレンス一覧] ← R3: reference の結果 -[直近 N トークン分の生の会話] ← R2: pruned 状態で保持 -``` - -system message で統一する理由: -- LLM に「システムから提供された前提情報」として認識させる -- fake ユーザーメッセージや fake ToolCall を作らない -- 要約もファイルも同じ role で自然に並ぶ - -### auto-read の system message 例 - -``` -[Auto-read file: src/main.rs:42-142] -fn main() { - let config = Config::load(); - ... -} -``` - -### リファレンスの system message 例 - -``` -[Referenced files — read before compaction, contents not included] -- src/config.rs (read during task setup) -- tests/integration_test.rs (read during test implementation) -Use read_file to access current contents if needed. -``` - ---- - -## R2: トークンベースの保護 - -現状の `retained_turns` を `retained_tokens` に変更。 - -``` -history (全て pruned 済み = summary only): - [...古い部分...] [...直近 N トークン分...] - ↓ ↓ - 要約対象 そのまま新 history に載せる -``` - -- Prune 済みの history に対して `Pod::split_for_retained(N)`(token-counter で - 導入済み)で cut 位置を求める -- 計算は session-store の `UsageRecord` 履歴 (実測値) を逆算ソースに使う -- ターン境界は無視。アイテム単位で切る - -```toml -[compaction] -compact_threshold = 80000 -retained_tokens = 8000 # ← retained_turns から変更 -``` - ---- - -## R3: Auto-Read + リファレンス - -### デフォルトリファレンスの抽出 - -`tools::Tracker`(実装済み)が Read/Write/Edit で触られたファイルを LRU で -保持している。Compact 時は `self.tracker.recent_files(5)` で先頭 5 件を -compact worker のデフォルトリファレンスとして渡す。 - -### compact worker のツール - -``` -read_file(path, offset?, limit?) — ファイルを読んで判断するため -mark_read_required(path, offset?, limit?) — auto-read 対象(内容をコンテキストに載せる) -add_reference(path) — リファレンス追加(内容は載せない) -write_summary(text) — 構造化要約を出力/上書き(上書き可) -``` - -`write_summary` は**上書き可**。マルチターンで「下書き → 追加 read → 書き直し」の順序が自然に動く。 -最終的に直近の呼び出しが採用される。ガードは「一度も呼ばれていない」時のみ。 - -### フロー - -1. Pod が `Tracker::recent_files(5)` で最近触られたファイルを抽出(デフォルトリファレンス) -2. compact worker のプロンプトに含める: - - ``` - 以下のファイルがリファレンスとして指定されています。 - 全て読んで、タスク続行に必要なものを mark_read_required で指定してください。 - リファレンスを追加したい場合は add_reference で追加できます。 - ``` - -3. compact worker が read_file で全ファイルを読み、判断: - - 必要なファイル → `mark_read_required(path, offset?, limit?)` - - 不要だがコンテキストとして有用 → リファレンスのまま残す - - 追加のリファレンス → `add_reference(path)` -4. `write_summary` で構造化要約を出力(最後のが採用される) -5. ターン終了時に summary が一度も書かれていない or read_required が空(かつファイル操作履歴がある場合)→ 追加プロンプトで促す - -### Auto-Read の Budget 管理 - -compact worker が `mark_read_required` を無制限に呼ぶとコンテキストが膨張する。 -共有 budget で制御: - -```toml -[compaction] -auto_read_budget = 8000 # 合計トークン上限 -``` - -- `mark_read_required` のツール結果で残量を返す: - `"Marked. Budget: 4200/8000 tokens remaining"` -- 50% 以下になったら次のツール結果に system reminder を append: - `"Budget half consumed. Consider calling write_summary soon."` -- 100% 超過で Err: - `"Error: auto-read budget exhausted (8000 tokens). Remove an existing mark or use add_reference instead."` -- compact worker が判断して自分で調整できる(Err は即中断ではない) - -### compact worker の暴走抑止 - -Turn/request 数ではなく、compact worker の累計入力トークンで上限を設ける: - -```toml -[compaction] -compact_worker_max_input_tokens = 50000 -``` - -超えたら compact worker を強制終了。`CompactState::record_compact_failure()` 経由で -サーキットブレーカーの自然な経路に乗る。 - ---- - -## R4: 要約の内容と品質 - -### 出力方法 - -compact worker が `write_summary(text)` ツールで出力する(上書き可)。 -最後のテキスト出力ではなくツールにする理由: -- マルチターンで read_file → 判断 → 要約の順序が自由 -- 要約を書いた後にさらにリファレンスを追加できる -- 「要約を書いていない」のガードが mark_read_required と同じパターンで検出可能 - -### 含めるべき内容 - -コードスニペットは auto-read に任せる。要約に求めるのは: -1. **何を、なぜやったか** — 意思決定の記録。具体的な型名・関数名で言及 -2. **ユーザーの指示・フィードバックの原文** — ニュアンス保持。重要なもののみ -3. **発生した問題と解決策** — 同じ轍を踏まない -4. **今どこにいて次に何をするか** — compact 前後の一貫性 (R1) - -含めないもの: -- コードの全文(auto-read が担う) -- 変更の diff(git がある) -- 中間のやりとりの詳細(最終結論だけ) - -### フォーマット(5セクション、1000-2000 トークン目安) - -``` -## Completed Tasks -### (タスク名) -- 完了した作業(具体的な型名・ファイル名で) -- 注意点 / 発覚した事実 - -### (タスク名) -- ... - -## Active Task -### (タスク名) -- 目標 -- 現状(何が済んで何が未着手か) -- 次のステップ - -## Key Decisions -- (判断内容) — (理由) -- ... - -## User Directives -- 「(ユーザー発言の原文)」 — 重要な指示・フィードバックのみ -- ... - -## Current Work -(直前に何をしていたか。2-3行) -``` - -各セクションの目安量: -- Completed Tasks: 各タスク 2-3 行 × タスク数 -- Active Task: 5-10 行 -- Key Decisions: 各 1-2 行 -- User Directives: 重要な発言のみ原文引用 -- Current Work: 2-3 行 - -### 要約の入力 - -pruned history から: -- ToolResult は summary のみ(content 除去) -- ToolCall は名前のみ(arguments 除去) -- Reasoning は除去 - ---- - -## 挙動の未決定事項 - -### Yield のタイミング精度 - -現状 `pre_llm_request`(リクエストの合間)でのみチェック。 -1 turn 内でツール呼び出しが多く途中でコンテキストが膨らむケースは次のリクエストまで待つ。 - -検討: `post_tool_call` でもチェックする? - -### 閾値の推奨値 - -- `compact_threshold` (post_run, proactive): モデルのコンテキスト上限の 70-80% あたりが目安 -- `compact_request_threshold` (request, safety net): `compact_threshold` より少し上、85-95% あたり - -両方 manifest で個別指定する(導出はしない)。要調整の余地あり。 - -### Prune と Compact の相互作用 - -Prune はリクエストコンテキストのみ操作。閾値判定は usage_history の最新 -測定値(前回の LLM レスポンス時点の占有量)を見るので、Prune の効果は -次回 LLM call まで反映されない。保守的(compact しすぎる方向)で実害は小さい。 - -### compact 中のクライアント通知 - -Protocol チケットの `CompactStart`/`CompactDone` で対応。 - -### 復元時の挙動 - -`Outcome::Yielded` で記録されたセッションは `last_run_interrupted = true` で復元。 -compact 後の新セッションが存在する場合、どちらを restore するかは呼び出し側の責任。 -`compacted_from` で辿れる。 - ---- - -## 実装順序 - -前提(usage-history / token-counter / tracker)は完了済み。 - -1. **閾値の修正 + リネーム + 個別指定化 + 占有量ソース統合** — manifest に `compact_request_threshold` 追加、`compact_state.rs` の 2 閾値を `Option` 化、`turn_threshold` → `request_threshold` リネーム、`exceeds_turn()` → `exceeds_request()`。`last_input_tokens` 撤去、閾値判定は `Pod::total_tokens()` 経由に切替。compact_state.rs / compact_interceptor.rs / pod.rs / manifest / テスト / docs 更新 -2. **要約入力の削減** — `build_summary_prompt` から content/arguments/reasoning を除去 -3. **retained_tokens 化** — retained_turns → retained_tokens に変更。マニフェスト設定追加 -4. **compact worker のツール化** — read_file + mark_read_required + add_reference + write_summary (上書き可) -5. **Auto-Read + リファレンス** — デフォルト5ファイル抽出 (`Tracker::recent_files` から)、compact worker による選定、system message での注入 -6. **Auto-Read Budget** — `mark_read_required` のトークン会計、残量通知、超過エラー -7. **compact worker の累計入力トークン制限** — `compact_worker_max_input_tokens` -8. **要約フォーマット** — タスク分類の要約プロンプト調整 -9. **ガード** — write_summary 未呼び出し or mark_read_required 空時の追加プロンプト