feat: session-metrics完了

This commit is contained in:
Keisuke Hirata 2026-05-03 15:56:06 +09:00
parent 70c4f1930e
commit aca5bda1f2
5 changed files with 132 additions and 160 deletions

View File

@ -16,6 +16,5 @@
- メモリ機構
- 使用頻度メトリクス + Knowledge 化候補レポート → [tickets/memory-usage-metrics.md](tickets/memory-usage-metrics.md)
- セッション内 TODO ツール(注意機構付き) → [tickets/session-todo.md](tickets/session-todo.md)
- セッションメトリクス: Extension 経由の汎用計測レーン(最初の利用者は Prune → [tickets/session-metrics.md](tickets/session-metrics.md)
- ワークスペースのメモリーをLintするヘッドレスCLI
- system-reminder 注入機構の汎用化2件目の利用者が出た時に検討。タグ形式と「履歴を汚さない」原則は session-todo で先行確立)

View File

@ -454,6 +454,28 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
}
}
/// Append a metric, swallowing errors so observability writes never
/// fail the surrounding turn. On failure the head hash stays put
/// (the entry is dropped) and a `Warn` alert + `tracing::warn!` are
/// emitted so the failure isn't completely silent.
async fn try_record_metric(&mut self, metric: &session_metrics::Metric) {
if let Err(err) = session_metrics::record_metric(
&self.store,
self.session_id,
&mut self.head_hash,
metric,
)
.await
{
warn!(name = %metric.name, error = %err, "failed to record session metric; dropping");
self.alert(
AlertLevel::Warn,
AlertSource::Pod,
format!("failed to record metric `{}`: {}", metric.name, err),
);
}
}
/// Broadcast a typed `Event` to connected clients. No-op when no
/// `event_tx` is attached (tests / direct `Pod::new` usage) or when
/// no clients are currently subscribed.
@ -1099,15 +1121,16 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
// Ordered before LlmUsage so that a `prune.fire` and the
// `prune.post_request` derived from the matching usage record
// appear in the log close together.
//
// Metric writes are intentionally non-fatal: a failure here
// surfaces as a `Warn` alert + `tracing::warn!` and the loop
// continues. Metrics are observability data, not load-bearing
// for run correctness, so a transient FS error must not poison
// the turn record (`save_delta` / `save_turn_end` already landed
// by this point, and `save_run_completed` still needs to land).
let pending_metrics = self.metrics_tracker.drain();
for metric in pending_metrics {
session_metrics::record_metric(
&self.store,
self.session_id,
&mut self.head_hash,
&metric,
)
.await?;
self.try_record_metric(&metric).await;
}
// Persist any LLM Usage measurements collected during this run.
@ -1141,13 +1164,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
.with_value(record.cache_read_tokens as f64)
.with_dimension("cache_write_tokens", record.cache_write_tokens.to_string())
.with_dimension("history_len", record.history_len.to_string());
session_metrics::record_metric(
&self.store,
self.session_id,
&mut self.head_hash,
&metric,
)
.await?;
self.try_record_metric(&metric).await;
}
self.usage_history
.lock()

View File

@ -28,7 +28,9 @@ use llm_worker::llm_client::event::{
use llm_worker::llm_client::{ClientError, LlmClient, Request};
use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput};
use session_metrics::{DOMAIN, Metric, metrics_from_extensions};
use session_store::FsStore;
use session_store::{
EntryHash, FsStore, HashedEntry, LogEntry, SessionId, Store, StoreError, TraceEntry,
};
use pod::{Pod, PodManifest};
@ -317,6 +319,104 @@ async fn prune_metrics_record_below_min_savings_skip() {
assert!(metrics.iter().all(|m| m.name != "prune.post_request"));
}
/// `Store` wrapper that delegates to an inner `FsStore` for everything
/// except `LogEntry::Extension { domain: "metrics", .. }` appends, which
/// it rejects with an `Io` error. Lets us drive the `try_record_metric`
/// failure path without affecting any other persistence write.
#[derive(Clone)]
struct MetricFailingStore {
inner: FsStore,
}
impl Store for MetricFailingStore {
async fn append(&self, id: SessionId, entry: &HashedEntry) -> Result<(), StoreError> {
if let LogEntry::Extension { domain, .. } = &entry.entry {
if domain == DOMAIN {
return Err(StoreError::Io(std::io::Error::other("synthetic failure")));
}
}
self.inner.append(id, entry).await
}
async fn read_all(&self, id: SessionId) -> Result<Vec<HashedEntry>, StoreError> {
self.inner.read_all(id).await
}
async fn list_sessions(&self) -> Result<Vec<SessionId>, StoreError> {
self.inner.list_sessions().await
}
async fn create_session(
&self,
id: SessionId,
entries: &[HashedEntry],
) -> Result<(), StoreError> {
self.inner.create_session(id, entries).await
}
async fn exists(&self, id: SessionId) -> Result<bool, StoreError> {
self.inner.exists(id).await
}
async fn read_head_hash(&self, id: SessionId) -> Result<Option<EntryHash>, StoreError> {
self.inner.read_head_hash(id).await
}
async fn append_trace(&self, id: SessionId, entry: &TraceEntry) -> Result<(), StoreError> {
self.inner.append_trace(id, entry).await
}
}
/// Metric write failures are non-fatal: the run still completes, the
/// session log carries no metric entries (drops), but a `Warn` alert
/// fires on the alerter so the TUI surface picks it up.
#[tokio::test]
async fn metric_write_failure_emits_warn_alert_and_does_not_abort_run() {
use protocol::{AlertLevel, AlertSource, Event};
use tokio::sync::broadcast;
let manifest_toml = manifest_toml(1, 1);
let manifest = PodManifest::from_toml(&manifest_toml).unwrap();
let store_tmp = tempfile::tempdir().unwrap();
let inner = FsStore::new(store_tmp.path()).await.unwrap();
let store = MetricFailingStore { inner };
let pwd_tmp = tempfile::tempdir().unwrap();
let pwd = pwd_tmp.path().to_path_buf();
let scope = pod::Scope::writable(&pwd).unwrap();
// Even with a tool registered, this run will only emit
// `prune.skip { reason: "no_candidates" }` (one user message,
// protected_turns=1 covers everything). That is enough to drive
// the failure path: at least one metric attempts to write.
let client = MockClient::new(vec![text_response_with_cache("hi", 0, 0)]);
let worker = Worker::new(client);
let mut pod = Pod::new(manifest, worker, store.clone(), pwd, scope)
.await
.unwrap();
let (tx, mut rx) = broadcast::channel::<Event>(64);
let alerter = pod::Alerter::new(tx);
pod.attach_alerter(alerter);
let session_id = pod.session_id();
// Run completes successfully despite metric failure.
pod.run_text("hello").await.unwrap();
// No metrics ended up in the log (writes were rejected).
let state = session_store::restore(&store, session_id).await.unwrap();
let metrics = metrics_from_extensions(&state.extensions);
assert!(metrics.is_empty(), "metrics must drop on write failure");
// The alerter saw at least one Warn from AlertSource::Pod.
let mut saw_warn = false;
while let Ok(ev) = rx.try_recv() {
if let Event::Alert(a) = ev {
if a.level == AlertLevel::Warn
&& a.source == AlertSource::Pod
&& a.message.contains("metric")
{
saw_warn = true;
break;
}
}
}
assert!(saw_warn, "expected Warn/Pod alert about metric failure");
}
/// Sessions that have no metrics in the log restore cleanly: the
/// `RestoredState.extensions` simply contains no `metrics` domain
/// payloads, and `metrics_from_extensions` returns an empty Vec.

View File

@ -1,79 +0,0 @@
# セッションメトリクス: Extension 経由の汎用計測レーン
## 背景
セッション中の挙動を後から定量評価したい需要が増えている。直近の動機は Prune projection の効果測定どこでどれくらいの頻度で発火したか、KV キャッシュ無効化のコストに対して回収トークンがどれだけあったかだが、Compact 起動条件、Hook の実行時間、ツール呼び出しのリトライ回数など、同種の「セッション中に積み上げて後で引きたい」値は今後も発生する。
現状の session-log は `UserInput` / `AssistantItems` / `LlmUsage` といった状態遷移を典型化した variant のみで、ad hoc な計測値の置き場所がない。一方 `LogEntry::Extension { domain, payload }` という汎用エスケープハッチが既に用意されており、ここに乗せれば hash chain・replay 経路を流用できる。
## 方針
- `LogEntry::Extension``domain = "metrics"` 名前空間として実装する。session-store の型は触らない
- メトリクス型は `name + dimensions(sparse map) + value(Option) + correlation_id(Option)` 程度の最小スキーマ。値が測れない次元は `None` で明示し、Prometheus 的な厳格な label set は持たない
- 「後から埋まる値」(例: prune 発火直後の LLM 呼び出しで観測される `cache_read_tokens`)は前 entry に書き戻さず、`correlation_id` を共有する別 metric として流す。集計は読み手側で join
- 集計 / 可視化 API は本チケットでは作らない。session-log を読めば取り出せる、までを到達点とする
## 要件
### メトリクス型
- 専用 crateまたは既存の適切な配置に定義。`serde` で JSON ラウンドトリップ可能
- 必須: `name``namespace.metric` 形式の文字列、例: `prune.fire`)、`ts`u64 epoch ms
- 任意: `dimensions: BTreeMap<String, String>`、`value: Option<f64>`、`correlation_id: Option<String>`
- 「unknown」は対応フィールドを `None` にすることで表現。schema レベルで dimension の網羅性は要求しない
### 書き込み経路
- `Pod` から呼べる薄いヘルパー(例: `Pod::record_metric(&self, metric)`) を session-store に追加し、`LogEntry::Extension { domain: "metrics", payload: serde_json::to_value(metric) }` として append する
- 既存の `append_entry` フローを踏襲し、hash chain に乗る
- 書き込み失敗store IO エラー)はメトリクス側で握りつぶす。本体処理を阻害しない
### 読み出し経路
- replay 時、`RestoredState.extensions` に `("metrics", payload)` として既に積まれる(既存挙動)
- メトリクスドメイン側で payload を `Vec<Metric>` に fold するヘルパーを提供
- session-store のテストハーネスから「特定セッションの metric 列を取り出す」サンプルが書ける状態にする
### 最初の利用者: Prune projection
本チケットの完了は、最低 1 つの実利用者が乗っていることを条件とする。Prune を最初の利用者として組み込む:
- `pod::compact::prune``attach_prune` 経路で、projection 評価のたびに以下を発行
- 発火時: `name = "prune.fire"`, `dimensions = { border_turn, candidate_count }`, `value = estimated_savings`, `correlation_id = <次の LLM 呼び出しと紐付ける ID>`
- スキップ時: `name = "prune.skip"`, `dimensions = { reason }``below_min_savings` / `no_candidates` 等)
- 直後の LLM リクエストで `LlmUsage` が記録される際、同じ `correlation_id` を持つ補助 metric `prune.post_request` を併発し、`cache_read_tokens` / `cache_write_tokens` を value/dimension として記録
- `correlation_id` の生成・伝搬経路は実装側で決定。既存の request-id 系があれば再利用
### Resume 互換
- 旧セッションmetric entry を持たないログ)の replay は何も変えない。`extensions` に metrics domain が無いだけ
- payload schema が将来変わった場合、deserialize 失敗した metric は無視してよいfold ヘルパー側で `serde_json::from_value` の Err を skip
## 完了条件
- メトリクス型と書き込み / 読み出しヘルパーが定義され、unit test がある
- Prune projection から `prune.fire` / `prune.skip` / `prune.post_request` が session-log に乗る
- 既存セッションログの replay が壊れない(後方互換)
- セッションログから prune metric 列を取り出すテストが通る
- correlation_id で prune 発火と直後の LLM 呼び出しの cache 値が join できることを test で示す
## 範囲外
- 集計 / 可視化ツールCLI / TUI。後続で別途
- ワークスペースまたぎのメトリクス集約(複数セッション横断分析)
- リアルタイム購読 APIWatcher 経由の stream 配信)
- session-log 以外の sinkjsonl 別系統、外部時系列 DB 等)
- Prune 以外のメトリクス利用者の追加Compact / Hook 等は別チケット)
- メトリクス保存量の自動圧縮 / 退避
## 参照
- 設計指針: `CLAUDE.md`(最小の構造化 / 概念の追加は不在が問題になってから)
- `crates/session-store/src/session_log.rs``LogEntry::Extension` と `RestoredState.extensions` の既存仕様)
- `crates/llm-worker/src/usage_record.rs`、`crates/llm-worker/src/llm_client/event.rs`cache_read / cache_write の取得経路)
- `crates/pod/src/compact/prune.rs`、`crates/llm-worker/src/prune.rs`(最初の利用者の挿入点)
## Review
- 状態: Approve with follow-up
- レビュー詳細: [./session-metrics.review.md](./session-metrics.review.md)
- 日付: 2026-05-03

View File

@ -1,65 +0,0 @@
# Review: セッションメトリクス: Extension 経由の汎用計測レーン
## 前提・要件の確認
### メトリクス型
- 専用 crate に置き serde で round-trip できる: `crates/session-metrics/src/lib.rs:28-44`、`tests::metric_round_trip_via_json`。OK。
- 必須 `name` / `ts`、任意 `dimensions` / `value` / `correlation_id`、unknown は `None` で表現: 同 `lib.rs:28-44`、`metric_serializes_minimal_form_compactly` で省略形を確認。OK。
### 書き込み経路
- 薄いヘルパーがあり `LogEntry::Extension { domain: "metrics", payload }` で append: `crates/session-metrics/src/lib.rs:78-86``session_store::save_extension`。session-store 側は無改変(`git diff -- crates/session-store/` で空)。
- hash chain に乗る (`save_extension` 経由)、ticket 上の "session-store に追加" は専用 crate に置き換えており、ticket 文言の「専用 crateまたは既存の適切な配置」の許容範囲。OK。
- 書き込み失敗の握り潰し: **未対応(後述 Blocking。**
### 読み出し経路
- `metrics_from_extensions` で fold、deserialize 失敗 payload は skip: `crates/session-metrics/src/lib.rs:92-100`、`fold_skips_undeserializable_payloads` でカバー。OK。
- 「特定セッションの metric 列を取り出すサンプル」は `crates/pod/tests/session_metrics_test.rs:210-211``session_store::restore``metrics_from_extensions` の最小例として機能している。OK。
### 最初の利用者: Prune projection
- `attach_prune` から `prune.fire` / `prune.skip` を発行: `crates/pod/src/compact/prune.rs:52-80`
- `Fired` 時: `value=estimated_savings`, `correlation_id`(uuid v7), `candidate_count` + `border_turn` dim、`UsageTracker::note_correlation_id` で stash。
- `SkippedNoCandidates` / `SkippedBelowMinSavings` の 2 経路を分けている。
- `prune.post_request` が直後の `LlmUsage` と組で発行され同じ `correlation_id` を持つ: `crates/pod/src/pod.rs:1121-1156`
- `correlation_id` の生成は uuid v7既存 `uuid` workspace dep の v7 feature を再利用。OK。
### Resume 互換
- `[compaction]` 無し manifest で metrics が一切書かれない・replay も成功: `crates/pod/tests/session_metrics_test.rs:325-367`。OK。
### 完了条件
- 型 + 書き込み/読み出し + unit test: 4 件 (`crates/session-metrics/src/lib.rs:106-169`)。OK。
- prune.fire/skip/post_request が session-log に乗る: 統合テスト `prune_metrics_emit_skip_then_fire_with_post_request_join` で確認。
- 後方互換: `old_sessions_without_metrics_replay_cleanly` で確認。
- 「prune metric 列を取り出す」テスト: 同上の統合テストが兼ねる。
- correlation_id join: `prune_metrics_emit_skip_then_fire_with_post_request_join:258-276``fire.correlation_id == post.correlation_id`、`post.value == cache_read_input_tokens` を assert。OK。
## アーキテクチャ・スコープ
- session-store の型と公開 API は無改変(`git diff` 空)。前提を遵守している。
- `UsageTracker` 内部を `Vec<UsageRecord>``Vec<RecordedUsage>` に拡張したのは pod 内 `pub(crate)` のローカル拡張で、`llm_worker::UsageRecord` 型自体は触っていない。レイヤ上は問題なし(`session-store の型は触らない` の制約は守られている)。
- 新規 crate 名 `session-metrics` は memory ノートの「`insomnia-` プレフィックス不要、短い名前」に準拠。
- 依存追加は workspace.dependencies + `cargo add`手動編集の痕跡なし。OK。
- `Worker` への配線は `set_prune_observer` を追加する小規模な拡張で、prune 評価ロジック自体は `evaluate_candidates` の border_turn 返却を追加した以外はリファクタの域。`prunable_indices` を wrapper に薄く残しているのも既存呼び出し側を壊さない配慮で妥当。
- prune metric の dimension/value 振り分け(`prune.fire` の `value=estimated_savings`, `prune.post_request``value=cache_read_tokens`)は ticket の「value/dimension として記録」を素直に解釈したもので許容範囲。今後別 metric を生やすときに揃えやすいよう、value は「主スカラ(後で集計したい数)」/dimension は「軸filter したい文字列)」のポリシーに統一しておくとよい(コメントで明文化されているとなおよし)。
- LLM provider policy / ScopedFs scripting plan 等の他方針には抵触しない。
## 指摘事項
### Blocking
なし。
### Non-blocking / Follow-up
- **メトリクス書き込み失敗の握り潰し**: ticket 要件 `crates/.../tickets/session-metrics.md:29` に「書き込み失敗store IO エラー)はメトリクス側で握りつぶす。本体処理を阻害しない」とある一方、`crates/pod/src/pod.rs:1102-1110` および `1144-1150``record_metric``?``StoreError` を呼び出し元 (`persist_turn` → `Pod::run`) に伝播させている。ヘルパー自身のドキュメントも「書き込み失敗は呼び出し側に返す」(`crates/session-metrics/src/lib.rs:76-77`) で投げ直し前提なので、現状はメトリクス IO 失敗時に turn 永続化フロー全体が落ちる。挙動として「メトリクスのために本体処理を止めない」契約を満たしていない。
- 対応案: `persist_turn` 内で `record_metric(..).await` の戻り値を `if let Err(e) = ... { warn!(error=%e, "metric write failed; ignoring"); }` で握り、`LlmUsage` 永続化と直交させる。Helper の doc も「呼び出し側で握り潰すのが既定」に揃える。
- 重要度: ticket の明示要件であり本来 Blocking 候補だが、現状 store IO は LocalFsStore 一択で実害が出る経路が乏しく、後続の小修正で吸収可能と判断し follow-up に置く(ユーザーが Blocking 扱いに引き上げたい場合は了承)。
### Nits
- `crates/pod/src/compact/prune.rs:58-62``Fired` 時の `border_turn``if let Some(...)` で条件挿入しているが、`evaluate_candidates` の実装上 `Fired` / `SkippedBelowMinSavings` のときは必ず `Some` になる(候補が空でない=境界が決まる)。動作上問題はないが、不変条件を `expect("border_turn is Some when candidates exist")` で表に出すか、コメントで残すと意図が明確。
- `crates/pod/src/pod.rs:1102` 周辺のコメントにある「Ordered before LlmUsage so that a `prune.fire` and the `prune.post_request` derived from the matching usage record appear in the log close together.」は良い記述。同コメントを `metrics_tracker.rs``drain` 側にも一行張ると読み手が flush 順を把握しやすい。
- `Metric` の value/dimension のポリシー(主スカラ vs ラベル)について、`session-metrics/src/lib.rs` の crate doc に 1〜2 行追記しておくと、今後の利用者compact/hook 等)が振り分けで迷わない。
## 判断
**Approve with follow-up** — ticket 要件と完了条件の主要部分は概ね満たされている。session-store の型を一切触らず、新規 crate も命名・依存追加とも方針通り。特に correlation_id による fire ↔ post_request の join はテストで明示的に検証されており、一次目的prune 効果測定の最小レーン)は達成。一方で「メトリクス書き込み失敗を握り潰す」要件が `persist_turn` 経路で守られていない点は ticket の明文要件であり、後続コミットで吸収する想定で follow-up とするBlocking 扱いに昇格させたい場合はその判断に従う)。