feat: session-metrics完了
This commit is contained in:
parent
a86f69fd8d
commit
3f987e9885
1
TODO.md
1
TODO.md
|
|
@ -16,6 +16,5 @@
|
|||
- メモリ機構
|
||||
- 使用頻度メトリクス + Knowledge 化候補レポート → [tickets/memory-usage-metrics.md](tickets/memory-usage-metrics.md)
|
||||
- セッション内 TODO ツール(注意機構付き) → [tickets/session-todo.md](tickets/session-todo.md)
|
||||
- セッションメトリクス: Extension 経由の汎用計測レーン(最初の利用者は Prune) → [tickets/session-metrics.md](tickets/session-metrics.md)
|
||||
- ワークスペースのメモリーをLintするヘッドレスCLI
|
||||
- system-reminder 注入機構の汎用化(2件目の利用者が出た時に検討。タグ形式と「履歴を汚さない」原則は session-todo で先行確立)
|
||||
|
|
|
|||
|
|
@ -454,6 +454,28 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
}
|
||||
}
|
||||
|
||||
/// Append a metric, swallowing errors so observability writes never
|
||||
/// fail the surrounding turn. On failure the head hash stays put
|
||||
/// (the entry is dropped) and a `Warn` alert + `tracing::warn!` are
|
||||
/// emitted so the failure isn't completely silent.
|
||||
async fn try_record_metric(&mut self, metric: &session_metrics::Metric) {
|
||||
if let Err(err) = session_metrics::record_metric(
|
||||
&self.store,
|
||||
self.session_id,
|
||||
&mut self.head_hash,
|
||||
metric,
|
||||
)
|
||||
.await
|
||||
{
|
||||
warn!(name = %metric.name, error = %err, "failed to record session metric; dropping");
|
||||
self.alert(
|
||||
AlertLevel::Warn,
|
||||
AlertSource::Pod,
|
||||
format!("failed to record metric `{}`: {}", metric.name, err),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// Broadcast a typed `Event` to connected clients. No-op when no
|
||||
/// `event_tx` is attached (tests / direct `Pod::new` usage) or when
|
||||
/// no clients are currently subscribed.
|
||||
|
|
@ -1099,15 +1121,16 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
// Ordered before LlmUsage so that a `prune.fire` and the
|
||||
// `prune.post_request` derived from the matching usage record
|
||||
// appear in the log close together.
|
||||
//
|
||||
// Metric writes are intentionally non-fatal: a failure here
|
||||
// surfaces as a `Warn` alert + `tracing::warn!` and the loop
|
||||
// continues. Metrics are observability data, not load-bearing
|
||||
// for run correctness, so a transient FS error must not poison
|
||||
// the turn record (`save_delta` / `save_turn_end` already landed
|
||||
// by this point, and `save_run_completed` still needs to land).
|
||||
let pending_metrics = self.metrics_tracker.drain();
|
||||
for metric in pending_metrics {
|
||||
session_metrics::record_metric(
|
||||
&self.store,
|
||||
self.session_id,
|
||||
&mut self.head_hash,
|
||||
&metric,
|
||||
)
|
||||
.await?;
|
||||
self.try_record_metric(&metric).await;
|
||||
}
|
||||
|
||||
// Persist any LLM Usage measurements collected during this run.
|
||||
|
|
@ -1141,13 +1164,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
.with_value(record.cache_read_tokens as f64)
|
||||
.with_dimension("cache_write_tokens", record.cache_write_tokens.to_string())
|
||||
.with_dimension("history_len", record.history_len.to_string());
|
||||
session_metrics::record_metric(
|
||||
&self.store,
|
||||
self.session_id,
|
||||
&mut self.head_hash,
|
||||
&metric,
|
||||
)
|
||||
.await?;
|
||||
self.try_record_metric(&metric).await;
|
||||
}
|
||||
self.usage_history
|
||||
.lock()
|
||||
|
|
|
|||
|
|
@ -28,7 +28,9 @@ use llm_worker::llm_client::event::{
|
|||
use llm_worker::llm_client::{ClientError, LlmClient, Request};
|
||||
use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput};
|
||||
use session_metrics::{DOMAIN, Metric, metrics_from_extensions};
|
||||
use session_store::FsStore;
|
||||
use session_store::{
|
||||
EntryHash, FsStore, HashedEntry, LogEntry, SessionId, Store, StoreError, TraceEntry,
|
||||
};
|
||||
|
||||
use pod::{Pod, PodManifest};
|
||||
|
||||
|
|
@ -317,6 +319,104 @@ async fn prune_metrics_record_below_min_savings_skip() {
|
|||
assert!(metrics.iter().all(|m| m.name != "prune.post_request"));
|
||||
}
|
||||
|
||||
/// `Store` wrapper that delegates to an inner `FsStore` for everything
|
||||
/// except `LogEntry::Extension { domain: "metrics", .. }` appends, which
|
||||
/// it rejects with an `Io` error. Lets us drive the `try_record_metric`
|
||||
/// failure path without affecting any other persistence write.
|
||||
#[derive(Clone)]
|
||||
struct MetricFailingStore {
|
||||
inner: FsStore,
|
||||
}
|
||||
|
||||
impl Store for MetricFailingStore {
|
||||
async fn append(&self, id: SessionId, entry: &HashedEntry) -> Result<(), StoreError> {
|
||||
if let LogEntry::Extension { domain, .. } = &entry.entry {
|
||||
if domain == DOMAIN {
|
||||
return Err(StoreError::Io(std::io::Error::other("synthetic failure")));
|
||||
}
|
||||
}
|
||||
self.inner.append(id, entry).await
|
||||
}
|
||||
async fn read_all(&self, id: SessionId) -> Result<Vec<HashedEntry>, StoreError> {
|
||||
self.inner.read_all(id).await
|
||||
}
|
||||
async fn list_sessions(&self) -> Result<Vec<SessionId>, StoreError> {
|
||||
self.inner.list_sessions().await
|
||||
}
|
||||
async fn create_session(
|
||||
&self,
|
||||
id: SessionId,
|
||||
entries: &[HashedEntry],
|
||||
) -> Result<(), StoreError> {
|
||||
self.inner.create_session(id, entries).await
|
||||
}
|
||||
async fn exists(&self, id: SessionId) -> Result<bool, StoreError> {
|
||||
self.inner.exists(id).await
|
||||
}
|
||||
async fn read_head_hash(&self, id: SessionId) -> Result<Option<EntryHash>, StoreError> {
|
||||
self.inner.read_head_hash(id).await
|
||||
}
|
||||
async fn append_trace(&self, id: SessionId, entry: &TraceEntry) -> Result<(), StoreError> {
|
||||
self.inner.append_trace(id, entry).await
|
||||
}
|
||||
}
|
||||
|
||||
/// Metric write failures are non-fatal: the run still completes, the
|
||||
/// session log carries no metric entries (drops), but a `Warn` alert
|
||||
/// fires on the alerter so the TUI surface picks it up.
|
||||
#[tokio::test]
|
||||
async fn metric_write_failure_emits_warn_alert_and_does_not_abort_run() {
|
||||
use protocol::{AlertLevel, AlertSource, Event};
|
||||
use tokio::sync::broadcast;
|
||||
|
||||
let manifest_toml = manifest_toml(1, 1);
|
||||
let manifest = PodManifest::from_toml(&manifest_toml).unwrap();
|
||||
let store_tmp = tempfile::tempdir().unwrap();
|
||||
let inner = FsStore::new(store_tmp.path()).await.unwrap();
|
||||
let store = MetricFailingStore { inner };
|
||||
let pwd_tmp = tempfile::tempdir().unwrap();
|
||||
let pwd = pwd_tmp.path().to_path_buf();
|
||||
let scope = pod::Scope::writable(&pwd).unwrap();
|
||||
|
||||
// Even with a tool registered, this run will only emit
|
||||
// `prune.skip { reason: "no_candidates" }` (one user message,
|
||||
// protected_turns=1 covers everything). That is enough to drive
|
||||
// the failure path: at least one metric attempts to write.
|
||||
let client = MockClient::new(vec![text_response_with_cache("hi", 0, 0)]);
|
||||
let worker = Worker::new(client);
|
||||
let mut pod = Pod::new(manifest, worker, store.clone(), pwd, scope)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let (tx, mut rx) = broadcast::channel::<Event>(64);
|
||||
let alerter = pod::Alerter::new(tx);
|
||||
pod.attach_alerter(alerter);
|
||||
|
||||
let session_id = pod.session_id();
|
||||
// Run completes successfully despite metric failure.
|
||||
pod.run_text("hello").await.unwrap();
|
||||
|
||||
// No metrics ended up in the log (writes were rejected).
|
||||
let state = session_store::restore(&store, session_id).await.unwrap();
|
||||
let metrics = metrics_from_extensions(&state.extensions);
|
||||
assert!(metrics.is_empty(), "metrics must drop on write failure");
|
||||
|
||||
// The alerter saw at least one Warn from AlertSource::Pod.
|
||||
let mut saw_warn = false;
|
||||
while let Ok(ev) = rx.try_recv() {
|
||||
if let Event::Alert(a) = ev {
|
||||
if a.level == AlertLevel::Warn
|
||||
&& a.source == AlertSource::Pod
|
||||
&& a.message.contains("metric")
|
||||
{
|
||||
saw_warn = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
assert!(saw_warn, "expected Warn/Pod alert about metric failure");
|
||||
}
|
||||
|
||||
/// Sessions that have no metrics in the log restore cleanly: the
|
||||
/// `RestoredState.extensions` simply contains no `metrics` domain
|
||||
/// payloads, and `metrics_from_extensions` returns an empty Vec.
|
||||
|
|
|
|||
|
|
@ -1,79 +0,0 @@
|
|||
# セッションメトリクス: Extension 経由の汎用計測レーン
|
||||
|
||||
## 背景
|
||||
|
||||
セッション中の挙動を後から定量評価したい需要が増えている。直近の動機は Prune projection の効果測定(どこでどれくらいの頻度で発火したか、KV キャッシュ無効化のコストに対して回収トークンがどれだけあったか)だが、Compact 起動条件、Hook の実行時間、ツール呼び出しのリトライ回数など、同種の「セッション中に積み上げて後で引きたい」値は今後も発生する。
|
||||
|
||||
現状の session-log は `UserInput` / `AssistantItems` / `LlmUsage` といった状態遷移を典型化した variant のみで、ad hoc な計測値の置き場所がない。一方 `LogEntry::Extension { domain, payload }` という汎用エスケープハッチが既に用意されており、ここに乗せれば hash chain・replay 経路を流用できる。
|
||||
|
||||
## 方針
|
||||
|
||||
- `LogEntry::Extension` の `domain = "metrics"` 名前空間として実装する。session-store の型は触らない
|
||||
- メトリクス型は `name + dimensions(sparse map) + value(Option) + correlation_id(Option)` 程度の最小スキーマ。値が測れない次元は `None` で明示し、Prometheus 的な厳格な label set は持たない
|
||||
- 「後から埋まる値」(例: prune 発火直後の LLM 呼び出しで観測される `cache_read_tokens`)は前 entry に書き戻さず、`correlation_id` を共有する別 metric として流す。集計は読み手側で join
|
||||
- 集計 / 可視化 API は本チケットでは作らない。session-log を読めば取り出せる、までを到達点とする
|
||||
|
||||
## 要件
|
||||
|
||||
### メトリクス型
|
||||
|
||||
- 専用 crate(または既存の適切な配置)に定義。`serde` で JSON ラウンドトリップ可能
|
||||
- 必須: `name`(`namespace.metric` 形式の文字列、例: `prune.fire`)、`ts`(u64 epoch ms)
|
||||
- 任意: `dimensions: BTreeMap<String, String>`、`value: Option<f64>`、`correlation_id: Option<String>`
|
||||
- 「unknown」は対応フィールドを `None` にすることで表現。schema レベルで dimension の網羅性は要求しない
|
||||
|
||||
### 書き込み経路
|
||||
|
||||
- `Pod` から呼べる薄いヘルパー(例: `Pod::record_metric(&self, metric)`) を session-store に追加し、`LogEntry::Extension { domain: "metrics", payload: serde_json::to_value(metric) }` として append する
|
||||
- 既存の `append_entry` フローを踏襲し、hash chain に乗る
|
||||
- 書き込み失敗(store IO エラー)はメトリクス側で握りつぶす。本体処理を阻害しない
|
||||
|
||||
### 読み出し経路
|
||||
|
||||
- replay 時、`RestoredState.extensions` に `("metrics", payload)` として既に積まれる(既存挙動)
|
||||
- メトリクスドメイン側で payload を `Vec<Metric>` に fold するヘルパーを提供
|
||||
- session-store のテストハーネスから「特定セッションの metric 列を取り出す」サンプルが書ける状態にする
|
||||
|
||||
### 最初の利用者: Prune projection
|
||||
|
||||
本チケットの完了は、最低 1 つの実利用者が乗っていることを条件とする。Prune を最初の利用者として組み込む:
|
||||
|
||||
- `pod::compact::prune` の `attach_prune` 経路で、projection 評価のたびに以下を発行
|
||||
- 発火時: `name = "prune.fire"`, `dimensions = { border_turn, candidate_count }`, `value = estimated_savings`, `correlation_id = <次の LLM 呼び出しと紐付ける ID>`
|
||||
- スキップ時: `name = "prune.skip"`, `dimensions = { reason }`(`below_min_savings` / `no_candidates` 等)
|
||||
- 直後の LLM リクエストで `LlmUsage` が記録される際、同じ `correlation_id` を持つ補助 metric `prune.post_request` を併発し、`cache_read_tokens` / `cache_write_tokens` を value/dimension として記録
|
||||
- `correlation_id` の生成・伝搬経路は実装側で決定。既存の request-id 系があれば再利用
|
||||
|
||||
### Resume 互換
|
||||
|
||||
- 旧セッション(metric entry を持たないログ)の replay は何も変えない。`extensions` に metrics domain が無いだけ
|
||||
- payload schema が将来変わった場合、deserialize 失敗した metric は無視してよい(fold ヘルパー側で `serde_json::from_value` の Err を skip)
|
||||
|
||||
## 完了条件
|
||||
|
||||
- メトリクス型と書き込み / 読み出しヘルパーが定義され、unit test がある
|
||||
- Prune projection から `prune.fire` / `prune.skip` / `prune.post_request` が session-log に乗る
|
||||
- 既存セッションログの replay が壊れない(後方互換)
|
||||
- セッションログから prune metric 列を取り出すテストが通る
|
||||
- correlation_id で prune 発火と直後の LLM 呼び出しの cache 値が join できることを test で示す
|
||||
|
||||
## 範囲外
|
||||
|
||||
- 集計 / 可視化ツール(CLI / TUI)。後続で別途
|
||||
- ワークスペースまたぎのメトリクス集約(複数セッション横断分析)
|
||||
- リアルタイム購読 API(Watcher 経由の stream 配信)
|
||||
- session-log 以外の sink(jsonl 別系統、外部時系列 DB 等)
|
||||
- Prune 以外のメトリクス利用者の追加(Compact / Hook 等は別チケット)
|
||||
- メトリクス保存量の自動圧縮 / 退避
|
||||
|
||||
## 参照
|
||||
|
||||
- 設計指針: `CLAUDE.md`(最小の構造化 / 概念の追加は不在が問題になってから)
|
||||
- `crates/session-store/src/session_log.rs`(`LogEntry::Extension` と `RestoredState.extensions` の既存仕様)
|
||||
- `crates/llm-worker/src/usage_record.rs`、`crates/llm-worker/src/llm_client/event.rs`(cache_read / cache_write の取得経路)
|
||||
- `crates/pod/src/compact/prune.rs`、`crates/llm-worker/src/prune.rs`(最初の利用者の挿入点)
|
||||
|
||||
## Review
|
||||
- 状態: Approve with follow-up
|
||||
- レビュー詳細: [./session-metrics.review.md](./session-metrics.review.md)
|
||||
- 日付: 2026-05-03
|
||||
|
|
@ -1,65 +0,0 @@
|
|||
# Review: セッションメトリクス: Extension 経由の汎用計測レーン
|
||||
|
||||
## 前提・要件の確認
|
||||
|
||||
### メトリクス型
|
||||
- 専用 crate に置き serde で round-trip できる: `crates/session-metrics/src/lib.rs:28-44`、`tests::metric_round_trip_via_json`。OK。
|
||||
- 必須 `name` / `ts`、任意 `dimensions` / `value` / `correlation_id`、unknown は `None` で表現: 同 `lib.rs:28-44`、`metric_serializes_minimal_form_compactly` で省略形を確認。OK。
|
||||
|
||||
### 書き込み経路
|
||||
- 薄いヘルパーがあり `LogEntry::Extension { domain: "metrics", payload }` で append: `crates/session-metrics/src/lib.rs:78-86` → `session_store::save_extension`。session-store 側は無改変(`git diff -- crates/session-store/` で空)。
|
||||
- hash chain に乗る (`save_extension` 経由)、ticket 上の "session-store に追加" は専用 crate に置き換えており、ticket 文言の「専用 crate(または既存の適切な配置)」の許容範囲。OK。
|
||||
- 書き込み失敗の握り潰し: **未対応(後述 Blocking)。**
|
||||
|
||||
### 読み出し経路
|
||||
- `metrics_from_extensions` で fold、deserialize 失敗 payload は skip: `crates/session-metrics/src/lib.rs:92-100`、`fold_skips_undeserializable_payloads` でカバー。OK。
|
||||
- 「特定セッションの metric 列を取り出すサンプル」は `crates/pod/tests/session_metrics_test.rs:210-211` が `session_store::restore` → `metrics_from_extensions` の最小例として機能している。OK。
|
||||
|
||||
### 最初の利用者: Prune projection
|
||||
- `attach_prune` から `prune.fire` / `prune.skip` を発行: `crates/pod/src/compact/prune.rs:52-80`。
|
||||
- `Fired` 時: `value=estimated_savings`, `correlation_id`(uuid v7), `candidate_count` + `border_turn` dim、`UsageTracker::note_correlation_id` で stash。
|
||||
- `SkippedNoCandidates` / `SkippedBelowMinSavings` の 2 経路を分けている。
|
||||
- `prune.post_request` が直後の `LlmUsage` と組で発行され同じ `correlation_id` を持つ: `crates/pod/src/pod.rs:1121-1156`。
|
||||
- `correlation_id` の生成は uuid v7(既存 `uuid` workspace dep の v7 feature を再利用)。OK。
|
||||
|
||||
### Resume 互換
|
||||
- `[compaction]` 無し manifest で metrics が一切書かれない・replay も成功: `crates/pod/tests/session_metrics_test.rs:325-367`。OK。
|
||||
|
||||
### 完了条件
|
||||
- 型 + 書き込み/読み出し + unit test: 4 件 (`crates/session-metrics/src/lib.rs:106-169`)。OK。
|
||||
- prune.fire/skip/post_request が session-log に乗る: 統合テスト `prune_metrics_emit_skip_then_fire_with_post_request_join` で確認。
|
||||
- 後方互換: `old_sessions_without_metrics_replay_cleanly` で確認。
|
||||
- 「prune metric 列を取り出す」テスト: 同上の統合テストが兼ねる。
|
||||
- correlation_id join: `prune_metrics_emit_skip_then_fire_with_post_request_join:258-276` で `fire.correlation_id == post.correlation_id`、`post.value == cache_read_input_tokens` を assert。OK。
|
||||
|
||||
## アーキテクチャ・スコープ
|
||||
|
||||
- session-store の型と公開 API は無改変(`git diff` 空)。前提を遵守している。
|
||||
- `UsageTracker` 内部を `Vec<UsageRecord>` → `Vec<RecordedUsage>` に拡張したのは pod 内 `pub(crate)` のローカル拡張で、`llm_worker::UsageRecord` 型自体は触っていない。レイヤ上は問題なし(`session-store の型は触らない` の制約は守られている)。
|
||||
- 新規 crate 名 `session-metrics` は memory ノートの「`insomnia-` プレフィックス不要、短い名前」に準拠。
|
||||
- 依存追加は workspace.dependencies + `cargo add` 流(手動編集の痕跡なし)。OK。
|
||||
- `Worker` への配線は `set_prune_observer` を追加する小規模な拡張で、prune 評価ロジック自体は `evaluate_candidates` の border_turn 返却を追加した以外はリファクタの域。`prunable_indices` を wrapper に薄く残しているのも既存呼び出し側を壊さない配慮で妥当。
|
||||
- prune metric の dimension/value 振り分け(`prune.fire` の `value=estimated_savings`, `prune.post_request` の `value=cache_read_tokens`)は ticket の「value/dimension として記録」を素直に解釈したもので許容範囲。今後別 metric を生やすときに揃えやすいよう、value は「主スカラ(後で集計したい数)」/dimension は「軸(filter したい文字列)」のポリシーに統一しておくとよい(コメントで明文化されているとなおよし)。
|
||||
- LLM provider policy / ScopedFs scripting plan 等の他方針には抵触しない。
|
||||
|
||||
## 指摘事項
|
||||
|
||||
### Blocking
|
||||
|
||||
なし。
|
||||
|
||||
### Non-blocking / Follow-up
|
||||
|
||||
- **メトリクス書き込み失敗の握り潰し**: ticket 要件 `crates/.../tickets/session-metrics.md:29` に「書き込み失敗(store IO エラー)はメトリクス側で握りつぶす。本体処理を阻害しない」とある一方、`crates/pod/src/pod.rs:1102-1110` および `1144-1150` の `record_metric` は `?` で `StoreError` を呼び出し元 (`persist_turn` → `Pod::run`) に伝播させている。ヘルパー自身のドキュメントも「書き込み失敗は呼び出し側に返す」(`crates/session-metrics/src/lib.rs:76-77`) で投げ直し前提なので、現状はメトリクス IO 失敗時に turn 永続化フロー全体が落ちる。挙動として「メトリクスのために本体処理を止めない」契約を満たしていない。
|
||||
- 対応案: `persist_turn` 内で `record_metric(..).await` の戻り値を `if let Err(e) = ... { warn!(error=%e, "metric write failed; ignoring"); }` で握り、`LlmUsage` 永続化と直交させる。Helper の doc も「呼び出し側で握り潰すのが既定」に揃える。
|
||||
- 重要度: ticket の明示要件であり本来 Blocking 候補だが、現状 store IO は LocalFsStore 一択で実害が出る経路が乏しく、後続の小修正で吸収可能と判断し follow-up に置く(ユーザーが Blocking 扱いに引き上げたい場合は了承)。
|
||||
|
||||
### Nits
|
||||
|
||||
- `crates/pod/src/compact/prune.rs:58-62` で `Fired` 時の `border_turn` を `if let Some(...)` で条件挿入しているが、`evaluate_candidates` の実装上 `Fired` / `SkippedBelowMinSavings` のときは必ず `Some` になる(候補が空でない=境界が決まる)。動作上問題はないが、不変条件を `expect("border_turn is Some when candidates exist")` で表に出すか、コメントで残すと意図が明確。
|
||||
- `crates/pod/src/pod.rs:1102` 周辺のコメントにある「Ordered before LlmUsage so that a `prune.fire` and the `prune.post_request` derived from the matching usage record appear in the log close together.」は良い記述。同コメントを `metrics_tracker.rs` の `drain` 側にも一行張ると読み手が flush 順を把握しやすい。
|
||||
- `Metric` の value/dimension のポリシー(主スカラ vs ラベル)について、`session-metrics/src/lib.rs` の crate doc に 1〜2 行追記しておくと、今後の利用者(compact/hook 等)が振り分けで迷わない。
|
||||
|
||||
## 判断
|
||||
|
||||
**Approve with follow-up** — ticket 要件と完了条件の主要部分は概ね満たされている。session-store の型を一切触らず、新規 crate も命名・依存追加とも方針通り。特に correlation_id による fire ↔ post_request の join はテストで明示的に検証されており、一次目的(prune 効果測定の最小レーン)は達成。一方で「メトリクス書き込み失敗を握り潰す」要件が `persist_turn` 経路で守られていない点は ticket の明文要件であり、後続コミットで吸収する想定で follow-up とする(Blocking 扱いに昇格させたい場合はその判断に従う)。
|
||||
Loading…
Reference in New Issue
Block a user