usageデータの永続化実装

This commit is contained in:
Keisuke Hirata 2026-04-13 07:09:05 +09:00
parent 22fe502d71
commit 17d0430a4d
13 changed files with 642 additions and 22 deletions

View File

@ -265,12 +265,23 @@ impl AnthropicScheme {
}
fn convert_usage(&self, usage: &UsageData) -> UsageEvent {
let input = usage.input_tokens.unwrap_or(0);
// Anthropic の `input_tokens` は **キャッシュ外** の入力トークンのみで、
// プロンプト全長は input_tokens + cache_read + cache_creation。
// UsageEvent の `input_tokens` には「占有量(プロンプト全長)」を載せる
// 規約に合わせて、ここでキャッシュ分を足し込む。
// cache_read_input_tokens / cache_creation_input_tokens は内訳として
// 別フィールドに残るので、料金計算側で `input - cache_read - cache_creation`
// により非キャッシュ入力分は逆算可能。
let raw_input = usage.input_tokens.unwrap_or(0);
let cache_read = usage.cache_read_input_tokens.unwrap_or(0);
let cache_creation = usage.cache_creation_input_tokens.unwrap_or(0);
let input_total = raw_input + cache_read + cache_creation;
let output = usage.output_tokens.unwrap_or(0);
UsageEvent {
input_tokens: usage.input_tokens,
input_tokens: usage.input_tokens.map(|_| input_total),
output_tokens: usage.output_tokens,
total_tokens: Some(input + output),
total_tokens: Some(input_total + output),
cache_read_input_tokens: usage.cache_read_input_tokens,
cache_creation_input_tokens: usage.cache_creation_input_tokens,
}
@ -289,12 +300,33 @@ mod tests {
let event = scheme.parse_event("message_start", data).unwrap().unwrap();
match event {
Event::Usage(u) => {
// キャッシュなしなので input_total = raw_input = 10
assert_eq!(u.input_tokens, Some(10));
}
_ => panic!("Expected Usage event"),
}
}
#[test]
fn test_convert_usage_includes_cache_in_input_total() {
// Anthropic の input_tokens はキャッシュ外のみで、占有量は
// input + cache_read + cache_creation。
// UsageEvent.input_tokens は占有量に正規化される。
let scheme = AnthropicScheme::new();
let usage = UsageData {
input_tokens: Some(100),
output_tokens: Some(50),
cache_read_input_tokens: Some(800),
cache_creation_input_tokens: Some(200),
};
let event = scheme.convert_usage(&usage);
// 100 + 800 + 200 = 1100
assert_eq!(event.input_tokens, Some(1100));
assert_eq!(event.cache_read_input_tokens, Some(800));
assert_eq!(event.cache_creation_input_tokens, Some(200));
assert_eq!(event.total_tokens, Some(1150));
}
#[test]
fn test_parse_content_block_start_text() {
let scheme = AnthropicScheme::new();

View File

@ -8,6 +8,33 @@ use std::marker::PhantomData;
use super::event::*;
use crate::handler::*;
// =============================================================================
// Helpers
// =============================================================================
/// 1リクエスト内で受信した複数 UsageEvent をマージする。
/// 各フィールドについて新しい値が `Some` ならそれで上書き。
/// プロバイダによっては input/cache 系を最初の event だけに載せ、
/// output_tokens を後続 event で更新するため、最後の値だけを取るのではなく
/// フィールド単位で latest-non-None を取る。
fn merge_usage(acc: &mut UsageEvent, new: &UsageEvent) {
if new.input_tokens.is_some() {
acc.input_tokens = new.input_tokens;
}
if new.output_tokens.is_some() {
acc.output_tokens = new.output_tokens;
}
if new.total_tokens.is_some() {
acc.total_tokens = new.total_tokens;
}
if new.cache_read_input_tokens.is_some() {
acc.cache_read_input_tokens = new.cache_read_input_tokens;
}
if new.cache_creation_input_tokens.is_some() {
acc.cache_creation_input_tokens = new.cache_creation_input_tokens;
}
}
// =============================================================================
// Type-erased Handler
// =============================================================================
@ -362,6 +389,12 @@ pub struct Timeline {
// 現在アクティブなブロック
current_block: Option<BlockType>,
// 1リクエスト内で受信した Usage event の集約バッファ。
// Anthropic は message_start と message_delta、Gemini は各チャンクと、
// 多くのプロバイダが複数 Usage を発行するため、リクエスト境界で
// 1度だけ発火するためにここでマージする。flush_usage() で発火する。
pending_usage: Option<UsageEvent>,
}
impl Default for Timeline {
@ -381,6 +414,7 @@ impl Timeline {
thinking_block_handlers: Vec::new(),
tool_use_block_handlers: Vec::new(),
current_block: None,
pending_usage: None,
}
}
@ -491,9 +525,24 @@ impl Timeline {
}
}
/// Usage event を即時には dispatch せず、pending_usage にマージする。
/// 1リクエスト内で複数の Usage event が来ても、ハンドラには 1 度だけ
/// 最終値を渡したいため。flush_usage() で発火する。
fn dispatch_usage(&mut self, event: &UsageEvent) {
for handler in &mut self.usage_handlers {
handler.dispatch(event);
match &mut self.pending_usage {
Some(acc) => merge_usage(acc, event),
None => self.pending_usage = Some(event.clone()),
}
}
/// pending_usage を usage_handlers に発火し、バッファをクリアする。
/// 1リクエスト分のストリーム終了時に1回だけ呼ぶ想定。
/// pending_usage が空ならば何もしない。
pub fn flush_usage(&mut self) {
if let Some(event) = self.pending_usage.take() {
for handler in &mut self.usage_handlers {
handler.dispatch(&event);
}
}
}
@ -629,9 +678,63 @@ mod tests {
timeline.on_usage(handler);
timeline.dispatch(&Event::usage(100, 50));
// pending_usage に積まれているだけなのでまだ未発火
assert_eq!(calls.lock().unwrap().len(), 0);
// flush で 1 度だけ発火
timeline.flush_usage();
let recorded = calls.lock().unwrap();
assert_eq!(recorded.len(), 1);
assert_eq!(recorded[0].input_tokens, Some(100));
}
#[test]
fn test_usage_aggregation_and_flush() {
struct TestUsageHandler {
calls: Arc<Mutex<Vec<UsageEvent>>>,
}
impl Handler<UsageKind> for TestUsageHandler {
type Scope = ();
fn on_event(&mut self, _scope: &mut (), event: &UsageEvent) {
self.calls.lock().unwrap().push(event.clone());
}
}
let calls = Arc::new(Mutex::new(Vec::new()));
let mut timeline = Timeline::new();
timeline.on_usage(TestUsageHandler {
calls: calls.clone(),
});
// Anthropic 風: message_start で input + 暫定 output
timeline.dispatch(&Event::Usage(UsageEvent {
input_tokens: Some(409),
output_tokens: Some(1),
total_tokens: Some(410),
cache_read_input_tokens: Some(0),
cache_creation_input_tokens: Some(0),
}));
// message_delta で最終 output
timeline.dispatch(&Event::Usage(UsageEvent {
input_tokens: Some(409),
output_tokens: Some(71),
total_tokens: Some(480),
cache_read_input_tokens: Some(0),
cache_creation_input_tokens: Some(0),
}));
// 未 flush の段階では発火しない
assert_eq!(calls.lock().unwrap().len(), 0);
timeline.flush_usage();
let recorded = calls.lock().unwrap();
assert_eq!(recorded.len(), 1);
assert_eq!(recorded[0].input_tokens, Some(409));
assert_eq!(recorded[0].output_tokens, Some(71));
// flush 後にもう一度 flush しても何も起きない
drop(recorded);
timeline.flush_usage();
assert_eq!(calls.lock().unwrap().len(), 1);
}
}

View File

@ -803,7 +803,11 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
}
}
let event = result
.inspect_err(|_| self.last_run_interrupted = true)?;
.inspect_err(|_| {
self.last_run_interrupted = true;
// 部分情報でも発火しておく(料金会計用)
self.timeline.flush_usage();
})?;
self.timeline.dispatch(&event);
}
None => break,
@ -814,11 +818,14 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
info!("Stream cancelled");
}
self.timeline.abort_current_block();
self.timeline.flush_usage();
self.last_run_interrupted = true;
return Err(WorkerError::Cancelled);
}
}
}
// ストリーム完了時に集約済み Usage を 1 度だけ発火
self.timeline.flush_usage();
debug!(event_count = event_count, "Stream completed");
Ok(())
}

View File

@ -10,6 +10,7 @@ mod compact_interceptor;
mod compact_state;
mod hook_interceptor;
mod pod;
mod usage_tracker;
pub use controller::{PodController, PodHandle};
pub use manifest::{PodManifest, ProviderConfig, ProviderKind, Scope};

View File

@ -20,6 +20,24 @@ use crate::hook::{
PreToolCall,
};
use crate::hook_interceptor::HookInterceptor;
use crate::usage_tracker::UsageTracker;
use llm_worker::interceptor::PreRequestAction;
use async_trait::async_trait;
/// Pre-LLM-request hook that records `history.len()` at send time into a
/// shared `UsageTracker`. The on_usage callback later pairs this with the
/// aggregated UsageEvent to produce one `UsageRecord` per LLM call.
struct UsageTrackingHook {
tracker: Arc<UsageTracker>,
}
#[async_trait]
impl Hook<PreLlmRequest> for UsageTrackingHook {
async fn call(&self, context: &mut Vec<Item>) -> PreRequestAction {
self.tracker.note_request(context.len());
PreRequestAction::Continue
}
}
const SUMMARY_SYSTEM_PROMPT: &str = "\
You are a context compaction assistant. \
@ -53,6 +71,10 @@ pub struct Pod<C: LlmClient, St: Store> {
manifest_dir: Option<PathBuf>,
/// Shared compaction state (present when compact_threshold is configured).
compact_state: Option<Arc<CompactState>>,
/// Per-LLM-request Usage tracker. Always present after construction.
/// Captures `(history_len, UsageEvent)` pairs during a run; drained
/// in `persist_turn` and persisted as `LogEntry::LlmUsage` entries.
usage_tracker: Arc<UsageTracker>,
/// Session-lifetime file-operation tracker from the builtin `tools`
/// crate. Populated by the Controller when it registers the builtin
/// tools so that Pod-owned operations (e.g. compaction) can consult
@ -85,6 +107,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
interceptor_installed: false,
manifest_dir: None,
compact_state: None,
usage_tracker: Arc::new(UsageTracker::new()),
tracker: None,
})
}
@ -118,6 +141,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
interceptor_installed: false,
manifest_dir: None,
compact_state: None,
usage_tracker: Arc::new(UsageTracker::new()),
tracker: None,
})
}
@ -220,6 +244,14 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
/// `on_usage` callback to track `input_tokens`.
fn ensure_interceptor_installed(&mut self) {
if !self.interceptor_installed {
// Pre-LLM-request hook: capture history.len() into the
// UsageTracker so the upcoming on_usage callback can pair
// it with the measured input_tokens.
self.hook_builder
.add_pre_llm_request(UsageTrackingHook {
tracker: self.usage_tracker.clone(),
});
let builder = std::mem::take(&mut self.hook_builder);
let registry = Arc::new(builder.build());
let hook_interceptor = HookInterceptor::new(registry);
@ -230,6 +262,11 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
.as_ref()
.and_then(|c| c.compact_threshold);
// Usage tracking via on_usage callback. Independent of
// compact_threshold so that LlmUsage entries are persisted
// unconditionally.
let tracker_for_usage = self.usage_tracker.clone();
if let Some(threshold) = compact_threshold {
let retained = self
.manifest
@ -240,18 +277,23 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
let state = Arc::new(CompactState::new(threshold, retained));
// Track input_tokens via on_usage callback.
// Combined on_usage: feed both the legacy compact threshold
// tracker and the new UsageTracker.
let state_for_usage = state.clone();
self.worker_mut().on_usage(move |event| {
if let Some(tokens) = event.input_tokens {
state_for_usage.update_input_tokens(tokens);
}
tracker_for_usage.record_usage(event);
});
let interceptor = CompactInterceptor::new(hook_interceptor, state.clone());
self.worker_mut().set_interceptor(interceptor);
self.compact_state = Some(state);
} else {
self.worker_mut().on_usage(move |event| {
tracker_for_usage.record_usage(event);
});
self.worker_mut().set_interceptor(hook_interceptor);
}
@ -439,6 +481,24 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
)
.await?;
// Persist any LLM Usage measurements collected during this run.
// One LogEntry::LlmUsage per LLM call (the tool loop may have run
// many calls within a single Pod::run).
let usage_records = self.usage_tracker.drain();
for record in usage_records {
session_store::save_usage(
&self.store,
self.session_id,
&mut self.head_hash,
record.history_len,
record.input_total_tokens,
record.cache_read_tokens,
record.cache_write_tokens,
record.output_tokens,
)
.await?;
}
let interrupted = self.worker.as_ref().unwrap().last_run_interrupted();
let outcome = match result {
Ok(WorkerResult::Finished) => Outcome::Finished,
@ -597,6 +657,7 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
interceptor_installed: false,
manifest_dir,
compact_state: None,
usage_tracker: Arc::new(UsageTracker::new()),
tracker: None,
})
}

View File

@ -0,0 +1,136 @@
//! Tracks per-LLM-request Usage measurements within a Pod run.
//!
//! Bridge between two sync touchpoints in the Worker lifecycle:
//!
//! - **`pre_llm_request` hook** (async, but synchronously accessed via the
//! tracker): captures `history.len()` at the moment a request goes out.
//! - **`on_usage` callback** (sync closure): receives the aggregated final
//! `UsageEvent` for that request after the stream completes.
//!
//! Pairing the two yields one `UsageRecord` per LLM call. Pod drains them
//! in `persist_turn` and writes them as `LogEntry::LlmUsage` entries.
//!
//! Multiple LLM calls per Pod run (tool loop) are supported: each call
//! produces its own `(history_len, UsageEvent)` pair, and the records are
//! buffered in chronological order.
use std::sync::Mutex;
use llm_worker::timeline::event::UsageEvent;
use session_store::UsageRecord;
/// Shared between the pre-request hook, the `on_usage` callback, and Pod.
pub(crate) struct UsageTracker {
/// `history.len()` captured at the most recent `pre_llm_request`.
/// Cleared when paired with an incoming `on_usage` event.
pending_history_len: Mutex<Option<usize>>,
/// Records accumulated during the current run; drained by Pod.
pending_records: Mutex<Vec<UsageRecord>>,
}
impl UsageTracker {
pub(crate) fn new() -> Self {
Self {
pending_history_len: Mutex::new(None),
pending_records: Mutex::new(Vec::new()),
}
}
/// Called from a `pre_llm_request` hook with the current history length.
pub(crate) fn note_request(&self, history_len: usize) {
*self.pending_history_len.lock().unwrap() = Some(history_len);
}
/// Called from the `on_usage` callback with the aggregated final
/// UsageEvent. If a `history_len` was previously stashed via
/// `note_request`, builds a `UsageRecord` and pushes it onto the buffer.
/// If not (e.g. test code that fires Usage outside a request), drops
/// the event.
pub(crate) fn record_usage(&self, event: &UsageEvent) {
let history_len = match self.pending_history_len.lock().unwrap().take() {
Some(n) => n,
None => return,
};
// UsageEvent.input_tokens は scheme 層で「占有量(プロンプト全長)」に
// 正規化済みである前提Anthropic は cache_read + cache_creation を
// 加算して emit する)。
let input_total = event.input_tokens.unwrap_or(0);
let cache_read = event.cache_read_input_tokens.unwrap_or(0);
let cache_write = event.cache_creation_input_tokens.unwrap_or(0);
let output = event.output_tokens.unwrap_or(0);
self.pending_records.lock().unwrap().push(UsageRecord {
history_len,
input_total_tokens: input_total,
cache_read_tokens: cache_read,
cache_write_tokens: cache_write,
output_tokens: output,
});
}
/// Drain accumulated records. Called by Pod after a run completes,
/// before persisting the turn.
pub(crate) fn drain(&self) -> Vec<UsageRecord> {
std::mem::take(&mut *self.pending_records.lock().unwrap())
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_event(input: u64, cache_read: u64, cache_write: u64, output: u64) -> UsageEvent {
UsageEvent {
input_tokens: Some(input),
output_tokens: Some(output),
total_tokens: Some(input + output),
cache_read_input_tokens: Some(cache_read),
cache_creation_input_tokens: Some(cache_write),
}
}
#[test]
fn pairs_history_len_with_usage_event() {
let tracker = UsageTracker::new();
tracker.note_request(5);
tracker.record_usage(&make_event(1000, 800, 100, 42));
let records = tracker.drain();
assert_eq!(records.len(), 1);
assert_eq!(records[0].history_len, 5);
assert_eq!(records[0].input_total_tokens, 1000);
assert_eq!(records[0].cache_read_tokens, 800);
assert_eq!(records[0].cache_write_tokens, 100);
assert_eq!(records[0].output_tokens, 42);
}
#[test]
fn drain_clears_buffer() {
let tracker = UsageTracker::new();
tracker.note_request(1);
tracker.record_usage(&make_event(10, 0, 0, 5));
assert_eq!(tracker.drain().len(), 1);
assert_eq!(tracker.drain().len(), 0);
}
#[test]
fn usage_without_pending_history_len_is_dropped() {
let tracker = UsageTracker::new();
tracker.record_usage(&make_event(10, 0, 0, 5));
assert_eq!(tracker.drain().len(), 0);
}
#[test]
fn multiple_requests_in_one_run() {
let tracker = UsageTracker::new();
tracker.note_request(5);
tracker.record_usage(&make_event(100, 0, 0, 20));
tracker.note_request(10);
tracker.record_usage(&make_event(200, 50, 0, 30));
let records = tracker.drain();
assert_eq!(records.len(), 2);
assert_eq!(records[0].history_len, 5);
assert_eq!(records[1].history_len, 10);
assert_eq!(records[1].cache_read_tokens, 50);
}
}

View File

@ -37,11 +37,11 @@ pub use fs_store::FsStore;
pub use session::{
SessionStartState, create_compacted_session, create_session, ensure_head_or_fork, fork, fork_at,
restore, save_cache_locked, save_cache_unlocked, save_config_changed, save_delta, save_outcome,
save_turn_end,
save_turn_end, save_usage,
};
pub use session_log::{
EntryHash, HashedEntry, LogEntry, Outcome, RestoredState, SessionOrigin, build_chain,
collect_state, compute_hash,
EntryHash, HashedEntry, LogEntry, Outcome, RestoredState, SessionOrigin, UsageRecord,
build_chain, collect_state, compute_hash,
};
pub use store::{Store, StoreError};

View File

@ -214,6 +214,33 @@ pub async fn save_outcome(
.await
}
/// Log an `LlmUsage` entry — 1 LLM リクエスト分の Usage スナップショット。
///
/// `history_len` は送信時の `history.len()`。`input_total_tokens` は
/// その prefix をプロバイダが実測した占有量(プロンプト全長)で、
/// プロバイダ別の正規化Anthropic では `input + cache_read + cache_creation`)を
/// 済ませた値を渡す。
pub async fn save_usage(
store: &impl Store,
session_id: SessionId,
head_hash: &mut Option<EntryHash>,
history_len: usize,
input_total_tokens: u64,
cache_read_tokens: u64,
cache_write_tokens: u64,
output_tokens: u64,
) -> Result<(), StoreError> {
append_entry(store, session_id, head_hash, LogEntry::LlmUsage {
ts: session_log::now_millis(),
history_len,
input_total_tokens,
cache_read_tokens,
cache_write_tokens,
output_tokens,
})
.await
}
/// Log a `Locked` entry (KV cache locked).
pub async fn save_cache_locked(
store: &impl Store,

View File

@ -141,6 +141,28 @@ pub enum LogEntry {
/// `RequestConfig` changed.
ConfigChanged { ts: u64, config: RequestConfig },
/// LLM リクエスト 1 件分の Usage スナップショット。
///
/// `history_len` は送信時の `history.len()`。`input_total_tokens` は
/// その prefix をプロバイダが実測した占有量(プロンプト全長)。
/// このリクエスト 1 件で新しく追加された分ではない。
///
/// プロバイダ別の正規化(呼び出し側で行う想定):
/// - Anthropic: `input_tokens + cache_read + cache_creation`
/// - OpenAI: `prompt_tokens`
/// - Gemini: `promptTokenCount`
/// - Ollama: `prompt_eval_count`
///
/// `cache_read_tokens` / `cache_write_tokens` は上記の内訳で、料金会計用。
LlmUsage {
ts: u64,
history_len: usize,
input_total_tokens: u64,
cache_read_tokens: u64,
cache_write_tokens: u64,
output_tokens: u64,
},
}
/// Provenance reference to a parent session.
@ -176,6 +198,27 @@ pub struct RestoredState {
pub last_run_interrupted: bool,
/// Hash of the last entry in the chain (None if empty).
pub head_hash: Option<EntryHash>,
/// LLM リクエストごとの Usage スナップショット時系列。
/// `LogEntry::LlmUsage` を replay して時系列順に積まれる。
/// 任意位置のトークン数推定に使う。
pub usage_history: Vec<UsageRecord>,
}
/// LLM リクエスト送信時点での占有量スナップショット。
///
/// `LogEntry::LlmUsage` の replay 時に `RestoredState.usage_history` に積まれる。
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct UsageRecord {
/// 送信時の history.len()
pub history_len: usize,
/// history[..history_len] の占有量(プロンプト全長、実測)
pub input_total_tokens: u64,
/// 上記のうちキャッシュから読み出された分
pub cache_read_tokens: u64,
/// 上記のうちこのリクエストでキャッシュに書かれた分
pub cache_write_tokens: u64,
/// このリクエストで生成された出力トークン数
pub output_tokens: u64,
}
/// Replay a sequence of hashed entries to reconstruct worker state.
@ -188,6 +231,7 @@ pub fn collect_state(entries: &[HashedEntry]) -> RestoredState {
locked_prefix_len: 0,
last_run_interrupted: false,
head_hash: None,
usage_history: Vec::new(),
};
for hashed in entries {
@ -233,6 +277,22 @@ pub fn collect_state(entries: &[HashedEntry]) -> RestoredState {
LogEntry::ConfigChanged { config, .. } => {
state.config = config.clone();
}
LogEntry::LlmUsage {
history_len,
input_total_tokens,
cache_read_tokens,
cache_write_tokens,
output_tokens,
..
} => {
state.usage_history.push(UsageRecord {
history_len: *history_len,
input_total_tokens: *input_total_tokens,
cache_read_tokens: *cache_read_tokens,
cache_write_tokens: *cache_write_tokens,
output_tokens: *output_tokens,
});
}
}
}
@ -452,6 +512,106 @@ mod tests {
assert_ne!(hash_a, hash_b);
}
#[test]
fn replay_llm_usage_appends_to_usage_history() {
let entries = build_chain(&[
LogEntry::SessionStart {
ts: 1000,
system_prompt: None,
config: RequestConfig::default(),
history: vec![],
forked_from: None,
compacted_from: None,
},
LogEntry::UserInput {
ts: 2000,
item: Item::user_message("hi"),
},
LogEntry::LlmUsage {
ts: 2100,
history_len: 1,
input_total_tokens: 50,
cache_read_tokens: 0,
cache_write_tokens: 0,
output_tokens: 10,
},
LogEntry::AssistantItems {
ts: 2200,
items: vec![Item::assistant_message("yo")],
},
LogEntry::LlmUsage {
ts: 3100,
history_len: 2,
input_total_tokens: 65,
cache_read_tokens: 50,
cache_write_tokens: 0,
output_tokens: 5,
},
]);
let state = collect_state(&entries);
// history は LlmUsage で変化しない
assert_eq!(state.history.len(), 2);
// usage_history は時系列順
assert_eq!(state.usage_history.len(), 2);
assert_eq!(state.usage_history[0].history_len, 1);
assert_eq!(state.usage_history[0].input_total_tokens, 50);
assert_eq!(state.usage_history[1].history_len, 2);
assert_eq!(state.usage_history[1].cache_read_tokens, 50);
}
#[test]
fn replay_without_llm_usage_keeps_usage_history_empty() {
// 既存ログ互換: LlmUsage entry が無くても collect_state は壊れない
let entries = build_chain(&[
LogEntry::SessionStart {
ts: 1000,
system_prompt: None,
config: RequestConfig::default(),
history: vec![],
forked_from: None,
compacted_from: None,
},
LogEntry::UserInput {
ts: 2000,
item: Item::user_message("hi"),
},
]);
let state = collect_state(&entries);
assert!(state.usage_history.is_empty());
}
#[test]
fn llm_usage_entry_round_trip_via_json() {
let entry = LogEntry::LlmUsage {
ts: 12345,
history_len: 7,
input_total_tokens: 1000,
cache_read_tokens: 800,
cache_write_tokens: 100,
output_tokens: 42,
};
let json = serde_json::to_string(&entry).unwrap();
let parsed: LogEntry = serde_json::from_str(&json).unwrap();
match parsed {
LogEntry::LlmUsage {
ts,
history_len,
input_total_tokens,
cache_read_tokens,
cache_write_tokens,
output_tokens,
} => {
assert_eq!(ts, 12345);
assert_eq!(history_len, 7);
assert_eq!(input_total_tokens, 1000);
assert_eq!(cache_read_tokens, 800);
assert_eq!(cache_write_tokens, 100);
assert_eq!(output_tokens, 42);
}
other => panic!("expected LlmUsage, got {:?}", other),
}
}
#[test]
fn hash_hex_round_trip() {
let entry = LogEntry::SessionStart {

View File

@ -85,6 +85,28 @@ warn を出す。両方 None なら compact 無効(今まで通り)。片方
`CompactState` 内部では `Option<u64>` 2 本持ち。`exceeds_*` メソッドは `Option``None` なら常に `false`
### 占有量ソースの統合(重要)
現在 `CompactState::last_input_tokens: AtomicU64``on_usage` callback から
更新され、閾値判定に使われている。これは usage-history チケットで導入された
session-store の `LogEntry::LlmUsage` 履歴と**情報源が二重化**している状態。
本チケットで両者を統合する。**改善版である `usage_history` を単一の情報源とし、
`last_input_tokens` 経路を撤去する**:
- `CompactState` から `last_input_tokens: AtomicU64` フィールドを削除
- `CompactState::update_input_tokens` メソッドを削除
- `Pod::ensure_interceptor_installed` の on_usage callback から
`state_for_usage.update_input_tokens(tokens)` の行を削除
`tracker_for_usage.record_usage(event)` だけが残る)
- 閾値判定 (`exceeds_request` / `exceeds_post_run`) は `Session::total_tokens()`
token-counter で導入される APIの戻り値を見る形に変える
- これにより「実測値の単一履歴 → トークン会計 API → 閾値判定」と一直線になる
Anthropic のキャッシュヒット時に占有量を取りこぼす旧バグも、このパスを
廃止することで自動的に解消する(`UsageEvent.input_tokens` は scheme 層で
すでに占有量に正規化済み、かつ usage_history はそれをそのまま保存している)。
### 影響箇所
- **`crates/manifest/src/lib.rs`**
@ -93,6 +115,8 @@ warn を出す。両方 None なら compact 無効(今まで通り)。片方
- テスト更新 (両閾値が読めること)
- **`crates/pod/src/compact_state.rs`**
- `last_input_tokens: AtomicU64` フィールドを **削除**(情報源を usage_history に一本化)
- `update_input_tokens` / `last_input_tokens` メソッドも削除
- `turn_threshold` フィールドを `request_threshold: Option<u64>` にリネーム + `Option`
- `post_run_threshold: u64``Option<u64>` に変更
- コンストラクタシグネチャ変更:
@ -106,19 +130,27 @@ warn を出す。両方 None なら compact 無効(今まで通り)。片方
retained_turns: usize,
) -> Self
```
- `exceeds_turn()``exceeds_request()` にリネーム。中身:
- `exceeds_turn()``exceeds_request()` にリネーム。閾値超過判定は
呼び出し側で現在の占有量を渡す形に変えるCompactState は閾値しか持たない):
```rust
pub(crate) fn exceeds_request(&self) -> bool {
pub(crate) fn exceeds_request(&self, current_tokens: u64) -> bool {
self.request_threshold
.map(|t| self.last_input_tokens() > t)
.map(|t| current_tokens > t)
.unwrap_or(false)
}
```
呼び出し元 (`compact_interceptor.rs` / `controller.rs`) は `Session::total_tokens()`
token-counter で生やす APIから現在の占有量を取って渡す
- `exceeds_post_run()` も同様に Option 対応
- `turn_threshold()` getter → `request_threshold()`、戻り値は `Option<u64>`
- ドックコメントを「proactive = post_run」「safety net = request」で書き直し
- テスト: 両方設定/片方だけ/両方 None の 3 ケース
- **`crates/pod/src/pod.rs`** (上記の compact_state 変更に伴って)
- `ensure_interceptor_installed` の on_usage callback から
`state_for_usage.update_input_tokens(tokens)` の行を削除。
`tracker_for_usage.record_usage(event)` だけが残る
- **`crates/pod/src/compact_interceptor.rs`**
- `exceeds_turn()` 呼び出しを `exceeds_request()`
- ログメッセージ "Between-turns ..." → "Between-requests ..."
@ -356,8 +388,9 @@ pruned history から:
### Prune と Compact の相互作用
Prune はリクエストコンテキストのみ操作、`last_input_tokens` は前回の LLM レスポンスの値。
Prune の効果は閾値判断に反映されない。保守的compact しすぎる方向)で実害は小さい。
Prune はリクエストコンテキストのみ操作。閾値判定は usage_history の最新
測定値(前回の LLM レスポンス時点の占有量を見るので、Prune の効果は
次回 LLM call まで反映されない。保守的compact しすぎる方向)で実害は小さい。
### compact 中のクライアント通知

View File

@ -18,6 +18,11 @@ Compact / Prune の挙動改善に「**履歴上の任意位置のトークン
正確なトークン数(推定でも実測由来)が要る箇所:
- **Compact 閾値判定** — 現状 `CompactState::last_input_tokens` (`AtomicU64`) が
on_usage callback で更新されているが、これは usage_history と情報源が二重化
している。本チケットで `Session::total_tokens()` を生やせば、`compact_interceptor.rs` /
`controller.rs` から閾値判定がこの API 経由になり、`last_input_tokens` 経路を
撤去できる(撤去自体は compact-improvements 側で実施)
- **Compact の retained_tokens 切り出し** — 末尾から N トークン残す cut 位置を決める
- **Prune の `min_savings` 判定** — 「この content を捨てたら何トークン浮くか」を見積もる
- **Compact worker の auto-read budget 判定**`mark_read_required` の累計

View File

@ -107,13 +107,14 @@ pub async fn save_usage(
- 各プロバイダの scheme で 1 リクエスト内の複数 Usage eventAnthropic の
message_start + message_deltaを集約し、**完了時の最終値だけを 1 つの
`UsageEvent` として外に発火する**。pod 側では暫定値を見ない
- `UsageEvent` 上で provider 別 raw 値(`input_tokens` / `cache_read_input_tokens`
/ `cache_creation_input_tokens` / `output_tokens`)はそのまま保持。占有量への
正規化は consumer 側save_usage 呼び出し側)で行う
- 動機: llm-worker は raw 値の運搬役に徹し、「プロンプト全長」のような
プロバイダ依存の意味付けは upper layer に集約する
- 正規化ヘルパー(例: `UsageEvent::input_total_tokens()`)を llm-worker に
生やすかは実装時判断
- 占有量への正規化Anthropic: `input_tokens + cache_read + cache_creation`)は
各 scheme の `convert_usage` で行い、`UsageEvent.input_tokens` には正規化済みの
占有量プロンプト全長が入る。consumer 側pod / UsageTracker
`event.input_tokens` をそのまま使う
- 動機: 正規化ロジックを scheme に閉じ込めることで、consumer が provider 差異を
意識する必要がなくなる
- `cache_read_input_tokens` / `cache_creation_input_tokens` は内訳として
別フィールドに保持。料金会計用
### pod 側
@ -187,6 +188,10 @@ callback と同じ場所。
- 既存ログ(`LlmUsage` 無し)を読んでも壊れない
- Anthropic の cache hit ありレスポンスで input_total が正しく計算される
## レビュー状態
Reviewed — [usage-history.review.md](usage-history.review.md)
## 依存
- なし(前提チケット)

View File

@ -0,0 +1,50 @@
# usage-history レビュー
## 要件の充足
チケットが定義した要件は全て達成されている:
- **LogEntry::LlmUsage**: session-store のハッシュチェーンに乗る variant として追加。
`history_len` / `input_total_tokens` / `cache_read_tokens` / `cache_write_tokens` / `output_tokens` の5フィールド
- **RestoredState.usage_history**: `collect_state` の replay で `Vec<UsageRecord>` に時系列順で積まれる。history の構築には影響しない
- **save_usage**: `append_entry` 経由でハッシュチェーンに接続
- **既存ログ互換**: `LlmUsage` entry が無い既存ログを読んでも `usage_history` が空になるだけで壊れない
- **1リクエスト = 1 entry**: Timeline の `pending_usage` + `flush_usage()` で複数 Usage event を集約し、handler には1度だけ発火
## アーキテクチャ
レイヤー分担が明確で、各層の責務が逸脱していない:
| レイヤー | 責務 |
|---------|------|
| scheme (anthropic) | raw → 占有量への正規化。`input_tokens + cache_read + cache_creation` |
| Timeline | 1リクエスト内の複数 Usage event をフィールド単位 latest-non-None でマージ。`flush_usage()` で1度だけ発火 |
| Worker | ストリーム完了・エラー・キャンセルの全パスで `flush_usage()` を呼ぶ |
| UsageTracker (pod) | `note_request(history_len)``record_usage(event)` のペアリング。drain で Pod に渡す |
| Pod::persist_turn | drain した records を `save_usage` で session-store に書き出し |
## 指摘と対処
### 1. UsageEvent の doc comment対処済み
`UsageEvent.input_tokens` が「占有量(プロンプト全長、キャッシュ込み)」を意味することが
struct と各フィールドの doc comment に明記された。scheme 層での正規化規約も記載済み。
### 2. save_usage の引数が多い(非ブロッカー、未対処)
8引数。`UsageRecord` を直接受け取れば `drain()` の結果をそのまま渡せてシグネチャがきれいになるが、
他の `save_*` 関数がフラットな引数を取るパターンと一貫しているため、統一性の観点では現状でも妥当。
将来フィールドが増えた時点でまとめて `UsageRecord` 受け取りに変えればよい。
## テスト
- `replay_llm_usage_appends_to_usage_history`: 複数 LlmUsage entry の replay で usage_history が正しく積まれ、history.len() に影響しない
- `replay_without_llm_usage_keeps_usage_history_empty`: 既存ログ互換
- `llm_usage_entry_round_trip_via_json`: serde 往復
- `test_convert_usage_includes_cache_in_input_total`: Anthropic の占有量正規化
- `test_usage_aggregation_and_flush`: Timeline の集約 + flush
- UsageTracker: ペアリング、drain、未ペアの drop、複数リクエスト
## 判定
承認。