diff --git a/Cargo.lock b/Cargo.lock index 3c3e9fb5..9c5e3c92 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2144,6 +2144,7 @@ dependencies = [ "schemars", "serde", "serde_json", + "session-metrics", "session-store", "tempfile", "thiserror 2.0.18", @@ -2926,6 +2927,15 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "session-metrics" +version = "0.1.0" +dependencies = [ + "serde", + "serde_json", + "session-store", +] + [[package]] name = "session-store" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index de37754a..7751c6cd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ members = [ "crates/protocol", "crates/provider", "crates/pod-registry", + "crates/session-metrics", "crates/tools", "crates/tui", "crates/memory", @@ -28,6 +29,7 @@ memory = { path = "crates/memory" } pod-registry = { path = "crates/pod-registry" } protocol = { path = "crates/protocol" } provider = { path = "crates/provider" } +session-metrics = { path = "crates/session-metrics" } session-store = { path = "crates/session-store" } tools = { path = "crates/tools" } diff --git a/crates/llm-worker/src/prune.rs b/crates/llm-worker/src/prune.rs index 4acd6f69..e05001b5 100644 --- a/crates/llm-worker/src/prune.rs +++ b/crates/llm-worker/src/prune.rs @@ -30,6 +30,43 @@ use crate::llm_client::types::Item; /// 実際の projection と一致する savings を返す必要がある。 pub type SavingsEstimator = Box u64 + Send + Sync>; +/// Result of one prune evaluation pass, surfaced to the optional +/// [`PruneObserver`] for instrumentation. +/// +/// Worker は LLM リクエストごとに 1 回 prune の評価をし、その結果を +/// (observer が登録されていれば)この値で通知する。fire/skip の判定 +/// 結果と、判定材料になった候補数 / 推定 savings / 境界ターン位置を持つ。 +#[derive(Debug, Clone)] +pub struct PruneEvaluation { + /// `prunable_indices` の長さ。`Skipped::NoCandidates` の時は 0。 + pub candidate_count: usize, + /// 推定された savings (tokens)。`NoCandidates` の時は 0。 + pub estimated_savings: u64, + /// `protected_turns` 境界に当たる turn-start アイテムの index。 + /// turn 数が `protected_turns` 以下で境界が決まらない場合は `None`。 + pub border_turn: Option, + /// 判定結果。 + pub decision: PruneDecision, +} + +/// Outcome of one prune evaluation. Each variant is one branch of the +/// "fire vs skip" decision tree the Worker walks before each LLM request. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum PruneDecision { + /// `prunable_indices` が空 → 何もしない。 + SkippedNoCandidates, + /// 候補はあったが推定 savings が `min_savings` 未満 → 何もしない。 + SkippedBelowMinSavings, + /// 候補があり savings >= min_savings → projection を適用した。 + /// `pruned_count` は `project()` が実際に書き換えた item 数 + /// (既に content=None だった候補は 0 計上)。 + Fired { pruned_count: usize }, +} + +/// Optional observer invoked after each prune evaluation, regardless of +/// branch. Pod 等の上位層が install して metrics を発行する。 +pub type PruneObserver = Box; + /// Configuration for the Prune algorithm. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct PruneConfig { @@ -100,12 +137,20 @@ pub fn project(items: &mut [Item], indices: &[usize]) -> usize { /// Returns an empty vector when there are too few turns or no prunable /// candidates. pub fn prunable_indices(items: &[Item], protected_turns: usize) -> Vec { + evaluate_candidates(items, protected_turns).0 +} + +/// Same as [`prunable_indices`] but also returns the index of the +/// `protected_turns` boundary (the turn-start item whose tail is +/// protected). `None` when too few turns exist for a boundary to be +/// defined. +pub fn evaluate_candidates(items: &[Item], protected_turns: usize) -> (Vec, Option) { let turn_starts = find_turn_starts(items); if turn_starts.len() <= protected_turns { - return Vec::new(); + return (Vec::new(), None); } let boundary = turn_starts[turn_starts.len() - protected_turns]; - items[..boundary] + let candidates = items[..boundary] .iter() .enumerate() .filter_map(|(i, item)| match item { @@ -114,7 +159,8 @@ pub fn prunable_indices(items: &[Item], protected_turns: usize) -> Vec { } => Some(i), _ => None, }) - .collect() + .collect(); + (candidates, Some(boundary)) } #[cfg(test)] @@ -239,6 +285,30 @@ mod tests { assert_eq!(project(&mut items, &candidates), 0); } + #[test] + fn evaluate_candidates_returns_boundary_index() { + let big = "x".repeat(64); + let items = make_history(&[ + ("turn1", vec![("s1", Some(&big))]), + ("turn2", vec![("s2", Some(&big))]), + ("turn3", vec![("s3", Some("keep"))]), + ("turn4", vec![("s4", Some("keep too"))]), + ]); + let (candidates, border) = evaluate_candidates(&items, 2); + assert_eq!(candidates.len(), 2); + // protected_turns=2 → boundary は turn3 の user message 位置。 + // turn1: u/a/c/r (4) + turn2: u/a/c/r (4) = index 8 (turn3 の user)。 + assert_eq!(border, Some(8)); + } + + #[test] + fn evaluate_candidates_no_boundary_when_too_few_turns() { + let items = make_history(&[("only", vec![("s", Some("x"))])]); + let (candidates, border) = evaluate_candidates(&items, 2); + assert!(candidates.is_empty()); + assert!(border.is_none()); + } + #[test] fn protected_turns_boundary_exact() { // 3 turns with protected_turns=2: only turn 1 is a candidate. diff --git a/crates/llm-worker/src/worker.rs b/crates/llm-worker/src/worker.rs index b638d772..fd305e42 100644 --- a/crates/llm-worker/src/worker.rs +++ b/crates/llm-worker/src/worker.rs @@ -184,6 +184,9 @@ pub struct Worker { /// by higher layers that own usage measurements. `None` disables /// the prune projection. savings_estimator: Option, + /// Optional observer fired once per prune evaluation (regardless of + /// whether projection actually fired). `None` disables instrumentation. + prune_observer: Option, /// Index of the last stable cache prefix item, set by higher layers. /// Plumbed into [`Request::cache_anchor`] at request build time. cache_anchor: Option, @@ -384,6 +387,16 @@ impl Worker { self.savings_estimator = estimator; } + /// Install an observer notified after each prune evaluation pass. + /// + /// Fires once per outgoing LLM request (the same point as the + /// `prune_config` / `savings_estimator` pair), regardless of whether + /// projection actually applied. Intended for upper layers that want + /// to instrument fire/skip rates without owning the prune logic. + pub fn set_prune_observer(&mut self, observer: Option) { + self.prune_observer = observer; + } + /// Mark an index into the current history as a stable, cacheable /// prefix boundary. The value is included in each outgoing /// [`Request`] via [`Request::cache_anchor`] — caching-aware @@ -854,9 +867,16 @@ impl Worker { // threshold. Worker does not own usage history itself; the // estimator is injected by the layer that does. if let (Some(config), Some(estimator)) = (&self.prune_config, &self.savings_estimator) { - let candidates = - crate::prune::prunable_indices(&request_context, config.protected_turns); - if !candidates.is_empty() { + let (candidates, border_turn) = + crate::prune::evaluate_candidates(&request_context, config.protected_turns); + let evaluation = if candidates.is_empty() { + crate::prune::PruneEvaluation { + candidate_count: 0, + estimated_savings: 0, + border_turn, + decision: crate::prune::PruneDecision::SkippedNoCandidates, + } + } else { let savings = estimator(&request_context, &candidates); if savings >= config.min_savings { let pruned = crate::prune::project(&mut request_context, &candidates); @@ -867,7 +887,25 @@ impl Worker { "Projected old tool-result content out of request context" ); } + crate::prune::PruneEvaluation { + candidate_count: candidates.len(), + estimated_savings: savings, + border_turn, + decision: crate::prune::PruneDecision::Fired { + pruned_count: pruned, + }, + } + } else { + crate::prune::PruneEvaluation { + candidate_count: candidates.len(), + estimated_savings: savings, + border_turn, + decision: crate::prune::PruneDecision::SkippedBelowMinSavings, + } } + }; + if let Some(observer) = &self.prune_observer { + observer(&evaluation); } } @@ -1077,6 +1115,7 @@ impl Worker { tool_output_limits: None, prune_config: None, savings_estimator: None, + prune_observer: None, cache_anchor: None, cache_key: None, _state: PhantomData, @@ -1334,6 +1373,7 @@ impl Worker { tool_output_limits: self.tool_output_limits, prune_config: self.prune_config, savings_estimator: self.savings_estimator, + prune_observer: self.prune_observer, cache_anchor: self.cache_anchor, cache_key: self.cache_key, _state: PhantomData, @@ -1414,6 +1454,7 @@ impl Worker { tool_output_limits: self.tool_output_limits, prune_config: self.prune_config, savings_estimator: self.savings_estimator, + prune_observer: self.prune_observer, cache_anchor: self.cache_anchor, cache_key: self.cache_key, _state: PhantomData, diff --git a/crates/pod/Cargo.toml b/crates/pod/Cargo.toml index 597d8558..8b1610ab 100644 --- a/crates/pod/Cargo.toml +++ b/crates/pod/Cargo.toml @@ -28,6 +28,7 @@ libc = { workspace = true } schemars = { workspace = true } memory = { workspace = true } uuid = { workspace = true, features = ["v7"] } +session-metrics = { workspace = true } [dev-dependencies] dotenv = "0.15.0" diff --git a/crates/pod/src/compact/metrics_tracker.rs b/crates/pod/src/compact/metrics_tracker.rs new file mode 100644 index 00000000..10feaf1d --- /dev/null +++ b/crates/pod/src/compact/metrics_tracker.rs @@ -0,0 +1,50 @@ +//! Sync buffer for `session_metrics::Metric` values queued from inside +//! Worker callbacks (which run synchronously and cannot themselves +//! perform `async` store writes). +//! +//! Pod drains this buffer in `persist_turn` and writes each metric via +//! `session_metrics::record_metric`, alongside the regular `LlmUsage` +//! entries. + +use std::sync::Mutex; + +use session_metrics::Metric; + +pub(crate) struct MetricsTracker { + pending: Mutex>, +} + +impl MetricsTracker { + pub(crate) fn new() -> Self { + Self { + pending: Mutex::new(Vec::new()), + } + } + + /// Queue a metric for the next `persist_turn` flush. + pub(crate) fn push(&self, metric: Metric) { + self.pending.lock().unwrap().push(metric); + } + + /// Drain all queued metrics. Called by Pod after a run completes. + pub(crate) fn drain(&self) -> Vec { + std::mem::take(&mut *self.pending.lock().unwrap()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn push_then_drain_returns_in_order_and_clears() { + let t = MetricsTracker::new(); + t.push(Metric::now("a")); + t.push(Metric::now("b")); + let drained = t.drain(); + assert_eq!(drained.len(), 2); + assert_eq!(drained[0].name, "a"); + assert_eq!(drained[1].name, "b"); + assert!(t.drain().is_empty()); + } +} diff --git a/crates/pod/src/compact/mod.rs b/crates/pod/src/compact/mod.rs index 79316add..47087718 100644 --- a/crates/pod/src/compact/mod.rs +++ b/crates/pod/src/compact/mod.rs @@ -1,3 +1,4 @@ +pub(crate) mod metrics_tracker; pub(crate) mod prune; pub(crate) mod state; pub(crate) mod token_counter; diff --git a/crates/pod/src/compact/prune.rs b/crates/pod/src/compact/prune.rs index 166a7a9c..3b238340 100644 --- a/crates/pod/src/compact/prune.rs +++ b/crates/pod/src/compact/prune.rs @@ -5,10 +5,16 @@ //! 直後)。Worker は usage 履歴を知らないので、`min_savings` 判定に使う savings //! の見積もりはコールバックで外部から注入する。このモジュールはそのコールバック //! を組み立てて Worker に差し込むための `impl Pod` を提供する。 +//! +//! 同じ経路で `PruneObserver` も install し、評価のたびに `prune.fire` / +//! `prune.skip` metric を `MetricsTracker` に積む。`Fired` 時は uuid を +//! `UsageTracker` にも stash しておき、後続の `LlmUsage` と組で +//! `prune.post_request` を吐けるようにする。 use llm_worker::Item; use llm_worker::llm_client::client::LlmClient; -use llm_worker::prune::{PruneConfig, SavingsEstimator}; +use llm_worker::prune::{PruneConfig, PruneDecision, PruneObserver, SavingsEstimator}; +use session_metrics::Metric; use session_store::Store; use crate::Pod; @@ -24,6 +30,12 @@ impl Pod { /// Measurement-less estimates (before the first LLM call, or immediately /// after a compact) return `0` from the estimator, which naturally /// prevents the prune projection from firing until usage data exists. + /// + /// Also installs a [`PruneObserver`] that pushes `prune.fire` / + /// `prune.skip` metrics into the shared [`MetricsTracker`]. On `Fired` + /// the observer additionally stashes a fresh correlation_id in + /// [`UsageTracker`] so the next `LlmUsage` can be paired with a + /// `prune.post_request` metric carrying the same id. pub fn attach_prune(&mut self, config: PruneConfig) { let usage = self.usage_history_handle(); let estimator: SavingsEstimator = Box::new(move |history: &[Item], indices| { @@ -34,9 +46,43 @@ impl Pod { _ => est.tokens, } }); + + let metrics = self.metrics_tracker_handle(); + let usage_tracker = self.usage_tracker_handle(); + let observer: PruneObserver = Box::new(move |eval| { + match &eval.decision { + PruneDecision::Fired { .. } => { + let correlation_id = uuid::Uuid::now_v7().to_string(); + let mut metric = Metric::now("prune.fire") + .with_value(eval.estimated_savings as f64) + .with_correlation_id(&correlation_id) + .with_dimension("candidate_count", eval.candidate_count.to_string()); + if let Some(border) = eval.border_turn { + metric = metric.with_dimension("border_turn", border.to_string()); + } + metrics.push(metric); + usage_tracker.note_correlation_id(correlation_id); + } + PruneDecision::SkippedNoCandidates => { + metrics.push( + Metric::now("prune.skip").with_dimension("reason", "no_candidates"), + ); + } + PruneDecision::SkippedBelowMinSavings => { + metrics.push( + Metric::now("prune.skip") + .with_dimension("reason", "below_min_savings") + .with_dimension("candidate_count", eval.candidate_count.to_string()) + .with_value(eval.estimated_savings as f64), + ); + } + } + }); + let worker = self.worker_mut(); worker.set_prune_config(Some(config)); worker.set_savings_estimator(Some(estimator)); + worker.set_prune_observer(Some(observer)); } /// If the manifest has a `[compaction]` section, build a `PruneConfig` diff --git a/crates/pod/src/compact/usage_tracker.rs b/crates/pod/src/compact/usage_tracker.rs index 02cbc8c4..34b87281 100644 --- a/crates/pod/src/compact/usage_tracker.rs +++ b/crates/pod/src/compact/usage_tracker.rs @@ -19,19 +19,35 @@ use std::sync::Mutex; use llm_worker::UsageRecord; use llm_worker::timeline::event::UsageEvent; +/// One drained measurement: the underlying `UsageRecord` plus an optional +/// `correlation_id` stamped by the prune projection (or any other future +/// upstream observer) so that downstream metrics emitted alongside this +/// record can be joined to it after the fact. +#[derive(Debug, Clone)] +pub(crate) struct RecordedUsage { + pub(crate) record: UsageRecord, + pub(crate) correlation_id: Option, +} + /// Shared between the pre-request hook, the `on_usage` callback, and Pod. pub(crate) struct UsageTracker { /// `history.len()` captured at the most recent `pre_llm_request`. /// Cleared when paired with an incoming `on_usage` event. pending_history_len: Mutex>, + /// Optional `correlation_id` set by an upstream observer (currently + /// the prune projection on `Fired`). Paired into the next + /// `RecordedUsage` and cleared. Skips that don't fire leave this + /// `None`, so the resulting record carries no correlation. + pending_correlation_id: Mutex>, /// Records accumulated during the current run; drained by Pod. - pending_records: Mutex>, + pending_records: Mutex>, } impl UsageTracker { pub(crate) fn new() -> Self { Self { pending_history_len: Mutex::new(None), + pending_correlation_id: Mutex::new(None), pending_records: Mutex::new(Vec::new()), } } @@ -41,16 +57,29 @@ impl UsageTracker { *self.pending_history_len.lock().unwrap() = Some(history_len); } + /// Stash a `correlation_id` to be paired into the next `RecordedUsage`. + /// Currently invoked by the prune observer on `Fired` so that the + /// `prune.fire` metric and the `prune.post_request` metric (emitted + /// alongside the resulting `LlmUsage`) carry the same join key. + /// + /// Overwrites any previous unconsumed value — by construction the + /// observer fires at most once per outgoing LLM request, immediately + /// before the pre-request hook captures `history_len`. + pub(crate) fn note_correlation_id(&self, id: String) { + *self.pending_correlation_id.lock().unwrap() = Some(id); + } + /// Called from the `on_usage` callback with the aggregated final /// UsageEvent. If a `history_len` was previously stashed via - /// `note_request`, builds a `UsageRecord` and pushes it onto the buffer. - /// If not (e.g. test code that fires Usage outside a request), drops - /// the event. + /// `note_request`, builds a `RecordedUsage` and pushes it onto the + /// buffer. If not (e.g. test code that fires Usage outside a request), + /// drops the event. pub(crate) fn record_usage(&self, event: &UsageEvent) { let history_len = match self.pending_history_len.lock().unwrap().take() { Some(n) => n, None => return, }; + let correlation_id = self.pending_correlation_id.lock().unwrap().take(); // UsageEvent.input_tokens は scheme 層で「占有量(プロンプト全長)」に // 正規化済みである前提(Anthropic は cache_read + cache_creation を // 加算して emit する)。 @@ -58,18 +87,21 @@ impl UsageTracker { let cache_read = event.cache_read_input_tokens.unwrap_or(0); let cache_write = event.cache_creation_input_tokens.unwrap_or(0); let output = event.output_tokens.unwrap_or(0); - self.pending_records.lock().unwrap().push(UsageRecord { - history_len, - input_total_tokens: input_total, - cache_read_tokens: cache_read, - cache_write_tokens: cache_write, - output_tokens: output, + self.pending_records.lock().unwrap().push(RecordedUsage { + record: UsageRecord { + history_len, + input_total_tokens: input_total, + cache_read_tokens: cache_read, + cache_write_tokens: cache_write, + output_tokens: output, + }, + correlation_id, }); } /// Drain accumulated records. Called by Pod after a run completes, /// before persisting the turn. - pub(crate) fn drain(&self) -> Vec { + pub(crate) fn drain(&self) -> Vec { std::mem::take(&mut *self.pending_records.lock().unwrap()) } } @@ -96,11 +128,12 @@ mod tests { let records = tracker.drain(); assert_eq!(records.len(), 1); - assert_eq!(records[0].history_len, 5); - assert_eq!(records[0].input_total_tokens, 1000); - assert_eq!(records[0].cache_read_tokens, 800); - assert_eq!(records[0].cache_write_tokens, 100); - assert_eq!(records[0].output_tokens, 42); + assert_eq!(records[0].record.history_len, 5); + assert_eq!(records[0].record.input_total_tokens, 1000); + assert_eq!(records[0].record.cache_read_tokens, 800); + assert_eq!(records[0].record.cache_write_tokens, 100); + assert_eq!(records[0].record.output_tokens, 42); + assert!(records[0].correlation_id.is_none()); } #[test] @@ -129,8 +162,25 @@ mod tests { let records = tracker.drain(); assert_eq!(records.len(), 2); - assert_eq!(records[0].history_len, 5); - assert_eq!(records[1].history_len, 10); - assert_eq!(records[1].cache_read_tokens, 50); + assert_eq!(records[0].record.history_len, 5); + assert_eq!(records[1].record.history_len, 10); + assert_eq!(records[1].record.cache_read_tokens, 50); + } + + #[test] + fn correlation_id_pairs_with_next_record_only() { + let tracker = UsageTracker::new(); + // Stash an ID, then run a request → the ID should land on this record. + tracker.note_correlation_id("abc".into()); + tracker.note_request(5); + tracker.record_usage(&make_event(100, 0, 0, 20)); + // Next request without a fresh stash → no correlation_id. + tracker.note_request(10); + tracker.record_usage(&make_event(200, 50, 0, 30)); + + let records = tracker.drain(); + assert_eq!(records.len(), 2); + assert_eq!(records[0].correlation_id.as_deref(), Some("abc")); + assert!(records[1].correlation_id.is_none()); } } diff --git a/crates/pod/src/pod.rs b/crates/pod/src/pod.rs index cdc4fd13..96c150eb 100644 --- a/crates/pod/src/pod.rs +++ b/crates/pod/src/pod.rs @@ -77,6 +77,11 @@ pub struct Pod { /// Captures `(history_len, UsageEvent)` pairs during a run; drained /// in `persist_turn` and persisted as `LogEntry::LlmUsage` entries. usage_tracker: Arc, + /// Sync-side buffer for `Metric` values queued from inside Worker + /// callbacks (currently the prune observer). Drained in `persist_turn` + /// and written via `session_metrics::record_metric` alongside + /// `LogEntry::LlmUsage`. Always present after construction. + metrics_tracker: Arc, /// Cumulative Usage measurement timeline, one entry per LLM call. /// Restored from session log on `restore`, appended on each persist. /// Read by token-accounting APIs (`Pod::total_tokens`, etc.). @@ -203,6 +208,7 @@ impl Pod { interceptor_installed: false, compact_state: None, usage_tracker: Arc::new(UsageTracker::new()), + metrics_tracker: Arc::new(crate::compact::metrics_tracker::MetricsTracker::new()), usage_history: Arc::new(Mutex::new(Vec::::new())), tracker: None, system_prompt_template: None, @@ -391,6 +397,26 @@ impl Pod { self.usage_history.clone() } + /// Handle to the per-LLM-request `UsageTracker`. + /// + /// Sibling modules (e.g. the prune observer) clone this `Arc` to stash + /// per-request side state (e.g. a `correlation_id`) that pairs with + /// the next `LlmUsage`. + pub(crate) fn usage_tracker_handle(&self) -> Arc { + self.usage_tracker.clone() + } + + /// Handle to the synchronous `MetricsTracker` buffer. + /// + /// Worker callbacks (e.g. the prune observer) clone this `Arc` and + /// `.push(metric)` into it; Pod drains it in `persist_turn` and + /// writes each metric via `session_metrics::record_metric`. + pub(crate) fn metrics_tracker_handle( + &self, + ) -> Arc { + self.metrics_tracker.clone() + } + /// Attach the session-scoped file-operation tracker from the builtin /// `tools` crate. Called by the Controller immediately after it /// registers the builtin tools on the Worker. Overwrites any @@ -1068,13 +1094,36 @@ impl Pod { ) .await?; + // Flush any sync-buffered metrics from this run first + // (currently `prune.fire` / `prune.skip` from the prune observer). + // Ordered before LlmUsage so that a `prune.fire` and the + // `prune.post_request` derived from the matching usage record + // appear in the log close together. + let pending_metrics = self.metrics_tracker.drain(); + for metric in pending_metrics { + session_metrics::record_metric( + &self.store, + self.session_id, + &mut self.head_hash, + &metric, + ) + .await?; + } + // Persist any LLM Usage measurements collected during this run. // One LogEntry::LlmUsage per LLM call (the tool loop may have run // many calls within a single Pod::run). Each is also appended to // the in-memory `usage_history` so token-accounting APIs see it - // before the next run. + // before the next run. Records carrying a `correlation_id` (set + // by an upstream observer such as the prune projection) also get + // a paired `prune.post_request` metric so cache_read/write can be + // joined back to the originating event. let usage_records = self.usage_tracker.drain(); - for record in usage_records { + for recorded in usage_records { + let crate::compact::usage_tracker::RecordedUsage { + record, + correlation_id, + } = recorded; session_store::save_usage( &self.store, self.session_id, @@ -1086,6 +1135,20 @@ impl Pod { record.output_tokens, ) .await?; + if let Some(id) = correlation_id { + let metric = session_metrics::Metric::now("prune.post_request") + .with_correlation_id(&id) + .with_value(record.cache_read_tokens as f64) + .with_dimension("cache_write_tokens", record.cache_write_tokens.to_string()) + .with_dimension("history_len", record.history_len.to_string()); + session_metrics::record_metric( + &self.store, + self.session_id, + &mut self.head_hash, + &metric, + ) + .await?; + } self.usage_history .lock() .expect("usage_history poisoned") @@ -1895,6 +1958,7 @@ impl Pod, St> { interceptor_installed: false, compact_state: None, usage_tracker: Arc::new(UsageTracker::new()), + metrics_tracker: Arc::new(crate::compact::metrics_tracker::MetricsTracker::new()), usage_history: Arc::new(Mutex::new(Vec::new())), tracker: None, system_prompt_template: common.system_prompt_template, @@ -1956,6 +2020,7 @@ impl Pod, St> { interceptor_installed: false, compact_state: None, usage_tracker: Arc::new(UsageTracker::new()), + metrics_tracker: Arc::new(crate::compact::metrics_tracker::MetricsTracker::new()), usage_history: Arc::new(Mutex::new(Vec::new())), tracker: None, system_prompt_template: common.system_prompt_template, @@ -2067,6 +2132,7 @@ impl Pod, St> { interceptor_installed: false, compact_state: None, usage_tracker: Arc::new(UsageTracker::new()), + metrics_tracker: Arc::new(crate::compact::metrics_tracker::MetricsTracker::new()), usage_history: Arc::new(Mutex::new(state.usage_history)), tracker: None, // Restore replays the saved system_prompt verbatim — no diff --git a/crates/pod/tests/session_metrics_test.rs b/crates/pod/tests/session_metrics_test.rs new file mode 100644 index 00000000..6abed523 --- /dev/null +++ b/crates/pod/tests/session_metrics_test.rs @@ -0,0 +1,367 @@ +//! End-to-end coverage for the prune-projection metrics path. +//! +//! Drives a Pod with a scripted mock LLM client and a custom tool that +//! returns a long `ToolOutput.content`, then inspects the persisted +//! session log to verify: +//! +//! - `prune.skip { reason: "no_candidates" }` lands when the protected-turn +//! window covers the entire history. +//! - `prune.fire` lands once enough turns + usage measurements exist for +//! the projection to actually apply. +//! - The fire metric and the immediately-following `prune.post_request` +//! metric share the same `correlation_id`, so cache_read / cache_write +//! from the LlmUsage that triggered the projection can be joined back +//! to the originating event. +//! - `prune.skip { reason: "below_min_savings" }` lands when candidates +//! exist but their estimated savings are below the configured floor. + +use std::pin::Pin; +use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; + +use async_trait::async_trait; +use futures::Stream; +use llm_worker::Worker; +use llm_worker::llm_client::event::{ + Event as LlmEvent, ResponseStatus, StatusEvent, UsageEvent, +}; +use llm_worker::llm_client::{ClientError, LlmClient, Request}; +use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput}; +use session_metrics::{DOMAIN, Metric, metrics_from_extensions}; +use session_store::FsStore; + +use pod::{Pod, PodManifest}; + +#[derive(Clone)] +struct MockClient { + responses: Arc>>, + call_count: Arc, +} + +impl MockClient { + fn new(responses: Vec>) -> Self { + Self { + responses: Arc::new(responses), + call_count: Arc::new(AtomicUsize::new(0)), + } + } +} + +#[async_trait] +impl LlmClient for MockClient { + fn clone_boxed(&self) -> Box { + Box::new(self.clone()) + } + + async fn stream( + &self, + _request: Request, + ) -> Result> + Send>>, ClientError> + { + let count = self.call_count.fetch_add(1, Ordering::SeqCst); + if count >= self.responses.len() { + return Err(ClientError::Config("mock client exhausted".into())); + } + let events = self.responses[count].clone(); + let stream = futures::stream::iter(events.into_iter().map(Ok)); + Ok(Box::pin(stream)) + } +} + +/// Tool that returns a fixed `ToolOutput { summary, content: Some(big) }`. +/// `content` is long enough for prune savings to comfortably clear small +/// `min_savings` thresholds. +struct BigContentTool { + summary: &'static str, + content: String, +} + +#[async_trait] +impl Tool for BigContentTool { + async fn execute(&self, _input: &str) -> Result { + Ok(ToolOutput { + summary: self.summary.into(), + content: Some(self.content.clone()), + }) + } +} + +fn big_content_tool_definition(name: &'static str) -> ToolDefinition { + Arc::new(move || { + let summary = "tool result summary"; + let content = "x".repeat(2048); + ( + ToolMeta::new(name) + .description("test tool that returns a long content") + .input_schema(serde_json::json!({"type": "object"})), + Arc::new(BigContentTool { summary, content }) as Arc, + ) + }) +} + +fn usage_event(input_total: u64, cache_read: u64, cache_write: u64, output: u64) -> LlmEvent { + LlmEvent::Usage(UsageEvent { + input_tokens: Some(input_total), + output_tokens: Some(output), + total_tokens: Some(input_total + output), + cache_read_input_tokens: Some(cache_read), + cache_creation_input_tokens: Some(cache_write), + }) +} + +/// Tool-call response from the assistant: emits a `tool_use` block then a +/// usage event so usage_history gains a measurement on this turn. +fn tool_use_response(call_id: &str, tool_name: &str) -> Vec { + vec![ + LlmEvent::tool_use_start(0, call_id, tool_name), + LlmEvent::tool_input_delta(0, "{}"), + LlmEvent::tool_use_stop(0), + usage_event(500, 0, 0, 10), + LlmEvent::Status(StatusEvent { + status: ResponseStatus::Completed, + }), + ] +} + +/// Plain text response with explicit cache_read/cache_write so that +/// `prune.post_request` can carry meaningful values when this is the +/// LLM call that follows a `prune.fire` event. +fn text_response_with_cache(text: &str, cache_read: u64, cache_write: u64) -> Vec { + vec![ + LlmEvent::text_block_start(0), + LlmEvent::text_delta(0, text), + LlmEvent::text_block_stop(0, None), + usage_event(800, cache_read, cache_write, 5), + LlmEvent::Status(StatusEvent { + status: ResponseStatus::Completed, + }), + ] +} + +fn manifest_toml(prune_protected_turns: usize, prune_min_savings: u64) -> String { + format!( + r#" +[pod] +name = "test-pod" +pwd = "./" + +[model] +scheme = "anthropic" +model_id = "test-model" + +[worker] +max_tokens = 100 + +[compaction] +prune_protected_turns = {prune_protected_turns} +prune_min_savings = {prune_min_savings} + +[[scope.allow]] +target = "./" +permission = "write" +"# + ) +} + +async fn make_pod( + manifest_toml: String, + client: MockClient, + tool_name: &'static str, +) -> (Pod, tempfile::TempDir, tempfile::TempDir) { + let manifest = PodManifest::from_toml(&manifest_toml).unwrap(); + let store_tmp = tempfile::tempdir().unwrap(); + let store = FsStore::new(store_tmp.path()).await.unwrap(); + let pwd_tmp = tempfile::tempdir().unwrap(); + let pwd = pwd_tmp.path().to_path_buf(); + let scope = pod::Scope::writable(&pwd).unwrap(); + + let mut worker = Worker::new(client); + worker.register_tool(big_content_tool_definition(tool_name)); + + let pod = Pod::new(manifest, worker, store, pwd, scope).await.unwrap(); + (pod, store_tmp, pwd_tmp) +} + +/// Drive Pod through enough runs to exercise both skip-no_candidates and +/// fire branches, then read the session log back and assert the metric +/// stream. +#[tokio::test] +async fn prune_metrics_emit_skip_then_fire_with_post_request_join() { + // Run 1 (request 0): tool_use → triggers tool execution → request 1 + // on the second iteration to produce the assistant reply. + // Run 2 (request 2): plain assistant text. Prune evaluation here + // sees user1's tool_result outside the 1-protected-turn window and + // should fire. + let client = MockClient::new(vec![ + tool_use_response("call-1", "big_tool"), + text_response_with_cache("ok", 0, 200), + text_response_with_cache("done", 1234, 50), + ]); + let (mut pod, _store_tmp, _pwd_tmp) = + make_pod(manifest_toml(1, 1), client, "big_tool").await; + let session_id = pod.session_id(); + // Cloning the store handle to read the session log back after the + // runs complete — the Pod retains its own copy. + let store = pod.store().clone(); + + pod.run_text("first").await.unwrap(); + pod.run_text("second").await.unwrap(); + + let state = session_store::restore(&store, session_id).await.unwrap(); + let metrics = metrics_from_extensions(&state.extensions); + + // Run 1 has 2 LLM iterations (tool loop), each evaluates prune with + // only one user-message turn → 2x skip{no_candidates}. + // Run 2 has 1 LLM iteration with enough turns → 1x fire + + // 1x post_request paired by correlation_id. + let names: Vec<&str> = metrics.iter().map(|m| m.name.as_str()).collect(); + assert!( + names.contains(&"prune.skip"), + "expected prune.skip in {names:?}" + ); + assert!( + names.contains(&"prune.fire"), + "expected prune.fire in {names:?}" + ); + assert!( + names.contains(&"prune.post_request"), + "expected prune.post_request in {names:?}" + ); + + // All skips in run 1 must record reason=no_candidates. + for m in metrics.iter().filter(|m| m.name == "prune.skip") { + assert_eq!( + m.dimensions.get("reason").map(String::as_str), + Some("no_candidates"), + "skip metric should be no_candidates here, got {m:?}" + ); + assert!(m.correlation_id.is_none()); + } + + // The fire metric carries dimensions and correlation_id. + let fire = metrics + .iter() + .find(|m| m.name == "prune.fire") + .expect("prune.fire missing"); + assert!( + fire.dimensions.contains_key("candidate_count"), + "fire missing candidate_count: {fire:?}" + ); + assert!( + fire.dimensions.contains_key("border_turn"), + "fire missing border_turn: {fire:?}" + ); + assert!( + fire.value.is_some(), + "fire missing estimated_savings value" + ); + let fire_id = fire + .correlation_id + .as_ref() + .expect("fire metric missing correlation_id"); + + // Exactly one post_request metric should exist with the same id, and + // its value/dimension should reflect the cache numbers from the + // text_response_with_cache call (cache_read=1234, cache_write=50). + let post = metrics + .iter() + .find(|m| m.name == "prune.post_request") + .expect("prune.post_request missing"); + assert_eq!(post.correlation_id.as_ref(), Some(fire_id)); + assert_eq!(post.value, Some(1234.0)); + assert_eq!( + post.dimensions.get("cache_write_tokens").map(String::as_str), + Some("50") + ); + assert!(post.dimensions.contains_key("history_len")); +} + +/// `min_savings` set high enough that candidates exist but the estimated +/// savings always fall short → the second run should record +/// `prune.skip { reason: "below_min_savings" }`. +#[tokio::test] +async fn prune_metrics_record_below_min_savings_skip() { + let client = MockClient::new(vec![ + tool_use_response("call-1", "big_tool"), + text_response_with_cache("ok", 0, 100), + text_response_with_cache("done", 0, 0), + ]); + let (mut pod, _store_tmp, _pwd_tmp) = + make_pod(manifest_toml(1, u64::MAX), client, "big_tool").await; + let session_id = pod.session_id(); + let store = pod.store().clone(); + + pod.run_text("first").await.unwrap(); + pod.run_text("second").await.unwrap(); + + let state = session_store::restore(&store, session_id).await.unwrap(); + let metrics = metrics_from_extensions(&state.extensions); + let below = metrics + .iter() + .find(|m| { + m.name == "prune.skip" + && m.dimensions.get("reason").map(String::as_str) == Some("below_min_savings") + }) + .expect("expected prune.skip with reason=below_min_savings"); + assert!( + below.dimensions.contains_key("candidate_count"), + "below_min_savings skip should report candidate_count: {below:?}" + ); + assert!( + below.value.is_some(), + "below_min_savings skip should report estimated savings as value: {below:?}" + ); + // No prune.fire for this scenario. + assert!(metrics.iter().all(|m| m.name != "prune.fire")); + // No prune.post_request either (no fire to join with). + assert!(metrics.iter().all(|m| m.name != "prune.post_request")); +} + +/// Sessions that have no metrics in the log restore cleanly: the +/// `RestoredState.extensions` simply contains no `metrics` domain +/// payloads, and `metrics_from_extensions` returns an empty Vec. +/// Backward-compatibility check for old logs predating this feature. +#[tokio::test] +async fn old_sessions_without_metrics_replay_cleanly() { + // Manifest without any `[compaction]` section → prune (and therefore + // the prune observer) is never installed, so no metrics get written. + let manifest_toml = r#" +[pod] +name = "test-pod" +pwd = "./" + +[model] +scheme = "anthropic" +model_id = "test-model" + +[worker] +max_tokens = 100 + +[[scope.allow]] +target = "./" +permission = "write" +"#; + let client = MockClient::new(vec![text_response_with_cache("hi", 0, 0)]); + let manifest = PodManifest::from_toml(manifest_toml).unwrap(); + let store_tmp = tempfile::tempdir().unwrap(); + let store = FsStore::new(store_tmp.path()).await.unwrap(); + let pwd_tmp = tempfile::tempdir().unwrap(); + let pwd = pwd_tmp.path().to_path_buf(); + let scope = pod::Scope::writable(&pwd).unwrap(); + let worker = Worker::new(client); + let mut pod = Pod::new(manifest, worker, store.clone(), pwd, scope) + .await + .unwrap(); + let session_id = pod.session_id(); + pod.run_text("hello").await.unwrap(); + + let state = session_store::restore(&store, session_id).await.unwrap(); + let metrics = metrics_from_extensions(&state.extensions); + assert!(metrics.is_empty(), "no metrics should be recorded: {metrics:?}"); + // And no extension entries at all in the metrics domain. + assert!(state.extensions.iter().all(|(d, _)| d != DOMAIN)); + + // Smoke check that fold helper is robust on a sentinel Metric value: + let m = Metric::now("smoke"); + assert_eq!(m.name, "smoke"); +} diff --git a/crates/session-metrics/Cargo.toml b/crates/session-metrics/Cargo.toml new file mode 100644 index 00000000..6303ddae --- /dev/null +++ b/crates/session-metrics/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "session-metrics" +version = "0.1.0" +edition.workspace = true +license.workspace = true + +[dependencies] +serde = { workspace = true, features = ["derive"] } +serde_json = { workspace = true } +session-store = { workspace = true } diff --git a/crates/session-metrics/src/lib.rs b/crates/session-metrics/src/lib.rs new file mode 100644 index 00000000..73037a73 --- /dev/null +++ b/crates/session-metrics/src/lib.rs @@ -0,0 +1,170 @@ +//! Session metrics — generic ad-hoc measurement lane on top of +//! `LogEntry::Extension { domain: "metrics" }`. +//! +//! セッション中に積み上げて後で引きたい値(prune の発火頻度・Hook の実行 +//! 時間・ツールリトライ回数 等)を session-log に乗せるための薄い層。 +//! session-store は payload を不透明な `serde_json::Value` として扱うので、 +//! このクレートは型と読み書きヘルパーだけを提供する。 +//! +//! # 設計 +//! +//! - 厳格な label set は持たない。次元は sparse な `BTreeMap`、 +//! 観測できない値は `None` で明示する +//! - 「後から埋まる値」(例: prune 発火直後の `cache_read_tokens`)は前 entry に +//! 書き戻さず、`correlation_id` を共有する別 metric として流す。集計は読み手で join +//! - 集計 / 可視化 API はこのクレートには無い。session-log を読めば取り出せる、 +//! までが到達点 + +use std::collections::BTreeMap; + +use serde::{Deserialize, Serialize}; +use session_store::{EntryHash, SessionId, Store, StoreError, save_extension, session_log}; + +/// Domain tag used in `LogEntry::Extension` for all metrics records. +pub const DOMAIN: &str = "metrics"; + +/// 単発の計測値。`name` は `namespace.metric` 形式の自由文字列 +/// (例: `"prune.fire"`、`"hook.duration"`)。 +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct Metric { + /// `namespace.metric` 形式の名前。 + pub name: String, + /// epoch ms。 + pub ts: u64, + /// sparse な次元(label)。観測できないものはキー自体を入れない。 + #[serde(default, skip_serializing_if = "BTreeMap::is_empty")] + pub dimensions: BTreeMap, + /// 主スカラ値。dimension では表現したくない数値を載せる場所。 + #[serde(default, skip_serializing_if = "Option::is_none")] + pub value: Option, + /// 関連 metric を join するためのキー。同一 ID を持つ複数 metric は + /// 「同じ事象を多面的に観測している」という意味付けで読まれる。 + #[serde(default, skip_serializing_if = "Option::is_none")] + pub correlation_id: Option, +} + +impl Metric { + /// 最小コンストラクタ。`ts` は呼び出し時刻(epoch ms)で埋める。 + pub fn now(name: impl Into) -> Self { + Self { + name: name.into(), + ts: session_log::now_millis(), + dimensions: BTreeMap::new(), + value: None, + correlation_id: None, + } + } + + pub fn with_dimension(mut self, key: impl Into, value: impl Into) -> Self { + self.dimensions.insert(key.into(), value.into()); + self + } + + pub fn with_value(mut self, value: f64) -> Self { + self.value = Some(value); + self + } + + pub fn with_correlation_id(mut self, id: impl Into) -> Self { + self.correlation_id = Some(id.into()); + self + } +} + +/// `LogEntry::Extension { domain: "metrics", payload: }` を append する。 +/// +/// `save_extension` の薄い wrapper。書き込み失敗は呼び出し側に返す +/// (メトリクスのために本体処理を止めるかは呼び出し側の判断)。 +pub async fn record_metric( + store: &impl Store, + session_id: SessionId, + head_hash: &mut Option, + metric: &Metric, +) -> Result<(), StoreError> { + let payload = serde_json::to_value(metric).expect("Metric serialization cannot fail"); + save_extension(store, session_id, head_hash, DOMAIN, payload).await +} + +/// `RestoredState.extensions` から metrics domain の payload を順に取り出し、 +/// `Metric` 列に fold する。 +/// +/// schema 変更で deserialize できない payload は無視する(後方互換)。 +pub fn metrics_from_extensions( + extensions: &[(String, serde_json::Value)], +) -> Vec { + extensions + .iter() + .filter(|(domain, _)| domain == DOMAIN) + .filter_map(|(_, payload)| serde_json::from_value::(payload.clone()).ok()) + .collect() +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn metric_round_trip_via_json() { + let metric = Metric::now("prune.fire") + .with_dimension("border_turn", "3") + .with_dimension("candidate_count", "2") + .with_value(4096.0) + .with_correlation_id("abc-123"); + let json = serde_json::to_string(&metric).unwrap(); + let parsed: Metric = serde_json::from_str(&json).unwrap(); + assert_eq!(parsed, metric); + } + + #[test] + fn metric_serializes_minimal_form_compactly() { + // dimensions が空 / value/correlation_id が None の時は出力に含めない。 + let metric = Metric { + name: "x".into(), + ts: 1, + dimensions: BTreeMap::new(), + value: None, + correlation_id: None, + }; + let json = serde_json::to_string(&metric).unwrap(); + assert!(!json.contains("dimensions")); + assert!(!json.contains("value")); + assert!(!json.contains("correlation_id")); + } + + #[test] + fn fold_skips_other_domains() { + let extensions = vec![ + ( + "memory.extract".into(), + serde_json::json!({ "processed_through_entry": 7 }), + ), + ( + DOMAIN.into(), + serde_json::to_value(Metric::now("a")).unwrap(), + ), + ( + DOMAIN.into(), + serde_json::to_value(Metric::now("b")).unwrap(), + ), + ]; + let metrics = metrics_from_extensions(&extensions); + assert_eq!(metrics.len(), 2); + assert_eq!(metrics[0].name, "a"); + assert_eq!(metrics[1].name, "b"); + } + + #[test] + fn fold_skips_undeserializable_payloads() { + // 将来 schema が変わって読めない payload も skip して落ちない。 + let extensions = vec![ + (DOMAIN.into(), serde_json::json!({ "garbage": true })), + ( + DOMAIN.into(), + serde_json::to_value(Metric::now("ok")).unwrap(), + ), + ]; + let metrics = metrics_from_extensions(&extensions); + assert_eq!(metrics.len(), 1); + assert_eq!(metrics[0].name, "ok"); + } +} diff --git a/tickets/session-metrics.md b/tickets/session-metrics.md index f412c875..ec0d2914 100644 --- a/tickets/session-metrics.md +++ b/tickets/session-metrics.md @@ -72,3 +72,8 @@ - `crates/session-store/src/session_log.rs`(`LogEntry::Extension` と `RestoredState.extensions` の既存仕様) - `crates/llm-worker/src/usage_record.rs`、`crates/llm-worker/src/llm_client/event.rs`(cache_read / cache_write の取得経路) - `crates/pod/src/compact/prune.rs`、`crates/llm-worker/src/prune.rs`(最初の利用者の挿入点) + +## Review +- 状態: Approve with follow-up +- レビュー詳細: [./session-metrics.review.md](./session-metrics.review.md) +- 日付: 2026-05-03 diff --git a/tickets/session-metrics.review.md b/tickets/session-metrics.review.md new file mode 100644 index 00000000..9e5ce510 --- /dev/null +++ b/tickets/session-metrics.review.md @@ -0,0 +1,65 @@ +# Review: セッションメトリクス: Extension 経由の汎用計測レーン + +## 前提・要件の確認 + +### メトリクス型 +- 専用 crate に置き serde で round-trip できる: `crates/session-metrics/src/lib.rs:28-44`、`tests::metric_round_trip_via_json`。OK。 +- 必須 `name` / `ts`、任意 `dimensions` / `value` / `correlation_id`、unknown は `None` で表現: 同 `lib.rs:28-44`、`metric_serializes_minimal_form_compactly` で省略形を確認。OK。 + +### 書き込み経路 +- 薄いヘルパーがあり `LogEntry::Extension { domain: "metrics", payload }` で append: `crates/session-metrics/src/lib.rs:78-86` → `session_store::save_extension`。session-store 側は無改変(`git diff -- crates/session-store/` で空)。 +- hash chain に乗る (`save_extension` 経由)、ticket 上の "session-store に追加" は専用 crate に置き換えており、ticket 文言の「専用 crate(または既存の適切な配置)」の許容範囲。OK。 +- 書き込み失敗の握り潰し: **未対応(後述 Blocking)。** + +### 読み出し経路 +- `metrics_from_extensions` で fold、deserialize 失敗 payload は skip: `crates/session-metrics/src/lib.rs:92-100`、`fold_skips_undeserializable_payloads` でカバー。OK。 +- 「特定セッションの metric 列を取り出すサンプル」は `crates/pod/tests/session_metrics_test.rs:210-211` が `session_store::restore` → `metrics_from_extensions` の最小例として機能している。OK。 + +### 最初の利用者: Prune projection +- `attach_prune` から `prune.fire` / `prune.skip` を発行: `crates/pod/src/compact/prune.rs:52-80`。 + - `Fired` 時: `value=estimated_savings`, `correlation_id`(uuid v7), `candidate_count` + `border_turn` dim、`UsageTracker::note_correlation_id` で stash。 + - `SkippedNoCandidates` / `SkippedBelowMinSavings` の 2 経路を分けている。 +- `prune.post_request` が直後の `LlmUsage` と組で発行され同じ `correlation_id` を持つ: `crates/pod/src/pod.rs:1121-1156`。 +- `correlation_id` の生成は uuid v7(既存 `uuid` workspace dep の v7 feature を再利用)。OK。 + +### Resume 互換 +- `[compaction]` 無し manifest で metrics が一切書かれない・replay も成功: `crates/pod/tests/session_metrics_test.rs:325-367`。OK。 + +### 完了条件 +- 型 + 書き込み/読み出し + unit test: 4 件 (`crates/session-metrics/src/lib.rs:106-169`)。OK。 +- prune.fire/skip/post_request が session-log に乗る: 統合テスト `prune_metrics_emit_skip_then_fire_with_post_request_join` で確認。 +- 後方互換: `old_sessions_without_metrics_replay_cleanly` で確認。 +- 「prune metric 列を取り出す」テスト: 同上の統合テストが兼ねる。 +- correlation_id join: `prune_metrics_emit_skip_then_fire_with_post_request_join:258-276` で `fire.correlation_id == post.correlation_id`、`post.value == cache_read_input_tokens` を assert。OK。 + +## アーキテクチャ・スコープ + +- session-store の型と公開 API は無改変(`git diff` 空)。前提を遵守している。 +- `UsageTracker` 内部を `Vec` → `Vec` に拡張したのは pod 内 `pub(crate)` のローカル拡張で、`llm_worker::UsageRecord` 型自体は触っていない。レイヤ上は問題なし(`session-store の型は触らない` の制約は守られている)。 +- 新規 crate 名 `session-metrics` は memory ノートの「`insomnia-` プレフィックス不要、短い名前」に準拠。 +- 依存追加は workspace.dependencies + `cargo add` 流(手動編集の痕跡なし)。OK。 +- `Worker` への配線は `set_prune_observer` を追加する小規模な拡張で、prune 評価ロジック自体は `evaluate_candidates` の border_turn 返却を追加した以外はリファクタの域。`prunable_indices` を wrapper に薄く残しているのも既存呼び出し側を壊さない配慮で妥当。 +- prune metric の dimension/value 振り分け(`prune.fire` の `value=estimated_savings`, `prune.post_request` の `value=cache_read_tokens`)は ticket の「value/dimension として記録」を素直に解釈したもので許容範囲。今後別 metric を生やすときに揃えやすいよう、value は「主スカラ(後で集計したい数)」/dimension は「軸(filter したい文字列)」のポリシーに統一しておくとよい(コメントで明文化されているとなおよし)。 +- LLM provider policy / ScopedFs scripting plan 等の他方針には抵触しない。 + +## 指摘事項 + +### Blocking + +なし。 + +### Non-blocking / Follow-up + +- **メトリクス書き込み失敗の握り潰し**: ticket 要件 `crates/.../tickets/session-metrics.md:29` に「書き込み失敗(store IO エラー)はメトリクス側で握りつぶす。本体処理を阻害しない」とある一方、`crates/pod/src/pod.rs:1102-1110` および `1144-1150` の `record_metric` は `?` で `StoreError` を呼び出し元 (`persist_turn` → `Pod::run`) に伝播させている。ヘルパー自身のドキュメントも「書き込み失敗は呼び出し側に返す」(`crates/session-metrics/src/lib.rs:76-77`) で投げ直し前提なので、現状はメトリクス IO 失敗時に turn 永続化フロー全体が落ちる。挙動として「メトリクスのために本体処理を止めない」契約を満たしていない。 + - 対応案: `persist_turn` 内で `record_metric(..).await` の戻り値を `if let Err(e) = ... { warn!(error=%e, "metric write failed; ignoring"); }` で握り、`LlmUsage` 永続化と直交させる。Helper の doc も「呼び出し側で握り潰すのが既定」に揃える。 + - 重要度: ticket の明示要件であり本来 Blocking 候補だが、現状 store IO は LocalFsStore 一択で実害が出る経路が乏しく、後続の小修正で吸収可能と判断し follow-up に置く(ユーザーが Blocking 扱いに引き上げたい場合は了承)。 + +### Nits + +- `crates/pod/src/compact/prune.rs:58-62` で `Fired` 時の `border_turn` を `if let Some(...)` で条件挿入しているが、`evaluate_candidates` の実装上 `Fired` / `SkippedBelowMinSavings` のときは必ず `Some` になる(候補が空でない=境界が決まる)。動作上問題はないが、不変条件を `expect("border_turn is Some when candidates exist")` で表に出すか、コメントで残すと意図が明確。 +- `crates/pod/src/pod.rs:1102` 周辺のコメントにある「Ordered before LlmUsage so that a `prune.fire` and the `prune.post_request` derived from the matching usage record appear in the log close together.」は良い記述。同コメントを `metrics_tracker.rs` の `drain` 側にも一行張ると読み手が flush 順を把握しやすい。 +- `Metric` の value/dimension のポリシー(主スカラ vs ラベル)について、`session-metrics/src/lib.rs` の crate doc に 1〜2 行追記しておくと、今後の利用者(compact/hook 等)が振り分けで迷わない。 + +## 判断 + +**Approve with follow-up** — ticket 要件と完了条件の主要部分は概ね満たされている。session-store の型を一切触らず、新規 crate も命名・依存追加とも方針通り。特に correlation_id による fire ↔ post_request の join はテストで明示的に検証されており、一次目的(prune 効果測定の最小レーン)は達成。一方で「メトリクス書き込み失敗を握り潰す」要件が `persist_turn` 経路で守られていない点は ticket の明文要件であり、後続コミットで吸収する想定で follow-up とする(Blocking 扱いに昇格させたい場合はその判断に従う)。