diff --git a/TODO.md b/TODO.md index faff19b1..13c380e7 100644 --- a/TODO.md +++ b/TODO.md @@ -14,10 +14,9 @@ - [x] JSONL ストリーム変換ユーティリティ (protocol::stream) - [x] Hook モジュールの llm-worker からの除去 → [tickets/remove-hook-module.md](tickets/remove-hook-module.md) - [x] api_key_file: ファイルパスによるAPIキー解決 → [tickets/api-key-file.md](tickets/api-key-file.md) -- [ ] コンテキスト圧縮 (Prune + Compact) → [tickets/context-compaction.md](tickets/context-compaction.md) - - [x] ToolOutput 再設計 + 旧モジュール削除 (Step 1-2) - - [x] prune.rs + PruneHook (Step 3-4) - - [ ] Compact (Step 5-8、session-store-extraction 後) +- [x] コンテキスト圧縮 (Prune + Compact) → [tickets/context-compaction.md](tickets/context-compaction.md) +- [ ] Compact の改善(要約品質 + 挙動詳細) → [tickets/compact-improvements.md](tickets/compact-improvements.md) +- [ ] Protocol の設計 → [tickets/protocol-design.md](tickets/protocol-design.md) - [x] Protocol: request-response パターン (GetHistory等) → [tickets/request-response-protocol.md](tickets/request-response-protocol.md) - [ ] パーミッション: パターンベースのツール実行制御 → [tickets/permission-extension-point.md](tickets/permission-extension-point.md) - [x] session-store: persistence クレートの再構成(wrap廃止、リネーム) → [tickets/session-store-extraction.md](tickets/session-store-extraction.md) diff --git a/crates/llm-worker/examples/worker_cancel_demo.rs b/crates/llm-worker/examples/worker_cancel_demo.rs index 44dc9850..56c69e13 100644 --- a/crates/llm-worker/examples/worker_cancel_demo.rs +++ b/crates/llm-worker/examples/worker_cancel_demo.rs @@ -45,6 +45,7 @@ async fn main() -> Result<(), Box> { WorkerResult::Finished => println!("✅ Task completed normally"), WorkerResult::Paused => println!("⏸️ Task paused"), WorkerResult::LimitReached => println!("🔒 Turn limit reached"), + WorkerResult::Yielded => println!("↩️ Task yielded"), }, Err(e) => { println!("❌ Task error: {}", e); diff --git a/crates/llm-worker/src/interceptor.rs b/crates/llm-worker/src/interceptor.rs index 725fd93d..2b1d7078 100644 --- a/crates/llm-worker/src/interceptor.rs +++ b/crates/llm-worker/src/interceptor.rs @@ -30,8 +30,13 @@ pub enum PromptAction { pub enum PreRequestAction { /// Proceed normally. Continue, - /// Cancel with a reason. + /// Cancel with a reason (treated as an error). Cancel(String), + /// Yield control to the caller for external processing. + /// + /// The Worker exits the turn loop cleanly with `WorkerResult::Yielded`. + /// The caller is expected to resume execution later. + Yield, } /// Action before a tool call. diff --git a/crates/llm-worker/src/worker.rs b/crates/llm-worker/src/worker.rs index 1bb0a52e..e52f2e55 100644 --- a/crates/llm-worker/src/worker.rs +++ b/crates/llm-worker/src/worker.rs @@ -71,6 +71,12 @@ pub enum WorkerResult { Paused, /// Turn limit reached (max_turns exceeded) LimitReached, + /// Yielded to caller for external processing (e.g. context compaction). + /// + /// Distinct from `Paused`: internal machinery, not user-facing. The + /// caller is expected to perform some side work and then call `resume()` + /// to continue the turn loop. + Yielded, } /// Result of [`Worker::run()`] / [`Worker::resume()`]. @@ -702,6 +708,14 @@ impl Worker { self.last_run_interrupted = true; return Err(WorkerError::Aborted(reason)); } + PreRequestAction::Yield => { + info!("Yielded by interceptor"); + for cb in &self.turn_end_cbs { + cb(current_turn); + } + self.last_run_interrupted = true; + return Ok(WorkerResult::Yielded); + } PreRequestAction::Continue => {} } diff --git a/crates/pod/src/compact_interceptor.rs b/crates/pod/src/compact_interceptor.rs new file mode 100644 index 00000000..32238d83 --- /dev/null +++ b/crates/pod/src/compact_interceptor.rs @@ -0,0 +1,76 @@ +//! CompactInterceptor — wraps HookInterceptor with urgent compaction check. +//! +//! Decorator that delegates all [`Interceptor`] methods to the inner +//! `HookInterceptor`, then adds a token-count check in `pre_llm_request`. +//! When `last_input_tokens` exceeds the turn threshold, returns +//! `PreRequestAction::Yield` so the Worker exits the turn loop cleanly +//! with `WorkerResult::Yielded` and Pod can perform compaction. + +use std::sync::Arc; + +use async_trait::async_trait; +use llm_worker::interceptor::{ + Interceptor, PostToolAction, PreRequestAction, PreToolAction, PromptAction, ToolCallInfo, + ToolResultInfo, TurnEndAction, +}; +use llm_worker::Item; +use tracing::info; + +use crate::compact_state::CompactState; +use crate::hook_interceptor::HookInterceptor; + +/// Interceptor that wraps HookInterceptor and adds between-turns +/// compaction threshold check. +pub(crate) struct CompactInterceptor { + inner: HookInterceptor, + state: Arc, +} + +impl CompactInterceptor { + pub(crate) fn new(inner: HookInterceptor, state: Arc) -> Self { + Self { inner, state } + } +} + +#[async_trait] +impl Interceptor for CompactInterceptor { + async fn on_prompt_submit(&self, item: &mut Item) -> PromptAction { + self.inner.on_prompt_submit(item).await + } + + async fn pre_llm_request(&self, context: &mut Vec) -> PreRequestAction { + // Step 1: Delegate to inner (PruneHook and other hooks run first). + let inner_action = self.inner.pre_llm_request(context).await; + if !matches!(inner_action, PreRequestAction::Continue) { + return inner_action; + } + + // Step 2: Check between-turns compaction threshold. + if !self.state.is_disabled() && self.state.exceeds_turn() { + info!( + input_tokens = self.state.last_input_tokens(), + threshold = self.state.turn_threshold(), + "Between-turns compaction threshold exceeded, yielding" + ); + return PreRequestAction::Yield; + } + + PreRequestAction::Continue + } + + async fn pre_tool_call(&self, info: &mut ToolCallInfo) -> PreToolAction { + self.inner.pre_tool_call(info).await + } + + async fn post_tool_call(&self, info: &mut ToolResultInfo) -> PostToolAction { + self.inner.post_tool_call(info).await + } + + async fn on_turn_end(&self, history: &[Item]) -> TurnEndAction { + self.inner.on_turn_end(history).await + } + + async fn on_abort(&self, reason: &str) { + self.inner.on_abort(reason).await; + } +} diff --git a/crates/pod/src/compact_state.rs b/crates/pod/src/compact_state.rs new file mode 100644 index 00000000..30e0da37 --- /dev/null +++ b/crates/pod/src/compact_state.rs @@ -0,0 +1,174 @@ +//! Shared state for compaction decisions. +//! +//! Holds atomic counters shared between: +//! - `on_usage` callback (writes `last_input_tokens`) +//! - `CompactInterceptor` (reads token count, checks thresholds) +//! - `Pod::run()`/`resume()` (circuit breaker, thrash detection) + +use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}; + +const MAX_COMPACT_FAILURES: usize = 3; + +/// Shared mutable state for compaction decisions. +pub(crate) struct CompactState { + /// Last observed input_tokens from `on_usage` callback. + last_input_tokens: AtomicU64, + /// Proactive threshold — checked in `pre_llm_request` (between turns). + turn_threshold: u64, + /// Post-run threshold — checked by Controller after run completes. + post_run_threshold: u64, + /// Number of recent turns to retain after compaction. + retained_turns: usize, + /// Consecutive compact failures. At `MAX_COMPACT_FAILURES`, compaction is disabled. + consecutive_failures: AtomicUsize, + /// `true` immediately after a successful compact, cleared on next normal completion. + just_compacted: AtomicBool, + /// `true` when circuit breaker has tripped. + disabled: AtomicBool, +} + +impl CompactState { + /// Create a new CompactState. + /// + /// `turn_threshold` is the proactive (80%) threshold from the manifest. + /// `post_run_threshold` is derived as `turn_threshold * 9 / 8` (≈90%). + pub(crate) fn new(turn_threshold: u64, retained_turns: usize) -> Self { + Self { + last_input_tokens: AtomicU64::new(0), + turn_threshold, + post_run_threshold: turn_threshold * 9 / 8, + retained_turns, + consecutive_failures: AtomicUsize::new(0), + just_compacted: AtomicBool::new(false), + disabled: AtomicBool::new(false), + } + } + + /// Update the last observed input_tokens (called from `on_usage`). + pub(crate) fn update_input_tokens(&self, tokens: u64) { + self.last_input_tokens.store(tokens, Ordering::Relaxed); + } + + /// Read the last observed input_tokens. + pub(crate) fn last_input_tokens(&self) -> u64 { + self.last_input_tokens.load(Ordering::Relaxed) + } + + /// The between-turns threshold value. + pub(crate) fn turn_threshold(&self) -> u64 { + self.turn_threshold + } + + /// Number of turns to retain after compaction. + pub(crate) fn retained_turns(&self) -> usize { + self.retained_turns + } + + /// Whether compaction has been disabled by the circuit breaker. + pub(crate) fn is_disabled(&self) -> bool { + self.disabled.load(Ordering::Relaxed) + } + + /// Whether `last_input_tokens` exceeds the between-turns threshold. + pub(crate) fn exceeds_turn(&self) -> bool { + self.last_input_tokens() > self.turn_threshold + } + + /// Whether `last_input_tokens` exceeds the post-run threshold. + pub(crate) fn exceeds_post_run(&self) -> bool { + self.last_input_tokens() > self.post_run_threshold + } + + /// Whether a compact just completed (for thrash detection). + pub(crate) fn just_compacted(&self) -> bool { + self.just_compacted.load(Ordering::Relaxed) + } + + /// Set or clear the just_compacted flag. + pub(crate) fn set_just_compacted(&self, val: bool) { + self.just_compacted.store(val, Ordering::Relaxed); + } + + /// Record a successful compaction: reset failure counter, set just_compacted. + pub(crate) fn record_compact_success(&self) { + self.consecutive_failures.store(0, Ordering::Relaxed); + self.just_compacted.store(true, Ordering::Relaxed); + } + + /// Record a compaction failure. Disables compaction after MAX_COMPACT_FAILURES. + pub(crate) fn record_compact_failure(&self) { + let prev = self.consecutive_failures.fetch_add(1, Ordering::Relaxed); + if prev + 1 >= MAX_COMPACT_FAILURES { + self.disabled.store(true, Ordering::Relaxed); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn threshold_derivation() { + let state = CompactState::new(80_000, 2); + assert_eq!(state.turn_threshold, 80_000); + assert_eq!(state.post_run_threshold, 90_000); + assert_eq!(state.retained_turns(), 2); + } + + #[test] + fn exceeds_checks() { + let state = CompactState::new(80_000, 2); + assert!(!state.exceeds_turn()); + assert!(!state.exceeds_post_run()); + + state.update_input_tokens(85_000); + assert!(state.exceeds_turn()); + assert!(!state.exceeds_post_run()); + + state.update_input_tokens(95_000); + assert!(state.exceeds_turn()); + assert!(state.exceeds_post_run()); + } + + #[test] + fn circuit_breaker_trips_after_max_failures() { + let state = CompactState::new(80_000, 2); + assert!(!state.is_disabled()); + + state.record_compact_failure(); + assert!(!state.is_disabled()); + state.record_compact_failure(); + assert!(!state.is_disabled()); + state.record_compact_failure(); + assert!(state.is_disabled()); + } + + #[test] + fn success_resets_failure_count() { + let state = CompactState::new(80_000, 2); + state.record_compact_failure(); + state.record_compact_failure(); + assert!(!state.is_disabled()); + + state.record_compact_success(); + assert!(state.just_compacted()); + + // After success + 2 more failures, still not disabled (count was reset). + state.record_compact_failure(); + state.record_compact_failure(); + assert!(!state.is_disabled()); + } + + #[test] + fn just_compacted_lifecycle() { + let state = CompactState::new(80_000, 2); + assert!(!state.just_compacted()); + + state.record_compact_success(); + assert!(state.just_compacted()); + + state.set_just_compacted(false); + assert!(!state.just_compacted()); + } +} diff --git a/crates/pod/src/controller.rs b/crates/pod/src/controller.rs index a80465b9..bbce7262 100644 --- a/crates/pod/src/controller.rs +++ b/crates/pod/src/controller.rs @@ -191,6 +191,13 @@ impl PodController { ) .await; + // Proactive post-run compaction (best-effort). + if new_status == PodStatus::Idle { + if let Err(e) = pod.try_post_run_compact().await { + tracing::warn!(error = %e, "Post-run compaction error"); + } + } + let items = pod.worker().history().to_vec(); shared_state.update_history(items); shared_state.set_status(new_status); @@ -218,6 +225,13 @@ impl PodController { ) .await; + // Proactive post-run compaction (best-effort). + if new_status == PodStatus::Idle { + if let Err(e) = pod.try_post_run_compact().await { + tracing::warn!(error = %e, "Post-run compaction error"); + } + } + let items = pod.worker().history().to_vec(); shared_state.update_history(items); shared_state.set_status(new_status); diff --git a/crates/pod/src/lib.rs b/crates/pod/src/lib.rs index 4e4113fc..5174b81e 100644 --- a/crates/pod/src/lib.rs +++ b/crates/pod/src/lib.rs @@ -6,6 +6,8 @@ pub mod socket_server; pub mod prune_hook; +mod compact_interceptor; +mod compact_state; mod hook_interceptor; mod pod; diff --git a/crates/pod/src/pod.rs b/crates/pod/src/pod.rs index d28cff62..ecd70eb9 100644 --- a/crates/pod/src/pod.rs +++ b/crates/pod/src/pod.rs @@ -9,9 +9,12 @@ use llm_worker::{Worker, WorkerError, WorkerResult}; use session_store::{ EntryHash, Outcome, SessionId, SessionStartState, Store, StoreError, }; +use tracing::{info, warn}; use manifest::{PodManifest, Scope, WorkerManifest}; +use crate::compact_interceptor::CompactInterceptor; +use crate::compact_state::CompactState; use crate::hook::{ Hook, HookRegistryBuilder, OnAbort, OnPromptSubmit, OnTurnEnd, PostToolCall, PreLlmRequest, PreToolCall, @@ -48,6 +51,8 @@ pub struct Pod { interceptor_installed: bool, /// Directory containing the manifest file (needed for api_key_file resolution). manifest_dir: Option, + /// Shared compaction state (present when compact_threshold is configured). + compact_state: Option>, } impl Pod { @@ -74,6 +79,7 @@ impl Pod { hook_builder: HookRegistryBuilder::new(), interceptor_installed: false, manifest_dir: None, + compact_state: None, }) } @@ -105,6 +111,7 @@ impl Pod { hook_builder: HookRegistryBuilder::new(), interceptor_installed: false, manifest_dir: None, + compact_state: None, }) } @@ -187,34 +194,59 @@ impl Pod { } /// Install the hook-based interceptor on the Worker if not already done. + /// + /// When `compact_threshold` is configured in the manifest, wraps the + /// `HookInterceptor` in a [`CompactInterceptor`] and registers an + /// `on_usage` callback to track `input_tokens`. fn ensure_interceptor_installed(&mut self) { if !self.interceptor_installed { let builder = std::mem::take(&mut self.hook_builder); let registry = Arc::new(builder.build()); - let interceptor = HookInterceptor::new(registry); - self.worker_mut().set_interceptor(interceptor); + let hook_interceptor = HookInterceptor::new(registry); + + let compact_threshold = self + .manifest + .compaction + .as_ref() + .and_then(|c| c.compact_threshold); + + if let Some(threshold) = compact_threshold { + let retained = self + .manifest + .compaction + .as_ref() + .map(|c| c.compact_retained_turns) + .unwrap_or(2); + + let state = Arc::new(CompactState::new(threshold, retained)); + + // Track input_tokens via on_usage callback. + let state_for_usage = state.clone(); + self.worker_mut().on_usage(move |event| { + if let Some(tokens) = event.input_tokens { + state_for_usage.update_input_tokens(tokens); + } + }); + + let interceptor = CompactInterceptor::new(hook_interceptor, state.clone()); + self.worker_mut().set_interceptor(interceptor); + self.compact_state = Some(state); + } else { + self.worker_mut().set_interceptor(hook_interceptor); + } + self.interceptor_installed = true; } } /// Send user input and run until the LLM turn completes. + /// + /// If the between-turns compaction threshold is exceeded mid-run, + /// the Worker is aborted, history is compacted, and execution resumes + /// automatically. pub async fn run(&mut self, input: impl Into) -> Result { self.ensure_interceptor_installed(); - - // Split borrow: access worker field directly to allow concurrent - // mutable borrows on session_id / head_hash. - let w = self.worker.as_ref().unwrap(); - session_store::ensure_head_or_fork( - &self.store, - &mut self.session_id, - &mut self.head_hash, - SessionStartState { - system_prompt: w.get_system_prompt(), - config: w.request_config(), - history: w.history(), - }, - ) - .await?; + self.ensure_session_head().await?; let history_before = self.worker.as_ref().unwrap().history().len(); @@ -224,14 +256,27 @@ impl Pod { let result = locked.run(input).await; self.worker = Some(locked.unlock()); - self.persist_turn(history_before, &result).await?; - result.map(PodRunResult::from).map_err(PodError::Worker) + self.handle_worker_result(result, history_before).await } /// Resume from a paused state. pub async fn resume(&mut self) -> Result { self.ensure_interceptor_installed(); + self.ensure_session_head().await?; + let history_before = self.worker.as_ref().unwrap().history().len(); + + // lock → resume → unlock + let worker = self.worker.take().expect("worker taken during run"); + let mut locked = worker.lock(); + let result = locked.resume().await; + self.worker = Some(locked.unlock()); + + self.handle_worker_result(result, history_before).await + } + + /// Ensure session head exists (fork if needed). + async fn ensure_session_head(&mut self) -> Result<(), PodError> { let w = self.worker.as_ref().unwrap(); session_store::ensure_head_or_fork( &self.store, @@ -244,19 +289,109 @@ impl Pod { }, ) .await?; + Ok(()) + } - let history_before = self.worker.as_ref().unwrap().history().len(); - - // lock → resume → unlock - let worker = self.worker.take().expect("worker taken during run"); - let mut locked = worker.lock(); - let result = locked.resume().await; - self.worker = Some(locked.unlock()); - + /// Handle Worker result: always persist the turn first, then if + /// `Yielded`, perform compaction and resume. + /// + /// Persisting before compaction ensures that if compact fails, the + /// turn is fully recorded in the old session (interrupted, outcome + /// `Yielded`), so restore remains consistent. + async fn handle_worker_result( + &mut self, + result: Result, + history_before: usize, + ) -> Result { self.persist_turn(history_before, &result).await?; + + if matches!(result, Ok(WorkerResult::Yielded)) { + return self.do_compact_and_resume().await; + } + + if result.is_ok() { + if let Some(ref state) = self.compact_state { + state.set_just_compacted(false); + } + } result.map(PodRunResult::from).map_err(PodError::Worker) } + /// Perform compaction after a `compact_needed` abort and resume execution. + /// + /// Uses `Box::pin` for the recursive `resume()` call to break the + /// async layout cycle (`run → handle_worker_result → do_compact_and_resume → resume`). + fn do_compact_and_resume( + &mut self, + ) -> std::pin::Pin> + Send + '_>> + { + Box::pin(async move { + // Thrash detection: if we just compacted and hit the threshold again, + // something is wrong. + if let Some(ref state) = self.compact_state { + if state.just_compacted() { + state.set_just_compacted(false); + return Err(PodError::CompactThrash); + } + } + + let retained = self + .compact_state + .as_ref() + .map(|s| s.retained_turns()) + .unwrap_or(2); + + match self.compact(retained).await { + Ok(new_session_id) => { + info!( + new_session_id = %new_session_id, + "Compaction succeeded, resuming execution" + ); + if let Some(ref state) = self.compact_state { + state.record_compact_success(); + } + self.resume().await + } + Err(e) => { + warn!(error = %e, "Compaction failed during run"); + if let Some(ref state) = self.compact_state { + state.record_compact_failure(); + } + Err(e) + } + } + }) + } + + /// Attempt proactive compaction (called by Controller after run). + /// + /// Best-effort: failures are logged but do not propagate. + pub async fn try_post_run_compact(&mut self) -> Result<(), PodError> { + let state = match self.compact_state.as_ref() { + Some(s) if !s.is_disabled() && s.exceeds_post_run() && !s.just_compacted() => { + s.clone() + } + _ => return Ok(()), + }; + + let retained = state.retained_turns(); + match self.compact(retained).await { + Ok(new_session_id) => { + info!( + new_session_id = %new_session_id, + "Proactive post-run compaction succeeded" + ); + state.record_compact_success(); + Ok(()) + } + Err(e) => { + warn!(error = %e, "Proactive post-run compaction failed"); + state.record_compact_failure(); + Ok(()) + } + } + } + /// Persist delta + turn end + outcome after a run/resume. async fn persist_turn( &mut self, @@ -289,6 +424,7 @@ impl Pod { Ok(WorkerResult::Finished) => Outcome::Finished, Ok(WorkerResult::Paused) => Outcome::Paused, Ok(WorkerResult::LimitReached) => Outcome::LimitReached, + Ok(WorkerResult::Yielded) => Outcome::Yielded, Err(e) => Outcome::Error { message: e.to_string(), }, @@ -440,6 +576,7 @@ impl Pod, St> { hook_builder: HookRegistryBuilder::new(), interceptor_installed: false, manifest_dir, + compact_state: None, }) } @@ -478,6 +615,9 @@ impl From for PodRunResult { WorkerResult::Finished => PodRunResult::Finished, WorkerResult::Paused => PodRunResult::Paused, WorkerResult::LimitReached => PodRunResult::LimitReached, + // Yielded is internal to Pod: it's always caught by + // handle_worker_result and never converted to PodRunResult. + WorkerResult::Yielded => unreachable!("Yielded never converts to PodRunResult"), } } } @@ -527,4 +667,7 @@ pub enum PodError { #[error(transparent)] Provider(#[from] provider::ProviderError), + + #[error("compaction thrash: context still exceeds threshold immediately after compact")] + CompactThrash, } diff --git a/crates/session-store/src/session_log.rs b/crates/session-store/src/session_log.rs index f1f9fe89..8f43a3a0 100644 --- a/crates/session-store/src/session_log.rs +++ b/crates/session-store/src/session_log.rs @@ -159,6 +159,9 @@ pub enum Outcome { Finished, Paused, LimitReached, + /// Worker yielded control to the caller for external processing. + /// Distinct from `Paused`: caller handles internally and resumes. + Yielded, Error { message: String }, } diff --git a/crates/session-store/tests/session_test.rs b/crates/session-store/tests/session_test.rs index 01157cc4..e1980b3f 100644 --- a/crates/session-store/tests/session_test.rs +++ b/crates/session-store/tests/session_test.rs @@ -119,6 +119,7 @@ async fn run_and_persist( Ok(llm_worker::WorkerResult::Finished) => Outcome::Finished, Ok(llm_worker::WorkerResult::Paused) => Outcome::Paused, Ok(llm_worker::WorkerResult::LimitReached) => Outcome::LimitReached, + Ok(llm_worker::WorkerResult::Yielded) => Outcome::Yielded, Err(e) => Outcome::Error { message: e.to_string(), }, diff --git a/tickets/compact-improvements.md b/tickets/compact-improvements.md new file mode 100644 index 00000000..62ea79d0 --- /dev/null +++ b/tickets/compact-improvements.md @@ -0,0 +1,142 @@ +# Compact の改善 + +## 背景 + +`Pod::compact()` とその周辺機構(CompactInterceptor, CompactState, Controller 統合)は +実装済み。挙動の詳細に未決定事項が残っており、要約品質にも改善余地がある。 + +--- + +## 1. 要約入力の改善 + +### 現状の問題 + +`build_summary_prompt()` が全 Item をフラットにテキスト化して LLM に投げている。 + +1. **データが多すぎる**: ToolResult の content(ファイル内容、grep 結果等)を含めている +2. **単一関心事の前提**: "Original Task" が1つだけ。タスク切り替わりに対応できない + +### Phase 1: 入力データの削減 + +`build_summary_prompt` で渡すデータを絞る: + +```rust +fn build_summary_prompt(items: &[Item]) -> String { + for item in items { + match item { + Item::ToolResult { summary, .. } => { + // content を含めない。summary だけ + lines.push(format!("[ToolResult] {summary}")); + } + Item::ToolCall { name, .. } => { + // arguments を含めない。ツール名だけ + lines.push(format!("[ToolCall] {name}")); + } + Item::Reasoning { .. } => { + // skip(内部思考は要約に不要) + } + // User/Assistant のテキストはそのまま + } + } +} +``` + +### Phase 2: 要約フォーマットの改善 + +タスク切り替わりを反映する: + +``` +## Tasks +### Task 1: (最初のユーザー指示) +- 完了した作業 +- 判明した事実 + +### Task 2: (次のユーザー指示) +- 完了した作業 +- 判明した事実 + +## Current State +- (変更されたファイル、残タスク) +``` + +### Phase 3: マルチターン要約 Worker + +1リクエストで全部読ませるのではなく、要約 Worker にツールを持たせて自律的に要約させる。 + +``` +要約 Worker: + system: 「セッションログを読んで構造化要約を生成せよ」 + ツール: read_session_segment(offset, limit) +``` + +利点: +- 巨大セッションでもコンテキストに収まる +- Worker が自分で「重要/不要」を判断できる +- タスク切り替わりを検出し、関心事ごとに要約できる + +builtin-tools チケットとの依存あり。 + +--- + +## 2. 挙動の未決定事項 + +### 現在の挙動 + +**トリガー(2段階):** +1. ターン間 (CompactInterceptor): `input_tokens > turn_threshold` → Yield → compact + resume +2. run 後 (Controller): `input_tokens > post_run_threshold (×9/8)` → best-effort compact + +**安全機構:** +- サーキットブレーカー: 3回連続失敗で無効化 +- Thrash 検出: compact 直後に再び閾値超過 → CompactThrash エラー +- Yield 前の永続化: persist_turn を compact の前に実行 + +### 2-1. Yield のタイミング精度 + +現状: `pre_llm_request` でチェック = ターンの切れ目でしか発火しない。 +1ターン内でツール呼び出しが多く、途中でコンテキストが膨らむケースは次のターンまで待つ。 + +検討: +- ツール実行後にもチェックする?(`post_tool_call` で Yield 相当の処理) +- 現状の「ターン切れ目のみ」で十分か? + +### 2-2. 閾値の導出 + +- `turn_threshold` = マニフェストの `compact_threshold` そのまま +- `post_run_threshold` = `turn_threshold * 9 / 8`(≈ 112.5%) + +9/8 の根拠はない(安全マージン)。マニフェストで個別指定可能にする? + +### 2-3. Prune と Compact の相互作用 + +``` +pre_llm_request: + 1. PruneHook(content を除去) + 2. CompactInterceptor(トークン数チェック) +``` + +Prune はリクエストコンテキストのみ操作し、`last_input_tokens` は前回の LLM レスポンスの値。 +Prune の効果は `last_input_tokens` に反映されず、Compact の判断には影響しない。 +→ Prune で十分に縮んでも Compact が走る可能性がある。保守的で実害は小さい。 + +### 2-4. compact 中のクライアント通知 + +compact は LLM 呼び出しを伴う。この間 Controller は Pod を占有。 +`AlreadyRunning` エラーで弾かれる。Protocol チケットの `CompactStart`/`CompactDone` で対応。 + +### 2-5. 復元時の挙動 + +`Outcome::Yielded` で記録されたセッションを restore した場合: +- `last_run_interrupted = true` で復元 +- Pod は resume 可能(通常の interrupted セッションと同じ) +- compact 後の新セッションが存在する場合、どちらを restore するかは呼び出し側の責任 +- `compacted_from` で辿れる + +--- + +## 実装順序 + +1. Phase 1(content/arguments/reasoning 除去)→ `build_summary_prompt` の変更のみ +2. 挙動の未決定事項 → 実運用でのフィードバックを元に判断 +3. Phase 2(フォーマット改善)→ チューニング +4. Phase 3(マルチターン要約)→ builtin-tools 後 diff --git a/tickets/context-compaction.md b/tickets/context-compaction.md deleted file mode 100644 index 75d2f232..00000000 --- a/tickets/context-compaction.md +++ /dev/null @@ -1,344 +0,0 @@ -# コンテキスト圧縮: Prune + Compact - -## 背景 - -長時間実行エージェントにとって、コンテキストウィンドウの管理はコア要件。 -現状の Worker は history をそのまま保持し、オーバーフロー時の対策がない。 - -Claude Code の3層構造(MicroCompaction / AutoCompact / Full Compact)を参考に、 -Insomnia では2層(条件付き Prune + Compact)で対処する。 - -参考: [docs/ref/claude-code-compaction.md](../docs/ref/claude-code-compaction.md) - ---- - -## 前提: ToolOutput の再設計 - -Prune の設計は ToolOutput の構造に依存する。 -現行の Inline/Stored enum を **summary + content** の2フィールド構造に改める。 - -詳細: ~~[tool-output-design.md](tool-output-design.md)~~ — **実装済み** - -### 構造 - -```rust -pub struct ToolOutput { - pub summary: String, // 1-2行。常に残る - pub content: Option, // 詳細。Prune で消える -} -``` - -```rust -Item::ToolResult { - call_id: CallId, - summary: String, - content: Option, -} -``` - -### Prune との関係 - -- summary: Prune 後も残る。「何をしたか」の最低限の情報 -- content: Prune 対象。`None` に置換するだけ -- 巨大出力はツール側がファイルに退避し、content に見取り図を置く - -### 削除対象 - -ToolOutput 再設計に伴い、以下を削除: - -- `ToolOutput` enum(Inline/Stored)→ struct に置換 -- `Content` enum, `auto_summarize`, `ToolOutputProcessor` trait -- `BlobStore` trait, `FsBlobStore`, `BlobOutputProcessor` -- `inspect_tool.rs`(汎用の read_file/grep で代替) -- Worker の `output_processor` フィールド - ---- - -## Phase 1: 条件付き Prune - -### 概要 - -Claude Code の `clear_at_least` パターンに倣い、**削れるトークン量が閾値を超える場合にのみ** Prune を実行する。キャッシュを無駄に壊さない。 - -### キャッシュの制約 - -全主要プロバイダ(Anthropic / OpenAI / Gemini)で KV キャッシュはプレフィクスベース。 -プレフィクス中のアイテムを変更すると、**変更地点以降が全て再計算**になる。 - -``` -キャッシュ済み: [A, B, C, D, E] -Prune: [A', B, C, D, E] ← A の content を消した -再計算: [A', B, C, D, E] ← A' 以降すべて -``` - -Prune で得られるトークン節約 vs キャッシュ再計算コスト。 -`min_savings` 閾値で「削る価値がある場合だけ」実行する。 - -### コード配置 - -| 場所 | 内容 | -|------|------| -| `crates/llm-worker/src/prune.rs` | Prune アルゴリズム(集計 + 置換) | -| `crates/pod/src/prune_hook.rs` | `PruneHook`(`Hook` 実装) | - -### アルゴリズム - -```rust -pub struct PruneConfig { - /// Prune 対象外とする直近ターン数 - pub protected_turns: usize, - /// この推定トークン数以上削れる場合にのみ Prune を実行 - pub min_savings: usize, -} - -pub fn prune(items: &mut Vec, config: &PruneConfig) -> bool { - // 1. ターン境界の特定(UserMessage 出現位置) - let turn_starts = find_turn_starts(items); - if turn_starts.len() <= config.protected_turns { - return false; - } - let boundary = turn_starts[turn_starts.len() - config.protected_turns]; - - // 2. Prune 可能なトークン数を集計 - let mut total_savings: usize = 0; - let mut prunable: Vec = Vec::new(); - - for (i, item) in items[..boundary].iter().enumerate() { - if let Item::ToolResult { content: Some(c), .. } = item { - total_savings += c.len() / 4; // 粗い推定 - prunable.push(i); - } - } - - // 3. 閾値チェック - if total_savings < config.min_savings { - return false; - } - - // 4. Prune: content を None にするだけ - for &i in &prunable { - if let Item::ToolResult { content, .. } = &mut items[i] { - *content = None; - } - } - true -} -``` - -### PruneHook - -```rust -pub struct PruneHook { - config: PruneConfig, -} - -#[async_trait] -impl Hook for PruneHook { - async fn call(&self, context: &mut Vec) -> PreRequestAction { - prune(context, &self.config); - PreRequestAction::Continue - } -} -``` - -### 特性 - -- **条件付き**: 集計して閾値を超えた場合のみ実行 -- **冪等**: `content: None` のアイテムはスキップ -- **非破壊**: history 本体は変更しない。Prune 状態(どこまで刈ったか)を Pod が保持し、LLM リクエスト構築時に反映する -- **単純**: Prune = `content = None`。blob 参照の解析やサマリ生成は不要 - ---- - -## Phase 2: Compact - -### 概要 - -history 全体を要約で置き換える。 -別の Worker(要約専用・ツールなし)で要約を生成する。 - -### トリガー - -Controller が `input_tokens` を追跡し、run 完了後に閾値と比較。 - -```rust -let last_input_tokens = Arc::new(AtomicU64::new(0)); -{ - let tracker = last_input_tokens.clone(); - worker.on_usage(move |event| { - if let Some(tokens) = event.input_tokens { - tracker.store(tokens, Ordering::Relaxed); - } - }); -} -``` - -### サーキットブレーカー - -```rust -const MAX_COMPACT_FAILURES: usize = 3; -// 3回連続失敗で compaction を無効化 -``` - -### Compaction フロー - -session-store-extraction 後の構造を前提とする。 -Pod が Worker を直接保持し、session-store は save/restore の関数群。 - -``` -Run 完了 → input_tokens > threshold - ↓ -Pod: worker.history() + worker.request_config() を読み出す - ↓ -Pod: build_client(&manifest.provider) で要約用 Worker を生成(ツールなし、temperature=0) - ↓ -要約 Worker: history を要約プロンプトとして受け取り、構造化要約を生成 - ↓ -Pod: [要約 Item, 直近 N ターン] で新 history を構築 - ↓ -Pod: worker.set_history(新 history) - ↓ -Pod: session_store::save_compacted(store, new_id, compacted_from, ...) で新セッション開始 - ↓ -旧セッション JSONL はそのまま保全(append-only 原則を維持) -``` - -``` -旧セッション (abc-123): - [entry0] → [entry1] → ... → [entryN] ← そのまま残る - -新セッション (def-456): - [SessionStart { history: [要約 + 直近N], compacted_from: (abc-123, entryN.hash) }] → ... -``` - -### SessionStart の出自フィールド - -```rust -LogEntry::SessionStart { - ts: u64, - system_prompt: Option, - config: RequestConfig, - history: Vec, - /// fork 由来の場合、元セッションと分岐点 - forked_from: Option<(SessionId, EntryHash)>, - /// compact 由来の場合、元セッションと圧縮時点 - compacted_from: Option<(SessionId, EntryHash)>, -} -``` - -- 通常の新規セッション: 両方 `None` -- fork: `forked_from = Some(...)` -- compact: `compacted_from = Some(...)` -- EntryHash で元セッションのどの時点からの操作かを追跡可能 - -### 要約用 Worker - -- `build_client(&manifest.provider, manifest_dir)` で新しい LlmClient を作る - - reqwest::Client は内部 Arc。1回きりのリクエストなので新規プールで問題なし -- Pod が `manifest_dir` を保持する必要がある(現状 `from_manifest` では受け取るが保持していない) - -### 要約プロンプト - -TODO: system prompt の文面、history を文字列化する方法を詰める。 - -出力フォーマット: - -``` -## Original Task -(元のユーザー指示) - -## Completed Work -- (完了した作業。ファイルパス・関数名等の具体情報) - -## Key Discoveries -- (判明した事実・制約・エラー) - -## Current State -- (変更されたファイル、残タスク) -``` - -### エラーハンドリング - -- 要約 Worker エラー → 警告ログ、スキップ、consecutive_failures++ -- 3回連続失敗 → セッション残りで compaction 無効化 -- Thrash loop(compaction 直後に再び閾値超過)→ エラーで停止 - ---- - -## 設定 - -### マニフェスト - -```toml -[compaction] -# Prune: 直近何ターンを保護するか(デフォルト: 3) -prune_protected_turns = 3 - -# Prune: この推定トークン数以上削れる場合にのみ実行(デフォルト: 4096) -prune_min_savings = 4096 - -# Compact: input_tokens がこの値を超えたら要約を実行(省略 = 無効) -compact_threshold = 80000 - -# Compact: 圧縮後に保持するターン数(デフォルト: 2) -compact_retained_turns = 2 -``` - -```rust -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct CompactionConfig { - #[serde(default = "default_prune_protected_turns")] - pub prune_protected_turns: usize, // default: 3 - #[serde(default = "default_prune_min_savings")] - pub prune_min_savings: usize, // default: 4096 - pub compact_threshold: Option, - #[serde(default = "default_compact_retained_turns")] - pub compact_retained_turns: usize, // default: 2 -} -``` - -### デフォルト動作 - -- `[compaction]` 省略: Prune も Compact も無効 -- `[compaction]` あり・`compact_threshold` 省略: Prune のみ有効 - ---- - -## 設計判断 - -| 判断 | 理由 | -|------|------| -| ToolOutput を summary + content に | Prune が `content = None` で済む。blob/inspect の複雑さが消える | -| BlobStore / inspect を削除 | 巨大出力はツール側の責務。フレームワークは summary/content を受け取るだけ | -| Prune は条件付き(`min_savings`) | KV キャッシュ無効化コスト vs 節約量。Claude Code の `clear_at_least` に倣う | -| Prune は request context を操作 | history 本体を保全。session log の完全性を維持 | -| Compact は run 間で実行 | 要約は LLM 呼び出しを伴う。ターンループ内では Prune が対処 | -| サーキットブレーカー | 連続失敗の無限ループ防止。Claude Code の知見 | -| 新しい trait は不要 | 設計原則3: Hook + Controller 制御 + set_history() で完結 | - ---- - -## 実装順序 - -1. ~~**ToolOutput 再設計**~~ — 実装済み -2. ~~**旧モジュール削除**~~ — 実装済み -3. ~~**`prune.rs`**~~ — 実装済み(`crates/llm-worker/src/prune.rs`) -4. ~~**`PruneHook`**~~ — 実装済み(`crates/pod/src/prune_hook.rs`) -5. ~~**`CompactionConfig`**~~ — 実装済み(`manifest::CompactionConfig`) -6. ~~**`LogEntry` に provenance フィールド追加**~~ — 実装済み(`SessionOrigin`, `forked_from`, `compacted_from`) -7. ~~**`compact()` 関数**~~ — 実装済み(`Pod::compact()`)。サーキットブレーカーは Controller 統合時に追加 -8. **Protocol イベント** — 保留(Controller 統合時に必要に応じて追加) - -### 残作業 - -- Controller への統合: run 完了後に `input_tokens > threshold` をチェックし `pod.compact()` を呼ぶ -- サーキットブレーカー(consecutive failures カウンタ) -- Thrash loop 検出(compact 直後に再び閾値超過 → エラー停止) -- 要約プロンプトの調整(実運用でのチューニング) - ---- - -## 依存チケット - -- ~~[remove-hook-module.md](remove-hook-module.md)~~ — 完了 -- ~~[session-store-extraction.md](session-store-extraction.md)~~ — 完了 diff --git a/tickets/protocol-design.md b/tickets/protocol-design.md new file mode 100644 index 00000000..a6f6f3f4 --- /dev/null +++ b/tickets/protocol-design.md @@ -0,0 +1,84 @@ +# Protocol の設計 + +## 背景 + +現状の Protocol (`Method` / `Event`) は最低限のストリーミングイベントのみ。 +機能が増えるにつれ、以下が不足している: + +- Compact 発生時のクライアント通知 +- Permission の ask/reply フロー +- セッション切り替え(compact 後の新 session_id 通知) +- クライアント→Pod の制御拡張(設定変更等) + +## 現状の Protocol + +### Method (Client → Pod) + +```rust +Method::Run { input } +Method::Resume +Method::Cancel +Method::GetHistory // request-response(socket 層で直接応答) +``` + +### Event (Pod → Client) + +``` +TurnStart, TurnEnd, TextDelta, TextDone, +ToolCallStart, ToolCallArgsDelta, ToolCallDone, ToolResult, +Usage, RunEnd, Error, History +``` + +## 設計課題 + +### 1. Broadcast vs Request-Response の区別 + +現状: +- Event は全て broadcast channel 経由 +- `GetHistory` だけ socket 層で直接応答(暗黙の request-response) + +課題: +- request-response が増えると socket_server の分岐が膨らむ +- クライアント側で「この Event は自分のリクエストへの応答」と判別できない + +選択肢: +- **A. 現状維持**: request-response は socket 層で個別に処理。シンプルだがスケールしない +- **B. request_id パターン**: Method に optional `id` を持たせ、応答 Event に同じ `id` を含める +- **C. 型で分ける**: `Response` enum を Event とは別に定義 + +### 2. Compact イベント + +TUI が compact の進行を表示するために必要: + +```rust +Event::CompactStart // compact 開始 +Event::CompactDone { new_session_id: String } // 成功 +Event::CompactFailed { error: String } // 失敗 +``` + +compact は Pod 内部で自律的に発火するので、broadcast で全クライアントに通知が自然。 +CompactDone 後、クライアントは GetHistory で新しい history を取得できる。 + +### 3. セッション情報の通知 + +compact で session_id が変わる。クライアントに通知する方法: + +- **CompactDone に含める**: `Event::CompactDone { new_session_id }` +- **汎用 SessionChanged イベント**: compact 以外でも session_id が変わるケース(fork 等)に対応 + +### 4. Permission の ask/reply(将来) + +permission-extension-point チケットで扱う。ここでは Protocol の拡張パターンだけ意識: + +``` +Pod → Client: Event::PermissionRequest { id, tool, args } +Client → Pod: Method::PermissionReply { id, allow } +``` + +これは request-response の逆方向(Pod が要求元)。同じソケット上で双方向に使える。 + +## 検討事項 + +- Event の肥大化をどう管理するか(カテゴリ分け?ネスト?) +- Protocol のバージョニング(クライアント互換性) +- イベントの順序保証(broadcast channel の特性)