feat: session-metrics実装
This commit is contained in:
parent
9010a920a4
commit
b9635c5002
10
Cargo.lock
generated
10
Cargo.lock
generated
|
|
@ -2144,6 +2144,7 @@ dependencies = [
|
|||
"schemars",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"session-metrics",
|
||||
"session-store",
|
||||
"tempfile",
|
||||
"thiserror 2.0.18",
|
||||
|
|
@ -2926,6 +2927,15 @@ dependencies = [
|
|||
"syn 2.0.117",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "session-metrics"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"serde",
|
||||
"serde_json",
|
||||
"session-store",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "session-store"
|
||||
version = "0.1.0"
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ members = [
|
|||
"crates/protocol",
|
||||
"crates/provider",
|
||||
"crates/pod-registry",
|
||||
"crates/session-metrics",
|
||||
"crates/tools",
|
||||
"crates/tui",
|
||||
"crates/memory",
|
||||
|
|
@ -28,6 +29,7 @@ memory = { path = "crates/memory" }
|
|||
pod-registry = { path = "crates/pod-registry" }
|
||||
protocol = { path = "crates/protocol" }
|
||||
provider = { path = "crates/provider" }
|
||||
session-metrics = { path = "crates/session-metrics" }
|
||||
session-store = { path = "crates/session-store" }
|
||||
tools = { path = "crates/tools" }
|
||||
|
||||
|
|
|
|||
|
|
@ -30,6 +30,43 @@ use crate::llm_client::types::Item;
|
|||
/// 実際の projection と一致する savings を返す必要がある。
|
||||
pub type SavingsEstimator = Box<dyn Fn(&[Item], &[usize]) -> u64 + Send + Sync>;
|
||||
|
||||
/// Result of one prune evaluation pass, surfaced to the optional
|
||||
/// [`PruneObserver`] for instrumentation.
|
||||
///
|
||||
/// Worker は LLM リクエストごとに 1 回 prune の評価をし、その結果を
|
||||
/// (observer が登録されていれば)この値で通知する。fire/skip の判定
|
||||
/// 結果と、判定材料になった候補数 / 推定 savings / 境界ターン位置を持つ。
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct PruneEvaluation {
|
||||
/// `prunable_indices` の長さ。`Skipped::NoCandidates` の時は 0。
|
||||
pub candidate_count: usize,
|
||||
/// 推定された savings (tokens)。`NoCandidates` の時は 0。
|
||||
pub estimated_savings: u64,
|
||||
/// `protected_turns` 境界に当たる turn-start アイテムの index。
|
||||
/// turn 数が `protected_turns` 以下で境界が決まらない場合は `None`。
|
||||
pub border_turn: Option<usize>,
|
||||
/// 判定結果。
|
||||
pub decision: PruneDecision,
|
||||
}
|
||||
|
||||
/// Outcome of one prune evaluation. Each variant is one branch of the
|
||||
/// "fire vs skip" decision tree the Worker walks before each LLM request.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum PruneDecision {
|
||||
/// `prunable_indices` が空 → 何もしない。
|
||||
SkippedNoCandidates,
|
||||
/// 候補はあったが推定 savings が `min_savings` 未満 → 何もしない。
|
||||
SkippedBelowMinSavings,
|
||||
/// 候補があり savings >= min_savings → projection を適用した。
|
||||
/// `pruned_count` は `project()` が実際に書き換えた item 数
|
||||
/// (既に content=None だった候補は 0 計上)。
|
||||
Fired { pruned_count: usize },
|
||||
}
|
||||
|
||||
/// Optional observer invoked after each prune evaluation, regardless of
|
||||
/// branch. Pod 等の上位層が install して metrics を発行する。
|
||||
pub type PruneObserver = Box<dyn Fn(&PruneEvaluation) + Send + Sync>;
|
||||
|
||||
/// Configuration for the Prune algorithm.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct PruneConfig {
|
||||
|
|
@ -100,12 +137,20 @@ pub fn project(items: &mut [Item], indices: &[usize]) -> usize {
|
|||
/// Returns an empty vector when there are too few turns or no prunable
|
||||
/// candidates.
|
||||
pub fn prunable_indices(items: &[Item], protected_turns: usize) -> Vec<usize> {
|
||||
evaluate_candidates(items, protected_turns).0
|
||||
}
|
||||
|
||||
/// Same as [`prunable_indices`] but also returns the index of the
|
||||
/// `protected_turns` boundary (the turn-start item whose tail is
|
||||
/// protected). `None` when too few turns exist for a boundary to be
|
||||
/// defined.
|
||||
pub fn evaluate_candidates(items: &[Item], protected_turns: usize) -> (Vec<usize>, Option<usize>) {
|
||||
let turn_starts = find_turn_starts(items);
|
||||
if turn_starts.len() <= protected_turns {
|
||||
return Vec::new();
|
||||
return (Vec::new(), None);
|
||||
}
|
||||
let boundary = turn_starts[turn_starts.len() - protected_turns];
|
||||
items[..boundary]
|
||||
let candidates = items[..boundary]
|
||||
.iter()
|
||||
.enumerate()
|
||||
.filter_map(|(i, item)| match item {
|
||||
|
|
@ -114,7 +159,8 @@ pub fn prunable_indices(items: &[Item], protected_turns: usize) -> Vec<usize> {
|
|||
} => Some(i),
|
||||
_ => None,
|
||||
})
|
||||
.collect()
|
||||
.collect();
|
||||
(candidates, Some(boundary))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
|
@ -239,6 +285,30 @@ mod tests {
|
|||
assert_eq!(project(&mut items, &candidates), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn evaluate_candidates_returns_boundary_index() {
|
||||
let big = "x".repeat(64);
|
||||
let items = make_history(&[
|
||||
("turn1", vec![("s1", Some(&big))]),
|
||||
("turn2", vec![("s2", Some(&big))]),
|
||||
("turn3", vec![("s3", Some("keep"))]),
|
||||
("turn4", vec![("s4", Some("keep too"))]),
|
||||
]);
|
||||
let (candidates, border) = evaluate_candidates(&items, 2);
|
||||
assert_eq!(candidates.len(), 2);
|
||||
// protected_turns=2 → boundary は turn3 の user message 位置。
|
||||
// turn1: u/a/c/r (4) + turn2: u/a/c/r (4) = index 8 (turn3 の user)。
|
||||
assert_eq!(border, Some(8));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn evaluate_candidates_no_boundary_when_too_few_turns() {
|
||||
let items = make_history(&[("only", vec![("s", Some("x"))])]);
|
||||
let (candidates, border) = evaluate_candidates(&items, 2);
|
||||
assert!(candidates.is_empty());
|
||||
assert!(border.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn protected_turns_boundary_exact() {
|
||||
// 3 turns with protected_turns=2: only turn 1 is a candidate.
|
||||
|
|
|
|||
|
|
@ -184,6 +184,9 @@ pub struct Worker<C: LlmClient, S: WorkerState = Mutable> {
|
|||
/// by higher layers that own usage measurements. `None` disables
|
||||
/// the prune projection.
|
||||
savings_estimator: Option<crate::prune::SavingsEstimator>,
|
||||
/// Optional observer fired once per prune evaluation (regardless of
|
||||
/// whether projection actually fired). `None` disables instrumentation.
|
||||
prune_observer: Option<crate::prune::PruneObserver>,
|
||||
/// Index of the last stable cache prefix item, set by higher layers.
|
||||
/// Plumbed into [`Request::cache_anchor`] at request build time.
|
||||
cache_anchor: Option<usize>,
|
||||
|
|
@ -384,6 +387,16 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
|
|||
self.savings_estimator = estimator;
|
||||
}
|
||||
|
||||
/// Install an observer notified after each prune evaluation pass.
|
||||
///
|
||||
/// Fires once per outgoing LLM request (the same point as the
|
||||
/// `prune_config` / `savings_estimator` pair), regardless of whether
|
||||
/// projection actually applied. Intended for upper layers that want
|
||||
/// to instrument fire/skip rates without owning the prune logic.
|
||||
pub fn set_prune_observer(&mut self, observer: Option<crate::prune::PruneObserver>) {
|
||||
self.prune_observer = observer;
|
||||
}
|
||||
|
||||
/// Mark an index into the current history as a stable, cacheable
|
||||
/// prefix boundary. The value is included in each outgoing
|
||||
/// [`Request`] via [`Request::cache_anchor`] — caching-aware
|
||||
|
|
@ -854,9 +867,16 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
|
|||
// threshold. Worker does not own usage history itself; the
|
||||
// estimator is injected by the layer that does.
|
||||
if let (Some(config), Some(estimator)) = (&self.prune_config, &self.savings_estimator) {
|
||||
let candidates =
|
||||
crate::prune::prunable_indices(&request_context, config.protected_turns);
|
||||
if !candidates.is_empty() {
|
||||
let (candidates, border_turn) =
|
||||
crate::prune::evaluate_candidates(&request_context, config.protected_turns);
|
||||
let evaluation = if candidates.is_empty() {
|
||||
crate::prune::PruneEvaluation {
|
||||
candidate_count: 0,
|
||||
estimated_savings: 0,
|
||||
border_turn,
|
||||
decision: crate::prune::PruneDecision::SkippedNoCandidates,
|
||||
}
|
||||
} else {
|
||||
let savings = estimator(&request_context, &candidates);
|
||||
if savings >= config.min_savings {
|
||||
let pruned = crate::prune::project(&mut request_context, &candidates);
|
||||
|
|
@ -867,7 +887,25 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
|
|||
"Projected old tool-result content out of request context"
|
||||
);
|
||||
}
|
||||
crate::prune::PruneEvaluation {
|
||||
candidate_count: candidates.len(),
|
||||
estimated_savings: savings,
|
||||
border_turn,
|
||||
decision: crate::prune::PruneDecision::Fired {
|
||||
pruned_count: pruned,
|
||||
},
|
||||
}
|
||||
} else {
|
||||
crate::prune::PruneEvaluation {
|
||||
candidate_count: candidates.len(),
|
||||
estimated_savings: savings,
|
||||
border_turn,
|
||||
decision: crate::prune::PruneDecision::SkippedBelowMinSavings,
|
||||
}
|
||||
}
|
||||
};
|
||||
if let Some(observer) = &self.prune_observer {
|
||||
observer(&evaluation);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1077,6 +1115,7 @@ impl<C: LlmClient> Worker<C, Mutable> {
|
|||
tool_output_limits: None,
|
||||
prune_config: None,
|
||||
savings_estimator: None,
|
||||
prune_observer: None,
|
||||
cache_anchor: None,
|
||||
cache_key: None,
|
||||
_state: PhantomData,
|
||||
|
|
@ -1334,6 +1373,7 @@ impl<C: LlmClient> Worker<C, Mutable> {
|
|||
tool_output_limits: self.tool_output_limits,
|
||||
prune_config: self.prune_config,
|
||||
savings_estimator: self.savings_estimator,
|
||||
prune_observer: self.prune_observer,
|
||||
cache_anchor: self.cache_anchor,
|
||||
cache_key: self.cache_key,
|
||||
_state: PhantomData,
|
||||
|
|
@ -1414,6 +1454,7 @@ impl<C: LlmClient> Worker<C, Locked> {
|
|||
tool_output_limits: self.tool_output_limits,
|
||||
prune_config: self.prune_config,
|
||||
savings_estimator: self.savings_estimator,
|
||||
prune_observer: self.prune_observer,
|
||||
cache_anchor: self.cache_anchor,
|
||||
cache_key: self.cache_key,
|
||||
_state: PhantomData,
|
||||
|
|
|
|||
|
|
@ -28,6 +28,7 @@ libc = { workspace = true }
|
|||
schemars = { workspace = true }
|
||||
memory = { workspace = true }
|
||||
uuid = { workspace = true, features = ["v7"] }
|
||||
session-metrics = { workspace = true }
|
||||
|
||||
[dev-dependencies]
|
||||
dotenv = "0.15.0"
|
||||
|
|
|
|||
50
crates/pod/src/compact/metrics_tracker.rs
Normal file
50
crates/pod/src/compact/metrics_tracker.rs
Normal file
|
|
@ -0,0 +1,50 @@
|
|||
//! Sync buffer for `session_metrics::Metric` values queued from inside
|
||||
//! Worker callbacks (which run synchronously and cannot themselves
|
||||
//! perform `async` store writes).
|
||||
//!
|
||||
//! Pod drains this buffer in `persist_turn` and writes each metric via
|
||||
//! `session_metrics::record_metric`, alongside the regular `LlmUsage`
|
||||
//! entries.
|
||||
|
||||
use std::sync::Mutex;
|
||||
|
||||
use session_metrics::Metric;
|
||||
|
||||
pub(crate) struct MetricsTracker {
|
||||
pending: Mutex<Vec<Metric>>,
|
||||
}
|
||||
|
||||
impl MetricsTracker {
|
||||
pub(crate) fn new() -> Self {
|
||||
Self {
|
||||
pending: Mutex::new(Vec::new()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Queue a metric for the next `persist_turn` flush.
|
||||
pub(crate) fn push(&self, metric: Metric) {
|
||||
self.pending.lock().unwrap().push(metric);
|
||||
}
|
||||
|
||||
/// Drain all queued metrics. Called by Pod after a run completes.
|
||||
pub(crate) fn drain(&self) -> Vec<Metric> {
|
||||
std::mem::take(&mut *self.pending.lock().unwrap())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn push_then_drain_returns_in_order_and_clears() {
|
||||
let t = MetricsTracker::new();
|
||||
t.push(Metric::now("a"));
|
||||
t.push(Metric::now("b"));
|
||||
let drained = t.drain();
|
||||
assert_eq!(drained.len(), 2);
|
||||
assert_eq!(drained[0].name, "a");
|
||||
assert_eq!(drained[1].name, "b");
|
||||
assert!(t.drain().is_empty());
|
||||
}
|
||||
}
|
||||
|
|
@ -1,3 +1,4 @@
|
|||
pub(crate) mod metrics_tracker;
|
||||
pub(crate) mod prune;
|
||||
pub(crate) mod state;
|
||||
pub(crate) mod token_counter;
|
||||
|
|
|
|||
|
|
@ -5,10 +5,16 @@
|
|||
//! 直後)。Worker は usage 履歴を知らないので、`min_savings` 判定に使う savings
|
||||
//! の見積もりはコールバックで外部から注入する。このモジュールはそのコールバック
|
||||
//! を組み立てて Worker に差し込むための `impl Pod` を提供する。
|
||||
//!
|
||||
//! 同じ経路で `PruneObserver` も install し、評価のたびに `prune.fire` /
|
||||
//! `prune.skip` metric を `MetricsTracker` に積む。`Fired` 時は uuid を
|
||||
//! `UsageTracker` にも stash しておき、後続の `LlmUsage` と組で
|
||||
//! `prune.post_request` を吐けるようにする。
|
||||
|
||||
use llm_worker::Item;
|
||||
use llm_worker::llm_client::client::LlmClient;
|
||||
use llm_worker::prune::{PruneConfig, SavingsEstimator};
|
||||
use llm_worker::prune::{PruneConfig, PruneDecision, PruneObserver, SavingsEstimator};
|
||||
use session_metrics::Metric;
|
||||
use session_store::Store;
|
||||
|
||||
use crate::Pod;
|
||||
|
|
@ -24,6 +30,12 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
/// Measurement-less estimates (before the first LLM call, or immediately
|
||||
/// after a compact) return `0` from the estimator, which naturally
|
||||
/// prevents the prune projection from firing until usage data exists.
|
||||
///
|
||||
/// Also installs a [`PruneObserver`] that pushes `prune.fire` /
|
||||
/// `prune.skip` metrics into the shared [`MetricsTracker`]. On `Fired`
|
||||
/// the observer additionally stashes a fresh correlation_id in
|
||||
/// [`UsageTracker`] so the next `LlmUsage` can be paired with a
|
||||
/// `prune.post_request` metric carrying the same id.
|
||||
pub fn attach_prune(&mut self, config: PruneConfig) {
|
||||
let usage = self.usage_history_handle();
|
||||
let estimator: SavingsEstimator = Box::new(move |history: &[Item], indices| {
|
||||
|
|
@ -34,9 +46,43 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
_ => est.tokens,
|
||||
}
|
||||
});
|
||||
|
||||
let metrics = self.metrics_tracker_handle();
|
||||
let usage_tracker = self.usage_tracker_handle();
|
||||
let observer: PruneObserver = Box::new(move |eval| {
|
||||
match &eval.decision {
|
||||
PruneDecision::Fired { .. } => {
|
||||
let correlation_id = uuid::Uuid::now_v7().to_string();
|
||||
let mut metric = Metric::now("prune.fire")
|
||||
.with_value(eval.estimated_savings as f64)
|
||||
.with_correlation_id(&correlation_id)
|
||||
.with_dimension("candidate_count", eval.candidate_count.to_string());
|
||||
if let Some(border) = eval.border_turn {
|
||||
metric = metric.with_dimension("border_turn", border.to_string());
|
||||
}
|
||||
metrics.push(metric);
|
||||
usage_tracker.note_correlation_id(correlation_id);
|
||||
}
|
||||
PruneDecision::SkippedNoCandidates => {
|
||||
metrics.push(
|
||||
Metric::now("prune.skip").with_dimension("reason", "no_candidates"),
|
||||
);
|
||||
}
|
||||
PruneDecision::SkippedBelowMinSavings => {
|
||||
metrics.push(
|
||||
Metric::now("prune.skip")
|
||||
.with_dimension("reason", "below_min_savings")
|
||||
.with_dimension("candidate_count", eval.candidate_count.to_string())
|
||||
.with_value(eval.estimated_savings as f64),
|
||||
);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let worker = self.worker_mut();
|
||||
worker.set_prune_config(Some(config));
|
||||
worker.set_savings_estimator(Some(estimator));
|
||||
worker.set_prune_observer(Some(observer));
|
||||
}
|
||||
|
||||
/// If the manifest has a `[compaction]` section, build a `PruneConfig`
|
||||
|
|
|
|||
|
|
@ -19,19 +19,35 @@ use std::sync::Mutex;
|
|||
use llm_worker::UsageRecord;
|
||||
use llm_worker::timeline::event::UsageEvent;
|
||||
|
||||
/// One drained measurement: the underlying `UsageRecord` plus an optional
|
||||
/// `correlation_id` stamped by the prune projection (or any other future
|
||||
/// upstream observer) so that downstream metrics emitted alongside this
|
||||
/// record can be joined to it after the fact.
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct RecordedUsage {
|
||||
pub(crate) record: UsageRecord,
|
||||
pub(crate) correlation_id: Option<String>,
|
||||
}
|
||||
|
||||
/// Shared between the pre-request hook, the `on_usage` callback, and Pod.
|
||||
pub(crate) struct UsageTracker {
|
||||
/// `history.len()` captured at the most recent `pre_llm_request`.
|
||||
/// Cleared when paired with an incoming `on_usage` event.
|
||||
pending_history_len: Mutex<Option<usize>>,
|
||||
/// Optional `correlation_id` set by an upstream observer (currently
|
||||
/// the prune projection on `Fired`). Paired into the next
|
||||
/// `RecordedUsage` and cleared. Skips that don't fire leave this
|
||||
/// `None`, so the resulting record carries no correlation.
|
||||
pending_correlation_id: Mutex<Option<String>>,
|
||||
/// Records accumulated during the current run; drained by Pod.
|
||||
pending_records: Mutex<Vec<UsageRecord>>,
|
||||
pending_records: Mutex<Vec<RecordedUsage>>,
|
||||
}
|
||||
|
||||
impl UsageTracker {
|
||||
pub(crate) fn new() -> Self {
|
||||
Self {
|
||||
pending_history_len: Mutex::new(None),
|
||||
pending_correlation_id: Mutex::new(None),
|
||||
pending_records: Mutex::new(Vec::new()),
|
||||
}
|
||||
}
|
||||
|
|
@ -41,16 +57,29 @@ impl UsageTracker {
|
|||
*self.pending_history_len.lock().unwrap() = Some(history_len);
|
||||
}
|
||||
|
||||
/// Stash a `correlation_id` to be paired into the next `RecordedUsage`.
|
||||
/// Currently invoked by the prune observer on `Fired` so that the
|
||||
/// `prune.fire` metric and the `prune.post_request` metric (emitted
|
||||
/// alongside the resulting `LlmUsage`) carry the same join key.
|
||||
///
|
||||
/// Overwrites any previous unconsumed value — by construction the
|
||||
/// observer fires at most once per outgoing LLM request, immediately
|
||||
/// before the pre-request hook captures `history_len`.
|
||||
pub(crate) fn note_correlation_id(&self, id: String) {
|
||||
*self.pending_correlation_id.lock().unwrap() = Some(id);
|
||||
}
|
||||
|
||||
/// Called from the `on_usage` callback with the aggregated final
|
||||
/// UsageEvent. If a `history_len` was previously stashed via
|
||||
/// `note_request`, builds a `UsageRecord` and pushes it onto the buffer.
|
||||
/// If not (e.g. test code that fires Usage outside a request), drops
|
||||
/// the event.
|
||||
/// `note_request`, builds a `RecordedUsage` and pushes it onto the
|
||||
/// buffer. If not (e.g. test code that fires Usage outside a request),
|
||||
/// drops the event.
|
||||
pub(crate) fn record_usage(&self, event: &UsageEvent) {
|
||||
let history_len = match self.pending_history_len.lock().unwrap().take() {
|
||||
Some(n) => n,
|
||||
None => return,
|
||||
};
|
||||
let correlation_id = self.pending_correlation_id.lock().unwrap().take();
|
||||
// UsageEvent.input_tokens は scheme 層で「占有量(プロンプト全長)」に
|
||||
// 正規化済みである前提(Anthropic は cache_read + cache_creation を
|
||||
// 加算して emit する)。
|
||||
|
|
@ -58,18 +87,21 @@ impl UsageTracker {
|
|||
let cache_read = event.cache_read_input_tokens.unwrap_or(0);
|
||||
let cache_write = event.cache_creation_input_tokens.unwrap_or(0);
|
||||
let output = event.output_tokens.unwrap_or(0);
|
||||
self.pending_records.lock().unwrap().push(UsageRecord {
|
||||
history_len,
|
||||
input_total_tokens: input_total,
|
||||
cache_read_tokens: cache_read,
|
||||
cache_write_tokens: cache_write,
|
||||
output_tokens: output,
|
||||
self.pending_records.lock().unwrap().push(RecordedUsage {
|
||||
record: UsageRecord {
|
||||
history_len,
|
||||
input_total_tokens: input_total,
|
||||
cache_read_tokens: cache_read,
|
||||
cache_write_tokens: cache_write,
|
||||
output_tokens: output,
|
||||
},
|
||||
correlation_id,
|
||||
});
|
||||
}
|
||||
|
||||
/// Drain accumulated records. Called by Pod after a run completes,
|
||||
/// before persisting the turn.
|
||||
pub(crate) fn drain(&self) -> Vec<UsageRecord> {
|
||||
pub(crate) fn drain(&self) -> Vec<RecordedUsage> {
|
||||
std::mem::take(&mut *self.pending_records.lock().unwrap())
|
||||
}
|
||||
}
|
||||
|
|
@ -96,11 +128,12 @@ mod tests {
|
|||
|
||||
let records = tracker.drain();
|
||||
assert_eq!(records.len(), 1);
|
||||
assert_eq!(records[0].history_len, 5);
|
||||
assert_eq!(records[0].input_total_tokens, 1000);
|
||||
assert_eq!(records[0].cache_read_tokens, 800);
|
||||
assert_eq!(records[0].cache_write_tokens, 100);
|
||||
assert_eq!(records[0].output_tokens, 42);
|
||||
assert_eq!(records[0].record.history_len, 5);
|
||||
assert_eq!(records[0].record.input_total_tokens, 1000);
|
||||
assert_eq!(records[0].record.cache_read_tokens, 800);
|
||||
assert_eq!(records[0].record.cache_write_tokens, 100);
|
||||
assert_eq!(records[0].record.output_tokens, 42);
|
||||
assert!(records[0].correlation_id.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
@ -129,8 +162,25 @@ mod tests {
|
|||
|
||||
let records = tracker.drain();
|
||||
assert_eq!(records.len(), 2);
|
||||
assert_eq!(records[0].history_len, 5);
|
||||
assert_eq!(records[1].history_len, 10);
|
||||
assert_eq!(records[1].cache_read_tokens, 50);
|
||||
assert_eq!(records[0].record.history_len, 5);
|
||||
assert_eq!(records[1].record.history_len, 10);
|
||||
assert_eq!(records[1].record.cache_read_tokens, 50);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn correlation_id_pairs_with_next_record_only() {
|
||||
let tracker = UsageTracker::new();
|
||||
// Stash an ID, then run a request → the ID should land on this record.
|
||||
tracker.note_correlation_id("abc".into());
|
||||
tracker.note_request(5);
|
||||
tracker.record_usage(&make_event(100, 0, 0, 20));
|
||||
// Next request without a fresh stash → no correlation_id.
|
||||
tracker.note_request(10);
|
||||
tracker.record_usage(&make_event(200, 50, 0, 30));
|
||||
|
||||
let records = tracker.drain();
|
||||
assert_eq!(records.len(), 2);
|
||||
assert_eq!(records[0].correlation_id.as_deref(), Some("abc"));
|
||||
assert!(records[1].correlation_id.is_none());
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -77,6 +77,11 @@ pub struct Pod<C: LlmClient, St: Store> {
|
|||
/// Captures `(history_len, UsageEvent)` pairs during a run; drained
|
||||
/// in `persist_turn` and persisted as `LogEntry::LlmUsage` entries.
|
||||
usage_tracker: Arc<UsageTracker>,
|
||||
/// Sync-side buffer for `Metric` values queued from inside Worker
|
||||
/// callbacks (currently the prune observer). Drained in `persist_turn`
|
||||
/// and written via `session_metrics::record_metric` alongside
|
||||
/// `LogEntry::LlmUsage`. Always present after construction.
|
||||
metrics_tracker: Arc<crate::compact::metrics_tracker::MetricsTracker>,
|
||||
/// Cumulative Usage measurement timeline, one entry per LLM call.
|
||||
/// Restored from session log on `restore`, appended on each persist.
|
||||
/// Read by token-accounting APIs (`Pod::total_tokens`, etc.).
|
||||
|
|
@ -203,6 +208,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
interceptor_installed: false,
|
||||
compact_state: None,
|
||||
usage_tracker: Arc::new(UsageTracker::new()),
|
||||
metrics_tracker: Arc::new(crate::compact::metrics_tracker::MetricsTracker::new()),
|
||||
usage_history: Arc::new(Mutex::new(Vec::<UsageRecord>::new())),
|
||||
tracker: None,
|
||||
system_prompt_template: None,
|
||||
|
|
@ -391,6 +397,26 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
self.usage_history.clone()
|
||||
}
|
||||
|
||||
/// Handle to the per-LLM-request `UsageTracker`.
|
||||
///
|
||||
/// Sibling modules (e.g. the prune observer) clone this `Arc` to stash
|
||||
/// per-request side state (e.g. a `correlation_id`) that pairs with
|
||||
/// the next `LlmUsage`.
|
||||
pub(crate) fn usage_tracker_handle(&self) -> Arc<UsageTracker> {
|
||||
self.usage_tracker.clone()
|
||||
}
|
||||
|
||||
/// Handle to the synchronous `MetricsTracker` buffer.
|
||||
///
|
||||
/// Worker callbacks (e.g. the prune observer) clone this `Arc` and
|
||||
/// `.push(metric)` into it; Pod drains it in `persist_turn` and
|
||||
/// writes each metric via `session_metrics::record_metric`.
|
||||
pub(crate) fn metrics_tracker_handle(
|
||||
&self,
|
||||
) -> Arc<crate::compact::metrics_tracker::MetricsTracker> {
|
||||
self.metrics_tracker.clone()
|
||||
}
|
||||
|
||||
/// Attach the session-scoped file-operation tracker from the builtin
|
||||
/// `tools` crate. Called by the Controller immediately after it
|
||||
/// registers the builtin tools on the Worker. Overwrites any
|
||||
|
|
@ -1068,13 +1094,36 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
)
|
||||
.await?;
|
||||
|
||||
// Flush any sync-buffered metrics from this run first
|
||||
// (currently `prune.fire` / `prune.skip` from the prune observer).
|
||||
// Ordered before LlmUsage so that a `prune.fire` and the
|
||||
// `prune.post_request` derived from the matching usage record
|
||||
// appear in the log close together.
|
||||
let pending_metrics = self.metrics_tracker.drain();
|
||||
for metric in pending_metrics {
|
||||
session_metrics::record_metric(
|
||||
&self.store,
|
||||
self.session_id,
|
||||
&mut self.head_hash,
|
||||
&metric,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
// Persist any LLM Usage measurements collected during this run.
|
||||
// One LogEntry::LlmUsage per LLM call (the tool loop may have run
|
||||
// many calls within a single Pod::run). Each is also appended to
|
||||
// the in-memory `usage_history` so token-accounting APIs see it
|
||||
// before the next run.
|
||||
// before the next run. Records carrying a `correlation_id` (set
|
||||
// by an upstream observer such as the prune projection) also get
|
||||
// a paired `prune.post_request` metric so cache_read/write can be
|
||||
// joined back to the originating event.
|
||||
let usage_records = self.usage_tracker.drain();
|
||||
for record in usage_records {
|
||||
for recorded in usage_records {
|
||||
let crate::compact::usage_tracker::RecordedUsage {
|
||||
record,
|
||||
correlation_id,
|
||||
} = recorded;
|
||||
session_store::save_usage(
|
||||
&self.store,
|
||||
self.session_id,
|
||||
|
|
@ -1086,6 +1135,20 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
record.output_tokens,
|
||||
)
|
||||
.await?;
|
||||
if let Some(id) = correlation_id {
|
||||
let metric = session_metrics::Metric::now("prune.post_request")
|
||||
.with_correlation_id(&id)
|
||||
.with_value(record.cache_read_tokens as f64)
|
||||
.with_dimension("cache_write_tokens", record.cache_write_tokens.to_string())
|
||||
.with_dimension("history_len", record.history_len.to_string());
|
||||
session_metrics::record_metric(
|
||||
&self.store,
|
||||
self.session_id,
|
||||
&mut self.head_hash,
|
||||
&metric,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
self.usage_history
|
||||
.lock()
|
||||
.expect("usage_history poisoned")
|
||||
|
|
@ -1895,6 +1958,7 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
|
|||
interceptor_installed: false,
|
||||
compact_state: None,
|
||||
usage_tracker: Arc::new(UsageTracker::new()),
|
||||
metrics_tracker: Arc::new(crate::compact::metrics_tracker::MetricsTracker::new()),
|
||||
usage_history: Arc::new(Mutex::new(Vec::new())),
|
||||
tracker: None,
|
||||
system_prompt_template: common.system_prompt_template,
|
||||
|
|
@ -1956,6 +2020,7 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
|
|||
interceptor_installed: false,
|
||||
compact_state: None,
|
||||
usage_tracker: Arc::new(UsageTracker::new()),
|
||||
metrics_tracker: Arc::new(crate::compact::metrics_tracker::MetricsTracker::new()),
|
||||
usage_history: Arc::new(Mutex::new(Vec::new())),
|
||||
tracker: None,
|
||||
system_prompt_template: common.system_prompt_template,
|
||||
|
|
@ -2067,6 +2132,7 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
|
|||
interceptor_installed: false,
|
||||
compact_state: None,
|
||||
usage_tracker: Arc::new(UsageTracker::new()),
|
||||
metrics_tracker: Arc::new(crate::compact::metrics_tracker::MetricsTracker::new()),
|
||||
usage_history: Arc::new(Mutex::new(state.usage_history)),
|
||||
tracker: None,
|
||||
// Restore replays the saved system_prompt verbatim — no
|
||||
|
|
|
|||
367
crates/pod/tests/session_metrics_test.rs
Normal file
367
crates/pod/tests/session_metrics_test.rs
Normal file
|
|
@ -0,0 +1,367 @@
|
|||
//! End-to-end coverage for the prune-projection metrics path.
|
||||
//!
|
||||
//! Drives a Pod with a scripted mock LLM client and a custom tool that
|
||||
//! returns a long `ToolOutput.content`, then inspects the persisted
|
||||
//! session log to verify:
|
||||
//!
|
||||
//! - `prune.skip { reason: "no_candidates" }` lands when the protected-turn
|
||||
//! window covers the entire history.
|
||||
//! - `prune.fire` lands once enough turns + usage measurements exist for
|
||||
//! the projection to actually apply.
|
||||
//! - The fire metric and the immediately-following `prune.post_request`
|
||||
//! metric share the same `correlation_id`, so cache_read / cache_write
|
||||
//! from the LlmUsage that triggered the projection can be joined back
|
||||
//! to the originating event.
|
||||
//! - `prune.skip { reason: "below_min_savings" }` lands when candidates
|
||||
//! exist but their estimated savings are below the configured floor.
|
||||
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use futures::Stream;
|
||||
use llm_worker::Worker;
|
||||
use llm_worker::llm_client::event::{
|
||||
Event as LlmEvent, ResponseStatus, StatusEvent, UsageEvent,
|
||||
};
|
||||
use llm_worker::llm_client::{ClientError, LlmClient, Request};
|
||||
use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput};
|
||||
use session_metrics::{DOMAIN, Metric, metrics_from_extensions};
|
||||
use session_store::FsStore;
|
||||
|
||||
use pod::{Pod, PodManifest};
|
||||
|
||||
#[derive(Clone)]
|
||||
struct MockClient {
|
||||
responses: Arc<Vec<Vec<LlmEvent>>>,
|
||||
call_count: Arc<AtomicUsize>,
|
||||
}
|
||||
|
||||
impl MockClient {
|
||||
fn new(responses: Vec<Vec<LlmEvent>>) -> Self {
|
||||
Self {
|
||||
responses: Arc::new(responses),
|
||||
call_count: Arc::new(AtomicUsize::new(0)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl LlmClient for MockClient {
|
||||
fn clone_boxed(&self) -> Box<dyn LlmClient> {
|
||||
Box::new(self.clone())
|
||||
}
|
||||
|
||||
async fn stream(
|
||||
&self,
|
||||
_request: Request,
|
||||
) -> Result<Pin<Box<dyn Stream<Item = Result<LlmEvent, ClientError>> + Send>>, ClientError>
|
||||
{
|
||||
let count = self.call_count.fetch_add(1, Ordering::SeqCst);
|
||||
if count >= self.responses.len() {
|
||||
return Err(ClientError::Config("mock client exhausted".into()));
|
||||
}
|
||||
let events = self.responses[count].clone();
|
||||
let stream = futures::stream::iter(events.into_iter().map(Ok));
|
||||
Ok(Box::pin(stream))
|
||||
}
|
||||
}
|
||||
|
||||
/// Tool that returns a fixed `ToolOutput { summary, content: Some(big) }`.
|
||||
/// `content` is long enough for prune savings to comfortably clear small
|
||||
/// `min_savings` thresholds.
|
||||
struct BigContentTool {
|
||||
summary: &'static str,
|
||||
content: String,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Tool for BigContentTool {
|
||||
async fn execute(&self, _input: &str) -> Result<ToolOutput, ToolError> {
|
||||
Ok(ToolOutput {
|
||||
summary: self.summary.into(),
|
||||
content: Some(self.content.clone()),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn big_content_tool_definition(name: &'static str) -> ToolDefinition {
|
||||
Arc::new(move || {
|
||||
let summary = "tool result summary";
|
||||
let content = "x".repeat(2048);
|
||||
(
|
||||
ToolMeta::new(name)
|
||||
.description("test tool that returns a long content")
|
||||
.input_schema(serde_json::json!({"type": "object"})),
|
||||
Arc::new(BigContentTool { summary, content }) as Arc<dyn Tool>,
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
fn usage_event(input_total: u64, cache_read: u64, cache_write: u64, output: u64) -> LlmEvent {
|
||||
LlmEvent::Usage(UsageEvent {
|
||||
input_tokens: Some(input_total),
|
||||
output_tokens: Some(output),
|
||||
total_tokens: Some(input_total + output),
|
||||
cache_read_input_tokens: Some(cache_read),
|
||||
cache_creation_input_tokens: Some(cache_write),
|
||||
})
|
||||
}
|
||||
|
||||
/// Tool-call response from the assistant: emits a `tool_use` block then a
|
||||
/// usage event so usage_history gains a measurement on this turn.
|
||||
fn tool_use_response(call_id: &str, tool_name: &str) -> Vec<LlmEvent> {
|
||||
vec![
|
||||
LlmEvent::tool_use_start(0, call_id, tool_name),
|
||||
LlmEvent::tool_input_delta(0, "{}"),
|
||||
LlmEvent::tool_use_stop(0),
|
||||
usage_event(500, 0, 0, 10),
|
||||
LlmEvent::Status(StatusEvent {
|
||||
status: ResponseStatus::Completed,
|
||||
}),
|
||||
]
|
||||
}
|
||||
|
||||
/// Plain text response with explicit cache_read/cache_write so that
|
||||
/// `prune.post_request` can carry meaningful values when this is the
|
||||
/// LLM call that follows a `prune.fire` event.
|
||||
fn text_response_with_cache(text: &str, cache_read: u64, cache_write: u64) -> Vec<LlmEvent> {
|
||||
vec![
|
||||
LlmEvent::text_block_start(0),
|
||||
LlmEvent::text_delta(0, text),
|
||||
LlmEvent::text_block_stop(0, None),
|
||||
usage_event(800, cache_read, cache_write, 5),
|
||||
LlmEvent::Status(StatusEvent {
|
||||
status: ResponseStatus::Completed,
|
||||
}),
|
||||
]
|
||||
}
|
||||
|
||||
fn manifest_toml(prune_protected_turns: usize, prune_min_savings: u64) -> String {
|
||||
format!(
|
||||
r#"
|
||||
[pod]
|
||||
name = "test-pod"
|
||||
pwd = "./"
|
||||
|
||||
[model]
|
||||
scheme = "anthropic"
|
||||
model_id = "test-model"
|
||||
|
||||
[worker]
|
||||
max_tokens = 100
|
||||
|
||||
[compaction]
|
||||
prune_protected_turns = {prune_protected_turns}
|
||||
prune_min_savings = {prune_min_savings}
|
||||
|
||||
[[scope.allow]]
|
||||
target = "./"
|
||||
permission = "write"
|
||||
"#
|
||||
)
|
||||
}
|
||||
|
||||
async fn make_pod(
|
||||
manifest_toml: String,
|
||||
client: MockClient,
|
||||
tool_name: &'static str,
|
||||
) -> (Pod<MockClient, FsStore>, tempfile::TempDir, tempfile::TempDir) {
|
||||
let manifest = PodManifest::from_toml(&manifest_toml).unwrap();
|
||||
let store_tmp = tempfile::tempdir().unwrap();
|
||||
let store = FsStore::new(store_tmp.path()).await.unwrap();
|
||||
let pwd_tmp = tempfile::tempdir().unwrap();
|
||||
let pwd = pwd_tmp.path().to_path_buf();
|
||||
let scope = pod::Scope::writable(&pwd).unwrap();
|
||||
|
||||
let mut worker = Worker::new(client);
|
||||
worker.register_tool(big_content_tool_definition(tool_name));
|
||||
|
||||
let pod = Pod::new(manifest, worker, store, pwd, scope).await.unwrap();
|
||||
(pod, store_tmp, pwd_tmp)
|
||||
}
|
||||
|
||||
/// Drive Pod through enough runs to exercise both skip-no_candidates and
|
||||
/// fire branches, then read the session log back and assert the metric
|
||||
/// stream.
|
||||
#[tokio::test]
|
||||
async fn prune_metrics_emit_skip_then_fire_with_post_request_join() {
|
||||
// Run 1 (request 0): tool_use → triggers tool execution → request 1
|
||||
// on the second iteration to produce the assistant reply.
|
||||
// Run 2 (request 2): plain assistant text. Prune evaluation here
|
||||
// sees user1's tool_result outside the 1-protected-turn window and
|
||||
// should fire.
|
||||
let client = MockClient::new(vec![
|
||||
tool_use_response("call-1", "big_tool"),
|
||||
text_response_with_cache("ok", 0, 200),
|
||||
text_response_with_cache("done", 1234, 50),
|
||||
]);
|
||||
let (mut pod, _store_tmp, _pwd_tmp) =
|
||||
make_pod(manifest_toml(1, 1), client, "big_tool").await;
|
||||
let session_id = pod.session_id();
|
||||
// Cloning the store handle to read the session log back after the
|
||||
// runs complete — the Pod retains its own copy.
|
||||
let store = pod.store().clone();
|
||||
|
||||
pod.run_text("first").await.unwrap();
|
||||
pod.run_text("second").await.unwrap();
|
||||
|
||||
let state = session_store::restore(&store, session_id).await.unwrap();
|
||||
let metrics = metrics_from_extensions(&state.extensions);
|
||||
|
||||
// Run 1 has 2 LLM iterations (tool loop), each evaluates prune with
|
||||
// only one user-message turn → 2x skip{no_candidates}.
|
||||
// Run 2 has 1 LLM iteration with enough turns → 1x fire +
|
||||
// 1x post_request paired by correlation_id.
|
||||
let names: Vec<&str> = metrics.iter().map(|m| m.name.as_str()).collect();
|
||||
assert!(
|
||||
names.contains(&"prune.skip"),
|
||||
"expected prune.skip in {names:?}"
|
||||
);
|
||||
assert!(
|
||||
names.contains(&"prune.fire"),
|
||||
"expected prune.fire in {names:?}"
|
||||
);
|
||||
assert!(
|
||||
names.contains(&"prune.post_request"),
|
||||
"expected prune.post_request in {names:?}"
|
||||
);
|
||||
|
||||
// All skips in run 1 must record reason=no_candidates.
|
||||
for m in metrics.iter().filter(|m| m.name == "prune.skip") {
|
||||
assert_eq!(
|
||||
m.dimensions.get("reason").map(String::as_str),
|
||||
Some("no_candidates"),
|
||||
"skip metric should be no_candidates here, got {m:?}"
|
||||
);
|
||||
assert!(m.correlation_id.is_none());
|
||||
}
|
||||
|
||||
// The fire metric carries dimensions and correlation_id.
|
||||
let fire = metrics
|
||||
.iter()
|
||||
.find(|m| m.name == "prune.fire")
|
||||
.expect("prune.fire missing");
|
||||
assert!(
|
||||
fire.dimensions.contains_key("candidate_count"),
|
||||
"fire missing candidate_count: {fire:?}"
|
||||
);
|
||||
assert!(
|
||||
fire.dimensions.contains_key("border_turn"),
|
||||
"fire missing border_turn: {fire:?}"
|
||||
);
|
||||
assert!(
|
||||
fire.value.is_some(),
|
||||
"fire missing estimated_savings value"
|
||||
);
|
||||
let fire_id = fire
|
||||
.correlation_id
|
||||
.as_ref()
|
||||
.expect("fire metric missing correlation_id");
|
||||
|
||||
// Exactly one post_request metric should exist with the same id, and
|
||||
// its value/dimension should reflect the cache numbers from the
|
||||
// text_response_with_cache call (cache_read=1234, cache_write=50).
|
||||
let post = metrics
|
||||
.iter()
|
||||
.find(|m| m.name == "prune.post_request")
|
||||
.expect("prune.post_request missing");
|
||||
assert_eq!(post.correlation_id.as_ref(), Some(fire_id));
|
||||
assert_eq!(post.value, Some(1234.0));
|
||||
assert_eq!(
|
||||
post.dimensions.get("cache_write_tokens").map(String::as_str),
|
||||
Some("50")
|
||||
);
|
||||
assert!(post.dimensions.contains_key("history_len"));
|
||||
}
|
||||
|
||||
/// `min_savings` set high enough that candidates exist but the estimated
|
||||
/// savings always fall short → the second run should record
|
||||
/// `prune.skip { reason: "below_min_savings" }`.
|
||||
#[tokio::test]
|
||||
async fn prune_metrics_record_below_min_savings_skip() {
|
||||
let client = MockClient::new(vec![
|
||||
tool_use_response("call-1", "big_tool"),
|
||||
text_response_with_cache("ok", 0, 100),
|
||||
text_response_with_cache("done", 0, 0),
|
||||
]);
|
||||
let (mut pod, _store_tmp, _pwd_tmp) =
|
||||
make_pod(manifest_toml(1, u64::MAX), client, "big_tool").await;
|
||||
let session_id = pod.session_id();
|
||||
let store = pod.store().clone();
|
||||
|
||||
pod.run_text("first").await.unwrap();
|
||||
pod.run_text("second").await.unwrap();
|
||||
|
||||
let state = session_store::restore(&store, session_id).await.unwrap();
|
||||
let metrics = metrics_from_extensions(&state.extensions);
|
||||
let below = metrics
|
||||
.iter()
|
||||
.find(|m| {
|
||||
m.name == "prune.skip"
|
||||
&& m.dimensions.get("reason").map(String::as_str) == Some("below_min_savings")
|
||||
})
|
||||
.expect("expected prune.skip with reason=below_min_savings");
|
||||
assert!(
|
||||
below.dimensions.contains_key("candidate_count"),
|
||||
"below_min_savings skip should report candidate_count: {below:?}"
|
||||
);
|
||||
assert!(
|
||||
below.value.is_some(),
|
||||
"below_min_savings skip should report estimated savings as value: {below:?}"
|
||||
);
|
||||
// No prune.fire for this scenario.
|
||||
assert!(metrics.iter().all(|m| m.name != "prune.fire"));
|
||||
// No prune.post_request either (no fire to join with).
|
||||
assert!(metrics.iter().all(|m| m.name != "prune.post_request"));
|
||||
}
|
||||
|
||||
/// Sessions that have no metrics in the log restore cleanly: the
|
||||
/// `RestoredState.extensions` simply contains no `metrics` domain
|
||||
/// payloads, and `metrics_from_extensions` returns an empty Vec.
|
||||
/// Backward-compatibility check for old logs predating this feature.
|
||||
#[tokio::test]
|
||||
async fn old_sessions_without_metrics_replay_cleanly() {
|
||||
// Manifest without any `[compaction]` section → prune (and therefore
|
||||
// the prune observer) is never installed, so no metrics get written.
|
||||
let manifest_toml = r#"
|
||||
[pod]
|
||||
name = "test-pod"
|
||||
pwd = "./"
|
||||
|
||||
[model]
|
||||
scheme = "anthropic"
|
||||
model_id = "test-model"
|
||||
|
||||
[worker]
|
||||
max_tokens = 100
|
||||
|
||||
[[scope.allow]]
|
||||
target = "./"
|
||||
permission = "write"
|
||||
"#;
|
||||
let client = MockClient::new(vec![text_response_with_cache("hi", 0, 0)]);
|
||||
let manifest = PodManifest::from_toml(manifest_toml).unwrap();
|
||||
let store_tmp = tempfile::tempdir().unwrap();
|
||||
let store = FsStore::new(store_tmp.path()).await.unwrap();
|
||||
let pwd_tmp = tempfile::tempdir().unwrap();
|
||||
let pwd = pwd_tmp.path().to_path_buf();
|
||||
let scope = pod::Scope::writable(&pwd).unwrap();
|
||||
let worker = Worker::new(client);
|
||||
let mut pod = Pod::new(manifest, worker, store.clone(), pwd, scope)
|
||||
.await
|
||||
.unwrap();
|
||||
let session_id = pod.session_id();
|
||||
pod.run_text("hello").await.unwrap();
|
||||
|
||||
let state = session_store::restore(&store, session_id).await.unwrap();
|
||||
let metrics = metrics_from_extensions(&state.extensions);
|
||||
assert!(metrics.is_empty(), "no metrics should be recorded: {metrics:?}");
|
||||
// And no extension entries at all in the metrics domain.
|
||||
assert!(state.extensions.iter().all(|(d, _)| d != DOMAIN));
|
||||
|
||||
// Smoke check that fold helper is robust on a sentinel Metric value:
|
||||
let m = Metric::now("smoke");
|
||||
assert_eq!(m.name, "smoke");
|
||||
}
|
||||
10
crates/session-metrics/Cargo.toml
Normal file
10
crates/session-metrics/Cargo.toml
Normal file
|
|
@ -0,0 +1,10 @@
|
|||
[package]
|
||||
name = "session-metrics"
|
||||
version = "0.1.0"
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
serde_json = { workspace = true }
|
||||
session-store = { workspace = true }
|
||||
170
crates/session-metrics/src/lib.rs
Normal file
170
crates/session-metrics/src/lib.rs
Normal file
|
|
@ -0,0 +1,170 @@
|
|||
//! Session metrics — generic ad-hoc measurement lane on top of
|
||||
//! `LogEntry::Extension { domain: "metrics" }`.
|
||||
//!
|
||||
//! セッション中に積み上げて後で引きたい値(prune の発火頻度・Hook の実行
|
||||
//! 時間・ツールリトライ回数 等)を session-log に乗せるための薄い層。
|
||||
//! session-store は payload を不透明な `serde_json::Value` として扱うので、
|
||||
//! このクレートは型と読み書きヘルパーだけを提供する。
|
||||
//!
|
||||
//! # 設計
|
||||
//!
|
||||
//! - 厳格な label set は持たない。次元は sparse な `BTreeMap<String,String>`、
|
||||
//! 観測できない値は `None` で明示する
|
||||
//! - 「後から埋まる値」(例: prune 発火直後の `cache_read_tokens`)は前 entry に
|
||||
//! 書き戻さず、`correlation_id` を共有する別 metric として流す。集計は読み手で join
|
||||
//! - 集計 / 可視化 API はこのクレートには無い。session-log を読めば取り出せる、
|
||||
//! までが到達点
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use session_store::{EntryHash, SessionId, Store, StoreError, save_extension, session_log};
|
||||
|
||||
/// Domain tag used in `LogEntry::Extension` for all metrics records.
|
||||
pub const DOMAIN: &str = "metrics";
|
||||
|
||||
/// 単発の計測値。`name` は `namespace.metric` 形式の自由文字列
|
||||
/// (例: `"prune.fire"`、`"hook.duration"`)。
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
pub struct Metric {
|
||||
/// `namespace.metric` 形式の名前。
|
||||
pub name: String,
|
||||
/// epoch ms。
|
||||
pub ts: u64,
|
||||
/// sparse な次元(label)。観測できないものはキー自体を入れない。
|
||||
#[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
|
||||
pub dimensions: BTreeMap<String, String>,
|
||||
/// 主スカラ値。dimension では表現したくない数値を載せる場所。
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub value: Option<f64>,
|
||||
/// 関連 metric を join するためのキー。同一 ID を持つ複数 metric は
|
||||
/// 「同じ事象を多面的に観測している」という意味付けで読まれる。
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub correlation_id: Option<String>,
|
||||
}
|
||||
|
||||
impl Metric {
|
||||
/// 最小コンストラクタ。`ts` は呼び出し時刻(epoch ms)で埋める。
|
||||
pub fn now(name: impl Into<String>) -> Self {
|
||||
Self {
|
||||
name: name.into(),
|
||||
ts: session_log::now_millis(),
|
||||
dimensions: BTreeMap::new(),
|
||||
value: None,
|
||||
correlation_id: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_dimension(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
|
||||
self.dimensions.insert(key.into(), value.into());
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_value(mut self, value: f64) -> Self {
|
||||
self.value = Some(value);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_correlation_id(mut self, id: impl Into<String>) -> Self {
|
||||
self.correlation_id = Some(id.into());
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
/// `LogEntry::Extension { domain: "metrics", payload: <metric> }` を append する。
|
||||
///
|
||||
/// `save_extension` の薄い wrapper。書き込み失敗は呼び出し側に返す
|
||||
/// (メトリクスのために本体処理を止めるかは呼び出し側の判断)。
|
||||
pub async fn record_metric(
|
||||
store: &impl Store,
|
||||
session_id: SessionId,
|
||||
head_hash: &mut Option<EntryHash>,
|
||||
metric: &Metric,
|
||||
) -> Result<(), StoreError> {
|
||||
let payload = serde_json::to_value(metric).expect("Metric serialization cannot fail");
|
||||
save_extension(store, session_id, head_hash, DOMAIN, payload).await
|
||||
}
|
||||
|
||||
/// `RestoredState.extensions` から metrics domain の payload を順に取り出し、
|
||||
/// `Metric` 列に fold する。
|
||||
///
|
||||
/// schema 変更で deserialize できない payload は無視する(後方互換)。
|
||||
pub fn metrics_from_extensions(
|
||||
extensions: &[(String, serde_json::Value)],
|
||||
) -> Vec<Metric> {
|
||||
extensions
|
||||
.iter()
|
||||
.filter(|(domain, _)| domain == DOMAIN)
|
||||
.filter_map(|(_, payload)| serde_json::from_value::<Metric>(payload.clone()).ok())
|
||||
.collect()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn metric_round_trip_via_json() {
|
||||
let metric = Metric::now("prune.fire")
|
||||
.with_dimension("border_turn", "3")
|
||||
.with_dimension("candidate_count", "2")
|
||||
.with_value(4096.0)
|
||||
.with_correlation_id("abc-123");
|
||||
let json = serde_json::to_string(&metric).unwrap();
|
||||
let parsed: Metric = serde_json::from_str(&json).unwrap();
|
||||
assert_eq!(parsed, metric);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn metric_serializes_minimal_form_compactly() {
|
||||
// dimensions が空 / value/correlation_id が None の時は出力に含めない。
|
||||
let metric = Metric {
|
||||
name: "x".into(),
|
||||
ts: 1,
|
||||
dimensions: BTreeMap::new(),
|
||||
value: None,
|
||||
correlation_id: None,
|
||||
};
|
||||
let json = serde_json::to_string(&metric).unwrap();
|
||||
assert!(!json.contains("dimensions"));
|
||||
assert!(!json.contains("value"));
|
||||
assert!(!json.contains("correlation_id"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn fold_skips_other_domains() {
|
||||
let extensions = vec![
|
||||
(
|
||||
"memory.extract".into(),
|
||||
serde_json::json!({ "processed_through_entry": 7 }),
|
||||
),
|
||||
(
|
||||
DOMAIN.into(),
|
||||
serde_json::to_value(Metric::now("a")).unwrap(),
|
||||
),
|
||||
(
|
||||
DOMAIN.into(),
|
||||
serde_json::to_value(Metric::now("b")).unwrap(),
|
||||
),
|
||||
];
|
||||
let metrics = metrics_from_extensions(&extensions);
|
||||
assert_eq!(metrics.len(), 2);
|
||||
assert_eq!(metrics[0].name, "a");
|
||||
assert_eq!(metrics[1].name, "b");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn fold_skips_undeserializable_payloads() {
|
||||
// 将来 schema が変わって読めない payload も skip して落ちない。
|
||||
let extensions = vec![
|
||||
(DOMAIN.into(), serde_json::json!({ "garbage": true })),
|
||||
(
|
||||
DOMAIN.into(),
|
||||
serde_json::to_value(Metric::now("ok")).unwrap(),
|
||||
),
|
||||
];
|
||||
let metrics = metrics_from_extensions(&extensions);
|
||||
assert_eq!(metrics.len(), 1);
|
||||
assert_eq!(metrics[0].name, "ok");
|
||||
}
|
||||
}
|
||||
|
|
@ -72,3 +72,8 @@
|
|||
- `crates/session-store/src/session_log.rs`(`LogEntry::Extension` と `RestoredState.extensions` の既存仕様)
|
||||
- `crates/llm-worker/src/usage_record.rs`、`crates/llm-worker/src/llm_client/event.rs`(cache_read / cache_write の取得経路)
|
||||
- `crates/pod/src/compact/prune.rs`、`crates/llm-worker/src/prune.rs`(最初の利用者の挿入点)
|
||||
|
||||
## Review
|
||||
- 状態: Approve with follow-up
|
||||
- レビュー詳細: [./session-metrics.review.md](./session-metrics.review.md)
|
||||
- 日付: 2026-05-03
|
||||
|
|
|
|||
65
tickets/session-metrics.review.md
Normal file
65
tickets/session-metrics.review.md
Normal file
|
|
@ -0,0 +1,65 @@
|
|||
# Review: セッションメトリクス: Extension 経由の汎用計測レーン
|
||||
|
||||
## 前提・要件の確認
|
||||
|
||||
### メトリクス型
|
||||
- 専用 crate に置き serde で round-trip できる: `crates/session-metrics/src/lib.rs:28-44`、`tests::metric_round_trip_via_json`。OK。
|
||||
- 必須 `name` / `ts`、任意 `dimensions` / `value` / `correlation_id`、unknown は `None` で表現: 同 `lib.rs:28-44`、`metric_serializes_minimal_form_compactly` で省略形を確認。OK。
|
||||
|
||||
### 書き込み経路
|
||||
- 薄いヘルパーがあり `LogEntry::Extension { domain: "metrics", payload }` で append: `crates/session-metrics/src/lib.rs:78-86` → `session_store::save_extension`。session-store 側は無改変(`git diff -- crates/session-store/` で空)。
|
||||
- hash chain に乗る (`save_extension` 経由)、ticket 上の "session-store に追加" は専用 crate に置き換えており、ticket 文言の「専用 crate(または既存の適切な配置)」の許容範囲。OK。
|
||||
- 書き込み失敗の握り潰し: **未対応(後述 Blocking)。**
|
||||
|
||||
### 読み出し経路
|
||||
- `metrics_from_extensions` で fold、deserialize 失敗 payload は skip: `crates/session-metrics/src/lib.rs:92-100`、`fold_skips_undeserializable_payloads` でカバー。OK。
|
||||
- 「特定セッションの metric 列を取り出すサンプル」は `crates/pod/tests/session_metrics_test.rs:210-211` が `session_store::restore` → `metrics_from_extensions` の最小例として機能している。OK。
|
||||
|
||||
### 最初の利用者: Prune projection
|
||||
- `attach_prune` から `prune.fire` / `prune.skip` を発行: `crates/pod/src/compact/prune.rs:52-80`。
|
||||
- `Fired` 時: `value=estimated_savings`, `correlation_id`(uuid v7), `candidate_count` + `border_turn` dim、`UsageTracker::note_correlation_id` で stash。
|
||||
- `SkippedNoCandidates` / `SkippedBelowMinSavings` の 2 経路を分けている。
|
||||
- `prune.post_request` が直後の `LlmUsage` と組で発行され同じ `correlation_id` を持つ: `crates/pod/src/pod.rs:1121-1156`。
|
||||
- `correlation_id` の生成は uuid v7(既存 `uuid` workspace dep の v7 feature を再利用)。OK。
|
||||
|
||||
### Resume 互換
|
||||
- `[compaction]` 無し manifest で metrics が一切書かれない・replay も成功: `crates/pod/tests/session_metrics_test.rs:325-367`。OK。
|
||||
|
||||
### 完了条件
|
||||
- 型 + 書き込み/読み出し + unit test: 4 件 (`crates/session-metrics/src/lib.rs:106-169`)。OK。
|
||||
- prune.fire/skip/post_request が session-log に乗る: 統合テスト `prune_metrics_emit_skip_then_fire_with_post_request_join` で確認。
|
||||
- 後方互換: `old_sessions_without_metrics_replay_cleanly` で確認。
|
||||
- 「prune metric 列を取り出す」テスト: 同上の統合テストが兼ねる。
|
||||
- correlation_id join: `prune_metrics_emit_skip_then_fire_with_post_request_join:258-276` で `fire.correlation_id == post.correlation_id`、`post.value == cache_read_input_tokens` を assert。OK。
|
||||
|
||||
## アーキテクチャ・スコープ
|
||||
|
||||
- session-store の型と公開 API は無改変(`git diff` 空)。前提を遵守している。
|
||||
- `UsageTracker` 内部を `Vec<UsageRecord>` → `Vec<RecordedUsage>` に拡張したのは pod 内 `pub(crate)` のローカル拡張で、`llm_worker::UsageRecord` 型自体は触っていない。レイヤ上は問題なし(`session-store の型は触らない` の制約は守られている)。
|
||||
- 新規 crate 名 `session-metrics` は memory ノートの「`insomnia-` プレフィックス不要、短い名前」に準拠。
|
||||
- 依存追加は workspace.dependencies + `cargo add` 流(手動編集の痕跡なし)。OK。
|
||||
- `Worker` への配線は `set_prune_observer` を追加する小規模な拡張で、prune 評価ロジック自体は `evaluate_candidates` の border_turn 返却を追加した以外はリファクタの域。`prunable_indices` を wrapper に薄く残しているのも既存呼び出し側を壊さない配慮で妥当。
|
||||
- prune metric の dimension/value 振り分け(`prune.fire` の `value=estimated_savings`, `prune.post_request` の `value=cache_read_tokens`)は ticket の「value/dimension として記録」を素直に解釈したもので許容範囲。今後別 metric を生やすときに揃えやすいよう、value は「主スカラ(後で集計したい数)」/dimension は「軸(filter したい文字列)」のポリシーに統一しておくとよい(コメントで明文化されているとなおよし)。
|
||||
- LLM provider policy / ScopedFs scripting plan 等の他方針には抵触しない。
|
||||
|
||||
## 指摘事項
|
||||
|
||||
### Blocking
|
||||
|
||||
なし。
|
||||
|
||||
### Non-blocking / Follow-up
|
||||
|
||||
- **メトリクス書き込み失敗の握り潰し**: ticket 要件 `crates/.../tickets/session-metrics.md:29` に「書き込み失敗(store IO エラー)はメトリクス側で握りつぶす。本体処理を阻害しない」とある一方、`crates/pod/src/pod.rs:1102-1110` および `1144-1150` の `record_metric` は `?` で `StoreError` を呼び出し元 (`persist_turn` → `Pod::run`) に伝播させている。ヘルパー自身のドキュメントも「書き込み失敗は呼び出し側に返す」(`crates/session-metrics/src/lib.rs:76-77`) で投げ直し前提なので、現状はメトリクス IO 失敗時に turn 永続化フロー全体が落ちる。挙動として「メトリクスのために本体処理を止めない」契約を満たしていない。
|
||||
- 対応案: `persist_turn` 内で `record_metric(..).await` の戻り値を `if let Err(e) = ... { warn!(error=%e, "metric write failed; ignoring"); }` で握り、`LlmUsage` 永続化と直交させる。Helper の doc も「呼び出し側で握り潰すのが既定」に揃える。
|
||||
- 重要度: ticket の明示要件であり本来 Blocking 候補だが、現状 store IO は LocalFsStore 一択で実害が出る経路が乏しく、後続の小修正で吸収可能と判断し follow-up に置く(ユーザーが Blocking 扱いに引き上げたい場合は了承)。
|
||||
|
||||
### Nits
|
||||
|
||||
- `crates/pod/src/compact/prune.rs:58-62` で `Fired` 時の `border_turn` を `if let Some(...)` で条件挿入しているが、`evaluate_candidates` の実装上 `Fired` / `SkippedBelowMinSavings` のときは必ず `Some` になる(候補が空でない=境界が決まる)。動作上問題はないが、不変条件を `expect("border_turn is Some when candidates exist")` で表に出すか、コメントで残すと意図が明確。
|
||||
- `crates/pod/src/pod.rs:1102` 周辺のコメントにある「Ordered before LlmUsage so that a `prune.fire` and the `prune.post_request` derived from the matching usage record appear in the log close together.」は良い記述。同コメントを `metrics_tracker.rs` の `drain` 側にも一行張ると読み手が flush 順を把握しやすい。
|
||||
- `Metric` の value/dimension のポリシー(主スカラ vs ラベル)について、`session-metrics/src/lib.rs` の crate doc に 1〜2 行追記しておくと、今後の利用者(compact/hook 等)が振り分けで迷わない。
|
||||
|
||||
## 判断
|
||||
|
||||
**Approve with follow-up** — ticket 要件と完了条件の主要部分は概ね満たされている。session-store の型を一切触らず、新規 crate も命名・依存追加とも方針通り。特に correlation_id による fire ↔ post_request の join はテストで明示的に検証されており、一次目的(prune 効果測定の最小レーン)は達成。一方で「メトリクス書き込み失敗を握り潰す」要件が `persist_turn` 経路で守られていない点は ticket の明文要件であり、後続コミットで吸収する想定で follow-up とする(Blocking 扱いに昇格させたい場合はその判断に従う)。
|
||||
Loading…
Reference in New Issue
Block a user