diff --git a/TODO.md b/TODO.md index 025cd0a3..0b538aa6 100644 --- a/TODO.md +++ b/TODO.md @@ -8,7 +8,6 @@ - Pod: 任意ターンからの Fork(複数ターン巻き戻しを汎用化) → [tickets/pod-session-fork.md](tickets/pod-session-fork.md) - Pod: 子→親の TurnEnded/Errored callback を親由来ターンのみに絞る → [tickets/pod-parent-turn-callback.md](tickets/pod-parent-turn-callback.md) - Pod: セッションログをバックエンドにした Pod 単位の永続化 → [tickets/pod-persistent-state.md](tickets/pod-persistent-state.md) -- LogEntry を単数化 + interceptor を direct writer に → [tickets/log-entry-singular-and-direct-commit.md](tickets/log-entry-singular-and-direct-commit.md) - 永続化層のセマンティック整理 → [tickets/persistence-semantics.md](tickets/persistence-semantics.md) - Exchange / Turn / Call セマンティクス整理 → [tickets/exchange-turn-call-semantics.md](tickets/exchange-turn-call-semantics.md) - llm-worker のエラー耐性 diff --git a/crates/pod/src/controller.rs b/crates/pod/src/controller.rs index d92753ca..ff748ea6 100644 --- a/crates/pod/src/controller.rs +++ b/crates/pod/src/controller.rs @@ -616,7 +616,7 @@ async fn controller_loop( Method::Notify { message } => { // Client-side live echo is delivered as `Event::SystemItem` // once the interceptor commits the corresponding - // `LogEntry::SystemItems` entry — drained out of the + // `LogEntry::SystemItem` entry — drained out of the // notify buffer + broadcast through the sink. No // separate echo here. pod.push_notify(message); @@ -674,8 +674,8 @@ async fn controller_loop( // Live echo travels through the SystemItem lane: once // the interceptor drains the notify buffer, the // typed `SystemItem::PodEvent` lands as a - // `LogEntry::SystemItems` entry and the sink fans it - // out to clients as `Event::SystemItem`. + // `LogEntry::SystemItem` entry and the sink forwards it + // to clients as `Event::SystemItem`. // // (1) system side effects — idempotent and tolerant of // out-of-order delivery (e.g. `TurnEnded` arriving diff --git a/crates/pod/src/ipc/notify_buffer.rs b/crates/pod/src/ipc/notify_buffer.rs index f5e50477..488afa6b 100644 --- a/crates/pod/src/ipc/notify_buffer.rs +++ b/crates/pod/src/ipc/notify_buffer.rs @@ -5,8 +5,8 @@ //! `PodInterceptor::pending_history_appends`, which the Worker calls //! at the head of each turn loop iteration. The drain renders each //! pending entry into a typed `SystemItem` (with the `notify_wrapper` -//! prompt applied), commits a `LogEntry::SystemItems` through the -//! session-log sink, and returns the corresponding +//! prompt applied), commits a `LogEntry::SystemItem` per entry through +//! the session-log sink, and returns the corresponding //! `Item::system_message`s for the worker to append to its //! persistent history. //! diff --git a/crates/pod/src/ipc/server.rs b/crates/pod/src/ipc/server.rs index a6a2dd64..97f40129 100644 --- a/crates/pod/src/ipc/server.rs +++ b/crates/pod/src/ipc/server.rs @@ -115,9 +115,20 @@ async fn handle_connection(stream: tokio::net::UnixStream, handle: PodHandle) { .expect("SystemItem is Serialize"); Some(Event::SystemItem { item: value }) } - // Defensive: should never reach here per - // `SessionLogSink::is_live_relevant`. - _ => None, + other => { + // `SessionLogSink::is_live_relevant` keeps + // non-live-relevant variants off the + // broadcast lane; reaching here means the + // two are out of sync and we silently + // dropped a wire event. Log so a future + // regression surfaces instead of vanishing. + tracing::error!( + entry_kind = ?std::mem::discriminant(&other), + "session-log broadcast emitted a non-live-relevant entry; \ + sink filter and IPC dispatch are out of sync" + ); + None + } }; if let Some(event) = outbound { if writer.write(&event).await.is_err() { diff --git a/crates/pod/src/lib.rs b/crates/pod/src/lib.rs index 17c44114..24c379a3 100644 --- a/crates/pod/src/lib.rs +++ b/crates/pod/src/lib.rs @@ -31,5 +31,5 @@ pub use prompt::system::{SystemPromptContext, SystemPromptError, SystemPromptTem pub use protocol::{ErrorCode, Event, Method, PodStatus, TurnResult}; pub use provider::{ProviderError, build_client}; pub use runtime::dir::RuntimeDir; -pub use session_log_sink::{SessionLogSink, SessionLogWriter}; +pub use session_log_sink::SessionLogSink; pub use shared_state::PodSharedState; diff --git a/crates/pod/src/pod.rs b/crates/pod/src/pod.rs index ea576002..f6c7f2ee 100644 --- a/crates/pod/src/pod.rs +++ b/crates/pod/src/pod.rs @@ -55,22 +55,13 @@ pub struct SessionHead { /// All three fields are `Clone` (the latter two as `Arc` clones, the /// store per its `Clone` impl) so the handle itself is a flat triple of /// cheap copies. -pub struct LogWriterHandle { +#[derive(Clone)] +pub struct LogWriterHandle { pub store: St, pub session_head: Arc>, pub sink: SessionLogSink, } -impl Clone for LogWriterHandle { - fn clone(&self) -> Self { - Self { - store: self.store.clone(), - session_head: self.session_head.clone(), - sink: self.sink.clone(), - } - } -} - impl LogWriterHandle where St: Store + Clone, @@ -1125,7 +1116,7 @@ impl Pod { self.ensure_interceptor_installed(); self.ensure_system_prompt_materialized()?; self.cleanup_finished_memory_task(); - self.ensure_session_head().await?; + self.ensure_session_head()?; if self.should_pre_run_compact() { self.join_memory_task().await; } @@ -1520,7 +1511,7 @@ impl Pod { /// `ensure_system_prompt_materialized` has just rendered. Subsequent /// calls fall through to `ensure_head_or_fork`, which auto-forks when /// another writer has advanced the store head behind our back. - async fn ensure_session_head(&mut self) -> Result<(), PodError> { + fn ensure_session_head(&mut self) -> Result<(), PodError> { let w = self.worker.as_ref().unwrap(); let prev_session_id; let initial_state = { @@ -1545,7 +1536,6 @@ impl Pod { let store_head = self .store .read_head_hash(prev_session_id) - .map_err(PodError::from)?; let mut head = self.session_head.lock(); if store_head == head.head_hash { @@ -1571,7 +1561,6 @@ impl Pod { }; self.store .create_session(fork_id, &[hashed]) - .map_err(PodError::from)?; head.session_id = fork_id; head.head_hash = Some(hash); @@ -1715,22 +1704,12 @@ impl Pod { history_before: usize, result: &Result, ) -> Result<(), StoreError> { - // Per-item commits for AssistantItems / ToolResults / - // HookInjectedItems already landed mid-turn through the - // controller-spawned drain task, fed by - // `Worker::on_history_append`. Drain the queue here so every - // in-flight item has actually been committed before the - // trailing `TurnEnd` entry. When no drain is wired (low-level - // tests / direct `Pod::new` usage) we fall back to a synchronous - // pass that replicates the legacy `save_delta` classification — - // those code paths don't fire `on_history_append`, so the items - // would otherwise be lost. // Per-item commits for AssistantItem / ToolResult / SystemItem // entries are expected to have landed synchronously: the // worker `on_history_append` callback (wired by the controller // via `wire_history_persistence`) commits each appended item // directly through the writer, and the interceptor commits - // SystemItems up-front in `on_prompt_submit` / + // SystemItem entries up-front in `on_prompt_submit` / // `pending_history_appends` before returning the matching // `Item::system_message`s. // diff --git a/crates/pod/src/session_log_sink.rs b/crates/pod/src/session_log_sink.rs index 15c47666..9907fc81 100644 --- a/crates/pod/src/session_log_sink.rs +++ b/crates/pod/src/session_log_sink.rs @@ -24,10 +24,7 @@ use std::sync::{Arc, Mutex as StdMutex}; -use parking_lot::{Mutex, MutexGuard}; -use session_store::{ - EntryHash, HashedEntry, LogEntry, SessionId, SessionStartState, Store, StoreError, session_log, -}; +use session_store::LogEntry; use tokio::sync::broadcast; /// Broadcast capacity for the live receiver. Slow subscribers that @@ -194,197 +191,6 @@ impl Default for SessionLogSink { } } -/// Active session head for the Pod's persistent log: session id + -/// last-committed entry hash. Bundled with the store + sink in a -/// `SessionLogWriter` so the worker callback / interceptor can share -/// one cheap `Clone` handle for direct sync appends. -#[derive(Debug, Clone)] -pub struct SessionHeadState { - pub session_id: SessionId, - pub head_hash: Option, -} - -/// Pod-side session-log writer. -/// -/// Bundles the (1) persistent store, (2) the in-memory session-head -/// state (id + hash), and (3) the broadcast sink. `append_entry` -/// chains the hash on disk, advances the head, then publishes the -/// entry through the sink — under a single sync mutex so two writers -/// cannot interleave the chain. -/// -/// All append paths run synchronously: a local-fs `<1 KiB` JSONL line -/// completes well below a millisecond, and going through an async -/// `tokio::fs` ferry would re-introduce the `LogCommand` / drain task -/// we removed. `parking_lot::Mutex` is safe to hold across the disk -/// write since the lock is never crossed by an `.await`. -/// -/// `Clone` is a cheap `Arc` clone. The Pod keeps one writer for its -/// inline commits (UserInput, TurnEnd, Usage, RunCompleted/Errored, -/// scope snapshots, metrics) and hands clones to every other commit -/// site (worker callback, interceptor). -pub struct SessionLogWriter { - inner: Arc>, -} - -struct WriterInner { - store: St, - head: Mutex, - sink: SessionLogSink, -} - -impl Clone for SessionLogWriter { - fn clone(&self) -> Self { - Self { - inner: self.inner.clone(), - } - } -} - -impl SessionLogWriter -where - St: Store + Clone, -{ - /// Create a writer for a fresh Pod with no entries on disk yet. - /// `head_hash` is `None` until the first `append_entry` (typically - /// the deferred `SessionStart` written by `ensure_session_head`). - pub fn new(store: St, session_id: SessionId) -> Self { - Self { - inner: Arc::new(WriterInner { - store, - head: Mutex::new(SessionHeadState { - session_id, - head_hash: None, - }), - sink: SessionLogSink::new(), - }), - } - } - - /// Create a writer seeded with a session already on disk. The - /// mirror is populated with `mirror` (typically loaded via - /// `Store::read_all`), and `head_hash` should be the hash of the - /// last entry. - pub fn restored( - store: St, - session_id: SessionId, - head_hash: Option, - mirror: Vec, - ) -> Self { - Self { - inner: Arc::new(WriterInner { - store, - head: Mutex::new(SessionHeadState { - session_id, - head_hash, - }), - sink: SessionLogSink::with_initial(mirror), - }), - } - } - - /// Append `entry` to the log: disk write → in-memory mirror push → - /// broadcast — atomic w.r.t. `subscribe_with_snapshot` callers. - pub fn append_entry(&self, entry: LogEntry) -> Result { - let mut head = self.inner.head.lock(); - let hash = session_store::append_entry_with_hash( - &self.inner.store, - head.session_id, - &mut head.head_hash, - entry.clone(), - )?; - self.inner.sink.publish(entry); - Ok(hash) - } - - /// Atomically swap to a new compacted session. - /// - /// Creates the new session on disk with `initial` as its - /// `SessionStart`, advances the head, and resets the sink mirror - /// to `[initial]` while broadcasting the entry. Existing - /// subscribers observe the swap as a freshly broadcast - /// `SessionStart` (with `compacted_from` set), which is their - /// signal to reset their derived view. - pub fn swap_session( - &self, - new_session_id: SessionId, - initial: LogEntry, - ) -> Result { - let hash = session_log::compute_hash(None, &initial); - let hashed = HashedEntry { - hash: hash.clone(), - prev_hash: None, - entry: initial.clone(), - }; - self.inner.store.create_session(new_session_id, &[hashed])?; - let mut head = self.inner.head.lock(); - head.session_id = new_session_id; - head.head_hash = Some(hash.clone()); - self.inner.sink.reset_with_initial(initial); - Ok(hash) - } - - /// If the store's head no longer matches our cached head, mint a - /// fresh session that forks from the current state and switch to - /// it. Returns `true` when a fork happened. - pub fn ensure_head_or_fork(&self, state: SessionStartState<'_>) -> Result { - let mut head = self.inner.head.lock(); - let store_head = self.inner.store.read_head_hash(head.session_id)?; - if store_head == head.head_hash { - return Ok(false); - } - let fork_id = session_store::new_session_id(); - let entry = LogEntry::SessionStart { - ts: session_log::now_millis(), - system_prompt: state.system_prompt.map(String::from), - config: state.config.clone(), - history: session_store::to_logged(state.history), - forked_from: None, - compacted_from: None, - }; - let hash = session_log::compute_hash(None, &entry); - let hashed = HashedEntry { - hash: hash.clone(), - prev_hash: None, - entry: entry.clone(), - }; - self.inner.store.create_session(fork_id, &[hashed])?; - head.session_id = fork_id; - head.head_hash = Some(hash); - self.inner.sink.reset_with_initial(entry); - Ok(true) - } - - /// Cloneable handle to the broadcast sink. Used by the IPC layer - /// for `subscribe_with_snapshot` and by tests that just want the - /// non-write side. - pub fn sink(&self) -> SessionLogSink { - self.inner.sink.clone() - } - - /// Underlying store handle. Direct access is preserved for callers - /// that read state (`read_all`, `read_head_hash`) without going - /// through the writer's hash chain. - pub fn store(&self) -> &St { - &self.inner.store - } - - /// Cheap snapshot of the current session id. - pub fn current_session_id(&self) -> SessionId { - self.inner.head.lock().session_id - } - - /// Cheap snapshot of the current head hash. - pub fn current_head_hash(&self) -> Option { - self.inner.head.lock().head_hash.clone() - } - - /// Direct lock on the head. Used by paths that need to coordinate - /// custom writes with the hash chain. - pub fn lock_head(&self) -> MutexGuard<'_, SessionHeadState> { - self.inner.head.lock() - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/crates/protocol/src/lib.rs b/crates/protocol/src/lib.rs index eb51e148..bdb1dd6b 100644 --- a/crates/protocol/src/lib.rs +++ b/crates/protocol/src/lib.rs @@ -224,10 +224,10 @@ pub enum Event { /// of parsing free-text prefixes like `[Notification] …` or /// `[File: …]`. /// - /// Fired per-item, even when the underlying - /// `LogEntry::SystemItems` entry batched several together — the - /// IPC layer fans the batch out at broadcast time so subscribers - /// observe one event per item. + /// One event per `LogEntry::SystemItem` commit. Disk-side and + /// wire-side are 1:1 (singular variant); legacy `SystemItems` + /// entries from older sessions are read-only and never emitted on + /// this lane. SystemItem { item: serde_json::Value, }, diff --git a/crates/session-store/Cargo.toml b/crates/session-store/Cargo.toml index da16aafb..42a93e2f 100644 --- a/crates/session-store/Cargo.toml +++ b/crates/session-store/Cargo.toml @@ -7,10 +7,8 @@ license.workspace = true [dependencies] llm-worker = { workspace = true } -async-trait = { workspace = true } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } -tokio = { workspace = true, features = ["fs", "io-util"] } uuid = { workspace = true, features = ["v7", "serde"] } thiserror = { workspace = true } sha2 = { workspace = true } @@ -19,6 +17,7 @@ protocol = { workspace = true } tracing.workspace = true [dev-dependencies] +async-trait = { workspace = true } tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } tempfile = { workspace = true } futures = { workspace = true } diff --git a/crates/session-store/src/system_item.rs b/crates/session-store/src/system_item.rs index 8849f2b8..749ca0ae 100644 --- a/crates/session-store/src/system_item.rs +++ b/crates/session-store/src/system_item.rs @@ -8,8 +8,9 @@ //! `kind` instead of parsing text prefixes like `[Notification] …` or //! `[File: …]`. //! -//! Persisted as the payload of [`crate::LogEntry::SystemItems`], and -//! broadcast live as the payload of `Event::SystemItem` on the wire. +//! Persisted as the payload of [`crate::LogEntry::SystemItem`] (one +//! entry per item), and broadcast live as the payload of +//! `Event::SystemItem` on the wire. //! //! For LLM context replay, each `SystemItem` reduces to an //! `Item::system_message(...)` whose body matches the legacy free-text diff --git a/crates/tui/src/app.rs b/crates/tui/src/app.rs index 607ef82c..132117e5 100644 --- a/crates/tui/src/app.rs +++ b/crates/tui/src/app.rs @@ -967,6 +967,16 @@ impl App { self.blocks.push(Block::UserMessage { segments }); } } + session_store::LogEntry::AssistantItem { item, .. } + | session_store::LogEntry::ToolResult { item, .. } => { + let it: llm_worker::Item = item.into(); + let item_value = serde_json::to_value(&it).expect("Item is Serialize"); + self.push_history_item(&item_value); + } + session_store::LogEntry::SystemItem { item, .. } => { + let value = serde_json::to_value(&item).expect("SystemItem is Serialize"); + self.apply_system_item(&value); + } session_store::LogEntry::AssistantItems { items, .. } | session_store::LogEntry::ToolResults { items, .. } | session_store::LogEntry::HookInjectedItems { items, .. } => { diff --git a/tickets/log-entry-singular-and-direct-commit.md b/tickets/log-entry-singular-and-direct-commit.md deleted file mode 100644 index 00647a82..00000000 --- a/tickets/log-entry-singular-and-direct-commit.md +++ /dev/null @@ -1,103 +0,0 @@ -# Log writer を sync 化 + LogEntry を単数化 - -## 背景 - -`tickets/system-item-unify.md` で `Event::SystemItem` / `LogEntry::SystemItems` を導入し、 Notify / PodEvent / 各 ref 解決を 1 経路に統合した。 ただし実装の過程で 2 つの設計上の歪みが残った: - -**1. `LogEntry::SystemItems { items: Vec }` の複数形は実需要が無い** - -- 既存 `LogEntry::AssistantItems { items: Vec }` / `ToolResults { items: Vec }` の形に揃えて plural にしたが、 これは旧 `save_delta` の turn 末尾バッチ commit 時代の名残 -- 現在は worker の `on_history_append` callback が per-item で発火し、 drain task が 1 件ずつ classify+commit する形に移行済み。 `items: Vec<_>` の中身は常に 1 件 -- `SystemItems` も同様で、 interceptor が drain した N 件をまとめているだけで「グルーピング」 の意味はない (同 drain 機会に来ただけ) -- wire 側では IPC server が `items.into_iter().map(...)` で per-item の `Event::SystemItem` に fan-out しており、 LogEntry 上のバッチと wire 上の単発の非対称が発生している - -**2. `LogCommand::SystemItems` 追加が二重設計** - -- `LogCommand` は元来「worker の sync callback (`on_history_append`) を async commit に橋渡しする」 ための小さな ferry。 `Item + Flush` の 2 variant だけだった -- system items に typed kind 情報を運ぶために `LogCommand::SystemItems(Vec)` を増やしたが、 interceptor は async コンテキストなので sync→async の橋渡しは要らない。 単に「SystemItem を直接 commit する経路」 として LogCommand を流用しているだけで、 `LogCommand` の元来の責務 (sync→async ferry) を曖昧にしている - -**3. そもそも log 周りで async が必要だった理由はほぼ無い** - -log 周りで async になっている箇所を棚卸しすると: - -| 箇所 | 現在 async | 本当に async でないとダメか | -|------|-----------|---------------------------| -| `Store::append` (FsStore) | tokio::fs | ❌ `std::fs::OpenOptions` + `writeln!` で良い (1 行 append、 < 1KB、 < 1ms) | -| `Store::read_all` (restore 時) | tokio::fs::read_to_string | ❌ Pod 起動時 1 回、 hot path じゃない | -| `Store::read_head_hash` | tokio::fs | ❌ 同上 | -| `session_head` mutex | `tokio::sync::AsyncMutex` | ❌ async は不要、 sync mutex で十分 | -| `sink.publish` / `subscribe_with_snapshot` | sync (元から) | — | -| `broadcast::send` | sync (元から) | — | - -つまり log subsystem を async にしていた本当の理由は **tokio::fs を default 選択した点だけ**。 hash chain 維持のためでも性能のためでもない。 そして sync callback が async commit を呼べないという mismatch を解消するために `LogCommand` ferry を作る必要があった。 - -log writer を sync にすればこの mismatch そのものが消え、 `LogCommand` / drain task / Flush バリアは丸ごと不要になる。 - -## 方針 - -**A. log writer を sync に切り替える** - -- `Store::append` / `read_all` / `read_head_hash` を `std::fs` / `std::io` ベースの sync API に変更 (or sync 版を併設して段階移行) -- `session_head` を `tokio::sync::AsyncMutex` から `parking_lot::Mutex` (or `std::sync::Mutex`) に変更 -- `SessionLogWriter::append_entry()` を sync 関数に。 disk write の間 caller の thread が ms 未満ブロックする (local fs append、 < 1KB、 実害なし) -- Pod の `commit_entry().await` は `commit_entry()` (await 無し) に -- worker の `on_history_append` callback が直接 `writer.append_entry(classified_entry)` を呼ぶ -- interceptor も `Arc` を持って直接呼ぶ - -**B. `LogCommand` / drain task / Flush バリアの撤廃** - -- `LogCommand` enum を削除 -- `run_log_drain` 関数を削除 -- mpsc channel (`log_cmd_tx` / `log_cmd_rx`) を削除 -- `persist_turn` の Flush バリアを削除 (sync 書き込みなので順序は call 順で自然に決まる) -- worker callback で直接 sync 書き込みするので、 `Worker::on_history_append` 経由のフローはそのまま (sync closure) - -**C. LogEntry のアシスタント / ツール / システム系を単数バリアントに揃える** - -- 新名 (write side): - - `LogEntry::AssistantItem { ts, item: LoggedItem }` - - `LogEntry::ToolResult { ts, item: LoggedItem }` - - `LogEntry::SystemItem { ts, item: SystemItem }` -- 旧 plural variant (`AssistantItems` / `ToolResults` / `HookInjectedItems` / `SystemItems`) は **読み出し専用 (deserialize-only)** として残置。 既存セッションログを開けることは保証する -- `collect_state` は新旧両方をフラットに `state.history` に展開 -- 1 entry = 1 item になることで commit 経路が `LogEntry:: { item }` に直接 1:1 写像する -- wire 側の `Event::SystemItem` (per-item) と log 上の `LogEntry::SystemItem` (per-item) が完全対称になり、 IPC server の fan-out ロジックが消える - -ハッシュチェーン長は entry 数に比例して長くなるが、 hash chain 自体は別チケットで廃止予定なので長さの悪化は受け入れる。 - -## 要件 - -- `Store` trait の主要メソッド (`append` / `read_all` / `read_head_hash` / `create_session` 等) を sync API に統一する。 async 版を残す必要があるかは内部判断 (例: 起動時に大きい log を読む `read_all` だけ async を残す案もあり、 必要に応じて選択) -- `SessionLogWriter` (Pod 側ラッパ) を sync API に統一し、 hash chain 計算 + session_head 更新 + sink publish を 1 つの sync 関数内で行う -- `session_head` を sync mutex に変更 -- `LogCommand` / `LogDrainHandle::run_log_drain` / `log_cmd_tx` / `log_cmd_rx` / Flush バリアを削除 -- `PodInterceptor::on_prompt_submit` / `pending_history_appends` は `Arc` を直接呼んで per-`SystemItem` に commit。 完了後に `Item::system_message` を worker に返す -- worker の `on_history_append` callback が直接 `writer.append_entry(...)` を呼ぶ (sync) -- `LogEntry` に `AssistantItem` / `ToolResult` / `SystemItem` の単数 variant を追加し、 新規書き込みはこれを使う -- 旧 plural variant は read 専用として残し、 deserialize alias または独立 variant のどちらでも実装判断で良い。 `collect_state` で同じ形に reduce されること -- IPC server の `LogEntry::SystemItem` 受信は 1 件の `Event::SystemItem` をそのまま送る単純な対応に -- `cargo test --workspace` が pass する - -## 完了条件 - -- `LogCommand` / drain task / Flush バリアがコードから消えている -- worker の `on_history_append` callback が直接 sync writer を呼んでいる -- `PodInterceptor` が writer を直接呼んで SystemItem を commit している (mpsc 経由ではない) -- `Store` および `SessionLogWriter` の主要 commit API が sync 関数 -- `session_head` の mutex 型が `parking_lot::Mutex` または `std::sync::Mutex` -- 新規セッションログに `assistant_item` / `tool_result` / `system_item` (snake_case wire tag) が単数 entry として書かれている -- 旧 `assistant_items` / `tool_results` / `hook_injected_items` / `system_items` を含むセッションログが読めて view 復元できる -- IPC server で `LogEntry::SystemItem` を受けたら 1 件の `Event::SystemItem` を fan-out 無しで送るシンプルなマッピングに戻っている - -## 範囲外 - -- ハッシュチェーン自体の廃止 (entry hash, `prev_hash`, `HashedEntry`, `session_head` mutex の最終撤去) は `tickets/persistence-semantics.md` の責務領域。 本チケットはその直前段階として、 mutex を sync 化するところまで -- Session / Segment 階層への rename (`tickets/persistence-semantics.md`) -- 旧 plural entry の disk migration (ファイル書き換え)。 deserialize alias で読めるところまで -- `Event::SystemItem` payload の変更 (引き続き `serde_json::Value` で `SystemItem` の JSON 形) - -## 関連 - -- 前提 `tickets/system-item-unify.md` (本チケットで完成した SystemItem 経路を、 本チケットで単数化 + direct writer 化する後続) -- 関連 `tickets/pod-state-from-session-log.md` (per-item commit の drain task 経路を確立した前段。 本チケットでその drain task 自体を撤去する) -- 後続 `tickets/persistence-semantics.md` の「Entry hash の廃止」 セクション。 本チケットで sync 化したことが entry hash 廃止の足場になる