"replay"という説明が不適切だったため修正

This commit is contained in:
Keisuke Hirata 2026-04-05 05:20:23 +09:00
parent 00e3ae1932
commit f66fb29f5c
6 changed files with 27 additions and 27 deletions

View File

@ -1,7 +1,7 @@
//! Debug-only raw stream event recording. //! Debug-only raw stream event recording.
//! //!
//! [`TraceEntry`] captures every LLM stream event verbatim for debugging //! [`TraceEntry`] captures every LLM stream event verbatim for debugging
//! and replay analysis. Written to a separate `.trace.jsonl` file, //! and post-hoc analysis. Written to a separate `.trace.jsonl` file,
//! completely independent of the session log used for state restoration. //! completely independent of the session log used for state restoration.
//! //!
//! Disabled by default. Enable via `SessionConfig::record_event_trace`. //! Disabled by default. Enable via `SessionConfig::record_event_trace`.

View File

@ -3,8 +3,8 @@
//! # Architecture //! # Architecture
//! //!
//! Sessions are recorded as a sequence of [`LogEntry`] values, one per line //! Sessions are recorded as a sequence of [`LogEntry`] values, one per line
//! in a `.jsonl` file. Replaying the log reconstructs the full [`Worker`] //! in a `.jsonl` file. Reading the log and collecting entries reconstructs
//! state — no separate snapshots or checkpoints needed. //! the full [`Worker`] state — no separate snapshots or checkpoints needed.
//! //!
//! Debug-mode [`TraceEntry`] records capture raw stream events in a separate //! Debug-mode [`TraceEntry`] records capture raw stream events in a separate
//! `.trace.jsonl` file, independent of the session log. //! `.trace.jsonl` file, independent of the session log.
@ -29,7 +29,7 @@ pub mod store;
pub use event_trace::TraceEntry; pub use event_trace::TraceEntry;
pub use fs_store::FsStore; pub use fs_store::FsStore;
pub use session::{Session, SessionConfig, SessionError}; pub use session::{Session, SessionConfig, SessionError};
pub use session_log::{LogEntry, Outcome, RestoredState, replay_entries}; pub use session_log::{LogEntry, Outcome, RestoredState, collect_state};
pub use store::{Store, StoreError}; pub use store::{Store, StoreError};
/// Session identifier. UUID v7 (time-ordered, lexicographically sortable). /// Session identifier. UUID v7 (time-ordered, lexicographically sortable).

View File

@ -77,7 +77,7 @@ impl<C: LlmClient, St: Store> Session<C, St> {
/// Restore a session from a stored log. /// Restore a session from a stored log.
/// ///
/// Reads all log entries, replays them to reconstruct state, /// Reads all log entries, collects state from them,
/// and returns a `Session` ready for `resume()`. /// and returns a `Session` ready for `resume()`.
pub async fn restore( pub async fn restore(
client: C, client: C,
@ -86,7 +86,7 @@ impl<C: LlmClient, St: Store> Session<C, St> {
config: SessionConfig, config: SessionConfig,
) -> Result<Self, SessionError> { ) -> Result<Self, SessionError> {
let entries = store.read_all(session_id).await?; let entries = store.read_all(session_id).await?;
let state = session_log::replay_entries(&entries); let state = session_log::collect_state(&entries);
let mut worker = Worker::new(client); let mut worker = Worker::new(client);
if let Some(ref prompt) = state.system_prompt { if let Some(ref prompt) = state.system_prompt {
@ -172,7 +172,7 @@ impl<C: LlmClient, St: Store> Session<C, St> {
) -> Result<SessionId, StoreError> { ) -> Result<SessionId, StoreError> {
let entries = store.read_all(source_id).await?; let entries = store.read_all(source_id).await?;
let truncated = &entries[..up_to_entry.min(entries.len())]; let truncated = &entries[..up_to_entry.min(entries.len())];
let state = session_log::replay_entries(truncated); let state = session_log::collect_state(truncated);
let fork_id = crate::new_session_id(); let fork_id = crate::new_session_id();
let start = LogEntry::SessionStart { let start = LogEntry::SessionStart {

View File

@ -1,8 +1,8 @@
//! Session log types for append-only JSONL persistence. //! Session log types for append-only JSONL persistence.
//! //!
//! Each [`LogEntry`] represents a single state transition in a session, //! Each [`LogEntry`] represents a single state transition in a session,
//! serialized as one line in a `.jsonl` file. Replaying the sequence of //! serialized as one line in a `.jsonl` file. Reading all entries and
//! entries reconstructs the full [`Worker`] state. //! collecting them via [`collect_state`] reconstructs the full [`Worker`] state.
use llm_worker::llm_client::types::{Item, RequestConfig}; use llm_worker::llm_client::types::{Item, RequestConfig};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -50,7 +50,7 @@ pub enum LogEntry {
CacheUnlocked { ts: u64 }, CacheUnlocked { ts: u64 },
/// Outcome of a `run()` or `resume()` call. /// Outcome of a `run()` or `resume()` call.
/// This is metadata for auditing; replay logic does not branch on the outcome. /// This is metadata for auditing; state collection does not branch on the outcome.
RunOutcome { RunOutcome {
ts: u64, ts: u64,
outcome: Outcome, outcome: Outcome,
@ -61,7 +61,7 @@ pub enum LogEntry {
ConfigChanged { ts: u64, config: RequestConfig }, ConfigChanged { ts: u64, config: RequestConfig },
} }
/// Outcome of a run/resume call. Used for auditing, not for replay branching. /// Outcome of a run/resume call. Metadata for auditing only.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")] #[serde(rename_all = "snake_case")]
pub enum Outcome { pub enum Outcome {
@ -70,7 +70,7 @@ pub enum Outcome {
Error { message: String }, Error { message: String },
} }
/// State reconstructed by replaying log entries. /// State collected from log entries.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct RestoredState { pub struct RestoredState {
pub system_prompt: Option<String>, pub system_prompt: Option<String>,
@ -82,7 +82,7 @@ pub struct RestoredState {
} }
/// Replay a sequence of log entries to reconstruct worker state. /// Replay a sequence of log entries to reconstruct worker state.
pub fn replay_entries(entries: &[LogEntry]) -> RestoredState { pub fn collect_state(entries: &[LogEntry]) -> RestoredState {
let mut state = RestoredState { let mut state = RestoredState {
system_prompt: None, system_prompt: None,
config: RequestConfig::default(), config: RequestConfig::default(),
@ -153,7 +153,7 @@ mod tests {
#[test] #[test]
fn replay_empty() { fn replay_empty() {
let state = replay_entries(&[]); let state = collect_state(&[]);
assert!(state.history.is_empty()); assert!(state.history.is_empty());
assert_eq!(state.turn_count, 0); assert_eq!(state.turn_count, 0);
assert_eq!(state.locked_prefix_len, 0); assert_eq!(state.locked_prefix_len, 0);
@ -167,7 +167,7 @@ mod tests {
config: RequestConfig::default().with_max_tokens(1024), config: RequestConfig::default().with_max_tokens(1024),
history: vec![Item::user_message("seed")], history: vec![Item::user_message("seed")],
}]; }];
let state = replay_entries(&entries); let state = collect_state(&entries);
assert_eq!(state.system_prompt.as_deref(), Some("You are helpful.")); assert_eq!(state.system_prompt.as_deref(), Some("You are helpful."));
assert_eq!(state.config.max_tokens, Some(1024)); assert_eq!(state.config.max_tokens, Some(1024));
assert_eq!(state.history.len(), 1); assert_eq!(state.history.len(), 1);
@ -200,7 +200,7 @@ mod tests {
interrupted: false, interrupted: false,
}, },
]; ];
let state = replay_entries(&entries); let state = collect_state(&entries);
assert_eq!(state.history.len(), 2); assert_eq!(state.history.len(), 2);
assert_eq!(state.turn_count, 1); assert_eq!(state.turn_count, 1);
assert!(!state.last_run_interrupted); assert!(!state.last_run_interrupted);
@ -236,7 +236,7 @@ mod tests {
turn_count: 1, turn_count: 1,
}, },
]; ];
let state = replay_entries(&entries); let state = collect_state(&entries);
assert_eq!(state.history.len(), 4); assert_eq!(state.history.len(), 4);
assert!(state.history[1].is_tool_call()); assert!(state.history[1].is_tool_call());
assert!(state.history[2].is_tool_result()); assert!(state.history[2].is_tool_result());
@ -257,11 +257,11 @@ mod tests {
}, },
LogEntry::CacheUnlocked { ts: 3000 }, LogEntry::CacheUnlocked { ts: 3000 },
]; ];
let state = replay_entries(&entries); let state = collect_state(&entries);
assert_eq!(state.locked_prefix_len, 0); assert_eq!(state.locked_prefix_len, 0);
// Check locked state before unlock // Check locked state before unlock
let state_locked = replay_entries(&entries[..2]); let state_locked = collect_state(&entries[..2]);
assert_eq!(state_locked.locked_prefix_len, 2); assert_eq!(state_locked.locked_prefix_len, 2);
} }
@ -279,7 +279,7 @@ mod tests {
config: RequestConfig::default().with_temperature(0.5), config: RequestConfig::default().with_temperature(0.5),
}, },
]; ];
let state = replay_entries(&entries); let state = collect_state(&entries);
assert_eq!(state.config.temperature, Some(0.5)); assert_eq!(state.config.temperature, Some(0.5));
} }
} }

View File

@ -1,6 +1,6 @@
use llm_worker::llm_client::types::{Item, RequestConfig}; use llm_worker::llm_client::types::{Item, RequestConfig};
use llm_worker_persistence::{ use llm_worker_persistence::{
FsStore, LogEntry, Outcome, Store, TraceEntry, new_session_id, replay_entries, FsStore, LogEntry, Outcome, Store, TraceEntry, new_session_id, collect_state,
}; };
#[tokio::test] #[tokio::test]
@ -45,7 +45,7 @@ async fn round_trip_write_and_read() {
assert_eq!(read_back.len(), entries.len()); assert_eq!(read_back.len(), entries.len());
// Replay and verify state // Replay and verify state
let state = replay_entries(&read_back); let state = collect_state(&read_back);
assert_eq!(state.system_prompt.as_deref(), Some("You are helpful.")); assert_eq!(state.system_prompt.as_deref(), Some("You are helpful."));
assert_eq!(state.config.max_tokens, Some(1024)); assert_eq!(state.config.max_tokens, Some(1024));
assert_eq!(state.history.len(), 2); assert_eq!(state.history.len(), 2);
@ -72,7 +72,7 @@ async fn create_session_writes_all_entries() {
let read_back = store.read_all(id).await.unwrap(); let read_back = store.read_all(id).await.unwrap();
assert_eq!(read_back.len(), 1); assert_eq!(read_back.len(), 1);
let state = replay_entries(&read_back); let state = collect_state(&read_back);
assert_eq!(state.history.len(), 2); assert_eq!(state.history.len(), 2);
} }

View File

@ -3,7 +3,7 @@
## 概要 ## 概要
`llm-worker-persistence` クレートは、`llm-worker` の `Worker` セッション状態を `llm-worker-persistence` クレートは、`llm-worker` の `Worker` セッション状態を
JSONL append-only ログとして永続化する。ログを replay することで Worker 状態を完全に復元する。 JSONL append-only ログとして永続化する。ログを読み込んで集約することで Worker 状態を復元する。
## 設計方針 ## 設計方針
@ -12,7 +12,7 @@ JSONL append-only ログとして永続化する。ログを replay すること
(`history: Vec<Item>` + `turn_count` + `request_config`)。 (`history: Vec<Item>` + `turn_count` + `request_config`)。
`resume()` は「ユーザー入力を追加せず `run_turn_loop()` に再入する」だけなので、 `resume()` は「ユーザー入力を追加せず `run_turn_loop()` に再入する」だけなので、
復元に必要なのは history の中身であり、前回の終了理由ではない。 復元に必要なのは history の中身であり、前回の終了理由ではない。
`RunOutcome``Finished`/`Paused` 区分は監査用メタデータであり、replay の分岐には使わない。 `RunOutcome``Finished`/`Paused` 区分は監査用メタデータであり、状態復元の分岐には使わない。
- **クレート分離**: `llm-worker` は永続化を知らない。`Session` ラッパーが外から Worker を包む。 - **クレート分離**: `llm-worker` は永続化を知らない。`Session` ラッパーが外から Worker を包む。
## 命名規約 ## 命名規約
@ -43,7 +43,7 @@ llm-worker-persistence → llm-worker → llm-worker-macros
各エントリは Worker の特定の状態変更に対応する: 各エントリは Worker の特定の状態変更に対応する:
| エントリ | Worker 上の対応箇所 | replay での効果 | | エントリ | Worker 上の対応箇所 | collect_state での効果 |
|---|---|---| |---|---|---|
| `SessionStart` | セッション開始 / fork | system_prompt, config, history を初期化 | | `SessionStart` | セッション開始 / fork | system_prompt, config, history を初期化 |
| `UserInput` | `worker.rs:229` | history に追加 | | `UserInput` | `worker.rs:229` | history に追加 |
@ -69,7 +69,7 @@ pub struct Session<C: LlmClient, St: Store> {
- `Session::new()` — SessionStart を書き込み - `Session::new()` — SessionStart を書き込み
- `Session::run()` — Worker::run() の前後で history を比較、差分をログ記録 - `Session::run()` — Worker::run() の前後で history を比較、差分をログ記録
- `Session::resume()` — 同上 - `Session::resume()` — 同上
- `Session::restore()` — ログを replay して Worker を再構築 - `Session::restore()` — ログを読み込み、状態を集約して Worker を再構築
- `Session::fork()` — 現在の history をシードにした新セッションを作成 - `Session::fork()` — 現在の history をシードにした新セッションを作成
- `Session::fork_at()` — 任意のログ地点から分岐 - `Session::fork_at()` — 任意のログ地点から分岐