ticket: 書き込みのsync化を計画
This commit is contained in:
parent
fe9cecb51a
commit
112ccb2365
1
TODO.md
1
TODO.md
|
|
@ -8,6 +8,7 @@
|
|||
- Pod: 任意ターンからの Fork(複数ターン巻き戻しを汎用化) → [tickets/pod-session-fork.md](tickets/pod-session-fork.md)
|
||||
- Pod: 子→親の TurnEnded/Errored callback を親由来ターンのみに絞る → [tickets/pod-parent-turn-callback.md](tickets/pod-parent-turn-callback.md)
|
||||
- Pod: セッションログをバックエンドにした Pod 単位の永続化 → [tickets/pod-persistent-state.md](tickets/pod-persistent-state.md)
|
||||
- LogEntry を単数化 + interceptor を direct writer に → [tickets/log-entry-singular-and-direct-commit.md](tickets/log-entry-singular-and-direct-commit.md)
|
||||
- 永続化層のセマンティック整理 → [tickets/persistence-semantics.md](tickets/persistence-semantics.md)
|
||||
- Exchange / Turn / Call セマンティクス整理 → [tickets/exchange-turn-call-semantics.md](tickets/exchange-turn-call-semantics.md)
|
||||
- llm-worker のエラー耐性
|
||||
|
|
|
|||
103
tickets/log-entry-singular-and-direct-commit.md
Normal file
103
tickets/log-entry-singular-and-direct-commit.md
Normal file
|
|
@ -0,0 +1,103 @@
|
|||
# Log writer を sync 化 + LogEntry を単数化
|
||||
|
||||
## 背景
|
||||
|
||||
`tickets/system-item-unify.md` で `Event::SystemItem` / `LogEntry::SystemItems` を導入し、 Notify / PodEvent / 各 ref 解決を 1 経路に統合した。 ただし実装の過程で 2 つの設計上の歪みが残った:
|
||||
|
||||
**1. `LogEntry::SystemItems { items: Vec<SystemItem> }` の複数形は実需要が無い**
|
||||
|
||||
- 既存 `LogEntry::AssistantItems { items: Vec<LoggedItem> }` / `ToolResults { items: Vec<LoggedItem> }` の形に揃えて plural にしたが、 これは旧 `save_delta` の turn 末尾バッチ commit 時代の名残
|
||||
- 現在は worker の `on_history_append` callback が per-item で発火し、 drain task が 1 件ずつ classify+commit する形に移行済み。 `items: Vec<_>` の中身は常に 1 件
|
||||
- `SystemItems` も同様で、 interceptor が drain した N 件をまとめているだけで「グルーピング」 の意味はない (同 drain 機会に来ただけ)
|
||||
- wire 側では IPC server が `items.into_iter().map(...)` で per-item の `Event::SystemItem` に fan-out しており、 LogEntry 上のバッチと wire 上の単発の非対称が発生している
|
||||
|
||||
**2. `LogCommand::SystemItems` 追加が二重設計**
|
||||
|
||||
- `LogCommand` は元来「worker の sync callback (`on_history_append`) を async commit に橋渡しする」 ための小さな ferry。 `Item + Flush` の 2 variant だけだった
|
||||
- system items に typed kind 情報を運ぶために `LogCommand::SystemItems(Vec<SystemItem>)` を増やしたが、 interceptor は async コンテキストなので sync→async の橋渡しは要らない。 単に「SystemItem を直接 commit する経路」 として LogCommand を流用しているだけで、 `LogCommand` の元来の責務 (sync→async ferry) を曖昧にしている
|
||||
|
||||
**3. そもそも log 周りで async が必要だった理由はほぼ無い**
|
||||
|
||||
log 周りで async になっている箇所を棚卸しすると:
|
||||
|
||||
| 箇所 | 現在 async | 本当に async でないとダメか |
|
||||
|------|-----------|---------------------------|
|
||||
| `Store::append` (FsStore) | tokio::fs | ❌ `std::fs::OpenOptions` + `writeln!` で良い (1 行 append、 < 1KB、 < 1ms) |
|
||||
| `Store::read_all` (restore 時) | tokio::fs::read_to_string | ❌ Pod 起動時 1 回、 hot path じゃない |
|
||||
| `Store::read_head_hash` | tokio::fs | ❌ 同上 |
|
||||
| `session_head` mutex | `tokio::sync::AsyncMutex` | ❌ async は不要、 sync mutex で十分 |
|
||||
| `sink.publish` / `subscribe_with_snapshot` | sync (元から) | — |
|
||||
| `broadcast::send` | sync (元から) | — |
|
||||
|
||||
つまり log subsystem を async にしていた本当の理由は **tokio::fs を default 選択した点だけ**。 hash chain 維持のためでも性能のためでもない。 そして sync callback が async commit を呼べないという mismatch を解消するために `LogCommand` ferry を作る必要があった。
|
||||
|
||||
log writer を sync にすればこの mismatch そのものが消え、 `LogCommand` / drain task / Flush バリアは丸ごと不要になる。
|
||||
|
||||
## 方針
|
||||
|
||||
**A. log writer を sync に切り替える**
|
||||
|
||||
- `Store::append` / `read_all` / `read_head_hash` を `std::fs` / `std::io` ベースの sync API に変更 (or sync 版を併設して段階移行)
|
||||
- `session_head` を `tokio::sync::AsyncMutex` から `parking_lot::Mutex` (or `std::sync::Mutex`) に変更
|
||||
- `SessionLogWriter::append_entry()` を sync 関数に。 disk write の間 caller の thread が ms 未満ブロックする (local fs append、 < 1KB、 実害なし)
|
||||
- Pod の `commit_entry().await` は `commit_entry()` (await 無し) に
|
||||
- worker の `on_history_append` callback が直接 `writer.append_entry(classified_entry)` を呼ぶ
|
||||
- interceptor も `Arc<SessionLogWriter>` を持って直接呼ぶ
|
||||
|
||||
**B. `LogCommand` / drain task / Flush バリアの撤廃**
|
||||
|
||||
- `LogCommand` enum を削除
|
||||
- `run_log_drain` 関数を削除
|
||||
- mpsc channel (`log_cmd_tx` / `log_cmd_rx`) を削除
|
||||
- `persist_turn` の Flush バリアを削除 (sync 書き込みなので順序は call 順で自然に決まる)
|
||||
- worker callback で直接 sync 書き込みするので、 `Worker::on_history_append` 経由のフローはそのまま (sync closure)
|
||||
|
||||
**C. LogEntry のアシスタント / ツール / システム系を単数バリアントに揃える**
|
||||
|
||||
- 新名 (write side):
|
||||
- `LogEntry::AssistantItem { ts, item: LoggedItem }`
|
||||
- `LogEntry::ToolResult { ts, item: LoggedItem }`
|
||||
- `LogEntry::SystemItem { ts, item: SystemItem }`
|
||||
- 旧 plural variant (`AssistantItems` / `ToolResults` / `HookInjectedItems` / `SystemItems`) は **読み出し専用 (deserialize-only)** として残置。 既存セッションログを開けることは保証する
|
||||
- `collect_state` は新旧両方をフラットに `state.history` に展開
|
||||
- 1 entry = 1 item になることで commit 経路が `LogEntry::<Singular> { item }` に直接 1:1 写像する
|
||||
- wire 側の `Event::SystemItem` (per-item) と log 上の `LogEntry::SystemItem` (per-item) が完全対称になり、 IPC server の fan-out ロジックが消える
|
||||
|
||||
ハッシュチェーン長は entry 数に比例して長くなるが、 hash chain 自体は別チケットで廃止予定なので長さの悪化は受け入れる。
|
||||
|
||||
## 要件
|
||||
|
||||
- `Store` trait の主要メソッド (`append` / `read_all` / `read_head_hash` / `create_session` 等) を sync API に統一する。 async 版を残す必要があるかは内部判断 (例: 起動時に大きい log を読む `read_all` だけ async を残す案もあり、 必要に応じて選択)
|
||||
- `SessionLogWriter` (Pod 側ラッパ) を sync API に統一し、 hash chain 計算 + session_head 更新 + sink publish を 1 つの sync 関数内で行う
|
||||
- `session_head` を sync mutex に変更
|
||||
- `LogCommand` / `LogDrainHandle::run_log_drain` / `log_cmd_tx` / `log_cmd_rx` / Flush バリアを削除
|
||||
- `PodInterceptor::on_prompt_submit` / `pending_history_appends` は `Arc<SessionLogWriter>` を直接呼んで per-`SystemItem` に commit。 完了後に `Item::system_message` を worker に返す
|
||||
- worker の `on_history_append` callback が直接 `writer.append_entry(...)` を呼ぶ (sync)
|
||||
- `LogEntry` に `AssistantItem` / `ToolResult` / `SystemItem` の単数 variant を追加し、 新規書き込みはこれを使う
|
||||
- 旧 plural variant は read 専用として残し、 deserialize alias または独立 variant のどちらでも実装判断で良い。 `collect_state` で同じ形に reduce されること
|
||||
- IPC server の `LogEntry::SystemItem` 受信は 1 件の `Event::SystemItem` をそのまま送る単純な対応に
|
||||
- `cargo test --workspace` が pass する
|
||||
|
||||
## 完了条件
|
||||
|
||||
- `LogCommand` / drain task / Flush バリアがコードから消えている
|
||||
- worker の `on_history_append` callback が直接 sync writer を呼んでいる
|
||||
- `PodInterceptor` が writer を直接呼んで SystemItem を commit している (mpsc 経由ではない)
|
||||
- `Store` および `SessionLogWriter` の主要 commit API が sync 関数
|
||||
- `session_head` の mutex 型が `parking_lot::Mutex` または `std::sync::Mutex`
|
||||
- 新規セッションログに `assistant_item` / `tool_result` / `system_item` (snake_case wire tag) が単数 entry として書かれている
|
||||
- 旧 `assistant_items` / `tool_results` / `hook_injected_items` / `system_items` を含むセッションログが読めて view 復元できる
|
||||
- IPC server で `LogEntry::SystemItem` を受けたら 1 件の `Event::SystemItem` を fan-out 無しで送るシンプルなマッピングに戻っている
|
||||
|
||||
## 範囲外
|
||||
|
||||
- ハッシュチェーン自体の廃止 (entry hash, `prev_hash`, `HashedEntry`, `session_head` mutex の最終撤去) は `tickets/persistence-semantics.md` の責務領域。 本チケットはその直前段階として、 mutex を sync 化するところまで
|
||||
- Session / Segment 階層への rename (`tickets/persistence-semantics.md`)
|
||||
- 旧 plural entry の disk migration (ファイル書き換え)。 deserialize alias で読めるところまで
|
||||
- `Event::SystemItem` payload の変更 (引き続き `serde_json::Value` で `SystemItem` の JSON 形)
|
||||
|
||||
## 関連
|
||||
|
||||
- 前提 `tickets/system-item-unify.md` (本チケットで完成した SystemItem 経路を、 本チケットで単数化 + direct writer 化する後続)
|
||||
- 関連 `tickets/pod-state-from-session-log.md` (per-item commit の drain task 経路を確立した前段。 本チケットでその drain task 自体を撤去する)
|
||||
- 後続 `tickets/persistence-semantics.md` の「Entry hash の廃止」 セクション。 本チケットで sync 化したことが entry hash 廃止の足場になる
|
||||
|
|
@ -79,6 +79,23 @@ llm-worker は session 概念を持たない(`Worker` は `history` / `turn_co
|
|||
- `SessionOrigin.at_hash` → `at_turn_index` (TurnEnd 由来) に置換。
|
||||
- `ensure_head_or_fork` の検知ロジックは、Segment 末尾の terminal marker entry または末尾 seq 比較に置換(形式は実装時に決める)。
|
||||
|
||||
### 廃止前の足場 (前提)
|
||||
|
||||
本セクションを実装に移すタイミングでは、log writer が既に sync 化されていることを前提にする (`tickets/log-entry-singular-and-direct-commit.md`)。具体的には:
|
||||
|
||||
- `Store::append` / `read_all` 等が `std::fs` ベースの sync API
|
||||
- `SessionLogWriter::append_entry()` が sync 関数
|
||||
- `session_head` mutex は `parking_lot::Mutex` / `std::sync::Mutex`
|
||||
- `LogCommand` / drain task / Flush バリアは既に撤廃済み
|
||||
|
||||
この状態で hash chain を廃止すると追加で取れる単純化:
|
||||
|
||||
- **`session_head` mutex そのものを撤去できる**。 hash chain が無いので「`head_hash` を直前 entry から取得して次に渡す」 という serialize 必須の依存が消える。 1 行 < `PIPE_BUF` (Linux 4KB) の `O_APPEND` write は kernel 側で atomic に直列化されるので、 user space で mutex を持つ必要が無い
|
||||
- `session_head` が消えると Pod / interceptor / worker callback が writer ハンドルだけ持てば良くなる。 `Arc<SessionLogWriter>` は単に `Arc<Store> + sink` を抱えるだけの値で、 hot-path の競合がない
|
||||
- `compute_hash` 呼び出しが消える分、 append が serialize + open + write + close の 3 syscall まで詰まる
|
||||
|
||||
つまり「sync 化」 が先に来て、 「hash 廃止」 で mutex まで消える、 という 2 段階の単純化になる。
|
||||
|
||||
## Fork: 2 種類の書き込み方
|
||||
|
||||
Session 境界の話ではなく **元 Segment への marker 書き込みの有無**で 2 種類を分ける。Session はどちらの場合も同じで、新 Segment が同 Session 内に生える。
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user