persistence

This commit is contained in:
Keisuke Hirata 2026-04-05 05:14:20 +09:00
parent 2d7e6bd5d6
commit 00e3ae1932
15 changed files with 1450 additions and 1 deletions

25
Cargo.lock generated
View File

@ -784,6 +784,19 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "llm-worker-persistence"
version = "0.1.0"
dependencies = [
"llm-worker",
"serde",
"serde_json",
"tempfile",
"thiserror",
"tokio",
"uuid",
]
[[package]] [[package]]
name = "log" name = "log"
version = "0.4.29" version = "0.4.29"
@ -1664,6 +1677,18 @@ version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
[[package]]
name = "uuid"
version = "1.23.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ac8b6f42ead25368cf5b098aeb3dc8a1a2c05a3eee8a9a1a68c640edbfc79d9"
dependencies = [
"getrandom 0.4.2",
"js-sys",
"serde_core",
"wasm-bindgen",
]
[[package]] [[package]]
name = "valuable" name = "valuable"
version = "0.1.1" version = "0.1.1"

View File

@ -4,6 +4,7 @@ members = [
"crates/insomnia", "crates/insomnia",
"crates/llm-worker", "crates/llm-worker",
"crates/llm-worker-macros", "crates/llm-worker-macros",
"crates/llm-worker-persistence",
] ]
[workspace.package] [workspace.package]

1
TODO.md Normal file
View File

@ -0,0 +1 @@
- [x] 永続化データ構造の制定

View File

@ -0,0 +1,18 @@
[package]
name = "llm-worker-persistence"
description = "Session persistence for llm-worker via append-only JSONL logs"
version = "0.1.0"
edition.workspace = true
license.workspace = true
[dependencies]
llm-worker = { path = "../llm-worker" }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1.49", features = ["fs", "io-util"] }
uuid = { version = "1", features = ["v7", "serde"] }
thiserror = "2.0"
[dev-dependencies]
tokio = { version = "1.49", features = ["macros", "rt-multi-thread", "fs", "io-util"] }
tempfile = "3.24"

View File

@ -0,0 +1,21 @@
//! Debug-only raw stream event recording.
//!
//! [`TraceEntry`] captures every LLM stream event verbatim for debugging
//! and replay analysis. Written to a separate `.trace.jsonl` file,
//! completely independent of the session log used for state restoration.
//!
//! Disabled by default. Enable via `SessionConfig::record_event_trace`.
use llm_worker::llm_client::event::Event;
use serde::{Deserialize, Serialize};
/// A single trace entry recording a raw stream event.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TraceEntry {
/// Timestamp in milliseconds since Unix epoch.
pub ts: u64,
/// Turn number at the time of recording.
pub turn: usize,
/// The raw stream event.
pub event: Event,
}

View File

@ -0,0 +1,132 @@
//! Filesystem-backed JSONL store.
//!
//! Layout:
//! - Session log: `{root}/{session_id}.jsonl`
//! - Event trace: `{root}/{session_id}.trace.jsonl`
use crate::event_trace::TraceEntry;
use crate::session_log::LogEntry;
use crate::store::{Store, StoreError};
use crate::SessionId;
use std::path::{Path, PathBuf};
use tokio::fs;
use tokio::io::AsyncWriteExt;
/// Filesystem-backed JSONL store.
///
/// Each session is stored as a single `.jsonl` file with one [`LogEntry`]
/// per line. Writes use append mode for crash safety.
pub struct FsStore {
root: PathBuf,
}
impl FsStore {
/// Create a new `FsStore` rooted at the given directory.
/// Creates the directory if it does not exist.
pub async fn new(root: impl Into<PathBuf>) -> Result<Self, StoreError> {
let root = root.into();
fs::create_dir_all(&root).await?;
Ok(Self { root })
}
fn log_path(&self, id: SessionId) -> PathBuf {
self.root.join(format!("{id}.jsonl"))
}
fn trace_path(&self, id: SessionId) -> PathBuf {
self.root.join(format!("{id}.trace.jsonl"))
}
async fn append_line(&self, path: &Path, line: &str) -> Result<(), StoreError> {
let mut file = fs::OpenOptions::new()
.create(true)
.append(true)
.open(path)
.await?;
file.write_all(line.as_bytes()).await?;
file.write_all(b"\n").await?;
file.flush().await?;
Ok(())
}
fn parse_jsonl<T: serde::de::DeserializeOwned>(
content: &str,
) -> Result<Vec<T>, StoreError> {
let mut entries = Vec::new();
for (i, line) in content.lines().enumerate() {
if line.trim().is_empty() {
continue;
}
let entry: T =
serde_json::from_str(line).map_err(|e| StoreError::Corrupt {
line: i + 1,
message: e.to_string(),
})?;
entries.push(entry);
}
Ok(entries)
}
}
impl Store for FsStore {
async fn append(&self, id: SessionId, entry: &LogEntry) -> Result<(), StoreError> {
let line = serde_json::to_string(entry)?;
self.append_line(&self.log_path(id), &line).await
}
async fn read_all(&self, id: SessionId) -> Result<Vec<LogEntry>, StoreError> {
let path = self.log_path(id);
if !path.exists() {
return Err(StoreError::NotFound(id));
}
let content = fs::read_to_string(&path).await?;
Self::parse_jsonl(&content)
}
async fn list_sessions(&self) -> Result<Vec<SessionId>, StoreError> {
let mut sessions = Vec::new();
let mut dir = fs::read_dir(&self.root).await?;
while let Some(entry) = dir.next_entry().await? {
let path = entry.path();
// Only match .jsonl files, not .trace.jsonl
let name = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
if name.ends_with(".jsonl") && !name.ends_with(".trace.jsonl") {
let stem = name.trim_end_matches(".jsonl");
if let Ok(id) = stem.parse::<SessionId>() {
sessions.push(id);
}
}
}
// UUID v7: lexicographic sort = chronological sort, newest first
sessions.sort_by(|a, b| b.cmp(a));
Ok(sessions)
}
async fn create_session(
&self,
id: SessionId,
entries: &[LogEntry],
) -> Result<(), StoreError> {
let path = self.log_path(id);
let mut content = String::new();
for entry in entries {
content.push_str(&serde_json::to_string(entry)?);
content.push('\n');
}
fs::write(&path, content.as_bytes()).await?;
Ok(())
}
async fn exists(&self, id: SessionId) -> Result<bool, StoreError> {
Ok(self.log_path(id).exists())
}
async fn append_trace(
&self,
id: SessionId,
entry: &TraceEntry,
) -> Result<(), StoreError> {
let line = serde_json::to_string(entry)?;
self.append_line(&self.trace_path(id), &line).await
}
}

View File

@ -0,0 +1,41 @@
//! Session persistence for `llm-worker` via append-only JSONL logs.
//!
//! # Architecture
//!
//! Sessions are recorded as a sequence of [`LogEntry`] values, one per line
//! in a `.jsonl` file. Replaying the log reconstructs the full [`Worker`]
//! state — no separate snapshots or checkpoints needed.
//!
//! Debug-mode [`TraceEntry`] records capture raw stream events in a separate
//! `.trace.jsonl` file, independent of the session log.
//!
//! # Quick start
//!
//! ```ignore
//! use llm_worker_persistence::{Session, SessionConfig, FsStore};
//!
//! let store = FsStore::new("./sessions").await?;
//! let worker = Worker::new(client);
//! let mut session = Session::new(worker, store, SessionConfig::default()).await?;
//! session.run("Hello!").await?;
//! ```
pub mod event_trace;
pub mod fs_store;
pub mod session;
pub mod session_log;
pub mod store;
pub use event_trace::TraceEntry;
pub use fs_store::FsStore;
pub use session::{Session, SessionConfig, SessionError};
pub use session_log::{LogEntry, Outcome, RestoredState, replay_entries};
pub use store::{Store, StoreError};
/// Session identifier. UUID v7 (time-ordered, lexicographically sortable).
pub type SessionId = uuid::Uuid;
/// Generate a new session ID.
pub fn new_session_id() -> SessionId {
uuid::Uuid::now_v7()
}

View File

@ -0,0 +1,347 @@
//! Persistent session wrapper around [`Worker`].
//!
//! [`Session`] intercepts `Worker` operations and appends [`LogEntry`] records
//! to a [`Store`]. It does not modify `Worker` internals — all persistence
//! happens by observing state before and after each operation.
use crate::session_log::{self, LogEntry, Outcome};
use crate::store::{Store, StoreError};
use crate::SessionId;
use llm_worker::llm_client::client::LlmClient;
use llm_worker::llm_client::types::Item;
use llm_worker::state::Mutable;
use llm_worker::{Worker, WorkerError, WorkerResult};
/// Configuration for session persistence.
#[derive(Debug, Clone)]
pub struct SessionConfig {
/// Record raw stream events to a separate trace file.
/// Default: `false`.
pub record_event_trace: bool,
}
impl Default for SessionConfig {
fn default() -> Self {
Self {
record_event_trace: false,
}
}
}
/// Errors from session operations.
#[derive(Debug, thiserror::Error)]
pub enum SessionError {
#[error(transparent)]
Worker(#[from] WorkerError),
#[error(transparent)]
Store(#[from] StoreError),
}
/// Persistent session wrapping a [`Worker`].
///
/// The `worker` field is public for direct access to Worker APIs
/// (tool registration, hook setup, subscriber management, etc.).
/// State-mutating operations (`run`, `resume`) should go through
/// Session methods to ensure proper logging.
pub struct Session<C: LlmClient, St: Store> {
pub worker: Worker<C, Mutable>,
store: St,
session_id: SessionId,
_config: SessionConfig,
}
impl<C: LlmClient, St: Store> Session<C, St> {
/// Create a new session, writing the initial `SessionStart` entry.
pub async fn new(
worker: Worker<C, Mutable>,
store: St,
config: SessionConfig,
) -> Result<Self, StoreError> {
let session_id = crate::new_session_id();
let start = LogEntry::SessionStart {
ts: session_log::now_millis(),
system_prompt: worker.get_system_prompt().map(String::from),
config: worker.request_config().clone(),
history: worker.history().to_vec(),
};
store.append(session_id, &start).await?;
Ok(Self {
worker,
store,
session_id,
_config: config,
})
}
/// Restore a session from a stored log.
///
/// Reads all log entries, replays them to reconstruct state,
/// and returns a `Session` ready for `resume()`.
pub async fn restore(
client: C,
store: St,
session_id: SessionId,
config: SessionConfig,
) -> Result<Self, SessionError> {
let entries = store.read_all(session_id).await?;
let state = session_log::replay_entries(&entries);
let mut worker = Worker::new(client);
if let Some(ref prompt) = state.system_prompt {
worker.set_system_prompt(prompt);
}
worker.set_history(state.history);
worker.set_request_config(state.config);
worker.set_turn_count(state.turn_count);
worker.set_last_run_interrupted(state.last_run_interrupted);
Ok(Self {
worker,
store,
session_id,
_config: config,
})
}
/// The session ID.
pub fn session_id(&self) -> SessionId {
self.session_id
}
/// Reference to the underlying store.
pub fn store(&self) -> &St {
&self.store
}
/// Run a user turn, logging all state changes.
pub async fn run(
&mut self,
user_input: impl Into<String>,
) -> Result<WorkerResult, SessionError> {
let input = user_input.into();
let user_item = Item::user_message(&input);
let history_before = self.worker.history().len();
let result = self.worker.run(input).await;
self.log_history_delta(history_before, Some(&user_item))
.await?;
self.log_turn_end().await?;
self.log_outcome(&result).await?;
result.map_err(SessionError::Worker)
}
/// Resume from a paused state, logging all state changes.
pub async fn resume(&mut self) -> Result<WorkerResult, SessionError> {
let history_before = self.worker.history().len();
let result = self.worker.resume().await;
self.log_history_delta(history_before, None).await?;
self.log_turn_end().await?;
self.log_outcome(&result).await?;
result.map_err(SessionError::Worker)
}
/// Fork this session at its current state.
/// Returns the new session ID. The new log contains a `SessionStart`
/// seeded with the current history.
pub async fn fork(&self) -> Result<SessionId, StoreError> {
let fork_id = crate::new_session_id();
let start = LogEntry::SessionStart {
ts: session_log::now_millis(),
system_prompt: self.worker.get_system_prompt().map(String::from),
config: self.worker.request_config().clone(),
history: self.worker.history().to_vec(),
};
self.store.create_session(fork_id, &[start]).await?;
Ok(fork_id)
}
/// Fork from an arbitrary point in a stored session's log.
/// Replays entries up to `up_to_entry` and creates a new session
/// with that reconstructed state.
pub async fn fork_at(
store: &St,
source_id: SessionId,
up_to_entry: usize,
) -> Result<SessionId, StoreError> {
let entries = store.read_all(source_id).await?;
let truncated = &entries[..up_to_entry.min(entries.len())];
let state = session_log::replay_entries(truncated);
let fork_id = crate::new_session_id();
let start = LogEntry::SessionStart {
ts: session_log::now_millis(),
system_prompt: state.system_prompt,
config: state.config,
history: state.history,
};
store.create_session(fork_id, &[start]).await?;
Ok(fork_id)
}
/// Log a `CacheLocked` entry.
pub async fn log_cache_locked(
&self,
locked_prefix_len: usize,
) -> Result<(), StoreError> {
self.store
.append(
self.session_id,
&LogEntry::CacheLocked {
ts: session_log::now_millis(),
locked_prefix_len,
},
)
.await
}
/// Log a `CacheUnlocked` entry.
pub async fn log_cache_unlocked(&self) -> Result<(), StoreError> {
self.store
.append(
self.session_id,
&LogEntry::CacheUnlocked {
ts: session_log::now_millis(),
},
)
.await
}
/// Log a `ConfigChanged` entry.
pub async fn log_config_changed(&self) -> Result<(), StoreError> {
self.store
.append(
self.session_id,
&LogEntry::ConfigChanged {
ts: session_log::now_millis(),
config: self.worker.request_config().clone(),
},
)
.await
}
// ── Private helpers ──────────────────────────────────────────────────
async fn log_history_delta(
&self,
before_len: usize,
user_item: Option<&Item>,
) -> Result<(), StoreError> {
let history = self.worker.history();
if history.len() <= before_len {
return Ok(());
}
let ts = session_log::now_millis();
let new_items = &history[before_len..];
let mut i = 0;
// If we have a user_item, the first new item should be the user input
if let Some(item) = user_item {
self.store
.append(
self.session_id,
&LogEntry::UserInput {
ts,
item: item.clone(),
},
)
.await?;
i = 1;
}
// Classify and group remaining items
while i < new_items.len() {
let item = &new_items[i];
if item.is_tool_result() {
let start = i;
while i < new_items.len() && new_items[i].is_tool_result() {
i += 1;
}
self.store
.append(
self.session_id,
&LogEntry::ToolResults {
ts,
items: new_items[start..i].to_vec(),
},
)
.await?;
} else if item.is_assistant_message()
|| item.is_tool_call()
|| item.is_reasoning()
{
let start = i;
while i < new_items.len()
&& (new_items[i].is_assistant_message()
|| new_items[i].is_tool_call()
|| new_items[i].is_reasoning())
{
i += 1;
}
self.store
.append(
self.session_id,
&LogEntry::AssistantItems {
ts,
items: new_items[start..i].to_vec(),
},
)
.await?;
} else {
self.store
.append(
self.session_id,
&LogEntry::HookInjectedItems {
ts,
items: vec![new_items[i].clone()],
},
)
.await?;
i += 1;
}
}
Ok(())
}
async fn log_turn_end(&self) -> Result<(), StoreError> {
self.store
.append(
self.session_id,
&LogEntry::TurnEnd {
ts: session_log::now_millis(),
turn_count: self.worker.turn_count(),
},
)
.await
}
async fn log_outcome(
&self,
result: &Result<WorkerResult, WorkerError>,
) -> Result<(), StoreError> {
let outcome = match result {
Ok(WorkerResult::Finished) => Outcome::Finished,
Ok(WorkerResult::Paused) => Outcome::Paused,
Err(e) => Outcome::Error {
message: e.to_string(),
},
};
self.store
.append(
self.session_id,
&LogEntry::RunOutcome {
ts: session_log::now_millis(),
outcome,
interrupted: self.worker.last_run_interrupted(),
},
)
.await
}
}

View File

@ -0,0 +1,285 @@
//! Session log types for append-only JSONL persistence.
//!
//! Each [`LogEntry`] represents a single state transition in a session,
//! serialized as one line in a `.jsonl` file. Replaying the sequence of
//! entries reconstructs the full [`Worker`] state.
use llm_worker::llm_client::types::{Item, RequestConfig};
use serde::{Deserialize, Serialize};
/// A single session log entry, serialized as one JSONL line.
///
/// Variants correspond to specific mutation points in `Worker`:
/// - `SessionStart` — always the first entry; captures initial state
/// - `UserInput` / `AssistantItems` / `ToolResults` / `HookInjectedItems` — history appends
/// - `TurnEnd` — turn boundary marker
/// - `CacheLocked` / `CacheUnlocked` — KV cache state transitions
/// - `RunOutcome` — marks end of a `run()` or `resume()` call
/// - `ConfigChanged` — `RequestConfig` mutation
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum LogEntry {
/// Session start. Always the first entry in a log.
/// For forked sessions, `history` contains the seed state from the parent.
SessionStart {
ts: u64,
system_prompt: Option<String>,
config: RequestConfig,
history: Vec<Item>,
},
/// User input pushed to history (worker.rs:229).
UserInput { ts: u64, item: Item },
/// Assistant response items added to history (worker.rs:1040-1041).
AssistantItems { ts: u64, items: Vec<Item> },
/// Tool execution results added to history (worker.rs:897-900, 1072-1076).
ToolResults { ts: u64, items: Vec<Item> },
/// Items injected by `on_turn_end` hook via `ContinueWithMessages` (worker.rs:1055).
HookInjectedItems { ts: u64, items: Vec<Item> },
/// Turn boundary. Records the turn count after increment.
TurnEnd { ts: u64, turn_count: usize },
/// KV cache locked. Records the history prefix length that is now immutable.
CacheLocked { ts: u64, locked_prefix_len: usize },
/// KV cache unlocked.
CacheUnlocked { ts: u64 },
/// Outcome of a `run()` or `resume()` call.
/// This is metadata for auditing; replay logic does not branch on the outcome.
RunOutcome {
ts: u64,
outcome: Outcome,
interrupted: bool,
},
/// `RequestConfig` changed.
ConfigChanged { ts: u64, config: RequestConfig },
}
/// Outcome of a run/resume call. Used for auditing, not for replay branching.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum Outcome {
Finished,
Paused,
Error { message: String },
}
/// State reconstructed by replaying log entries.
#[derive(Debug, Clone)]
pub struct RestoredState {
pub system_prompt: Option<String>,
pub config: RequestConfig,
pub history: Vec<Item>,
pub turn_count: usize,
pub locked_prefix_len: usize,
pub last_run_interrupted: bool,
}
/// Replay a sequence of log entries to reconstruct worker state.
pub fn replay_entries(entries: &[LogEntry]) -> RestoredState {
let mut state = RestoredState {
system_prompt: None,
config: RequestConfig::default(),
history: Vec::new(),
turn_count: 0,
locked_prefix_len: 0,
last_run_interrupted: false,
};
for entry in entries {
match entry {
LogEntry::SessionStart {
system_prompt,
config,
history,
..
} => {
state.system_prompt = system_prompt.clone();
state.config = config.clone();
state.history = history.clone();
}
LogEntry::UserInput { item, .. } => {
state.history.push(item.clone());
}
LogEntry::AssistantItems { items, .. } => {
state.history.extend(items.iter().cloned());
}
LogEntry::ToolResults { items, .. } => {
state.history.extend(items.iter().cloned());
}
LogEntry::HookInjectedItems { items, .. } => {
state.history.extend(items.iter().cloned());
}
LogEntry::TurnEnd { turn_count, .. } => {
state.turn_count = *turn_count;
}
LogEntry::CacheLocked {
locked_prefix_len, ..
} => {
state.locked_prefix_len = *locked_prefix_len;
}
LogEntry::CacheUnlocked { .. } => {
state.locked_prefix_len = 0;
}
LogEntry::RunOutcome { interrupted, .. } => {
state.last_run_interrupted = *interrupted;
}
LogEntry::ConfigChanged { config, .. } => {
state.config = config.clone();
}
}
}
state
}
/// Get the current timestamp in milliseconds since Unix epoch.
pub fn now_millis() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("system clock before Unix epoch")
.as_millis() as u64
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn replay_empty() {
let state = replay_entries(&[]);
assert!(state.history.is_empty());
assert_eq!(state.turn_count, 0);
assert_eq!(state.locked_prefix_len, 0);
}
#[test]
fn replay_session_start_sets_initial_state() {
let entries = vec![LogEntry::SessionStart {
ts: 1000,
system_prompt: Some("You are helpful.".into()),
config: RequestConfig::default().with_max_tokens(1024),
history: vec![Item::user_message("seed")],
}];
let state = replay_entries(&entries);
assert_eq!(state.system_prompt.as_deref(), Some("You are helpful."));
assert_eq!(state.config.max_tokens, Some(1024));
assert_eq!(state.history.len(), 1);
}
#[test]
fn replay_full_turn() {
let entries = vec![
LogEntry::SessionStart {
ts: 1000,
system_prompt: None,
config: RequestConfig::default(),
history: vec![],
},
LogEntry::UserInput {
ts: 2000,
item: Item::user_message("Hello"),
},
LogEntry::AssistantItems {
ts: 3000,
items: vec![Item::assistant_message("Hi!")],
},
LogEntry::TurnEnd {
ts: 3100,
turn_count: 1,
},
LogEntry::RunOutcome {
ts: 3200,
outcome: Outcome::Finished,
interrupted: false,
},
];
let state = replay_entries(&entries);
assert_eq!(state.history.len(), 2);
assert_eq!(state.turn_count, 1);
assert!(!state.last_run_interrupted);
}
#[test]
fn replay_with_tool_calls() {
let entries = vec![
LogEntry::SessionStart {
ts: 1000,
system_prompt: None,
config: RequestConfig::default(),
history: vec![],
},
LogEntry::UserInput {
ts: 2000,
item: Item::user_message("Check weather"),
},
LogEntry::AssistantItems {
ts: 3000,
items: vec![Item::tool_call("call_1", "get_weather", r#"{"city":"Tokyo"}"#)],
},
LogEntry::ToolResults {
ts: 3500,
items: vec![Item::tool_result("call_1", "Sunny, 25C")],
},
LogEntry::AssistantItems {
ts: 4000,
items: vec![Item::assistant_message("It's sunny in Tokyo!")],
},
LogEntry::TurnEnd {
ts: 4100,
turn_count: 1,
},
];
let state = replay_entries(&entries);
assert_eq!(state.history.len(), 4);
assert!(state.history[1].is_tool_call());
assert!(state.history[2].is_tool_result());
}
#[test]
fn replay_cache_lock_unlock() {
let entries = vec![
LogEntry::SessionStart {
ts: 1000,
system_prompt: None,
config: RequestConfig::default(),
history: vec![Item::user_message("a"), Item::assistant_message("b")],
},
LogEntry::CacheLocked {
ts: 2000,
locked_prefix_len: 2,
},
LogEntry::CacheUnlocked { ts: 3000 },
];
let state = replay_entries(&entries);
assert_eq!(state.locked_prefix_len, 0);
// Check locked state before unlock
let state_locked = replay_entries(&entries[..2]);
assert_eq!(state_locked.locked_prefix_len, 2);
}
#[test]
fn replay_config_changed() {
let entries = vec![
LogEntry::SessionStart {
ts: 1000,
system_prompt: None,
config: RequestConfig::default(),
history: vec![],
},
LogEntry::ConfigChanged {
ts: 2000,
config: RequestConfig::default().with_temperature(0.5),
},
];
let state = replay_entries(&entries);
assert_eq!(state.config.temperature, Some(0.5));
}
}

View File

@ -0,0 +1,68 @@
//! Persistence backend abstraction.
//!
//! [`Store`] defines the async interface for reading and writing session logs.
//! Implementations handle the physical storage (filesystem, database, etc.).
use crate::event_trace::TraceEntry;
use crate::session_log::LogEntry;
use crate::SessionId;
use std::future::Future;
/// Errors from the persistence store.
#[derive(Debug, thiserror::Error)]
pub enum StoreError {
#[error("I/O error: {0}")]
Io(#[from] std::io::Error),
#[error("serialization error: {0}")]
Serde(#[from] serde_json::Error),
#[error("session not found: {0}")]
NotFound(SessionId),
#[error("log corrupted at line {line}: {message}")]
Corrupt { line: usize, message: String },
}
/// Async persistence backend for session logs.
///
/// All methods take `&self` — implementations should use interior mutability
/// (e.g., append-mode file handles) when needed.
pub trait Store: Send + Sync {
/// Append a single log entry to the session.
fn append(
&self,
id: SessionId,
entry: &LogEntry,
) -> impl Future<Output = Result<(), StoreError>> + Send;
/// Read all log entries for a session, in order.
fn read_all(
&self,
id: SessionId,
) -> impl Future<Output = Result<Vec<LogEntry>, StoreError>> + Send;
/// List all session IDs, most recent first.
fn list_sessions(&self)
-> impl Future<Output = Result<Vec<SessionId>, StoreError>> + Send;
/// Create a new session with initial entries.
fn create_session(
&self,
id: SessionId,
entries: &[LogEntry],
) -> impl Future<Output = Result<(), StoreError>> + Send;
/// Check if a session exists.
fn exists(
&self,
id: SessionId,
) -> impl Future<Output = Result<bool, StoreError>> + Send;
/// Append a trace entry to the debug event trace file.
fn append_trace(
&self,
id: SessionId,
entry: &TraceEntry,
) -> impl Future<Output = Result<(), StoreError>> + Send;
}

View File

@ -0,0 +1,176 @@
use llm_worker::llm_client::types::{Item, RequestConfig};
use llm_worker_persistence::{
FsStore, LogEntry, Outcome, Store, TraceEntry, new_session_id, replay_entries,
};
#[tokio::test]
async fn round_trip_write_and_read() {
let dir = tempfile::tempdir().unwrap();
let store = FsStore::new(dir.path()).await.unwrap();
let id = new_session_id();
let entries = vec![
LogEntry::SessionStart {
ts: 1000,
system_prompt: Some("You are helpful.".into()),
config: RequestConfig::default().with_max_tokens(1024),
history: vec![],
},
LogEntry::UserInput {
ts: 2000,
item: Item::user_message("Hello"),
},
LogEntry::AssistantItems {
ts: 3000,
items: vec![Item::assistant_message("Hi there!")],
},
LogEntry::TurnEnd {
ts: 3100,
turn_count: 1,
},
LogEntry::RunOutcome {
ts: 3200,
outcome: Outcome::Finished,
interrupted: false,
},
];
// Write entries one by one
for entry in &entries {
store.append(id, entry).await.unwrap();
}
// Read back
let read_back = store.read_all(id).await.unwrap();
assert_eq!(read_back.len(), entries.len());
// Replay and verify state
let state = replay_entries(&read_back);
assert_eq!(state.system_prompt.as_deref(), Some("You are helpful."));
assert_eq!(state.config.max_tokens, Some(1024));
assert_eq!(state.history.len(), 2);
assert_eq!(state.turn_count, 1);
assert!(!state.last_run_interrupted);
}
#[tokio::test]
async fn create_session_writes_all_entries() {
let dir = tempfile::tempdir().unwrap();
let store = FsStore::new(dir.path()).await.unwrap();
let id = new_session_id();
let entries = vec![
LogEntry::SessionStart {
ts: 1000,
system_prompt: None,
config: RequestConfig::default(),
history: vec![Item::user_message("seed"), Item::assistant_message("ok")],
},
];
store.create_session(id, &entries).await.unwrap();
let read_back = store.read_all(id).await.unwrap();
assert_eq!(read_back.len(), 1);
let state = replay_entries(&read_back);
assert_eq!(state.history.len(), 2);
}
#[tokio::test]
async fn list_sessions_returns_newest_first() {
let dir = tempfile::tempdir().unwrap();
let store = FsStore::new(dir.path()).await.unwrap();
let id1 = new_session_id();
// Small delay to ensure different UUID v7 timestamps
tokio::time::sleep(std::time::Duration::from_millis(2)).await;
let id2 = new_session_id();
let start = LogEntry::SessionStart {
ts: 1000,
system_prompt: None,
config: RequestConfig::default(),
history: vec![],
};
store.append(id1, &start).await.unwrap();
store.append(id2, &start).await.unwrap();
let sessions = store.list_sessions().await.unwrap();
assert_eq!(sessions.len(), 2);
assert_eq!(sessions[0], id2); // newest first
assert_eq!(sessions[1], id1);
}
#[tokio::test]
async fn exists_returns_correct_state() {
let dir = tempfile::tempdir().unwrap();
let store = FsStore::new(dir.path()).await.unwrap();
let id = new_session_id();
assert!(!store.exists(id).await.unwrap());
store
.append(
id,
&LogEntry::SessionStart {
ts: 1000,
system_prompt: None,
config: RequestConfig::default(),
history: vec![],
},
)
.await
.unwrap();
assert!(store.exists(id).await.unwrap());
}
#[tokio::test]
async fn not_found_error_for_missing_session() {
let dir = tempfile::tempdir().unwrap();
let store = FsStore::new(dir.path()).await.unwrap();
let id = new_session_id();
let result = store.read_all(id).await;
assert!(result.is_err());
}
#[tokio::test]
async fn trace_entries_in_separate_file() {
let dir = tempfile::tempdir().unwrap();
let store = FsStore::new(dir.path()).await.unwrap();
let id = new_session_id();
// Write a log entry
store
.append(
id,
&LogEntry::SessionStart {
ts: 1000,
system_prompt: None,
config: RequestConfig::default(),
history: vec![],
},
)
.await
.unwrap();
// Write a trace entry
let trace = TraceEntry {
ts: 1500,
turn: 0,
event: llm_worker::llm_client::event::Event::Ping(
llm_worker::llm_client::event::PingEvent { timestamp: None },
),
};
store.append_trace(id, &trace).await.unwrap();
// Log should have 1 entry, unaffected by trace
let log = store.read_all(id).await.unwrap();
assert_eq!(log.len(), 1);
// Trace file should exist separately
let trace_path = dir.path().join(format!("{id}.trace.jsonl"));
assert!(trace_path.exists());
}

View File

@ -501,7 +501,7 @@ impl ToolDefinition {
// ============================================================================ // ============================================================================
/// Request configuration /// Request configuration
#[derive(Debug, Clone, Default)] #[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct RequestConfig { pub struct RequestConfig {
/// Maximum tokens to generate /// Maximum tokens to generate
pub max_tokens: Option<u32>, pub max_tokens: Option<u32>,

View File

@ -1308,6 +1308,16 @@ impl<C: LlmClient> Worker<C, Mutable> {
self.history.clear(); self.history.clear();
} }
/// Set the turn count (for session restoration)
pub fn set_turn_count(&mut self, count: usize) {
self.turn_count = count;
}
/// Set the last_run_interrupted flag (for session restoration)
pub fn set_last_run_interrupted(&mut self, interrupted: bool) {
self.last_run_interrupted = interrupted;
}
/// Apply configuration (reserved for future extensions) /// Apply configuration (reserved for future extensions)
#[allow(dead_code)] #[allow(dead_code)]
pub fn config(self, _config: WorkerConfig) -> Self { pub fn config(self, _config: WorkerConfig) -> Self {

89
docs/persistence.md Normal file
View File

@ -0,0 +1,89 @@
# 永続化設計
## 概要
`llm-worker-persistence` クレートは、`llm-worker` の `Worker` セッション状態を
JSONL append-only ログとして永続化する。ログを replay することで Worker 状態を完全に復元する。
## 設計方針
- **JSONL append-only ログ**: 1セッション = 1つの `.jsonl` ファイル。書き込みは末尾追記のみ。
- **Pause/正常終了で構造に差異なし**: Worker の状態は Pause 時も正常終了時も同じ形
(`history: Vec<Item>` + `turn_count` + `request_config`)。
`resume()` は「ユーザー入力を追加せず `run_turn_loop()` に再入する」だけなので、
復元に必要なのは history の中身であり、前回の終了理由ではない。
`RunOutcome``Finished`/`Paused` 区分は監査用メタデータであり、replay の分岐には使わない。
- **クレート分離**: `llm-worker` は永続化を知らない。`Session` ラッパーが外から Worker を包む。
## 命名規約
| 名前 | 用途 |
|---|---|
| **SessionLog / LogEntry** | 状態復元用の構造化された記録(永続化の本体) |
| **EventTrace / TraceEntry** | デバッグ用の生ストリームイベント全録(オプション、デフォルト OFF |
## クレート構成
```
llm-worker-persistence → llm-worker → llm-worker-macros
```
`llm-worker-persistence``llm-worker` に依存するが、逆方向の依存はない。
## ファイル配置
```
{root}/{session_id}.jsonl -- セッションログ
{root}/{session_id}.trace.jsonl -- イベントトレース(デバッグ時のみ)
```
`SessionId` は UUID v7`uuid` クレート)。タイムスタンプ埋め込みで辞書順 = 時系列順。
## LogEntry
各エントリは Worker の特定の状態変更に対応する:
| エントリ | Worker 上の対応箇所 | replay での効果 |
|---|---|---|
| `SessionStart` | セッション開始 / fork | system_prompt, config, history を初期化 |
| `UserInput` | `worker.rs:229` | history に追加 |
| `AssistantItems` | `worker.rs:1040-1041` | history に追加 |
| `ToolResults` | `worker.rs:897-900, 1072-1076` | history に追加 |
| `HookInjectedItems` | `worker.rs:1055` | history に追加 |
| `TurnEnd` | `worker.rs:1033` | turn_count を更新 |
| `CacheLocked` | `Worker::lock()` | locked_prefix_len を設定 |
| `CacheUnlocked` | `Worker::unlock()` | locked_prefix_len を 0 に |
| `RunOutcome` | `run()` / `resume()` 終了時 | interrupted フラグのみ(監査用) |
| `ConfigChanged` | `set_*` メソッド群 | config を更新 |
## Session ラッパー
```rust
pub struct Session<C: LlmClient, St: Store> {
pub worker: Worker<C, Mutable>,
store: St,
session_id: SessionId,
}
```
- `Session::new()` — SessionStart を書き込み
- `Session::run()` — Worker::run() の前後で history を比較、差分をログ記録
- `Session::resume()` — 同上
- `Session::restore()` — ログを replay して Worker を再構築
- `Session::fork()` — 現在の history をシードにした新セッションを作成
- `Session::fork_at()` — 任意のログ地点から分岐
## Store trait
```rust
pub trait Store: Send + Sync {
fn append(&self, id: SessionId, entry: &LogEntry) -> impl Future<...> + Send;
fn read_all(&self, id: SessionId) -> impl Future<...> + Send;
fn list_sessions(&self) -> impl Future<...> + Send;
fn create_session(&self, id: SessionId, entries: &[LogEntry]) -> impl Future<...> + Send;
fn exists(&self, id: SessionId) -> impl Future<...> + Send;
fn append_trace(&self, id: SessionId, entry: &TraceEntry) -> impl Future<...> + Send;
}
```
初期実装は `FsStore`(ファイルシステム JSONL。RPITIT 使用、`async_trait` 不要。

View File

@ -0,0 +1,235 @@
# 永続化データ構造の設計・実装プラン
## Context
INSOMNIA の `llm-worker` クレートは現在すべてのセッション状態をインメモリで保持しており、
プロセス終了時に会話履歴やターン状態が失われる。
Coding Agent として Pause/Resume・Fork をまたいだセッション継続を実現するため、
永続化レイヤーを追加する。
**設計方針**: Codex CLI / Claude Code と同様の **JSONL append-only ログ** 方式を採用。
セッションログを replay することで Worker 状態を完全に復元する。
Pause/正常終了で永続化データの構造に差異を設けない。
理由: Worker の状態は Pause 時も正常終了時も同じ形(`history: Vec<Item>` + `turn_count` + `request_config`)であり、
APIレスポンスのデータ構造上、両者に本質的な違いがない。
`resume()` は「ユーザー入力を追加せず `run_turn_loop()` に再入する」だけなので、
復元に必要なのは history の中身であり、前回の終了理由ではない。
`RunOutcome``Finished`/`Paused` 区分は監査・デバッグ用メタデータであり、replay ロジックの分岐には使わない。
**命名規約**:
- **SessionLog / LogEntry** — 状態復元用の構造化された記録(永続化の本体)
- **EventTrace / TraceEntry** — デバッグ用の生ストリームイベント全録(オプション)
## クレート構成
永続化は `llm-worker` とは別クレートに分離する。
`llm-worker` は永続化を一切知らず、Session ラッパーが外から Worker を包む。
```
crates/
llm-worker/ ← 既存LLM クライアント + Worker、変更なし以外は最小限
llm-worker-macros/ ← 既存(変更なし)
llm-worker-persistence/ ← 新規クレート
Cargo.toml
src/
lib.rs -- モジュールルート・re-exports・SessionId 型エイリアス
session_log.rs -- LogEntry enumJSONL 1行 = 1エントリ
event_trace.rs -- TraceEntryデバッグ用生イベント記録
store.rs -- Store traitバックエンド抽象
fs_store.rs -- JSONL ファイルシステム実装
session.rs -- Session<C, St> ラッパー + replay/restore
insomnia/ ← 将来のトップレベルアプリ
docs/persistence.md -- 設計ドキュメント(このプランの清書版)
```
依存グラフ:
```
llm-worker-persistence → llm-worker → llm-worker-macros
insomnia (将来) → llm-worker-persistence, llm-worker
```
### llm-worker-persistence/Cargo.toml 依存
```toml
[dependencies]
llm-worker = { path = "../llm-worker" }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
tokio = { version = "1", features = ["fs", "io-util"] }
uuid = { version = "1", features = ["v7", "serde"] }
thiserror = "2"
```
## 既存コードへの変更llm-worker 側)
### 1. `RequestConfig` に Serialize/Deserialize 追加
- **ファイル**: `crates/llm-worker/src/llm_client/types.rs:504`
- `#[derive(Debug, Clone, Default)]``#[derive(Debug, Clone, Default, Serialize, Deserialize)]`
### 2. Worker に復元用セッター追加
- **ファイル**: `crates/llm-worker/src/worker.rs`
- `impl<C: LlmClient> Worker<C, Mutable>` ブロックに追加:
- `pub fn set_turn_count(&mut self, count: usize)`
- `pub fn set_last_run_interrupted(&mut self, interrupted: bool)`
### 3. ワークスペース Cargo.toml にメンバー追加
- **ファイル**: `Cargo.toml`(ワークスペースルート)
- `members``"crates/llm-worker-persistence"` を追加
## 新規コード: データ型
### LogEntrysession_log.rs
状態復元に必要な構造化記録。JSONL 1行 = 1エントリ。
```rust
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum LogEntry {
// セッション開始ログ先頭、fork 時は history にシード状態を含む)
SessionStart {
ts: u64,
system_prompt: Option<String>,
config: RequestConfig,
history: Vec<Item>,
},
// ユーザー入力worker.rs:229 に対応)
UserInput { ts: u64, item: Item },
// アシスタント応答worker.rs:1040-1041 に対応)
AssistantItems { ts: u64, items: Vec<Item> },
// ツール実行結果worker.rs:897-900, 1072-1076 に対応)
ToolResults { ts: u64, items: Vec<Item> },
// Hook 注入 Itemsworker.rs:1055 ContinueWithMessages に対応)
HookInjectedItems { ts: u64, items: Vec<Item> },
// ターン境界
TurnEnd { ts: u64, turn_count: usize },
// KV キャッシュロック/アンロック
CacheLocked { ts: u64, locked_prefix_len: usize },
CacheUnlocked { ts: u64 },
// run/resume の終了結果
RunOutcome { ts: u64, outcome: Outcome, interrupted: bool },
// RequestConfig 変更
ConfigChanged { ts: u64, config: RequestConfig },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum Outcome { Finished, Paused, Error { message: String } }
```
**Replay ロジック**: 全エントリ種別を走査し、`*Items` / `UserInput` → history に append、
`TurnEnd` → turn_count 更新、`CacheLocked` → locked_prefix_len 設定。
### TraceEntryevent_trace.rs
デバッグ用の生ストリームイベント全録。デフォルト OFF。
セッションログとは別ファイル `{session_id}.trace.jsonl` に記録。
```rust
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TraceEntry {
pub ts: u64,
pub turn: usize,
pub event: Event,
}
```
replay 対象外。状態復元には使わない。
### SessionId
`uuid` クレートの UUID v7 をそのまま使用。型エイリアスのみ。
```rust
pub type SessionId = uuid::Uuid;
pub fn new_session_id() -> SessionId {
uuid::Uuid::now_v7()
}
```
UUID v7 はタイムスタンプ埋め込みで辞書順 = 時系列順。独自フォーマット不要。
### Store traitstore.rs
```rust
pub trait Store: Send + Sync {
fn append(&self, id: SessionId, entry: &LogEntry) -> impl Future<Output = Result<(), StoreError>> + Send;
fn read_all(&self, id: SessionId) -> impl Future<Output = Result<Vec<LogEntry>, StoreError>> + Send;
fn list_sessions(&self) -> impl Future<Output = Result<Vec<SessionId>, StoreError>> + Send;
fn create_session(&self, id: SessionId, entries: &[LogEntry]) -> impl Future<Output = Result<(), StoreError>> + Send;
fn exists(&self, id: SessionId) -> impl Future<Output = Result<bool, StoreError>> + Send;
// EventTrace 用(デバッグモード時のみ使用)
fn append_trace(&self, id: SessionId, entry: &TraceEntry) -> impl Future<Output = Result<(), StoreError>> + Send;
}
```
RPITIT (Rust 1.75+) 使用。`async_trait` 不要。
### FsStorefs_store.rs
ファイル配置:
- セッションログ: `{root}/{session_id}.jsonl`
- イベントトレース: `{root}/{session_id}.trace.jsonl`
append モードで書き込み。SQLite インデックスなし。
### Session ラッパーsession.rs
Worker を直接変更せず、**外部ラッパー** として実装:
```rust
pub struct Session<C: LlmClient, St: Store> {
pub worker: Worker<C, Mutable>, // pub で直接アクセス可能
store: St,
session_id: SessionId,
config: SessionConfig,
}
```
- `Session::new()` → SessionStart を append
- `Session::run()` → Worker::run() の前後で history.len() を比較、差分をログ記録
- `Session::resume()` → 同上
- `Session::fork()` → 現在の history をシードにした新 SessionStart を書き込み
- `Session::fork_at(store, source_id, entry_idx)` → 任意地点から分岐
**復元**: `restore_session(client, store, session_id)` → read_all → replay_entries → Worker 再構築
### EventTrace 記録(デバッグモード)
`SessionConfig::record_event_trace: bool`(デフォルト `false`)が `true` の場合、
Session が Worker に `OnStreamChunk` Hook を登録。
Hook 内で `TraceEntry``{session_id}.trace.jsonl` に append。
セッションログとは完全に分離。
## 実装順序
1. `RequestConfig` に Serialize/Deserialize 追加llm-worker 側)
2. Worker に `set_turn_count` / `set_last_run_interrupted` 追加llm-worker 側)
3. `crates/llm-worker-persistence/` クレート作成Cargo.toml + ワークスペース登録)
4. `session_log.rs` 作成LogEntry + Outcome + replay_entries
5. `event_trace.rs` 作成TraceEntry
6. `store.rs` 作成Store trait + StoreError
7. `fs_store.rs` 作成JSONL ファイルシステム実装)
8. `session.rs` 作成Session ラッパー + restore_session
9. `lib.rs` 作成re-exports・SessionId 型エイリアス・new_session_id
10. テスト作成replay round-trip, FsStore 読み書き, Session::run ログ記録)
11. `docs/persistence.md` 設計ドキュメント作成
## 検証方法
1. **ユニットテスト**: `replay_entries` に手動構築した LogEntry 列を渡し、復元状態を検証
2. **統合テスト**: MockLlmClient + FsStore で Session::run → restore_session → history 一致を確認
3. **Fork テスト**: fork → 新セッションの history が fork 時点と一致することを確認
4. **cargo test**: 既存テストが壊れていないことを確認
5. **cargo clippy / cargo check**: 警告なし