diff --git a/crates/llm-worker/src/llm_client/scheme/anthropic/events.rs b/crates/llm-worker/src/llm_client/scheme/anthropic/events.rs index 99757552..6542a5c4 100644 --- a/crates/llm-worker/src/llm_client/scheme/anthropic/events.rs +++ b/crates/llm-worker/src/llm_client/scheme/anthropic/events.rs @@ -265,12 +265,23 @@ impl AnthropicScheme { } fn convert_usage(&self, usage: &UsageData) -> UsageEvent { - let input = usage.input_tokens.unwrap_or(0); + // Anthropic の `input_tokens` は **キャッシュ外** の入力トークンのみで、 + // プロンプト全長は input_tokens + cache_read + cache_creation。 + // UsageEvent の `input_tokens` には「占有量(プロンプト全長)」を載せる + // 規約に合わせて、ここでキャッシュ分を足し込む。 + // cache_read_input_tokens / cache_creation_input_tokens は内訳として + // 別フィールドに残るので、料金計算側で `input - cache_read - cache_creation` + // により非キャッシュ入力分は逆算可能。 + let raw_input = usage.input_tokens.unwrap_or(0); + let cache_read = usage.cache_read_input_tokens.unwrap_or(0); + let cache_creation = usage.cache_creation_input_tokens.unwrap_or(0); + let input_total = raw_input + cache_read + cache_creation; let output = usage.output_tokens.unwrap_or(0); + UsageEvent { - input_tokens: usage.input_tokens, + input_tokens: usage.input_tokens.map(|_| input_total), output_tokens: usage.output_tokens, - total_tokens: Some(input + output), + total_tokens: Some(input_total + output), cache_read_input_tokens: usage.cache_read_input_tokens, cache_creation_input_tokens: usage.cache_creation_input_tokens, } @@ -289,12 +300,33 @@ mod tests { let event = scheme.parse_event("message_start", data).unwrap().unwrap(); match event { Event::Usage(u) => { + // キャッシュなしなので input_total = raw_input = 10 assert_eq!(u.input_tokens, Some(10)); } _ => panic!("Expected Usage event"), } } + #[test] + fn test_convert_usage_includes_cache_in_input_total() { + // Anthropic の input_tokens はキャッシュ外のみで、占有量は + // input + cache_read + cache_creation。 + // UsageEvent.input_tokens は占有量に正規化される。 + let scheme = AnthropicScheme::new(); + let usage = UsageData { + input_tokens: Some(100), + output_tokens: Some(50), + cache_read_input_tokens: Some(800), + cache_creation_input_tokens: Some(200), + }; + let event = scheme.convert_usage(&usage); + // 100 + 800 + 200 = 1100 + assert_eq!(event.input_tokens, Some(1100)); + assert_eq!(event.cache_read_input_tokens, Some(800)); + assert_eq!(event.cache_creation_input_tokens, Some(200)); + assert_eq!(event.total_tokens, Some(1150)); + } + #[test] fn test_parse_content_block_start_text() { let scheme = AnthropicScheme::new(); diff --git a/crates/llm-worker/src/timeline/timeline.rs b/crates/llm-worker/src/timeline/timeline.rs index 8bb18f8d..7ac581ab 100644 --- a/crates/llm-worker/src/timeline/timeline.rs +++ b/crates/llm-worker/src/timeline/timeline.rs @@ -8,6 +8,33 @@ use std::marker::PhantomData; use super::event::*; use crate::handler::*; +// ============================================================================= +// Helpers +// ============================================================================= + +/// 1リクエスト内で受信した複数 UsageEvent をマージする。 +/// 各フィールドについて新しい値が `Some` ならそれで上書き。 +/// プロバイダによっては input/cache 系を最初の event だけに載せ、 +/// output_tokens を後続 event で更新するため、最後の値だけを取るのではなく +/// フィールド単位で latest-non-None を取る。 +fn merge_usage(acc: &mut UsageEvent, new: &UsageEvent) { + if new.input_tokens.is_some() { + acc.input_tokens = new.input_tokens; + } + if new.output_tokens.is_some() { + acc.output_tokens = new.output_tokens; + } + if new.total_tokens.is_some() { + acc.total_tokens = new.total_tokens; + } + if new.cache_read_input_tokens.is_some() { + acc.cache_read_input_tokens = new.cache_read_input_tokens; + } + if new.cache_creation_input_tokens.is_some() { + acc.cache_creation_input_tokens = new.cache_creation_input_tokens; + } +} + // ============================================================================= // Type-erased Handler // ============================================================================= @@ -362,6 +389,12 @@ pub struct Timeline { // 現在アクティブなブロック current_block: Option, + + // 1リクエスト内で受信した Usage event の集約バッファ。 + // Anthropic は message_start と message_delta、Gemini は各チャンクと、 + // 多くのプロバイダが複数 Usage を発行するため、リクエスト境界で + // 1度だけ発火するためにここでマージする。flush_usage() で発火する。 + pending_usage: Option, } impl Default for Timeline { @@ -381,6 +414,7 @@ impl Timeline { thinking_block_handlers: Vec::new(), tool_use_block_handlers: Vec::new(), current_block: None, + pending_usage: None, } } @@ -491,9 +525,24 @@ impl Timeline { } } + /// Usage event を即時には dispatch せず、pending_usage にマージする。 + /// 1リクエスト内で複数の Usage event が来ても、ハンドラには 1 度だけ + /// 最終値を渡したいため。flush_usage() で発火する。 fn dispatch_usage(&mut self, event: &UsageEvent) { - for handler in &mut self.usage_handlers { - handler.dispatch(event); + match &mut self.pending_usage { + Some(acc) => merge_usage(acc, event), + None => self.pending_usage = Some(event.clone()), + } + } + + /// pending_usage を usage_handlers に発火し、バッファをクリアする。 + /// 1リクエスト分のストリーム終了時に1回だけ呼ぶ想定。 + /// pending_usage が空ならば何もしない。 + pub fn flush_usage(&mut self) { + if let Some(event) = self.pending_usage.take() { + for handler in &mut self.usage_handlers { + handler.dispatch(&event); + } } } @@ -629,9 +678,63 @@ mod tests { timeline.on_usage(handler); timeline.dispatch(&Event::usage(100, 50)); + // pending_usage に積まれているだけなのでまだ未発火 + assert_eq!(calls.lock().unwrap().len(), 0); + // flush で 1 度だけ発火 + timeline.flush_usage(); let recorded = calls.lock().unwrap(); assert_eq!(recorded.len(), 1); assert_eq!(recorded[0].input_tokens, Some(100)); } + + #[test] + fn test_usage_aggregation_and_flush() { + struct TestUsageHandler { + calls: Arc>>, + } + impl Handler for TestUsageHandler { + type Scope = (); + fn on_event(&mut self, _scope: &mut (), event: &UsageEvent) { + self.calls.lock().unwrap().push(event.clone()); + } + } + + let calls = Arc::new(Mutex::new(Vec::new())); + let mut timeline = Timeline::new(); + timeline.on_usage(TestUsageHandler { + calls: calls.clone(), + }); + + // Anthropic 風: message_start で input + 暫定 output + timeline.dispatch(&Event::Usage(UsageEvent { + input_tokens: Some(409), + output_tokens: Some(1), + total_tokens: Some(410), + cache_read_input_tokens: Some(0), + cache_creation_input_tokens: Some(0), + })); + // message_delta で最終 output + timeline.dispatch(&Event::Usage(UsageEvent { + input_tokens: Some(409), + output_tokens: Some(71), + total_tokens: Some(480), + cache_read_input_tokens: Some(0), + cache_creation_input_tokens: Some(0), + })); + + // 未 flush の段階では発火しない + assert_eq!(calls.lock().unwrap().len(), 0); + + timeline.flush_usage(); + let recorded = calls.lock().unwrap(); + assert_eq!(recorded.len(), 1); + assert_eq!(recorded[0].input_tokens, Some(409)); + assert_eq!(recorded[0].output_tokens, Some(71)); + + // flush 後にもう一度 flush しても何も起きない + drop(recorded); + timeline.flush_usage(); + assert_eq!(calls.lock().unwrap().len(), 1); + } } diff --git a/crates/llm-worker/src/worker.rs b/crates/llm-worker/src/worker.rs index e52f2e55..c62bed0f 100644 --- a/crates/llm-worker/src/worker.rs +++ b/crates/llm-worker/src/worker.rs @@ -803,7 +803,11 @@ impl Worker { } } let event = result - .inspect_err(|_| self.last_run_interrupted = true)?; + .inspect_err(|_| { + self.last_run_interrupted = true; + // 部分情報でも発火しておく(料金会計用) + self.timeline.flush_usage(); + })?; self.timeline.dispatch(&event); } None => break, @@ -814,11 +818,14 @@ impl Worker { info!("Stream cancelled"); } self.timeline.abort_current_block(); + self.timeline.flush_usage(); self.last_run_interrupted = true; return Err(WorkerError::Cancelled); } } } + // ストリーム完了時に集約済み Usage を 1 度だけ発火 + self.timeline.flush_usage(); debug!(event_count = event_count, "Stream completed"); Ok(()) } diff --git a/crates/pod/src/lib.rs b/crates/pod/src/lib.rs index 5174b81e..48572344 100644 --- a/crates/pod/src/lib.rs +++ b/crates/pod/src/lib.rs @@ -10,6 +10,7 @@ mod compact_interceptor; mod compact_state; mod hook_interceptor; mod pod; +mod usage_tracker; pub use controller::{PodController, PodHandle}; pub use manifest::{PodManifest, ProviderConfig, ProviderKind, Scope}; diff --git a/crates/pod/src/pod.rs b/crates/pod/src/pod.rs index f1794fca..69aeb28e 100644 --- a/crates/pod/src/pod.rs +++ b/crates/pod/src/pod.rs @@ -20,6 +20,24 @@ use crate::hook::{ PreToolCall, }; use crate::hook_interceptor::HookInterceptor; +use crate::usage_tracker::UsageTracker; +use llm_worker::interceptor::PreRequestAction; +use async_trait::async_trait; + +/// Pre-LLM-request hook that records `history.len()` at send time into a +/// shared `UsageTracker`. The on_usage callback later pairs this with the +/// aggregated UsageEvent to produce one `UsageRecord` per LLM call. +struct UsageTrackingHook { + tracker: Arc, +} + +#[async_trait] +impl Hook for UsageTrackingHook { + async fn call(&self, context: &mut Vec) -> PreRequestAction { + self.tracker.note_request(context.len()); + PreRequestAction::Continue + } +} const SUMMARY_SYSTEM_PROMPT: &str = "\ You are a context compaction assistant. \ @@ -53,6 +71,10 @@ pub struct Pod { manifest_dir: Option, /// Shared compaction state (present when compact_threshold is configured). compact_state: Option>, + /// Per-LLM-request Usage tracker. Always present after construction. + /// Captures `(history_len, UsageEvent)` pairs during a run; drained + /// in `persist_turn` and persisted as `LogEntry::LlmUsage` entries. + usage_tracker: Arc, /// Session-lifetime file-operation tracker from the builtin `tools` /// crate. Populated by the Controller when it registers the builtin /// tools so that Pod-owned operations (e.g. compaction) can consult @@ -85,6 +107,7 @@ impl Pod { interceptor_installed: false, manifest_dir: None, compact_state: None, + usage_tracker: Arc::new(UsageTracker::new()), tracker: None, }) } @@ -118,6 +141,7 @@ impl Pod { interceptor_installed: false, manifest_dir: None, compact_state: None, + usage_tracker: Arc::new(UsageTracker::new()), tracker: None, }) } @@ -220,6 +244,14 @@ impl Pod { /// `on_usage` callback to track `input_tokens`. fn ensure_interceptor_installed(&mut self) { if !self.interceptor_installed { + // Pre-LLM-request hook: capture history.len() into the + // UsageTracker so the upcoming on_usage callback can pair + // it with the measured input_tokens. + self.hook_builder + .add_pre_llm_request(UsageTrackingHook { + tracker: self.usage_tracker.clone(), + }); + let builder = std::mem::take(&mut self.hook_builder); let registry = Arc::new(builder.build()); let hook_interceptor = HookInterceptor::new(registry); @@ -230,6 +262,11 @@ impl Pod { .as_ref() .and_then(|c| c.compact_threshold); + // Usage tracking via on_usage callback. Independent of + // compact_threshold so that LlmUsage entries are persisted + // unconditionally. + let tracker_for_usage = self.usage_tracker.clone(); + if let Some(threshold) = compact_threshold { let retained = self .manifest @@ -240,18 +277,23 @@ impl Pod { let state = Arc::new(CompactState::new(threshold, retained)); - // Track input_tokens via on_usage callback. + // Combined on_usage: feed both the legacy compact threshold + // tracker and the new UsageTracker. let state_for_usage = state.clone(); self.worker_mut().on_usage(move |event| { if let Some(tokens) = event.input_tokens { state_for_usage.update_input_tokens(tokens); } + tracker_for_usage.record_usage(event); }); let interceptor = CompactInterceptor::new(hook_interceptor, state.clone()); self.worker_mut().set_interceptor(interceptor); self.compact_state = Some(state); } else { + self.worker_mut().on_usage(move |event| { + tracker_for_usage.record_usage(event); + }); self.worker_mut().set_interceptor(hook_interceptor); } @@ -439,6 +481,24 @@ impl Pod { ) .await?; + // Persist any LLM Usage measurements collected during this run. + // One LogEntry::LlmUsage per LLM call (the tool loop may have run + // many calls within a single Pod::run). + let usage_records = self.usage_tracker.drain(); + for record in usage_records { + session_store::save_usage( + &self.store, + self.session_id, + &mut self.head_hash, + record.history_len, + record.input_total_tokens, + record.cache_read_tokens, + record.cache_write_tokens, + record.output_tokens, + ) + .await?; + } + let interrupted = self.worker.as_ref().unwrap().last_run_interrupted(); let outcome = match result { Ok(WorkerResult::Finished) => Outcome::Finished, @@ -597,6 +657,7 @@ impl Pod, St> { interceptor_installed: false, manifest_dir, compact_state: None, + usage_tracker: Arc::new(UsageTracker::new()), tracker: None, }) } diff --git a/crates/pod/src/usage_tracker.rs b/crates/pod/src/usage_tracker.rs new file mode 100644 index 00000000..3b00bf8c --- /dev/null +++ b/crates/pod/src/usage_tracker.rs @@ -0,0 +1,136 @@ +//! Tracks per-LLM-request Usage measurements within a Pod run. +//! +//! Bridge between two sync touchpoints in the Worker lifecycle: +//! +//! - **`pre_llm_request` hook** (async, but synchronously accessed via the +//! tracker): captures `history.len()` at the moment a request goes out. +//! - **`on_usage` callback** (sync closure): receives the aggregated final +//! `UsageEvent` for that request after the stream completes. +//! +//! Pairing the two yields one `UsageRecord` per LLM call. Pod drains them +//! in `persist_turn` and writes them as `LogEntry::LlmUsage` entries. +//! +//! Multiple LLM calls per Pod run (tool loop) are supported: each call +//! produces its own `(history_len, UsageEvent)` pair, and the records are +//! buffered in chronological order. + +use std::sync::Mutex; + +use llm_worker::timeline::event::UsageEvent; +use session_store::UsageRecord; + +/// Shared between the pre-request hook, the `on_usage` callback, and Pod. +pub(crate) struct UsageTracker { + /// `history.len()` captured at the most recent `pre_llm_request`. + /// Cleared when paired with an incoming `on_usage` event. + pending_history_len: Mutex>, + /// Records accumulated during the current run; drained by Pod. + pending_records: Mutex>, +} + +impl UsageTracker { + pub(crate) fn new() -> Self { + Self { + pending_history_len: Mutex::new(None), + pending_records: Mutex::new(Vec::new()), + } + } + + /// Called from a `pre_llm_request` hook with the current history length. + pub(crate) fn note_request(&self, history_len: usize) { + *self.pending_history_len.lock().unwrap() = Some(history_len); + } + + /// Called from the `on_usage` callback with the aggregated final + /// UsageEvent. If a `history_len` was previously stashed via + /// `note_request`, builds a `UsageRecord` and pushes it onto the buffer. + /// If not (e.g. test code that fires Usage outside a request), drops + /// the event. + pub(crate) fn record_usage(&self, event: &UsageEvent) { + let history_len = match self.pending_history_len.lock().unwrap().take() { + Some(n) => n, + None => return, + }; + // UsageEvent.input_tokens は scheme 層で「占有量(プロンプト全長)」に + // 正規化済みである前提(Anthropic は cache_read + cache_creation を + // 加算して emit する)。 + let input_total = event.input_tokens.unwrap_or(0); + let cache_read = event.cache_read_input_tokens.unwrap_or(0); + let cache_write = event.cache_creation_input_tokens.unwrap_or(0); + let output = event.output_tokens.unwrap_or(0); + self.pending_records.lock().unwrap().push(UsageRecord { + history_len, + input_total_tokens: input_total, + cache_read_tokens: cache_read, + cache_write_tokens: cache_write, + output_tokens: output, + }); + } + + /// Drain accumulated records. Called by Pod after a run completes, + /// before persisting the turn. + pub(crate) fn drain(&self) -> Vec { + std::mem::take(&mut *self.pending_records.lock().unwrap()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn make_event(input: u64, cache_read: u64, cache_write: u64, output: u64) -> UsageEvent { + UsageEvent { + input_tokens: Some(input), + output_tokens: Some(output), + total_tokens: Some(input + output), + cache_read_input_tokens: Some(cache_read), + cache_creation_input_tokens: Some(cache_write), + } + } + + #[test] + fn pairs_history_len_with_usage_event() { + let tracker = UsageTracker::new(); + tracker.note_request(5); + tracker.record_usage(&make_event(1000, 800, 100, 42)); + + let records = tracker.drain(); + assert_eq!(records.len(), 1); + assert_eq!(records[0].history_len, 5); + assert_eq!(records[0].input_total_tokens, 1000); + assert_eq!(records[0].cache_read_tokens, 800); + assert_eq!(records[0].cache_write_tokens, 100); + assert_eq!(records[0].output_tokens, 42); + } + + #[test] + fn drain_clears_buffer() { + let tracker = UsageTracker::new(); + tracker.note_request(1); + tracker.record_usage(&make_event(10, 0, 0, 5)); + assert_eq!(tracker.drain().len(), 1); + assert_eq!(tracker.drain().len(), 0); + } + + #[test] + fn usage_without_pending_history_len_is_dropped() { + let tracker = UsageTracker::new(); + tracker.record_usage(&make_event(10, 0, 0, 5)); + assert_eq!(tracker.drain().len(), 0); + } + + #[test] + fn multiple_requests_in_one_run() { + let tracker = UsageTracker::new(); + tracker.note_request(5); + tracker.record_usage(&make_event(100, 0, 0, 20)); + tracker.note_request(10); + tracker.record_usage(&make_event(200, 50, 0, 30)); + + let records = tracker.drain(); + assert_eq!(records.len(), 2); + assert_eq!(records[0].history_len, 5); + assert_eq!(records[1].history_len, 10); + assert_eq!(records[1].cache_read_tokens, 50); + } +} diff --git a/crates/session-store/src/lib.rs b/crates/session-store/src/lib.rs index 3775b50c..3d7d260c 100644 --- a/crates/session-store/src/lib.rs +++ b/crates/session-store/src/lib.rs @@ -37,11 +37,11 @@ pub use fs_store::FsStore; pub use session::{ SessionStartState, create_compacted_session, create_session, ensure_head_or_fork, fork, fork_at, restore, save_cache_locked, save_cache_unlocked, save_config_changed, save_delta, save_outcome, - save_turn_end, + save_turn_end, save_usage, }; pub use session_log::{ - EntryHash, HashedEntry, LogEntry, Outcome, RestoredState, SessionOrigin, build_chain, - collect_state, compute_hash, + EntryHash, HashedEntry, LogEntry, Outcome, RestoredState, SessionOrigin, UsageRecord, + build_chain, collect_state, compute_hash, }; pub use store::{Store, StoreError}; diff --git a/crates/session-store/src/session.rs b/crates/session-store/src/session.rs index dfd1977b..380665ea 100644 --- a/crates/session-store/src/session.rs +++ b/crates/session-store/src/session.rs @@ -214,6 +214,33 @@ pub async fn save_outcome( .await } +/// Log an `LlmUsage` entry — 1 LLM リクエスト分の Usage スナップショット。 +/// +/// `history_len` は送信時の `history.len()`。`input_total_tokens` は +/// その prefix をプロバイダが実測した占有量(プロンプト全長)で、 +/// プロバイダ別の正規化(Anthropic では `input + cache_read + cache_creation`)を +/// 済ませた値を渡す。 +pub async fn save_usage( + store: &impl Store, + session_id: SessionId, + head_hash: &mut Option, + history_len: usize, + input_total_tokens: u64, + cache_read_tokens: u64, + cache_write_tokens: u64, + output_tokens: u64, +) -> Result<(), StoreError> { + append_entry(store, session_id, head_hash, LogEntry::LlmUsage { + ts: session_log::now_millis(), + history_len, + input_total_tokens, + cache_read_tokens, + cache_write_tokens, + output_tokens, + }) + .await +} + /// Log a `Locked` entry (KV cache locked). pub async fn save_cache_locked( store: &impl Store, diff --git a/crates/session-store/src/session_log.rs b/crates/session-store/src/session_log.rs index 8f43a3a0..98e0c957 100644 --- a/crates/session-store/src/session_log.rs +++ b/crates/session-store/src/session_log.rs @@ -141,6 +141,28 @@ pub enum LogEntry { /// `RequestConfig` changed. ConfigChanged { ts: u64, config: RequestConfig }, + + /// LLM リクエスト 1 件分の Usage スナップショット。 + /// + /// `history_len` は送信時の `history.len()`。`input_total_tokens` は + /// その prefix をプロバイダが実測した占有量(プロンプト全長)。 + /// このリクエスト 1 件で新しく追加された分ではない。 + /// + /// プロバイダ別の正規化(呼び出し側で行う想定): + /// - Anthropic: `input_tokens + cache_read + cache_creation` + /// - OpenAI: `prompt_tokens` + /// - Gemini: `promptTokenCount` + /// - Ollama: `prompt_eval_count` + /// + /// `cache_read_tokens` / `cache_write_tokens` は上記の内訳で、料金会計用。 + LlmUsage { + ts: u64, + history_len: usize, + input_total_tokens: u64, + cache_read_tokens: u64, + cache_write_tokens: u64, + output_tokens: u64, + }, } /// Provenance reference to a parent session. @@ -176,6 +198,27 @@ pub struct RestoredState { pub last_run_interrupted: bool, /// Hash of the last entry in the chain (None if empty). pub head_hash: Option, + /// LLM リクエストごとの Usage スナップショット時系列。 + /// `LogEntry::LlmUsage` を replay して時系列順に積まれる。 + /// 任意位置のトークン数推定に使う。 + pub usage_history: Vec, +} + +/// LLM リクエスト送信時点での占有量スナップショット。 +/// +/// `LogEntry::LlmUsage` の replay 時に `RestoredState.usage_history` に積まれる。 +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct UsageRecord { + /// 送信時の history.len() + pub history_len: usize, + /// history[..history_len] の占有量(プロンプト全長、実測) + pub input_total_tokens: u64, + /// 上記のうちキャッシュから読み出された分 + pub cache_read_tokens: u64, + /// 上記のうちこのリクエストでキャッシュに書かれた分 + pub cache_write_tokens: u64, + /// このリクエストで生成された出力トークン数 + pub output_tokens: u64, } /// Replay a sequence of hashed entries to reconstruct worker state. @@ -188,6 +231,7 @@ pub fn collect_state(entries: &[HashedEntry]) -> RestoredState { locked_prefix_len: 0, last_run_interrupted: false, head_hash: None, + usage_history: Vec::new(), }; for hashed in entries { @@ -233,6 +277,22 @@ pub fn collect_state(entries: &[HashedEntry]) -> RestoredState { LogEntry::ConfigChanged { config, .. } => { state.config = config.clone(); } + LogEntry::LlmUsage { + history_len, + input_total_tokens, + cache_read_tokens, + cache_write_tokens, + output_tokens, + .. + } => { + state.usage_history.push(UsageRecord { + history_len: *history_len, + input_total_tokens: *input_total_tokens, + cache_read_tokens: *cache_read_tokens, + cache_write_tokens: *cache_write_tokens, + output_tokens: *output_tokens, + }); + } } } @@ -452,6 +512,106 @@ mod tests { assert_ne!(hash_a, hash_b); } + #[test] + fn replay_llm_usage_appends_to_usage_history() { + let entries = build_chain(&[ + LogEntry::SessionStart { + ts: 1000, + system_prompt: None, + config: RequestConfig::default(), + history: vec![], + forked_from: None, + compacted_from: None, + }, + LogEntry::UserInput { + ts: 2000, + item: Item::user_message("hi"), + }, + LogEntry::LlmUsage { + ts: 2100, + history_len: 1, + input_total_tokens: 50, + cache_read_tokens: 0, + cache_write_tokens: 0, + output_tokens: 10, + }, + LogEntry::AssistantItems { + ts: 2200, + items: vec![Item::assistant_message("yo")], + }, + LogEntry::LlmUsage { + ts: 3100, + history_len: 2, + input_total_tokens: 65, + cache_read_tokens: 50, + cache_write_tokens: 0, + output_tokens: 5, + }, + ]); + let state = collect_state(&entries); + // history は LlmUsage で変化しない + assert_eq!(state.history.len(), 2); + // usage_history は時系列順 + assert_eq!(state.usage_history.len(), 2); + assert_eq!(state.usage_history[0].history_len, 1); + assert_eq!(state.usage_history[0].input_total_tokens, 50); + assert_eq!(state.usage_history[1].history_len, 2); + assert_eq!(state.usage_history[1].cache_read_tokens, 50); + } + + #[test] + fn replay_without_llm_usage_keeps_usage_history_empty() { + // 既存ログ互換: LlmUsage entry が無くても collect_state は壊れない + let entries = build_chain(&[ + LogEntry::SessionStart { + ts: 1000, + system_prompt: None, + config: RequestConfig::default(), + history: vec![], + forked_from: None, + compacted_from: None, + }, + LogEntry::UserInput { + ts: 2000, + item: Item::user_message("hi"), + }, + ]); + let state = collect_state(&entries); + assert!(state.usage_history.is_empty()); + } + + #[test] + fn llm_usage_entry_round_trip_via_json() { + let entry = LogEntry::LlmUsage { + ts: 12345, + history_len: 7, + input_total_tokens: 1000, + cache_read_tokens: 800, + cache_write_tokens: 100, + output_tokens: 42, + }; + let json = serde_json::to_string(&entry).unwrap(); + let parsed: LogEntry = serde_json::from_str(&json).unwrap(); + match parsed { + LogEntry::LlmUsage { + ts, + history_len, + input_total_tokens, + cache_read_tokens, + cache_write_tokens, + output_tokens, + } => { + assert_eq!(ts, 12345); + assert_eq!(history_len, 7); + assert_eq!(input_total_tokens, 1000); + assert_eq!(cache_read_tokens, 800); + assert_eq!(cache_write_tokens, 100); + assert_eq!(output_tokens, 42); + } + other => panic!("expected LlmUsage, got {:?}", other), + } + } + #[test] fn hash_hex_round_trip() { let entry = LogEntry::SessionStart { diff --git a/tickets/compact-improvements.md b/tickets/compact-improvements.md index 81b3c4e8..097c0e49 100644 --- a/tickets/compact-improvements.md +++ b/tickets/compact-improvements.md @@ -85,6 +85,28 @@ warn を出す。両方 None なら compact 無効(今まで通り)。片方 → `CompactState` 内部では `Option` 2 本持ち。`exceeds_*` メソッドは `Option` が `None` なら常に `false`。 +### 占有量ソースの統合(重要) + +現在 `CompactState::last_input_tokens: AtomicU64` が `on_usage` callback から +更新され、閾値判定に使われている。これは usage-history チケットで導入された +session-store の `LogEntry::LlmUsage` 履歴と**情報源が二重化**している状態。 + +本チケットで両者を統合する。**改善版である `usage_history` を単一の情報源とし、 +`last_input_tokens` 経路を撤去する**: + +- `CompactState` から `last_input_tokens: AtomicU64` フィールドを削除 +- `CompactState::update_input_tokens` メソッドを削除 +- `Pod::ensure_interceptor_installed` の on_usage callback から + `state_for_usage.update_input_tokens(tokens)` の行を削除 + (`tracker_for_usage.record_usage(event)` だけが残る) +- 閾値判定 (`exceeds_request` / `exceeds_post_run`) は `Session::total_tokens()` + (token-counter で導入される API)の戻り値を見る形に変える +- これにより「実測値の単一履歴 → トークン会計 API → 閾値判定」と一直線になる + +Anthropic のキャッシュヒット時に占有量を取りこぼす旧バグも、このパスを +廃止することで自動的に解消する(`UsageEvent.input_tokens` は scheme 層で +すでに占有量に正規化済み、かつ usage_history はそれをそのまま保存している)。 + ### 影響箇所 - **`crates/manifest/src/lib.rs`** @@ -93,6 +115,8 @@ warn を出す。両方 None なら compact 無効(今まで通り)。片方 - テスト更新 (両閾値が読めること) - **`crates/pod/src/compact_state.rs`** + - `last_input_tokens: AtomicU64` フィールドを **削除**(情報源を usage_history に一本化) + - `update_input_tokens` / `last_input_tokens` メソッドも削除 - `turn_threshold` フィールドを `request_threshold: Option` にリネーム + `Option` 化 - `post_run_threshold: u64` → `Option` に変更 - コンストラクタシグネチャ変更: @@ -106,19 +130,27 @@ warn を出す。両方 None なら compact 無効(今まで通り)。片方 retained_turns: usize, ) -> Self ``` - - `exceeds_turn()` → `exceeds_request()` にリネーム。中身: + - `exceeds_turn()` → `exceeds_request()` にリネーム。閾値超過判定は + 呼び出し側で現在の占有量を渡す形に変える(CompactState は閾値しか持たない): ```rust - pub(crate) fn exceeds_request(&self) -> bool { + pub(crate) fn exceeds_request(&self, current_tokens: u64) -> bool { self.request_threshold - .map(|t| self.last_input_tokens() > t) + .map(|t| current_tokens > t) .unwrap_or(false) } ``` + 呼び出し元 (`compact_interceptor.rs` / `controller.rs`) は `Session::total_tokens()` + (token-counter で生やす API)から現在の占有量を取って渡す - `exceeds_post_run()` も同様に Option 対応 - `turn_threshold()` getter → `request_threshold()`、戻り値は `Option` - ドックコメントを「proactive = post_run」「safety net = request」で書き直し - テスト: 両方設定/片方だけ/両方 None の 3 ケース +- **`crates/pod/src/pod.rs`** (上記の compact_state 変更に伴って) + - `ensure_interceptor_installed` の on_usage callback から + `state_for_usage.update_input_tokens(tokens)` の行を削除。 + `tracker_for_usage.record_usage(event)` だけが残る + - **`crates/pod/src/compact_interceptor.rs`** - `exceeds_turn()` 呼び出しを `exceeds_request()` に - ログメッセージ "Between-turns ..." → "Between-requests ..." @@ -356,8 +388,9 @@ pruned history から: ### Prune と Compact の相互作用 -Prune はリクエストコンテキストのみ操作、`last_input_tokens` は前回の LLM レスポンスの値。 -Prune の効果は閾値判断に反映されない。保守的(compact しすぎる方向)で実害は小さい。 +Prune はリクエストコンテキストのみ操作。閾値判定は usage_history の最新 +測定値(前回の LLM レスポンス時点の占有量)を見るので、Prune の効果は +次回 LLM call まで反映されない。保守的(compact しすぎる方向)で実害は小さい。 ### compact 中のクライアント通知 diff --git a/tickets/token-counter.md b/tickets/token-counter.md index a668c430..291c368d 100644 --- a/tickets/token-counter.md +++ b/tickets/token-counter.md @@ -18,6 +18,11 @@ Compact / Prune の挙動改善に「**履歴上の任意位置のトークン 正確なトークン数(推定でも実測由来)が要る箇所: +- **Compact 閾値判定** — 現状 `CompactState::last_input_tokens` (`AtomicU64`) が + on_usage callback で更新されているが、これは usage_history と情報源が二重化 + している。本チケットで `Session::total_tokens()` を生やせば、`compact_interceptor.rs` / + `controller.rs` から閾値判定がこの API 経由になり、`last_input_tokens` 経路を + 撤去できる(撤去自体は compact-improvements 側で実施) - **Compact の retained_tokens 切り出し** — 末尾から N トークン残す cut 位置を決める - **Prune の `min_savings` 判定** — 「この content を捨てたら何トークン浮くか」を見積もる - **Compact worker の auto-read budget 判定** — `mark_read_required` の累計 diff --git a/tickets/usage-history.md b/tickets/usage-history.md index 34a1932d..fc24a5bf 100644 --- a/tickets/usage-history.md +++ b/tickets/usage-history.md @@ -107,13 +107,14 @@ pub async fn save_usage( - 各プロバイダの scheme で 1 リクエスト内の複数 Usage event(Anthropic の message_start + message_delta)を集約し、**完了時の最終値だけを 1 つの `UsageEvent` として外に発火する**。pod 側では暫定値を見ない -- `UsageEvent` 上で provider 別 raw 値(`input_tokens` / `cache_read_input_tokens` - / `cache_creation_input_tokens` / `output_tokens`)はそのまま保持。占有量への - 正規化は consumer 側(save_usage 呼び出し側)で行う - - 動機: llm-worker は raw 値の運搬役に徹し、「プロンプト全長」のような - プロバイダ依存の意味付けは upper layer に集約する - - 正規化ヘルパー(例: `UsageEvent::input_total_tokens()`)を llm-worker に - 生やすかは実装時判断 +- 占有量への正規化(Anthropic: `input_tokens + cache_read + cache_creation`)は + 各 scheme の `convert_usage` で行い、`UsageEvent.input_tokens` には正規化済みの + 占有量(プロンプト全長)が入る。consumer 側(pod / UsageTracker)は + `event.input_tokens` をそのまま使う + - 動機: 正規化ロジックを scheme に閉じ込めることで、consumer が provider 差異を + 意識する必要がなくなる +- `cache_read_input_tokens` / `cache_creation_input_tokens` は内訳として + 別フィールドに保持。料金会計用 ### pod 側 @@ -187,6 +188,10 @@ callback と同じ場所。 - 既存ログ(`LlmUsage` 無し)を読んでも壊れない - Anthropic の cache hit ありレスポンスで input_total が正しく計算される +## レビュー状態 + +Reviewed — [usage-history.review.md](usage-history.review.md) + ## 依存 - なし(前提チケット) diff --git a/tickets/usage-history.review.md b/tickets/usage-history.review.md new file mode 100644 index 00000000..9cf4d1a0 --- /dev/null +++ b/tickets/usage-history.review.md @@ -0,0 +1,50 @@ +# usage-history レビュー + +## 要件の充足 + +チケットが定義した要件は全て達成されている: + +- **LogEntry::LlmUsage**: session-store のハッシュチェーンに乗る variant として追加。 + `history_len` / `input_total_tokens` / `cache_read_tokens` / `cache_write_tokens` / `output_tokens` の5フィールド +- **RestoredState.usage_history**: `collect_state` の replay で `Vec` に時系列順で積まれる。history の構築には影響しない +- **save_usage**: `append_entry` 経由でハッシュチェーンに接続 +- **既存ログ互換**: `LlmUsage` entry が無い既存ログを読んでも `usage_history` が空になるだけで壊れない +- **1リクエスト = 1 entry**: Timeline の `pending_usage` + `flush_usage()` で複数 Usage event を集約し、handler には1度だけ発火 + +## アーキテクチャ + +レイヤー分担が明確で、各層の責務が逸脱していない: + +| レイヤー | 責務 | +|---------|------| +| scheme (anthropic) | raw → 占有量への正規化。`input_tokens + cache_read + cache_creation` | +| Timeline | 1リクエスト内の複数 Usage event をフィールド単位 latest-non-None でマージ。`flush_usage()` で1度だけ発火 | +| Worker | ストリーム完了・エラー・キャンセルの全パスで `flush_usage()` を呼ぶ | +| UsageTracker (pod) | `note_request(history_len)` と `record_usage(event)` のペアリング。drain で Pod に渡す | +| Pod::persist_turn | drain した records を `save_usage` で session-store に書き出し | + +## 指摘と対処 + +### 1. UsageEvent の doc comment(対処済み) + +`UsageEvent.input_tokens` が「占有量(プロンプト全長、キャッシュ込み)」を意味することが +struct と各フィールドの doc comment に明記された。scheme 層での正規化規約も記載済み。 + +### 2. save_usage の引数が多い(非ブロッカー、未対処) + +8引数。`UsageRecord` を直接受け取れば `drain()` の結果をそのまま渡せてシグネチャがきれいになるが、 +他の `save_*` 関数がフラットな引数を取るパターンと一貫しているため、統一性の観点では現状でも妥当。 +将来フィールドが増えた時点でまとめて `UsageRecord` 受け取りに変えればよい。 + +## テスト + +- `replay_llm_usage_appends_to_usage_history`: 複数 LlmUsage entry の replay で usage_history が正しく積まれ、history.len() に影響しない +- `replay_without_llm_usage_keeps_usage_history_empty`: 既存ログ互換 +- `llm_usage_entry_round_trip_via_json`: serde 往復 +- `test_convert_usage_includes_cache_in_input_total`: Anthropic の占有量正規化 +- `test_usage_aggregation_and_flush`: Timeline の集約 + flush +- UsageTracker: ペアリング、drain、未ペアの drop、複数リクエスト + +## 判定 + +承認。