diff --git a/Cargo.lock b/Cargo.lock index 06e65ce4..8c622cb5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -784,6 +784,19 @@ dependencies = [ "syn", ] +[[package]] +name = "llm-worker-persistence" +version = "0.1.0" +dependencies = [ + "llm-worker", + "serde", + "serde_json", + "tempfile", + "thiserror", + "tokio", + "uuid", +] + [[package]] name = "log" version = "0.4.29" @@ -1664,6 +1677,18 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" +[[package]] +name = "uuid" +version = "1.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ac8b6f42ead25368cf5b098aeb3dc8a1a2c05a3eee8a9a1a68c640edbfc79d9" +dependencies = [ + "getrandom 0.4.2", + "js-sys", + "serde_core", + "wasm-bindgen", +] + [[package]] name = "valuable" version = "0.1.1" diff --git a/Cargo.toml b/Cargo.toml index 069dd8de..089a6887 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,7 @@ members = [ "crates/insomnia", "crates/llm-worker", "crates/llm-worker-macros", + "crates/llm-worker-persistence", ] [workspace.package] diff --git a/TODO.md b/TODO.md new file mode 100644 index 00000000..6d8fc6bc --- /dev/null +++ b/TODO.md @@ -0,0 +1 @@ +- [x] 永続化データ構造の制定 diff --git a/crates/llm-worker-persistence/Cargo.toml b/crates/llm-worker-persistence/Cargo.toml new file mode 100644 index 00000000..39a8d2cf --- /dev/null +++ b/crates/llm-worker-persistence/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "llm-worker-persistence" +description = "Session persistence for llm-worker via append-only JSONL logs" +version = "0.1.0" +edition.workspace = true +license.workspace = true + +[dependencies] +llm-worker = { path = "../llm-worker" } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +tokio = { version = "1.49", features = ["fs", "io-util"] } +uuid = { version = "1", features = ["v7", "serde"] } +thiserror = "2.0" + +[dev-dependencies] +tokio = { version = "1.49", features = ["macros", "rt-multi-thread", "fs", "io-util"] } +tempfile = "3.24" diff --git a/crates/llm-worker-persistence/src/event_trace.rs b/crates/llm-worker-persistence/src/event_trace.rs new file mode 100644 index 00000000..0a7f7c9d --- /dev/null +++ b/crates/llm-worker-persistence/src/event_trace.rs @@ -0,0 +1,21 @@ +//! Debug-only raw stream event recording. +//! +//! [`TraceEntry`] captures every LLM stream event verbatim for debugging +//! and replay analysis. Written to a separate `.trace.jsonl` file, +//! completely independent of the session log used for state restoration. +//! +//! Disabled by default. Enable via `SessionConfig::record_event_trace`. + +use llm_worker::llm_client::event::Event; +use serde::{Deserialize, Serialize}; + +/// A single trace entry recording a raw stream event. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TraceEntry { + /// Timestamp in milliseconds since Unix epoch. + pub ts: u64, + /// Turn number at the time of recording. + pub turn: usize, + /// The raw stream event. + pub event: Event, +} diff --git a/crates/llm-worker-persistence/src/fs_store.rs b/crates/llm-worker-persistence/src/fs_store.rs new file mode 100644 index 00000000..b46de33b --- /dev/null +++ b/crates/llm-worker-persistence/src/fs_store.rs @@ -0,0 +1,132 @@ +//! Filesystem-backed JSONL store. +//! +//! Layout: +//! - Session log: `{root}/{session_id}.jsonl` +//! - Event trace: `{root}/{session_id}.trace.jsonl` + +use crate::event_trace::TraceEntry; +use crate::session_log::LogEntry; +use crate::store::{Store, StoreError}; +use crate::SessionId; +use std::path::{Path, PathBuf}; +use tokio::fs; +use tokio::io::AsyncWriteExt; + +/// Filesystem-backed JSONL store. +/// +/// Each session is stored as a single `.jsonl` file with one [`LogEntry`] +/// per line. Writes use append mode for crash safety. +pub struct FsStore { + root: PathBuf, +} + +impl FsStore { + /// Create a new `FsStore` rooted at the given directory. + /// Creates the directory if it does not exist. + pub async fn new(root: impl Into) -> Result { + let root = root.into(); + fs::create_dir_all(&root).await?; + Ok(Self { root }) + } + + fn log_path(&self, id: SessionId) -> PathBuf { + self.root.join(format!("{id}.jsonl")) + } + + fn trace_path(&self, id: SessionId) -> PathBuf { + self.root.join(format!("{id}.trace.jsonl")) + } + + async fn append_line(&self, path: &Path, line: &str) -> Result<(), StoreError> { + let mut file = fs::OpenOptions::new() + .create(true) + .append(true) + .open(path) + .await?; + file.write_all(line.as_bytes()).await?; + file.write_all(b"\n").await?; + file.flush().await?; + Ok(()) + } + + fn parse_jsonl( + content: &str, + ) -> Result, StoreError> { + let mut entries = Vec::new(); + for (i, line) in content.lines().enumerate() { + if line.trim().is_empty() { + continue; + } + let entry: T = + serde_json::from_str(line).map_err(|e| StoreError::Corrupt { + line: i + 1, + message: e.to_string(), + })?; + entries.push(entry); + } + Ok(entries) + } +} + +impl Store for FsStore { + async fn append(&self, id: SessionId, entry: &LogEntry) -> Result<(), StoreError> { + let line = serde_json::to_string(entry)?; + self.append_line(&self.log_path(id), &line).await + } + + async fn read_all(&self, id: SessionId) -> Result, StoreError> { + let path = self.log_path(id); + if !path.exists() { + return Err(StoreError::NotFound(id)); + } + let content = fs::read_to_string(&path).await?; + Self::parse_jsonl(&content) + } + + async fn list_sessions(&self) -> Result, StoreError> { + let mut sessions = Vec::new(); + let mut dir = fs::read_dir(&self.root).await?; + while let Some(entry) = dir.next_entry().await? { + let path = entry.path(); + // Only match .jsonl files, not .trace.jsonl + let name = path.file_name().and_then(|n| n.to_str()).unwrap_or(""); + if name.ends_with(".jsonl") && !name.ends_with(".trace.jsonl") { + let stem = name.trim_end_matches(".jsonl"); + if let Ok(id) = stem.parse::() { + sessions.push(id); + } + } + } + // UUID v7: lexicographic sort = chronological sort, newest first + sessions.sort_by(|a, b| b.cmp(a)); + Ok(sessions) + } + + async fn create_session( + &self, + id: SessionId, + entries: &[LogEntry], + ) -> Result<(), StoreError> { + let path = self.log_path(id); + let mut content = String::new(); + for entry in entries { + content.push_str(&serde_json::to_string(entry)?); + content.push('\n'); + } + fs::write(&path, content.as_bytes()).await?; + Ok(()) + } + + async fn exists(&self, id: SessionId) -> Result { + Ok(self.log_path(id).exists()) + } + + async fn append_trace( + &self, + id: SessionId, + entry: &TraceEntry, + ) -> Result<(), StoreError> { + let line = serde_json::to_string(entry)?; + self.append_line(&self.trace_path(id), &line).await + } +} diff --git a/crates/llm-worker-persistence/src/lib.rs b/crates/llm-worker-persistence/src/lib.rs new file mode 100644 index 00000000..9bc2b571 --- /dev/null +++ b/crates/llm-worker-persistence/src/lib.rs @@ -0,0 +1,41 @@ +//! Session persistence for `llm-worker` via append-only JSONL logs. +//! +//! # Architecture +//! +//! Sessions are recorded as a sequence of [`LogEntry`] values, one per line +//! in a `.jsonl` file. Replaying the log reconstructs the full [`Worker`] +//! state — no separate snapshots or checkpoints needed. +//! +//! Debug-mode [`TraceEntry`] records capture raw stream events in a separate +//! `.trace.jsonl` file, independent of the session log. +//! +//! # Quick start +//! +//! ```ignore +//! use llm_worker_persistence::{Session, SessionConfig, FsStore}; +//! +//! let store = FsStore::new("./sessions").await?; +//! let worker = Worker::new(client); +//! let mut session = Session::new(worker, store, SessionConfig::default()).await?; +//! session.run("Hello!").await?; +//! ``` + +pub mod event_trace; +pub mod fs_store; +pub mod session; +pub mod session_log; +pub mod store; + +pub use event_trace::TraceEntry; +pub use fs_store::FsStore; +pub use session::{Session, SessionConfig, SessionError}; +pub use session_log::{LogEntry, Outcome, RestoredState, replay_entries}; +pub use store::{Store, StoreError}; + +/// Session identifier. UUID v7 (time-ordered, lexicographically sortable). +pub type SessionId = uuid::Uuid; + +/// Generate a new session ID. +pub fn new_session_id() -> SessionId { + uuid::Uuid::now_v7() +} diff --git a/crates/llm-worker-persistence/src/session.rs b/crates/llm-worker-persistence/src/session.rs new file mode 100644 index 00000000..79f3e46b --- /dev/null +++ b/crates/llm-worker-persistence/src/session.rs @@ -0,0 +1,347 @@ +//! Persistent session wrapper around [`Worker`]. +//! +//! [`Session`] intercepts `Worker` operations and appends [`LogEntry`] records +//! to a [`Store`]. It does not modify `Worker` internals — all persistence +//! happens by observing state before and after each operation. + +use crate::session_log::{self, LogEntry, Outcome}; +use crate::store::{Store, StoreError}; +use crate::SessionId; +use llm_worker::llm_client::client::LlmClient; +use llm_worker::llm_client::types::Item; +use llm_worker::state::Mutable; +use llm_worker::{Worker, WorkerError, WorkerResult}; + +/// Configuration for session persistence. +#[derive(Debug, Clone)] +pub struct SessionConfig { + /// Record raw stream events to a separate trace file. + /// Default: `false`. + pub record_event_trace: bool, +} + +impl Default for SessionConfig { + fn default() -> Self { + Self { + record_event_trace: false, + } + } +} + +/// Errors from session operations. +#[derive(Debug, thiserror::Error)] +pub enum SessionError { + #[error(transparent)] + Worker(#[from] WorkerError), + + #[error(transparent)] + Store(#[from] StoreError), +} + +/// Persistent session wrapping a [`Worker`]. +/// +/// The `worker` field is public for direct access to Worker APIs +/// (tool registration, hook setup, subscriber management, etc.). +/// State-mutating operations (`run`, `resume`) should go through +/// Session methods to ensure proper logging. +pub struct Session { + pub worker: Worker, + store: St, + session_id: SessionId, + _config: SessionConfig, +} + +impl Session { + /// Create a new session, writing the initial `SessionStart` entry. + pub async fn new( + worker: Worker, + store: St, + config: SessionConfig, + ) -> Result { + let session_id = crate::new_session_id(); + let start = LogEntry::SessionStart { + ts: session_log::now_millis(), + system_prompt: worker.get_system_prompt().map(String::from), + config: worker.request_config().clone(), + history: worker.history().to_vec(), + }; + store.append(session_id, &start).await?; + + Ok(Self { + worker, + store, + session_id, + _config: config, + }) + } + + /// Restore a session from a stored log. + /// + /// Reads all log entries, replays them to reconstruct state, + /// and returns a `Session` ready for `resume()`. + pub async fn restore( + client: C, + store: St, + session_id: SessionId, + config: SessionConfig, + ) -> Result { + let entries = store.read_all(session_id).await?; + let state = session_log::replay_entries(&entries); + + let mut worker = Worker::new(client); + if let Some(ref prompt) = state.system_prompt { + worker.set_system_prompt(prompt); + } + worker.set_history(state.history); + worker.set_request_config(state.config); + worker.set_turn_count(state.turn_count); + worker.set_last_run_interrupted(state.last_run_interrupted); + + Ok(Self { + worker, + store, + session_id, + _config: config, + }) + } + + /// The session ID. + pub fn session_id(&self) -> SessionId { + self.session_id + } + + /// Reference to the underlying store. + pub fn store(&self) -> &St { + &self.store + } + + /// Run a user turn, logging all state changes. + pub async fn run( + &mut self, + user_input: impl Into, + ) -> Result { + let input = user_input.into(); + let user_item = Item::user_message(&input); + let history_before = self.worker.history().len(); + + let result = self.worker.run(input).await; + + self.log_history_delta(history_before, Some(&user_item)) + .await?; + self.log_turn_end().await?; + self.log_outcome(&result).await?; + + result.map_err(SessionError::Worker) + } + + /// Resume from a paused state, logging all state changes. + pub async fn resume(&mut self) -> Result { + let history_before = self.worker.history().len(); + + let result = self.worker.resume().await; + + self.log_history_delta(history_before, None).await?; + self.log_turn_end().await?; + self.log_outcome(&result).await?; + + result.map_err(SessionError::Worker) + } + + /// Fork this session at its current state. + /// Returns the new session ID. The new log contains a `SessionStart` + /// seeded with the current history. + pub async fn fork(&self) -> Result { + let fork_id = crate::new_session_id(); + let start = LogEntry::SessionStart { + ts: session_log::now_millis(), + system_prompt: self.worker.get_system_prompt().map(String::from), + config: self.worker.request_config().clone(), + history: self.worker.history().to_vec(), + }; + self.store.create_session(fork_id, &[start]).await?; + Ok(fork_id) + } + + /// Fork from an arbitrary point in a stored session's log. + /// Replays entries up to `up_to_entry` and creates a new session + /// with that reconstructed state. + pub async fn fork_at( + store: &St, + source_id: SessionId, + up_to_entry: usize, + ) -> Result { + let entries = store.read_all(source_id).await?; + let truncated = &entries[..up_to_entry.min(entries.len())]; + let state = session_log::replay_entries(truncated); + + let fork_id = crate::new_session_id(); + let start = LogEntry::SessionStart { + ts: session_log::now_millis(), + system_prompt: state.system_prompt, + config: state.config, + history: state.history, + }; + store.create_session(fork_id, &[start]).await?; + Ok(fork_id) + } + + /// Log a `CacheLocked` entry. + pub async fn log_cache_locked( + &self, + locked_prefix_len: usize, + ) -> Result<(), StoreError> { + self.store + .append( + self.session_id, + &LogEntry::CacheLocked { + ts: session_log::now_millis(), + locked_prefix_len, + }, + ) + .await + } + + /// Log a `CacheUnlocked` entry. + pub async fn log_cache_unlocked(&self) -> Result<(), StoreError> { + self.store + .append( + self.session_id, + &LogEntry::CacheUnlocked { + ts: session_log::now_millis(), + }, + ) + .await + } + + /// Log a `ConfigChanged` entry. + pub async fn log_config_changed(&self) -> Result<(), StoreError> { + self.store + .append( + self.session_id, + &LogEntry::ConfigChanged { + ts: session_log::now_millis(), + config: self.worker.request_config().clone(), + }, + ) + .await + } + + // ── Private helpers ────────────────────────────────────────────────── + + async fn log_history_delta( + &self, + before_len: usize, + user_item: Option<&Item>, + ) -> Result<(), StoreError> { + let history = self.worker.history(); + if history.len() <= before_len { + return Ok(()); + } + + let ts = session_log::now_millis(); + let new_items = &history[before_len..]; + let mut i = 0; + + // If we have a user_item, the first new item should be the user input + if let Some(item) = user_item { + self.store + .append( + self.session_id, + &LogEntry::UserInput { + ts, + item: item.clone(), + }, + ) + .await?; + i = 1; + } + + // Classify and group remaining items + while i < new_items.len() { + let item = &new_items[i]; + if item.is_tool_result() { + let start = i; + while i < new_items.len() && new_items[i].is_tool_result() { + i += 1; + } + self.store + .append( + self.session_id, + &LogEntry::ToolResults { + ts, + items: new_items[start..i].to_vec(), + }, + ) + .await?; + } else if item.is_assistant_message() + || item.is_tool_call() + || item.is_reasoning() + { + let start = i; + while i < new_items.len() + && (new_items[i].is_assistant_message() + || new_items[i].is_tool_call() + || new_items[i].is_reasoning()) + { + i += 1; + } + self.store + .append( + self.session_id, + &LogEntry::AssistantItems { + ts, + items: new_items[start..i].to_vec(), + }, + ) + .await?; + } else { + self.store + .append( + self.session_id, + &LogEntry::HookInjectedItems { + ts, + items: vec![new_items[i].clone()], + }, + ) + .await?; + i += 1; + } + } + Ok(()) + } + + async fn log_turn_end(&self) -> Result<(), StoreError> { + self.store + .append( + self.session_id, + &LogEntry::TurnEnd { + ts: session_log::now_millis(), + turn_count: self.worker.turn_count(), + }, + ) + .await + } + + async fn log_outcome( + &self, + result: &Result, + ) -> Result<(), StoreError> { + let outcome = match result { + Ok(WorkerResult::Finished) => Outcome::Finished, + Ok(WorkerResult::Paused) => Outcome::Paused, + Err(e) => Outcome::Error { + message: e.to_string(), + }, + }; + self.store + .append( + self.session_id, + &LogEntry::RunOutcome { + ts: session_log::now_millis(), + outcome, + interrupted: self.worker.last_run_interrupted(), + }, + ) + .await + } +} diff --git a/crates/llm-worker-persistence/src/session_log.rs b/crates/llm-worker-persistence/src/session_log.rs new file mode 100644 index 00000000..917ba143 --- /dev/null +++ b/crates/llm-worker-persistence/src/session_log.rs @@ -0,0 +1,285 @@ +//! Session log types for append-only JSONL persistence. +//! +//! Each [`LogEntry`] represents a single state transition in a session, +//! serialized as one line in a `.jsonl` file. Replaying the sequence of +//! entries reconstructs the full [`Worker`] state. + +use llm_worker::llm_client::types::{Item, RequestConfig}; +use serde::{Deserialize, Serialize}; + +/// A single session log entry, serialized as one JSONL line. +/// +/// Variants correspond to specific mutation points in `Worker`: +/// - `SessionStart` — always the first entry; captures initial state +/// - `UserInput` / `AssistantItems` / `ToolResults` / `HookInjectedItems` — history appends +/// - `TurnEnd` — turn boundary marker +/// - `CacheLocked` / `CacheUnlocked` — KV cache state transitions +/// - `RunOutcome` — marks end of a `run()` or `resume()` call +/// - `ConfigChanged` — `RequestConfig` mutation +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "kind", rename_all = "snake_case")] +pub enum LogEntry { + /// Session start. Always the first entry in a log. + /// For forked sessions, `history` contains the seed state from the parent. + SessionStart { + ts: u64, + system_prompt: Option, + config: RequestConfig, + history: Vec, + }, + + /// User input pushed to history (worker.rs:229). + UserInput { ts: u64, item: Item }, + + /// Assistant response items added to history (worker.rs:1040-1041). + AssistantItems { ts: u64, items: Vec }, + + /// Tool execution results added to history (worker.rs:897-900, 1072-1076). + ToolResults { ts: u64, items: Vec }, + + /// Items injected by `on_turn_end` hook via `ContinueWithMessages` (worker.rs:1055). + HookInjectedItems { ts: u64, items: Vec }, + + /// Turn boundary. Records the turn count after increment. + TurnEnd { ts: u64, turn_count: usize }, + + /// KV cache locked. Records the history prefix length that is now immutable. + CacheLocked { ts: u64, locked_prefix_len: usize }, + + /// KV cache unlocked. + CacheUnlocked { ts: u64 }, + + /// Outcome of a `run()` or `resume()` call. + /// This is metadata for auditing; replay logic does not branch on the outcome. + RunOutcome { + ts: u64, + outcome: Outcome, + interrupted: bool, + }, + + /// `RequestConfig` changed. + ConfigChanged { ts: u64, config: RequestConfig }, +} + +/// Outcome of a run/resume call. Used for auditing, not for replay branching. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum Outcome { + Finished, + Paused, + Error { message: String }, +} + +/// State reconstructed by replaying log entries. +#[derive(Debug, Clone)] +pub struct RestoredState { + pub system_prompt: Option, + pub config: RequestConfig, + pub history: Vec, + pub turn_count: usize, + pub locked_prefix_len: usize, + pub last_run_interrupted: bool, +} + +/// Replay a sequence of log entries to reconstruct worker state. +pub fn replay_entries(entries: &[LogEntry]) -> RestoredState { + let mut state = RestoredState { + system_prompt: None, + config: RequestConfig::default(), + history: Vec::new(), + turn_count: 0, + locked_prefix_len: 0, + last_run_interrupted: false, + }; + + for entry in entries { + match entry { + LogEntry::SessionStart { + system_prompt, + config, + history, + .. + } => { + state.system_prompt = system_prompt.clone(); + state.config = config.clone(); + state.history = history.clone(); + } + LogEntry::UserInput { item, .. } => { + state.history.push(item.clone()); + } + LogEntry::AssistantItems { items, .. } => { + state.history.extend(items.iter().cloned()); + } + LogEntry::ToolResults { items, .. } => { + state.history.extend(items.iter().cloned()); + } + LogEntry::HookInjectedItems { items, .. } => { + state.history.extend(items.iter().cloned()); + } + LogEntry::TurnEnd { turn_count, .. } => { + state.turn_count = *turn_count; + } + LogEntry::CacheLocked { + locked_prefix_len, .. + } => { + state.locked_prefix_len = *locked_prefix_len; + } + LogEntry::CacheUnlocked { .. } => { + state.locked_prefix_len = 0; + } + LogEntry::RunOutcome { interrupted, .. } => { + state.last_run_interrupted = *interrupted; + } + LogEntry::ConfigChanged { config, .. } => { + state.config = config.clone(); + } + } + } + + state +} + +/// Get the current timestamp in milliseconds since Unix epoch. +pub fn now_millis() -> u64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .expect("system clock before Unix epoch") + .as_millis() as u64 +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn replay_empty() { + let state = replay_entries(&[]); + assert!(state.history.is_empty()); + assert_eq!(state.turn_count, 0); + assert_eq!(state.locked_prefix_len, 0); + } + + #[test] + fn replay_session_start_sets_initial_state() { + let entries = vec![LogEntry::SessionStart { + ts: 1000, + system_prompt: Some("You are helpful.".into()), + config: RequestConfig::default().with_max_tokens(1024), + history: vec![Item::user_message("seed")], + }]; + let state = replay_entries(&entries); + assert_eq!(state.system_prompt.as_deref(), Some("You are helpful.")); + assert_eq!(state.config.max_tokens, Some(1024)); + assert_eq!(state.history.len(), 1); + } + + #[test] + fn replay_full_turn() { + let entries = vec![ + LogEntry::SessionStart { + ts: 1000, + system_prompt: None, + config: RequestConfig::default(), + history: vec![], + }, + LogEntry::UserInput { + ts: 2000, + item: Item::user_message("Hello"), + }, + LogEntry::AssistantItems { + ts: 3000, + items: vec![Item::assistant_message("Hi!")], + }, + LogEntry::TurnEnd { + ts: 3100, + turn_count: 1, + }, + LogEntry::RunOutcome { + ts: 3200, + outcome: Outcome::Finished, + interrupted: false, + }, + ]; + let state = replay_entries(&entries); + assert_eq!(state.history.len(), 2); + assert_eq!(state.turn_count, 1); + assert!(!state.last_run_interrupted); + } + + #[test] + fn replay_with_tool_calls() { + let entries = vec![ + LogEntry::SessionStart { + ts: 1000, + system_prompt: None, + config: RequestConfig::default(), + history: vec![], + }, + LogEntry::UserInput { + ts: 2000, + item: Item::user_message("Check weather"), + }, + LogEntry::AssistantItems { + ts: 3000, + items: vec![Item::tool_call("call_1", "get_weather", r#"{"city":"Tokyo"}"#)], + }, + LogEntry::ToolResults { + ts: 3500, + items: vec![Item::tool_result("call_1", "Sunny, 25C")], + }, + LogEntry::AssistantItems { + ts: 4000, + items: vec![Item::assistant_message("It's sunny in Tokyo!")], + }, + LogEntry::TurnEnd { + ts: 4100, + turn_count: 1, + }, + ]; + let state = replay_entries(&entries); + assert_eq!(state.history.len(), 4); + assert!(state.history[1].is_tool_call()); + assert!(state.history[2].is_tool_result()); + } + + #[test] + fn replay_cache_lock_unlock() { + let entries = vec![ + LogEntry::SessionStart { + ts: 1000, + system_prompt: None, + config: RequestConfig::default(), + history: vec![Item::user_message("a"), Item::assistant_message("b")], + }, + LogEntry::CacheLocked { + ts: 2000, + locked_prefix_len: 2, + }, + LogEntry::CacheUnlocked { ts: 3000 }, + ]; + let state = replay_entries(&entries); + assert_eq!(state.locked_prefix_len, 0); + + // Check locked state before unlock + let state_locked = replay_entries(&entries[..2]); + assert_eq!(state_locked.locked_prefix_len, 2); + } + + #[test] + fn replay_config_changed() { + let entries = vec![ + LogEntry::SessionStart { + ts: 1000, + system_prompt: None, + config: RequestConfig::default(), + history: vec![], + }, + LogEntry::ConfigChanged { + ts: 2000, + config: RequestConfig::default().with_temperature(0.5), + }, + ]; + let state = replay_entries(&entries); + assert_eq!(state.config.temperature, Some(0.5)); + } +} diff --git a/crates/llm-worker-persistence/src/store.rs b/crates/llm-worker-persistence/src/store.rs new file mode 100644 index 00000000..feb7060a --- /dev/null +++ b/crates/llm-worker-persistence/src/store.rs @@ -0,0 +1,68 @@ +//! Persistence backend abstraction. +//! +//! [`Store`] defines the async interface for reading and writing session logs. +//! Implementations handle the physical storage (filesystem, database, etc.). + +use crate::event_trace::TraceEntry; +use crate::session_log::LogEntry; +use crate::SessionId; +use std::future::Future; + +/// Errors from the persistence store. +#[derive(Debug, thiserror::Error)] +pub enum StoreError { + #[error("I/O error: {0}")] + Io(#[from] std::io::Error), + + #[error("serialization error: {0}")] + Serde(#[from] serde_json::Error), + + #[error("session not found: {0}")] + NotFound(SessionId), + + #[error("log corrupted at line {line}: {message}")] + Corrupt { line: usize, message: String }, +} + +/// Async persistence backend for session logs. +/// +/// All methods take `&self` — implementations should use interior mutability +/// (e.g., append-mode file handles) when needed. +pub trait Store: Send + Sync { + /// Append a single log entry to the session. + fn append( + &self, + id: SessionId, + entry: &LogEntry, + ) -> impl Future> + Send; + + /// Read all log entries for a session, in order. + fn read_all( + &self, + id: SessionId, + ) -> impl Future, StoreError>> + Send; + + /// List all session IDs, most recent first. + fn list_sessions(&self) + -> impl Future, StoreError>> + Send; + + /// Create a new session with initial entries. + fn create_session( + &self, + id: SessionId, + entries: &[LogEntry], + ) -> impl Future> + Send; + + /// Check if a session exists. + fn exists( + &self, + id: SessionId, + ) -> impl Future> + Send; + + /// Append a trace entry to the debug event trace file. + fn append_trace( + &self, + id: SessionId, + entry: &TraceEntry, + ) -> impl Future> + Send; +} diff --git a/crates/llm-worker-persistence/tests/fs_store_test.rs b/crates/llm-worker-persistence/tests/fs_store_test.rs new file mode 100644 index 00000000..bf594549 --- /dev/null +++ b/crates/llm-worker-persistence/tests/fs_store_test.rs @@ -0,0 +1,176 @@ +use llm_worker::llm_client::types::{Item, RequestConfig}; +use llm_worker_persistence::{ + FsStore, LogEntry, Outcome, Store, TraceEntry, new_session_id, replay_entries, +}; + +#[tokio::test] +async fn round_trip_write_and_read() { + let dir = tempfile::tempdir().unwrap(); + let store = FsStore::new(dir.path()).await.unwrap(); + let id = new_session_id(); + + let entries = vec![ + LogEntry::SessionStart { + ts: 1000, + system_prompt: Some("You are helpful.".into()), + config: RequestConfig::default().with_max_tokens(1024), + history: vec![], + }, + LogEntry::UserInput { + ts: 2000, + item: Item::user_message("Hello"), + }, + LogEntry::AssistantItems { + ts: 3000, + items: vec![Item::assistant_message("Hi there!")], + }, + LogEntry::TurnEnd { + ts: 3100, + turn_count: 1, + }, + LogEntry::RunOutcome { + ts: 3200, + outcome: Outcome::Finished, + interrupted: false, + }, + ]; + + // Write entries one by one + for entry in &entries { + store.append(id, entry).await.unwrap(); + } + + // Read back + let read_back = store.read_all(id).await.unwrap(); + assert_eq!(read_back.len(), entries.len()); + + // Replay and verify state + let state = replay_entries(&read_back); + assert_eq!(state.system_prompt.as_deref(), Some("You are helpful.")); + assert_eq!(state.config.max_tokens, Some(1024)); + assert_eq!(state.history.len(), 2); + assert_eq!(state.turn_count, 1); + assert!(!state.last_run_interrupted); +} + +#[tokio::test] +async fn create_session_writes_all_entries() { + let dir = tempfile::tempdir().unwrap(); + let store = FsStore::new(dir.path()).await.unwrap(); + let id = new_session_id(); + + let entries = vec![ + LogEntry::SessionStart { + ts: 1000, + system_prompt: None, + config: RequestConfig::default(), + history: vec![Item::user_message("seed"), Item::assistant_message("ok")], + }, + ]; + + store.create_session(id, &entries).await.unwrap(); + let read_back = store.read_all(id).await.unwrap(); + assert_eq!(read_back.len(), 1); + + let state = replay_entries(&read_back); + assert_eq!(state.history.len(), 2); +} + +#[tokio::test] +async fn list_sessions_returns_newest_first() { + let dir = tempfile::tempdir().unwrap(); + let store = FsStore::new(dir.path()).await.unwrap(); + + let id1 = new_session_id(); + // Small delay to ensure different UUID v7 timestamps + tokio::time::sleep(std::time::Duration::from_millis(2)).await; + let id2 = new_session_id(); + + let start = LogEntry::SessionStart { + ts: 1000, + system_prompt: None, + config: RequestConfig::default(), + history: vec![], + }; + + store.append(id1, &start).await.unwrap(); + store.append(id2, &start).await.unwrap(); + + let sessions = store.list_sessions().await.unwrap(); + assert_eq!(sessions.len(), 2); + assert_eq!(sessions[0], id2); // newest first + assert_eq!(sessions[1], id1); +} + +#[tokio::test] +async fn exists_returns_correct_state() { + let dir = tempfile::tempdir().unwrap(); + let store = FsStore::new(dir.path()).await.unwrap(); + let id = new_session_id(); + + assert!(!store.exists(id).await.unwrap()); + + store + .append( + id, + &LogEntry::SessionStart { + ts: 1000, + system_prompt: None, + config: RequestConfig::default(), + history: vec![], + }, + ) + .await + .unwrap(); + + assert!(store.exists(id).await.unwrap()); +} + +#[tokio::test] +async fn not_found_error_for_missing_session() { + let dir = tempfile::tempdir().unwrap(); + let store = FsStore::new(dir.path()).await.unwrap(); + let id = new_session_id(); + + let result = store.read_all(id).await; + assert!(result.is_err()); +} + +#[tokio::test] +async fn trace_entries_in_separate_file() { + let dir = tempfile::tempdir().unwrap(); + let store = FsStore::new(dir.path()).await.unwrap(); + let id = new_session_id(); + + // Write a log entry + store + .append( + id, + &LogEntry::SessionStart { + ts: 1000, + system_prompt: None, + config: RequestConfig::default(), + history: vec![], + }, + ) + .await + .unwrap(); + + // Write a trace entry + let trace = TraceEntry { + ts: 1500, + turn: 0, + event: llm_worker::llm_client::event::Event::Ping( + llm_worker::llm_client::event::PingEvent { timestamp: None }, + ), + }; + store.append_trace(id, &trace).await.unwrap(); + + // Log should have 1 entry, unaffected by trace + let log = store.read_all(id).await.unwrap(); + assert_eq!(log.len(), 1); + + // Trace file should exist separately + let trace_path = dir.path().join(format!("{id}.trace.jsonl")); + assert!(trace_path.exists()); +} diff --git a/crates/llm-worker/src/llm_client/types.rs b/crates/llm-worker/src/llm_client/types.rs index 900f8406..4fbb0c7f 100644 --- a/crates/llm-worker/src/llm_client/types.rs +++ b/crates/llm-worker/src/llm_client/types.rs @@ -501,7 +501,7 @@ impl ToolDefinition { // ============================================================================ /// Request configuration -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct RequestConfig { /// Maximum tokens to generate pub max_tokens: Option, diff --git a/crates/llm-worker/src/worker.rs b/crates/llm-worker/src/worker.rs index d77d595f..c7acb91a 100644 --- a/crates/llm-worker/src/worker.rs +++ b/crates/llm-worker/src/worker.rs @@ -1308,6 +1308,16 @@ impl Worker { self.history.clear(); } + /// Set the turn count (for session restoration) + pub fn set_turn_count(&mut self, count: usize) { + self.turn_count = count; + } + + /// Set the last_run_interrupted flag (for session restoration) + pub fn set_last_run_interrupted(&mut self, interrupted: bool) { + self.last_run_interrupted = interrupted; + } + /// Apply configuration (reserved for future extensions) #[allow(dead_code)] pub fn config(self, _config: WorkerConfig) -> Self { diff --git a/docs/persistence.md b/docs/persistence.md new file mode 100644 index 00000000..394c3164 --- /dev/null +++ b/docs/persistence.md @@ -0,0 +1,89 @@ +# 永続化設計 + +## 概要 + +`llm-worker-persistence` クレートは、`llm-worker` の `Worker` セッション状態を +JSONL append-only ログとして永続化する。ログを replay することで Worker 状態を完全に復元する。 + +## 設計方針 + +- **JSONL append-only ログ**: 1セッション = 1つの `.jsonl` ファイル。書き込みは末尾追記のみ。 +- **Pause/正常終了で構造に差異なし**: Worker の状態は Pause 時も正常終了時も同じ形 + (`history: Vec` + `turn_count` + `request_config`)。 + `resume()` は「ユーザー入力を追加せず `run_turn_loop()` に再入する」だけなので、 + 復元に必要なのは history の中身であり、前回の終了理由ではない。 + `RunOutcome` の `Finished`/`Paused` 区分は監査用メタデータであり、replay の分岐には使わない。 +- **クレート分離**: `llm-worker` は永続化を知らない。`Session` ラッパーが外から Worker を包む。 + +## 命名規約 + +| 名前 | 用途 | +|---|---| +| **SessionLog / LogEntry** | 状態復元用の構造化された記録(永続化の本体) | +| **EventTrace / TraceEntry** | デバッグ用の生ストリームイベント全録(オプション、デフォルト OFF) | + +## クレート構成 + +``` +llm-worker-persistence → llm-worker → llm-worker-macros +``` + +`llm-worker-persistence` は `llm-worker` に依存するが、逆方向の依存はない。 + +## ファイル配置 + +``` +{root}/{session_id}.jsonl -- セッションログ +{root}/{session_id}.trace.jsonl -- イベントトレース(デバッグ時のみ) +``` + +`SessionId` は UUID v7(`uuid` クレート)。タイムスタンプ埋め込みで辞書順 = 時系列順。 + +## LogEntry + +各エントリは Worker の特定の状態変更に対応する: + +| エントリ | Worker 上の対応箇所 | replay での効果 | +|---|---|---| +| `SessionStart` | セッション開始 / fork | system_prompt, config, history を初期化 | +| `UserInput` | `worker.rs:229` | history に追加 | +| `AssistantItems` | `worker.rs:1040-1041` | history に追加 | +| `ToolResults` | `worker.rs:897-900, 1072-1076` | history に追加 | +| `HookInjectedItems` | `worker.rs:1055` | history に追加 | +| `TurnEnd` | `worker.rs:1033` | turn_count を更新 | +| `CacheLocked` | `Worker::lock()` | locked_prefix_len を設定 | +| `CacheUnlocked` | `Worker::unlock()` | locked_prefix_len を 0 に | +| `RunOutcome` | `run()` / `resume()` 終了時 | interrupted フラグのみ(監査用) | +| `ConfigChanged` | `set_*` メソッド群 | config を更新 | + +## Session ラッパー + +```rust +pub struct Session { + pub worker: Worker, + store: St, + session_id: SessionId, +} +``` + +- `Session::new()` — SessionStart を書き込み +- `Session::run()` — Worker::run() の前後で history を比較、差分をログ記録 +- `Session::resume()` — 同上 +- `Session::restore()` — ログを replay して Worker を再構築 +- `Session::fork()` — 現在の history をシードにした新セッションを作成 +- `Session::fork_at()` — 任意のログ地点から分岐 + +## Store trait + +```rust +pub trait Store: Send + Sync { + fn append(&self, id: SessionId, entry: &LogEntry) -> impl Future<...> + Send; + fn read_all(&self, id: SessionId) -> impl Future<...> + Send; + fn list_sessions(&self) -> impl Future<...> + Send; + fn create_session(&self, id: SessionId, entries: &[LogEntry]) -> impl Future<...> + Send; + fn exists(&self, id: SessionId) -> impl Future<...> + Send; + fn append_trace(&self, id: SessionId, entry: &TraceEntry) -> impl Future<...> + Send; +} +``` + +初期実装は `FsStore`(ファイルシステム JSONL)。RPITIT 使用、`async_trait` 不要。 diff --git a/docs/plan/llm_presistence.md b/docs/plan/llm_presistence.md new file mode 100644 index 00000000..3e680d69 --- /dev/null +++ b/docs/plan/llm_presistence.md @@ -0,0 +1,235 @@ +# 永続化データ構造の設計・実装プラン + +## Context + +INSOMNIA の `llm-worker` クレートは現在すべてのセッション状態をインメモリで保持しており、 +プロセス終了時に会話履歴やターン状態が失われる。 +Coding Agent として Pause/Resume・Fork をまたいだセッション継続を実現するため、 +永続化レイヤーを追加する。 + +**設計方針**: Codex CLI / Claude Code と同様の **JSONL append-only ログ** 方式を採用。 +セッションログを replay することで Worker 状態を完全に復元する。 +Pause/正常終了で永続化データの構造に差異を設けない。 +理由: Worker の状態は Pause 時も正常終了時も同じ形(`history: Vec` + `turn_count` + `request_config`)であり、 +APIレスポンスのデータ構造上、両者に本質的な違いがない。 +`resume()` は「ユーザー入力を追加せず `run_turn_loop()` に再入する」だけなので、 +復元に必要なのは history の中身であり、前回の終了理由ではない。 +`RunOutcome` の `Finished`/`Paused` 区分は監査・デバッグ用メタデータであり、replay ロジックの分岐には使わない。 + +**命名規約**: +- **SessionLog / LogEntry** — 状態復元用の構造化された記録(永続化の本体) +- **EventTrace / TraceEntry** — デバッグ用の生ストリームイベント全録(オプション) + +## クレート構成 + +永続化は `llm-worker` とは別クレートに分離する。 +`llm-worker` は永続化を一切知らず、Session ラッパーが外から Worker を包む。 + +``` +crates/ + llm-worker/ ← 既存(LLM クライアント + Worker、変更なし以外は最小限) + llm-worker-macros/ ← 既存(変更なし) + llm-worker-persistence/ ← 新規クレート + Cargo.toml + src/ + lib.rs -- モジュールルート・re-exports・SessionId 型エイリアス + session_log.rs -- LogEntry enum(JSONL 1行 = 1エントリ) + event_trace.rs -- TraceEntry(デバッグ用生イベント記録) + store.rs -- Store trait(バックエンド抽象) + fs_store.rs -- JSONL ファイルシステム実装 + session.rs -- Session ラッパー + replay/restore + insomnia/ ← 将来のトップレベルアプリ + +docs/persistence.md -- 設計ドキュメント(このプランの清書版) +``` + +依存グラフ: +``` +llm-worker-persistence → llm-worker → llm-worker-macros +insomnia (将来) → llm-worker-persistence, llm-worker +``` + +### llm-worker-persistence/Cargo.toml 依存 + +```toml +[dependencies] +llm-worker = { path = "../llm-worker" } +serde = { version = "1", features = ["derive"] } +serde_json = "1" +tokio = { version = "1", features = ["fs", "io-util"] } +uuid = { version = "1", features = ["v7", "serde"] } +thiserror = "2" +``` + +## 既存コードへの変更(llm-worker 側) + +### 1. `RequestConfig` に Serialize/Deserialize 追加 +- **ファイル**: `crates/llm-worker/src/llm_client/types.rs:504` +- `#[derive(Debug, Clone, Default)]` → `#[derive(Debug, Clone, Default, Serialize, Deserialize)]` + +### 2. Worker に復元用セッター追加 +- **ファイル**: `crates/llm-worker/src/worker.rs` +- `impl Worker` ブロックに追加: + - `pub fn set_turn_count(&mut self, count: usize)` + - `pub fn set_last_run_interrupted(&mut self, interrupted: bool)` + +### 3. ワークスペース Cargo.toml にメンバー追加 +- **ファイル**: `Cargo.toml`(ワークスペースルート) +- `members` に `"crates/llm-worker-persistence"` を追加 + +## 新規コード: データ型 + +### LogEntry(session_log.rs) + +状態復元に必要な構造化記録。JSONL 1行 = 1エントリ。 + +```rust +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "kind", rename_all = "snake_case")] +pub enum LogEntry { + // セッション開始(ログ先頭、fork 時は history にシード状態を含む) + SessionStart { + ts: u64, + system_prompt: Option, + config: RequestConfig, + history: Vec, + }, + + // ユーザー入力(worker.rs:229 に対応) + UserInput { ts: u64, item: Item }, + + // アシスタント応答(worker.rs:1040-1041 に対応) + AssistantItems { ts: u64, items: Vec }, + + // ツール実行結果(worker.rs:897-900, 1072-1076 に対応) + ToolResults { ts: u64, items: Vec }, + + // Hook 注入 Items(worker.rs:1055 ContinueWithMessages に対応) + HookInjectedItems { ts: u64, items: Vec }, + + // ターン境界 + TurnEnd { ts: u64, turn_count: usize }, + + // KV キャッシュロック/アンロック + CacheLocked { ts: u64, locked_prefix_len: usize }, + CacheUnlocked { ts: u64 }, + + // run/resume の終了結果 + RunOutcome { ts: u64, outcome: Outcome, interrupted: bool }, + + // RequestConfig 変更 + ConfigChanged { ts: u64, config: RequestConfig }, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum Outcome { Finished, Paused, Error { message: String } } +``` + +**Replay ロジック**: 全エントリ種別を走査し、`*Items` / `UserInput` → history に append、 +`TurnEnd` → turn_count 更新、`CacheLocked` → locked_prefix_len 設定。 + +### TraceEntry(event_trace.rs) + +デバッグ用の生ストリームイベント全録。デフォルト OFF。 +セッションログとは別ファイル `{session_id}.trace.jsonl` に記録。 + +```rust +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TraceEntry { + pub ts: u64, + pub turn: usize, + pub event: Event, +} +``` + +replay 対象外。状態復元には使わない。 + +### SessionId + +`uuid` クレートの UUID v7 をそのまま使用。型エイリアスのみ。 + +```rust +pub type SessionId = uuid::Uuid; + +pub fn new_session_id() -> SessionId { + uuid::Uuid::now_v7() +} +``` + +UUID v7 はタイムスタンプ埋め込みで辞書順 = 時系列順。独自フォーマット不要。 + +### Store trait(store.rs) + +```rust +pub trait Store: Send + Sync { + fn append(&self, id: SessionId, entry: &LogEntry) -> impl Future> + Send; + fn read_all(&self, id: SessionId) -> impl Future, StoreError>> + Send; + fn list_sessions(&self) -> impl Future, StoreError>> + Send; + fn create_session(&self, id: SessionId, entries: &[LogEntry]) -> impl Future> + Send; + fn exists(&self, id: SessionId) -> impl Future> + Send; + + // EventTrace 用(デバッグモード時のみ使用) + fn append_trace(&self, id: SessionId, entry: &TraceEntry) -> impl Future> + Send; +} +``` + +RPITIT (Rust 1.75+) 使用。`async_trait` 不要。 + +### FsStore(fs_store.rs) + +ファイル配置: +- セッションログ: `{root}/{session_id}.jsonl` +- イベントトレース: `{root}/{session_id}.trace.jsonl` + +append モードで書き込み。SQLite インデックスなし。 + +### Session ラッパー(session.rs) + +Worker を直接変更せず、**外部ラッパー** として実装: + +```rust +pub struct Session { + pub worker: Worker, // pub で直接アクセス可能 + store: St, + session_id: SessionId, + config: SessionConfig, +} +``` + +- `Session::new()` → SessionStart を append +- `Session::run()` → Worker::run() の前後で history.len() を比較、差分をログ記録 +- `Session::resume()` → 同上 +- `Session::fork()` → 現在の history をシードにした新 SessionStart を書き込み +- `Session::fork_at(store, source_id, entry_idx)` → 任意地点から分岐 + +**復元**: `restore_session(client, store, session_id)` → read_all → replay_entries → Worker 再構築 + +### EventTrace 記録(デバッグモード) + +`SessionConfig::record_event_trace: bool`(デフォルト `false`)が `true` の場合、 +Session が Worker に `OnStreamChunk` Hook を登録。 +Hook 内で `TraceEntry` を `{session_id}.trace.jsonl` に append。 +セッションログとは完全に分離。 + +## 実装順序 + +1. `RequestConfig` に Serialize/Deserialize 追加(llm-worker 側) +2. Worker に `set_turn_count` / `set_last_run_interrupted` 追加(llm-worker 側) +3. `crates/llm-worker-persistence/` クレート作成(Cargo.toml + ワークスペース登録) +4. `session_log.rs` 作成(LogEntry + Outcome + replay_entries) +5. `event_trace.rs` 作成(TraceEntry) +6. `store.rs` 作成(Store trait + StoreError) +7. `fs_store.rs` 作成(JSONL ファイルシステム実装) +8. `session.rs` 作成(Session ラッパー + restore_session) +9. `lib.rs` 作成(re-exports・SessionId 型エイリアス・new_session_id) +10. テスト作成(replay round-trip, FsStore 読み書き, Session::run ログ記録) +11. `docs/persistence.md` 設計ドキュメント作成 + +## 検証方法 + +1. **ユニットテスト**: `replay_entries` に手動構築した LogEntry 列を渡し、復元状態を検証 +2. **統合テスト**: MockLlmClient + FsStore で Session::run → restore_session → history 一致を確認 +3. **Fork テスト**: fork → 新セッションの history が fork 時点と一致することを確認 +4. **cargo test**: 既存テストが壊れていないことを確認 +5. **cargo clippy / cargo check**: 警告なし