session-logリファクタのレビュー・修正

This commit is contained in:
Keisuke Hirata 2026-04-29 22:55:36 +09:00
parent de6b8faf55
commit 09d56272d8
7 changed files with 162 additions and 5 deletions

View File

@ -324,11 +324,9 @@ impl PodController {
// Broadcast the accepted user message so every
// subscriber (including the submitter) can
// render the turn header + user line from a
// single source of truth. Mirror the segments
// into shared_state so subsequent History fetches
// can re-attach them to the corresponding worker
// user_message item.
shared_state.push_user_segments(input.clone());
// single source of truth. shared_state's
// `user_segments` is re-synced from `pod` after
// the run completes, so we don't push here.
let _ = event_tx.send(Event::UserMessage {
segments: input.clone(),
});
@ -377,6 +375,7 @@ impl PodController {
let items = pod.worker().history().to_vec();
shared_state.update_history(items);
shared_state.set_user_segments(pod.user_segments().to_vec());
shared_state.set_status(new_status);
let _ = runtime_dir.write_status(&shared_state).await;
let _ = runtime_dir.write_history(&shared_state).await;
@ -435,6 +434,7 @@ impl PodController {
let items = pod.worker().history().to_vec();
shared_state.update_history(items);
shared_state.set_user_segments(pod.user_segments().to_vec());
shared_state.set_status(new_status);
let _ = runtime_dir.write_status(&shared_state).await;
let _ = runtime_dir.write_history(&shared_state).await;
@ -490,6 +490,7 @@ impl PodController {
let items = pod.worker().history().to_vec();
shared_state.update_history(items);
shared_state.set_user_segments(pod.user_segments().to_vec());
shared_state.set_status(new_status);
let _ = runtime_dir.write_status(&shared_state).await;
let _ = runtime_dir.write_history(&shared_state).await;
@ -587,6 +588,7 @@ impl PodController {
let items = pod.worker().history().to_vec();
shared_state.update_history(items);
shared_state.set_user_segments(pod.user_segments().to_vec());
shared_state.set_status(new_status);
let _ = runtime_dir.write_status(&shared_state).await;
let _ = runtime_dir.write_history(&shared_state).await;

View File

@ -1147,6 +1147,13 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
))
});
// Count surviving user_messages before consuming `retained_items`
// — needed to align `self.user_segments` after the swap below.
let retained_user_msgs = retained_items
.iter()
.filter(|i| i.is_user_message())
.count();
// Build new history: [summary, ...auto-read, references, ...retained].
let mut new_history = Vec::with_capacity(
1 + auto_read_messages.len()
@ -1197,6 +1204,19 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
if self.scope_allocation.is_some() {
pod_registry::update_session(&self.manifest.pod.name, new_session_id)?;
}
// Align user_segments with the post-compaction history. Items
// before `retain_from` (now folded into the summary) lose their
// segments; only the user_messages surviving in retained_items
// keep them. They are always the trailing K entries of
// `self.user_segments` because submissions are appended in order.
let drop_n = self
.user_segments
.len()
.saturating_sub(retained_user_msgs);
if drop_n > 0 {
self.user_segments.drain(..drop_n);
}
let worker = self.worker.as_mut().unwrap();
worker.set_history(new_history);
// Anchor the prompt cache at the summary item so that Anthropic

View File

@ -275,6 +275,37 @@ async fn agents_md_not_reread_after_compact() {
assert!(!after_third.contains("mutated"));
}
#[tokio::test]
async fn compact_aligns_user_segments_with_retained_history() {
// retained_tokens=0 folds the entire conversation into the summary,
// so retained_items has zero user_messages and self.user_segments
// must be drained to match. A subsequent run() then appends fresh
// segments cleanly without ghost entries from the pre-compaction era.
let client = MockClient::new(vec![
single_text_events("a"),
single_text_events("b"),
write_summary_tool_use_events("call-1", "compacted summary"),
single_text_events("done"),
single_text_events("c"),
]);
let (mut pod, _pwd) = make_pod_with_body("BODY", client).await.unwrap();
pod.run_text("first").await.unwrap();
pod.run_text("second").await.unwrap();
assert_eq!(pod.user_segments().len(), 2);
pod.compact(0).await.unwrap();
assert_eq!(
pod.user_segments().len(),
0,
"compact(0) folds every user_message into the summary, so segments \
must be drained to match retained_items"
);
pod.run_text("third").await.unwrap();
assert_eq!(pod.user_segments().len(), 1);
}
#[tokio::test]
async fn compact_preserves_system_prompt() {
let client = MockClient::new(vec![

View File

@ -80,3 +80,9 @@ worker → logged の変換で落ちる field が出るのは構わない(永
- 後続: `tickets/session-log-segments.md`
- 影響範囲: `crates/session-store/src/session_log.rs`, `crates/session-store/src/session.rs`, `crates/session-store/tests/*`
- 不変: `crates/llm-worker/src/llm_client/types.rs``Item` / `ContentPart` 等)、`crates/pod``save_delta` の呼び出し側)
## Review
- 状態: Approve
- レビュー詳細: [./session-log-decouple-item.review.md](./session-log-decouple-item.review.md)
- 日付: 2026-04-29

View File

@ -0,0 +1,38 @@
# Review: Session log の Item 依存を切り離す
## 前提・要件の確認
- **session-store crate が `LoggedItem` 系の独自型を export し、`Item` への依存が UserInput を除いて消える**
満たされている。`crates/session-store/src/logged_item.rs` に `LoggedItem` / `LoggedRole` / `LoggedContentPart` を新設し、`crates/session-store/src/lib.rs:31-43` で公開。`LogEntry` の `AssistantItems` / `ToolResults` / `HookInjectedItems` / `SessionStart.history``Vec<LoggedItem>` に置換済み(`session_log.rs:106,123,126,129`)。`UserInput` のみ後続チケットの責務として `Item` 参照が残るが、本チケットでは触らない方針通り。
- **`collect_state` で組まれた `RestoredState.history` が従来と同じ Item 列を返す**
満たされている。`session_log.rs:252,262,267,272` で `Item::from(LoggedItem)` を介して再構築。既存の worker / pod テスト(`cargo test --workspace` 全合格)と置換テスト群で確認済み。
- **`save_delta` の外側 API は変えず、内部で Item → LoggedItem 変換を通す**
満たされている。`session.rs:178` のシグネチャ `&[Item]` は不変、`to_logged()` 経由で永続化(`session.rs:202,221,232`)。
- **session-store の単体テストを新スキーマに合わせて書き換え**
満たされている。`fs_store_test.rs` / `session_test.rs``.into()` 経由に更新。
- **Round-trip テスト 1 本追加**
超過達成。`logged_item.rs` 内に 5 本の round-trip テストuser_message / tool_call / reasoning+ZDR / tool_result_with_content / kind タグ確認)を追加。
- **既存ビルド・全テストが新スキーマで合格**
満たされている。`cargo test --workspace` 全合格。
## アーキテクチャ・スコープ
- 永続スキーマと worker 内部型の分離を logged_item モジュールで完結させている。From/Into 変換は session-store の責務として閉じており、pod / worker から見れば save_delta の API は不変。レイヤ違反なし。
- LLM ZDR の制約Reasoning::encrypted_content の保持)が頭注(`logged_item.rs:11-13`)に明記され、テスト(`round_trip_reasoning_preserves_encrypted_content`でも担保されている。「replay に必要な field のみ持つ」という方針判断の妥当性を担保する記述として適切。
- `id: ItemId` / `status: ItemStatus` を意図的に落とし、replay 時に `None` で synthesize する方針はチケット記載と一致。output-side metadata と replay-side metadata の分離が綺麗に効いている。
- `LogEntry::Extension` などの既存ログ拡張点に手を入れていない点も適切(スコープ厳守)。
- session-store の `lib.rs``llm_worker::{Item, ContentPart, Role}` を再 export しているが、これは `RestoredState.history: Vec<Item>` を消費する側のために残している既存挙動の維持で、ticket の趣旨(`Item` 依存を「ログスキーマから」剥がす)には反しない。
## 指摘事項
### Non-blocking / Follow-up
- `lib.rs:45``pub use llm_worker::llm_client::types::{ContentPart, Item, Role};` は外部利用箇所が見当たらない(`grep` 結果。dead 再エクスポートは将来のクリーンアップ余地としてメモ。本チケットの範疇では削除不要。
### Nits
- `from_logged``pub` で公開されているが、現状 `collect_state` 経由の `iter().cloned().map(Into::into)` パターンしか使われていない。slice 版が必要になるまで public API として残す価値があるかは要観察。
## 判断
**Approve** — チケットの完了条件はすべて満たされ、round-trip テストも要件以上に厚く配置されている。スキーマ分離の方針通り Item 依存は UserInput を除いて消えた。後続 `session-log-segments` の前提が綺麗に整っている。

View File

@ -50,3 +50,9 @@ Pod が吐く `Event::History.items: Vec<serde_json::Value>` の user message
- `crates/pod/src/controller.rs``Event::UserMessage` broadcast 経路)
- `crates/tui/src/app.rs``restore_history` — 現状 segment を捨てている地点)
- `crates/protocol/src/lib.rs``Segment`, `Event::History`
## Review
- 状態: Request changes
- レビュー詳細: [./session-log-segments.review.md](./session-log-segments.review.md)
- 日付: 2026-04-29

View File

@ -0,0 +1,54 @@
# Review: セッションログの Segment 保持
## 前提・要件の確認
- **`LogEntry::UserInput` から `item: Item` を取り除き `segments: Vec<Segment>` のみ持つ**
満たされている。`session_log.rs:120` で置換済み。`collect_state` は `Segment::flatten_to_text` 経由で `Item::user_message(text)` を派生(`session_log.rs:254-258`)。
- **セッションログに Segment を persist し、`Pod` の入口で submit-time に直書き、`save_delta` は user_message を skip**
満たされている。`session.rs:148-164` に `save_user_input` 関数追加。`session.rs:188-190` で `is_user_message()` を skip。`pod.rs:608-615` で `Pod::run` の入口flatten 直前)に `save_user_input` を埋め込み、in-memory `user_segments` に push。
- **復元経路で client が segments を取り戻すTUI で paste チップが復元)**
実装済み。`ipc/server.rs:91-127` で `Method::GetHistory` 応答時に worker history JSON の user_message に `segments` フィールドを embed、`tui/src/app.rs:461-473` の `restore_history``segments` を読み出して `Block::UserMessage` を組み立てる。手動 UI 確認は未消化report 申告通り)。
- **worker / llm-client 層は変更しない**
守られている。`Segment::flatten_to_text` を `protocol` に追加した以外、worker / llm-client API に変更なし。
- **後方互換は持たない**
既存 jsonl の読み込みフォールバックは入れていない(適切)。
- **Round-trip テスト**
`replay_user_input_segments_round_trip` 追加(`session_log.rs:682-751`。Text/Paste/FileRef を含む混合 segments の JSON 往復+`flatten_to_text` 派生+`user_segments` 保持を一気に検証。テスト粒度は要件十分。
- **既存ビルド・テストが新スキーマで合格**
`cargo test --workspace` 全合格を確認。
## アーキテクチャ・スコープ
- `Segment::flatten_to_text``protocol` 側に純粋関数として抜き、Pod 側の `flatten_segments` を「アラート発火 + flatten_to_text 呼び出し」へ整理した分離(`pod.rs:636-682`は正しい。replay 経路がアラートを再発火させない設計が両立している。
- session-store が `protocol::Segment` に依存する構造は妥当。`Cargo.toml` には `cargo add` で追加されておりコミットの差分通り、依存方向session-store → protocolも既存の依存グラフ的に問題ない。
- `RestoredState.user_segments` を別フィールドとして並走させ、replay 経路では Item と Segment を二重管理する設計は、worker history の責務LLM への渡し物)と client 復元の責務typed atom 描画)の分離として理にかなっている。
- IPC server で `is_user_message()` 数と `user_segments` 数の差分から「seed history は最初に詰まれる」という性質を使った右寄せ整列(`ipc/server.rs:101-126`は、ticket 記載の前提("seed history は segments を持たない、live submission のみ持つ")の素直な実装。
## 指摘事項
### Blocking
- **`Pod::compact()` 後に `self.user_segments` がクリアされない**`pod.rs:1165-1224`
`compact()` は worker history を `[summary, ...retained_items]` に置換するが、`self.user_segments` は触られていない。プロセス継続中に compaction が走った場合、次の `Method::GetHistory``ipc/server.rs:103``total_user_msgs.saturating_sub(segments_per_user.len())` が想定外の値になり、segments と worker user_message が錯誤マッチする。
例: 5 user_msg を投げた後 compaction で 1 だけ retained のとき、`total=1, segs=5 → skip=0` となり、retained の user_msg に 1 番目の古い segments が貼られる。
対策案: `compact()` 内で `retained_items` 中の `is_user_message()` 件数 K を数え、`self.user_segments = self.user_segments.split_off(self.user_segments.len() - K)` 相当に切り詰めるK=0 なら clear。controller 側 `shared_state` も同期するため、compact 後に `shared_state.set_user_segments(pod.user_segments().to_vec())` を呼び直す経路が要る。
なお post-compaction 後にプロセスを再起動して restore する場合は、新セッションの jsonl に UserInput が無いので `state.user_segments` は空になり、自動的に整列が直る。問題は**プロセス継続中の compaction → 次の GetHistory** の窓に限定されるが、TUI の通常運用フローで踏みうる。
### Non-blocking / Follow-up
- **`save_user_input` 失敗時の shared_state ghost segment**`controller.rs:331` と `pod.rs:608-615` の順序)
Controller は `Method::Run` 受信直後に `shared_state.push_user_segments(input.clone())` してから run_future を await する。`save_user_input` が I/O エラーで失敗すると、`pod.user_segments` には push されないが `shared_state.user_segments` には残り、worker.history にも user_msg は積まれない(`save_user_input` の失敗で `pod.run` が早期 return するため)。次回 GetHistory で 1 段ずれる可能性。実害は store I/O 障害のレアケースに限定されるが、`run_future` 完了後に `shared_state.set_user_segments(pod.user_segments().to_vec())` で同期し直すのが安全。
- **seed history の segments 喪失**`pod.rs:294-295`, `session_log.rs:217-223` のコメント参照)
ticket の範囲外として明示済み・コメントでも記述されている設計判断。compaction を境に paste チップの典型 magenta 復元が単純テキストにフォールバックする挙動はユーザー視点で受容可能だが、UX として把握しておく価値あり。後続で必要なら `SessionStart.history` 側にも segments を持たせる拡張余地あり。
- **chain hash の確定性**
`Segment` の serde 実装は HashMap 不使用・field 順序固定で、`session_log.rs:466-499` の `hash_chain_is_deterministic` / `different_content_produces_different_hash` で確認済み。スキーマ変更で hash 値は別物になるが、新スキーマ内での安定性は壊れていない(要件通り)。
- 手動 UI 確認paste chip の magenta 復元の自動テスト化は未消化。session-store + flatten 経路は単体テストでカバーされているが、TUI の `restore_history` 経路は serde_json::Value からの復元アサーションが望ましい follow-up。
### Nits
- `pod.rs:615``self.user_segments.push(input.clone())` は直前で `save_user_input``input.clone()` を渡しているため `input` を 2 回 clone している。`Vec<Segment>` の clone はそこそこ重い(特に Paste の contentので、`save_user_input(... segments)` の引数を所有 `Vec<Segment>` のまま受けて push 後に消費するか、`save_user_input` を `&[Segment]` に変えてから push 一回にする選択肢。マイクロ最適化なので必須ではない。
- `ipc/server.rs:105-114` のコメント「seed user_messages always come first」は、`ensure_head_or_fork` の auto-fork 経路で発生しうる "seed + 一部 segments のあとさらに live segments" のケースまで明示してくれると将来の保守者に親切(実際には auto-fork 後も pod.user_segments は live 分のみ累積する形で右寄せが成立しているが、コメントを読むと一瞬不安になる)。
## 判断
**Request changes** — `Pod::compact()` 後に `self.user_segments` を切り詰めない件Blockingが、ticket の完了条件「現行の compaction / fork / restore のフローを壊さない」に直接抵触する。修正は数行shared_state 再同期で済む規模。それ以外の要件は満たされており、修正後は Approve 想定。