session-storeとして分離

This commit is contained in:
Keisuke Hirata 2026-04-12 06:31:34 +09:00
parent eb670bfba5
commit cdafd5d914
25 changed files with 910 additions and 657 deletions

236
Cargo.lock generated
View File

@ -214,6 +214,20 @@ dependencies = [
"static_assertions",
]
[[package]]
name = "compact_str"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3fdb1325a1cece981e8a296ab8f0f9b63ae357bd0784a9faaf548cc7b480707a"
dependencies = [
"castaway",
"cfg-if",
"itoa",
"rustversion",
"ryu",
"static_assertions",
]
[[package]]
name = "const-oid"
version = "0.10.2"
@ -322,6 +336,15 @@ dependencies = [
"syn",
]
[[package]]
name = "deranged"
version = "0.5.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7cd812cc2bc1d69d4764bd80df88b4317eaef9e773c75226407d9bc0876b211c"
dependencies = [
"powerfmt",
]
[[package]]
name = "digest"
version = "0.11.2"
@ -413,6 +436,12 @@ version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2"
[[package]]
name = "foldhash"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77ce24cb58228fbb8aa041425bb1050850ac19177686ea6e0f41a70416f56fdb"
[[package]]
name = "foreign-types"
version = "0.3.2"
@ -582,7 +611,7 @@ checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1"
dependencies = [
"allocator-api2",
"equivalent",
"foldhash",
"foldhash 0.1.5",
]
[[package]]
@ -590,6 +619,11 @@ name = "hashbrown"
version = "0.16.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100"
dependencies = [
"allocator-api2",
"equivalent",
"foldhash 0.2.0",
]
[[package]]
name = "heck"
@ -907,6 +941,15 @@ dependencies = [
"either",
]
[[package]]
name = "itertools"
version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b192c782037fadd9cfa75548310488aabdbf3d2da73885b31bd0abd03351285"
dependencies = [
"either",
]
[[package]]
name = "itoa"
version = "1.0.18"
@ -925,6 +968,16 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "kasuari"
version = "0.4.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bde5057d6143cc94e861d90f591b9303d6716c6b9602309150bd068853c10899"
dependencies = [
"hashbrown 0.16.1",
"thiserror",
]
[[package]]
name = "lazy_static"
version = "1.5.0"
@ -943,6 +996,15 @@ version = "0.2.184"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "48f5d2a454e16a5ea0f4ced81bd44e4cfc7bd3a507b61887c99fd3538b28e4af"
[[package]]
name = "line-clipping"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f50e8f47623268b5407192d26876c4d7f89d686ca130fdc53bced4814cd29f8"
dependencies = [
"bitflags",
]
[[package]]
name = "linux-raw-sys"
version = "0.4.15"
@ -993,23 +1055,6 @@ dependencies = [
"syn",
]
[[package]]
name = "llm-worker-persistence"
version = "0.1.0"
dependencies = [
"async-trait",
"futures",
"hex",
"llm-worker",
"serde",
"serde_json",
"sha2",
"tempfile",
"thiserror",
"tokio",
"uuid",
]
[[package]]
name = "lock_api"
version = "0.4.14"
@ -1034,6 +1079,15 @@ dependencies = [
"hashbrown 0.15.5",
]
[[package]]
name = "lru"
version = "0.16.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1dc47f592c06f33f8e3aea9591776ec7c9f9e4124778ff8a3c3b87159f7e593"
dependencies = [
"hashbrown 0.16.1",
]
[[package]]
name = "manifest"
version = "0.1.0"
@ -1112,6 +1166,12 @@ dependencies = [
"windows-sys 0.61.2",
]
[[package]]
name = "num-conv"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c6673768db2d862beb9b39a78fdcb1a69439615d5794a1be50caa9bc92c81967"
[[package]]
name = "once_cell"
version = "1.21.4"
@ -1224,12 +1284,12 @@ dependencies = [
"dotenv",
"futures",
"llm-worker",
"llm-worker-persistence",
"manifest",
"protocol",
"provider",
"serde",
"serde_json",
"session-store",
"tempfile",
"thiserror",
"tokio",
@ -1246,6 +1306,12 @@ dependencies = [
"zerovec",
]
[[package]]
name = "powerfmt"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391"
[[package]]
name = "prettyplease"
version = "0.2.37"
@ -1308,16 +1374,55 @@ checksum = "eabd94c2f37801c20583fc49dd5cd6b0ba68c716787c2dd6ed18571e1e63117b"
dependencies = [
"bitflags",
"cassowary",
"compact_str",
"compact_str 0.8.1",
"crossterm",
"indoc",
"instability",
"itertools",
"lru",
"itertools 0.13.0",
"lru 0.12.5",
"paste",
"strum",
"strum 0.26.3",
"unicode-segmentation",
"unicode-truncate 1.1.0",
"unicode-width 0.2.0",
]
[[package]]
name = "ratatui-core"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ef8dea09a92caaf73bff7adb70b76162e5937524058a7e5bff37869cbbec293"
dependencies = [
"bitflags",
"compact_str 0.9.0",
"hashbrown 0.16.1",
"indoc",
"itertools 0.14.0",
"kasuari",
"lru 0.16.3",
"strum 0.27.2",
"thiserror",
"unicode-segmentation",
"unicode-truncate 2.0.1",
"unicode-width 0.2.0",
]
[[package]]
name = "ratatui-widgets"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d7dbfa023cd4e604c2553483820c5fe8aa9d71a42eea5aa77c6e7f35756612db"
dependencies = [
"bitflags",
"hashbrown 0.16.1",
"indoc",
"instability",
"itertools 0.14.0",
"line-clipping",
"ratatui-core",
"strum 0.27.2",
"time",
"unicode-segmentation",
"unicode-truncate",
"unicode-width 0.2.0",
]
@ -1665,6 +1770,23 @@ dependencies = [
"syn",
]
[[package]]
name = "session-store"
version = "0.1.0"
dependencies = [
"async-trait",
"futures",
"hex",
"llm-worker",
"serde",
"serde_json",
"sha2",
"tempfile",
"thiserror",
"tokio",
"uuid",
]
[[package]]
name = "sha2"
version = "0.11.0"
@ -1768,7 +1890,16 @@ version = "0.26.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8fec0f0aef304996cf250b31b5a10dee7980c85da9d759361292b8bca5a18f06"
dependencies = [
"strum_macros",
"strum_macros 0.26.4",
]
[[package]]
name = "strum"
version = "0.27.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af23d6f6c1a224baef9d3f61e287d2761385a5b88fdab4eb4c6f11aeb54c4bcf"
dependencies = [
"strum_macros 0.27.2",
]
[[package]]
@ -1784,6 +1915,18 @@ dependencies = [
"syn",
]
[[package]]
name = "strum_macros"
version = "0.27.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7695ce3845ea4b33927c055a39dc438a45b059f7c1b3d91d38d10355fb8cbca7"
dependencies = [
"heck",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "subtle"
version = "2.6.1"
@ -1878,6 +2021,24 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "time"
version = "0.3.47"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "743bd48c283afc0388f9b8827b976905fb217ad9e647fae3a379a9283c4def2c"
dependencies = [
"deranged",
"num-conv",
"powerfmt",
"time-core",
]
[[package]]
name = "time-core"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7694e1cfe791f8d31026952abf09c69ca6f6fa4e1a1229e18988f06a04a12dca"
[[package]]
name = "tinystr"
version = "0.8.3"
@ -2124,6 +2285,18 @@ dependencies = [
"ratatui",
"serde_json",
"tokio",
"tui-scrollview",
]
[[package]]
name = "tui-scrollview"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94a94f467c7ac7c291039b0733e3b2d379c77884e34fc27d167921fc1ab4842f"
dependencies = [
"indoc",
"ratatui-core",
"ratatui-widgets",
]
[[package]]
@ -2150,11 +2323,22 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b3644627a5af5fa321c95b9b235a72fd24cd29c648c2c379431e6628655627bf"
dependencies = [
"itertools",
"itertools 0.13.0",
"unicode-segmentation",
"unicode-width 0.1.14",
]
[[package]]
name = "unicode-truncate"
version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "16b380a1238663e5f8a691f9039c73e1cdae598a30e9855f541d29b08b53e9a5"
dependencies = [
"itertools 0.14.0",
"unicode-segmentation",
"unicode-width 0.2.0",
]
[[package]]
name = "unicode-width"
version = "0.1.14"

View File

@ -4,7 +4,7 @@ members = [
"crates/daemon",
"crates/llm-worker",
"crates/llm-worker-macros",
"crates/llm-worker-persistence",
"crates/session-store",
"crates/manifest",
"crates/pod",
"crates/protocol",

View File

@ -20,5 +20,5 @@
- [ ] Compact (Step 5-8、session-store-extraction 後)
- [x] Protocol: request-response パターン (GetHistory等) → [tickets/request-response-protocol.md](tickets/request-response-protocol.md)
- [ ] パーミッション: パターンベースのツール実行制御 → [tickets/permission-extension-point.md](tickets/permission-extension-point.md)
- [ ] session-store: persistence クレートの再構成wrap廃止、リネーム → [tickets/session-store-extraction.md](tickets/session-store-extraction.md)
- [x] session-store: persistence クレートの再構成wrap廃止、リネーム → [tickets/session-store-extraction.md](tickets/session-store-extraction.md)
- [ ] UI用トークン情報の記録run stats の永続化、session-store 後)

View File

@ -1,400 +0,0 @@
//! Persistent session wrapper around [`Worker`].
//!
//! [`Session`] intercepts `Worker` operations and appends [`HashedEntry`] records
//! to a [`Store`]. It does not modify `Worker` internals — all persistence
//! happens by observing state before and after each operation.
//!
//! Each appended entry carries a hash that chains to the previous entry.
//! On append, the session checks whether the store's head still matches its
//! own `head_hash`; if not, it auto-forks into a new session.
use crate::session_log::{self, EntryHash, HashedEntry, LogEntry, Outcome};
use crate::store::{Store, StoreError};
use crate::SessionId;
use llm_worker::llm_client::client::LlmClient;
use llm_worker::state::Mutable;
use llm_worker::{Worker, WorkerError, WorkerResult};
/// Configuration for session persistence.
#[derive(Debug, Clone)]
pub struct SessionConfig {
/// Record raw stream events to a separate trace file.
/// Default: `false`.
pub record_event_trace: bool,
}
impl Default for SessionConfig {
fn default() -> Self {
Self {
record_event_trace: false,
}
}
}
/// Errors from session operations.
#[derive(Debug, thiserror::Error)]
pub enum SessionError {
#[error(transparent)]
Worker(#[from] WorkerError),
#[error(transparent)]
Store(#[from] StoreError),
}
/// Persistent session wrapping a [`Worker`].
///
/// Use [`worker()`](Self::worker) / [`worker_mut()`](Self::worker_mut) to
/// access the underlying Worker for configuration (tool registration, etc.).
/// State-mutating operations (`run`, `resume`) should go through Session
/// methods to ensure proper logging.
pub struct Session<C: LlmClient, St: Store> {
/// Always `Some` outside of `run()` / `resume()`.
worker: Option<Worker<C, Mutable>>,
store: St,
session_id: SessionId,
head_hash: Option<EntryHash>,
_config: SessionConfig,
}
impl<C: LlmClient, St: Store> Session<C, St> {
/// Create a new session, writing the initial `SessionStart` entry.
pub async fn new(
worker: Worker<C, Mutable>,
store: St,
config: SessionConfig,
) -> Result<Self, StoreError> {
let session_id = crate::new_session_id();
let entry = LogEntry::SessionStart {
ts: session_log::now_millis(),
system_prompt: worker.get_system_prompt().map(String::from),
config: worker.request_config().clone(),
history: worker.history().to_vec(),
};
let hashed = session_log::compute_hash(None, &entry);
let hashed_entry = HashedEntry {
hash: hashed.clone(),
prev_hash: None,
entry,
};
store.append(session_id, &hashed_entry).await?;
Ok(Self {
worker: Some(worker),
store,
session_id,
head_hash: Some(hashed),
_config: config,
})
}
/// Restore a session from a stored log.
pub async fn restore(
client: C,
store: St,
session_id: SessionId,
config: SessionConfig,
) -> Result<Self, SessionError> {
let entries = store.read_all(session_id).await?;
let state = session_log::collect_state(&entries);
let mut worker = Worker::new(client);
if let Some(ref prompt) = state.system_prompt {
worker.set_system_prompt(prompt);
}
worker.set_history(state.history);
worker.set_request_config(state.config);
worker.set_turn_count(state.turn_count);
worker.set_last_run_interrupted(state.last_run_interrupted);
Ok(Self {
worker: Some(worker),
store,
session_id,
head_hash: state.head_hash,
_config: config,
})
}
fn w(&self) -> &Worker<C, Mutable> {
self.worker.as_ref().expect("worker taken during run")
}
/// Reference to the underlying Worker.
pub fn worker(&self) -> &Worker<C, Mutable> {
self.w()
}
/// Mutable reference to the underlying Worker.
pub fn worker_mut(&mut self) -> &mut Worker<C, Mutable> {
self.worker.as_mut().expect("worker taken during run")
}
/// The session ID.
pub fn session_id(&self) -> SessionId {
self.session_id
}
/// The current head hash of the session log chain.
pub fn head_hash(&self) -> Option<&EntryHash> {
self.head_hash.as_ref()
}
/// Reference to the underlying store.
pub fn store(&self) -> &St {
&self.store
}
/// Run a user turn, logging all state changes.
///
/// Internally locks the Worker (flushing pending tools), runs the turn,
/// then unlocks back to Mutable state.
pub async fn run(
&mut self,
user_input: impl Into<String>,
) -> Result<WorkerResult, SessionError> {
let input = user_input.into();
self.ensure_head_or_fork().await?;
let history_before = self.w().history().len();
// lock → run → unlock (use lock() directly to keep worker on error)
let worker = self.worker.take().expect("worker taken during run");
let mut locked = worker.lock();
let result = locked.run(input).await;
self.worker = Some(locked.unlock());
self.log_history_delta(history_before).await?;
self.log_turn_end().await?;
self.log_outcome(&result).await?;
result.map_err(SessionError::Worker)
}
/// Resume from a paused state, logging all state changes.
pub async fn resume(&mut self) -> Result<WorkerResult, SessionError> {
self.ensure_head_or_fork().await?;
let history_before = self.w().history().len();
// lock → resume → unlock
let worker = self.worker.take().expect("worker taken during run");
let mut locked = worker.lock();
let result = locked.resume().await;
self.worker = Some(locked.unlock());
self.log_history_delta(history_before).await?;
self.log_turn_end().await?;
self.log_outcome(&result).await?;
result.map_err(SessionError::Worker)
}
/// Fork this session at its current state.
pub async fn fork(&self) -> Result<SessionId, StoreError> {
let fork_id = crate::new_session_id();
let entry = LogEntry::SessionStart {
ts: session_log::now_millis(),
system_prompt: self.w().get_system_prompt().map(String::from),
config: self.w().request_config().clone(),
history: self.w().history().to_vec(),
};
let hashed = session_log::compute_hash(None, &entry);
let hashed_entry = HashedEntry {
hash: hashed,
prev_hash: None,
entry,
};
self.store
.create_session(fork_id, &[hashed_entry])
.await?;
Ok(fork_id)
}
/// Fork from an arbitrary point in a stored session's log.
pub async fn fork_at(
store: &St,
source_id: SessionId,
at_hash: &EntryHash,
) -> Result<SessionId, StoreError> {
let entries = store.read_all(source_id).await?;
let cut = entries
.iter()
.position(|e| &e.hash == at_hash)
.map(|i| i + 1)
.unwrap_or(entries.len());
let state = session_log::collect_state(&entries[..cut]);
let fork_id = crate::new_session_id();
let entry = LogEntry::SessionStart {
ts: session_log::now_millis(),
system_prompt: state.system_prompt,
config: state.config,
history: state.history,
};
let hashed = session_log::compute_hash(None, &entry);
let hashed_entry = HashedEntry {
hash: hashed,
prev_hash: None,
entry,
};
store.create_session(fork_id, &[hashed_entry]).await?;
Ok(fork_id)
}
/// Log a `Locked` entry.
pub async fn log_cache_locked(
&mut self,
locked_prefix_len: usize,
) -> Result<(), StoreError> {
let entry = LogEntry::Locked {
ts: session_log::now_millis(),
locked_prefix_len,
};
self.append_entry(entry).await
}
/// Log a `CacheUnlocked` entry.
pub async fn log_cache_unlocked(&mut self) -> Result<(), StoreError> {
let entry = LogEntry::CacheUnlocked {
ts: session_log::now_millis(),
};
self.append_entry(entry).await
}
/// Log a `ConfigChanged` entry.
pub async fn log_config_changed(&mut self) -> Result<(), StoreError> {
let entry = LogEntry::ConfigChanged {
ts: session_log::now_millis(),
config: self.w().request_config().clone(),
};
self.append_entry(entry).await
}
// ── Private helpers ──────────────────────────────────────────────────
async fn append_entry(&mut self, entry: LogEntry) -> Result<(), StoreError> {
let hash = session_log::compute_hash(self.head_hash.as_ref(), &entry);
let hashed_entry = HashedEntry {
hash: hash.clone(),
prev_hash: self.head_hash.clone(),
entry,
};
self.store
.append(self.session_id, &hashed_entry)
.await?;
self.head_hash = Some(hash);
Ok(())
}
async fn ensure_head_or_fork(&mut self) -> Result<(), StoreError> {
let store_head = self.store.read_head_hash(self.session_id).await?;
if store_head == self.head_hash {
return Ok(());
}
let fork_id = crate::new_session_id();
let entry = LogEntry::SessionStart {
ts: session_log::now_millis(),
system_prompt: self.w().get_system_prompt().map(String::from),
config: self.w().request_config().clone(),
history: self.w().history().to_vec(),
};
let hash = session_log::compute_hash(None, &entry);
let hashed_entry = HashedEntry {
hash: hash.clone(),
prev_hash: None,
entry,
};
self.store
.create_session(fork_id, &[hashed_entry])
.await?;
self.session_id = fork_id;
self.head_hash = Some(hash);
Ok(())
}
async fn log_history_delta(&mut self, before_len: usize) -> Result<(), StoreError> {
let history = self.w().history();
if history.len() <= before_len {
return Ok(());
}
let ts = session_log::now_millis();
let new_items = history[before_len..].to_vec();
let mut i = 0;
while i < new_items.len() {
let item = &new_items[i];
if item.is_user_message() {
self.append_entry(LogEntry::UserInput {
ts,
item: new_items[i].clone(),
})
.await?;
i += 1;
} else if item.is_tool_result() {
let start = i;
while i < new_items.len() && new_items[i].is_tool_result() {
i += 1;
}
self.append_entry(LogEntry::ToolResults {
ts,
items: new_items[start..i].to_vec(),
})
.await?;
} else if item.is_assistant_message()
|| item.is_tool_call()
|| item.is_reasoning()
{
let start = i;
while i < new_items.len()
&& (new_items[i].is_assistant_message()
|| new_items[i].is_tool_call()
|| new_items[i].is_reasoning())
{
i += 1;
}
self.append_entry(LogEntry::AssistantItems {
ts,
items: new_items[start..i].to_vec(),
})
.await?;
} else {
self.append_entry(LogEntry::HookInjectedItems {
ts,
items: vec![new_items[i].clone()],
})
.await?;
i += 1;
}
}
Ok(())
}
async fn log_turn_end(&mut self) -> Result<(), StoreError> {
self.append_entry(LogEntry::TurnEnd {
ts: session_log::now_millis(),
turn_count: self.w().turn_count(),
})
.await
}
async fn log_outcome(
&mut self,
result: &Result<WorkerResult, WorkerError>,
) -> Result<(), StoreError> {
let outcome = match result {
Ok(WorkerResult::Finished) => Outcome::Finished,
Ok(WorkerResult::Paused) => Outcome::Paused,
Ok(WorkerResult::LimitReached) => Outcome::LimitReached,
Err(e) => Outcome::Error {
message: e.to_string(),
},
};
self.append_entry(LogEntry::RunOutcome {
ts: session_log::now_millis(),
outcome,
interrupted: self.w().last_run_interrupted(),
})
.await
}
}

View File

@ -8,7 +8,7 @@ license.workspace = true
async-trait = "0.1.89"
clap = { version = "4.6.0", features = ["derive"] }
llm-worker = { version = "0.2.1", path = "../llm-worker" }
llm-worker-persistence = { version = "0.1.0", path = "../llm-worker-persistence" }
session-store = { version = "0.1.0", path = "../session-store" }
manifest = { version = "0.1.0", path = "../manifest" }
protocol = { version = "0.1.0", path = "../protocol" }
provider = { version = "0.1.0", path = "../provider" }
@ -23,6 +23,6 @@ tracing = "0.1.44"
async-trait = "0.1.89"
dotenv = "0.15.0"
futures = "0.3.32"
llm-worker-persistence = { path = "../llm-worker-persistence" }
session-store = { path = "../session-store" }
tempfile = "3.27.0"
tokio = { version = "1.49", features = ["macros", "rt-multi-thread", "time"] }

View File

@ -12,7 +12,7 @@
//! ```
use pod::{Pod, PodManifest, PodRunResult};
use llm_worker_persistence::FsStore;
use session_store::FsStore;
const MANIFEST_TOML: &str = r#"
[pod]
@ -52,7 +52,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
// 5. Extract the assistant's reply from history
let history = pod.session_mut().worker().history();
let history = pod.worker().history();
if let Some(text) = history
.iter()
.rev()

View File

@ -6,7 +6,7 @@
//! ```
use pod::{Event, Method, PodController, PodManifest};
use llm_worker_persistence::FsStore;
use session_store::FsStore;
const MANIFEST_TOML: &str = r#"
[pod]

View File

@ -3,7 +3,7 @@ use std::sync::Arc;
use llm_worker::llm_client::client::LlmClient;
use llm_worker::WorkerError;
use llm_worker_persistence::Store;
use session_store::Store;
use tokio::sync::{broadcast, mpsc};
use crate::pod::{Pod, PodRunResult, PodError};
@ -85,7 +85,7 @@ impl PodController {
// Register event bridge callbacks on the worker
{
let worker = pod.session_mut().worker_mut();
let worker = pod.worker_mut();
let tx = event_tx.clone();
worker.on_turn_start(move |turn| {
@ -158,7 +158,7 @@ impl PodController {
}
// Clone cancel sender before moving pod
let cancel_tx = pod.session_mut().worker_mut().cancel_sender();
let cancel_tx = pod.worker_mut().cancel_sender();
tokio::spawn(async move {
// Hold socket server alive for the lifetime of the controller task
@ -191,7 +191,7 @@ impl PodController {
)
.await;
let items = pod.session_mut().worker_mut().history().to_vec();
let items = pod.worker().history().to_vec();
shared_state.update_history(items);
shared_state.set_status(new_status);
let _ = runtime_dir.write_status(&shared_state).await;
@ -218,7 +218,7 @@ impl PodController {
)
.await;
let items = pod.session_mut().worker_mut().history().to_vec();
let items = pod.worker().history().to_vec();
shared_state.update_history(items);
shared_state.set_status(new_status);
let _ = runtime_dir.write_status(&shared_state).await;
@ -307,19 +307,12 @@ where
fn worker_error_code(e: &PodError) -> ErrorCode {
match e {
PodError::Session(se) => {
use llm_worker_persistence::SessionError;
match se {
SessionError::Worker(we) => match we {
PodError::Worker(we) => match we {
WorkerError::Tool(_) => ErrorCode::ToolError,
WorkerError::Client(_) => ErrorCode::ProviderError,
_ => ErrorCode::Internal,
},
_ => ErrorCode::Internal,
}
}
PodError::Provider(_) => ErrorCode::ProviderError,
_ => ErrorCode::Internal,
}
}

View File

@ -2,7 +2,7 @@ use std::path::{Path, PathBuf};
use std::process::ExitCode;
use clap::Parser;
use llm_worker_persistence::FsStore;
use session_store::FsStore;
use pod::{Pod, PodController};
#[derive(Parser)]

View File

@ -3,9 +3,10 @@ use std::sync::Arc;
use llm_worker::llm_client::client::LlmClient;
use llm_worker::llm_client::RequestConfig;
use llm_worker::Worker;
use llm_worker_persistence::{
Session, SessionConfig, SessionError, SessionId, Store, StoreError,
use llm_worker::state::Mutable;
use llm_worker::{Worker, WorkerError, WorkerResult};
use session_store::{
EntryHash, Outcome, SessionId, SessionStartState, Store, StoreError,
};
use manifest::{PodManifest, Scope, WorkerManifest};
@ -18,11 +19,15 @@ use crate::hook_interceptor::HookInterceptor;
/// An independent agent execution unit.
///
/// Wraps a persistent [`Session`] with manifest metadata and an optional
/// directory scope. This is the primary abstraction in insomnia.
/// Holds a [`Worker`] directly and persists session state via
/// `session-store` functions after each turn.
pub struct Pod<C: LlmClient, St: Store> {
manifest: PodManifest,
session: Session<C, St>,
/// Always `Some` outside of `run()`/`resume()`.
worker: Option<Worker<C, Mutable>>,
store: St,
session_id: SessionId,
head_hash: Option<EntryHash>,
scope: Option<Scope>,
hook_builder: HookRegistryBuilder,
interceptor_installed: bool,
@ -30,20 +35,24 @@ pub struct Pod<C: LlmClient, St: Store> {
impl<C: LlmClient, St: Store> Pod<C, St> {
/// Create a new Pod from a pre-built Worker and store.
///
/// The caller is responsible for constructing the `LlmClient` from the
/// manifest's provider config. This keeps Pod free of provider-specific
/// dependencies.
pub async fn new(
manifest: PodManifest,
worker: Worker<C>,
store: St,
scope: Option<Scope>,
) -> Result<Self, PodError> {
let session = Session::new(worker, store, SessionConfig::default()).await?;
let state = SessionStartState {
system_prompt: worker.get_system_prompt(),
config: worker.request_config(),
history: worker.history(),
};
let (session_id, head_hash) = session_store::create_session(&store, state).await?;
Ok(Self {
manifest,
session,
worker: Some(worker),
store,
session_id,
head_hash: Some(head_hash),
scope,
hook_builder: HookRegistryBuilder::new(),
interceptor_installed: false,
@ -58,10 +67,22 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
store: St,
scope: Option<Scope>,
) -> Result<Self, PodError> {
let session = Session::restore(client, store, session_id, SessionConfig::default()).await?;
let state = session_store::restore(&store, session_id).await?;
let mut worker = Worker::new(client);
if let Some(ref prompt) = state.system_prompt {
worker.set_system_prompt(prompt);
}
worker.set_history(state.history);
worker.set_request_config(state.config);
worker.set_turn_count(state.turn_count);
worker.set_last_run_interrupted(state.last_run_interrupted);
Ok(Self {
manifest,
session,
worker: Some(worker),
store,
session_id,
head_hash: state.head_hash,
scope,
hook_builder: HookRegistryBuilder::new(),
interceptor_installed: false,
@ -70,7 +91,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
/// The session ID used for persistence.
pub fn session_id(&self) -> SessionId {
self.session.session_id()
self.session_id
}
/// The Pod's manifest.
@ -83,18 +104,25 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
self.scope.as_ref()
}
/// Direct access to the underlying session.
/// Direct access to the underlying Worker.
pub fn worker(&self) -> &Worker<C, Mutable> {
self.worker.as_ref().expect("worker taken during run")
}
/// Mutable access to the underlying Worker.
///
/// Use this to register tools, hooks, or subscribers on the worker
/// before calling [`run`](Self::run).
pub fn session_mut(&mut self) -> &mut Session<C, St> {
&mut self.session
/// Use this to register tools, hooks, or subscribers before calling
/// [`run`](Self::run).
pub fn worker_mut(&mut self) -> &mut Worker<C, Mutable> {
self.worker.as_mut().expect("worker taken during run")
}
/// Reference to the store.
pub fn store(&self) -> &St {
&self.store
}
// --- Hook registration ---
//
// Hooks must be registered before the first call to `run()` or `resume()`.
// Attempting to add a hook after execution has started will panic.
fn assert_hooks_open(&self) {
assert!(
@ -145,7 +173,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
let builder = std::mem::take(&mut self.hook_builder);
let registry = Arc::new(builder.build());
let interceptor = HookInterceptor::new(registry);
self.session.worker_mut().set_interceptor(interceptor);
self.worker_mut().set_interceptor(interceptor);
self.interceptor_installed = true;
}
}
@ -153,23 +181,114 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
/// Send user input and run until the LLM turn completes.
pub async fn run(&mut self, input: impl Into<String>) -> Result<PodRunResult, PodError> {
self.ensure_interceptor_installed();
let result = self.session.run(input).await?;
Ok(result.into())
// Split borrow: access worker field directly to allow concurrent
// mutable borrows on session_id / head_hash.
let w = self.worker.as_ref().unwrap();
session_store::ensure_head_or_fork(
&self.store,
&mut self.session_id,
&mut self.head_hash,
SessionStartState {
system_prompt: w.get_system_prompt(),
config: w.request_config(),
history: w.history(),
},
)
.await?;
let history_before = self.worker.as_ref().unwrap().history().len();
// lock → run → unlock
let worker = self.worker.take().expect("worker taken during run");
let mut locked = worker.lock();
let result = locked.run(input).await;
self.worker = Some(locked.unlock());
self.persist_turn(history_before, &result).await?;
result.map(PodRunResult::from).map_err(PodError::Worker)
}
/// Resume from a paused state.
pub async fn resume(&mut self) -> Result<PodRunResult, PodError> {
self.ensure_interceptor_installed();
let result = self.session.resume().await?;
Ok(result.into())
let w = self.worker.as_ref().unwrap();
session_store::ensure_head_or_fork(
&self.store,
&mut self.session_id,
&mut self.head_hash,
SessionStartState {
system_prompt: w.get_system_prompt(),
config: w.request_config(),
history: w.history(),
},
)
.await?;
let history_before = self.worker.as_ref().unwrap().history().len();
// lock → resume → unlock
let worker = self.worker.take().expect("worker taken during run");
let mut locked = worker.lock();
let result = locked.resume().await;
self.worker = Some(locked.unlock());
self.persist_turn(history_before, &result).await?;
result.map(PodRunResult::from).map_err(PodError::Worker)
}
/// Persist delta + turn end + outcome after a run/resume.
async fn persist_turn(
&mut self,
history_before: usize,
result: &Result<WorkerResult, WorkerError>,
) -> Result<(), StoreError> {
// Use direct field access for split borrows (worker immutable,
// head_hash mutable).
let w = self.worker.as_ref().unwrap();
let new_items = &w.history()[history_before..];
session_store::save_delta(
&self.store,
self.session_id,
&mut self.head_hash,
new_items,
)
.await?;
let turn_count = self.worker.as_ref().unwrap().turn_count();
session_store::save_turn_end(
&self.store,
self.session_id,
&mut self.head_hash,
turn_count,
)
.await?;
let interrupted = self.worker.as_ref().unwrap().last_run_interrupted();
let outcome = match result {
Ok(WorkerResult::Finished) => Outcome::Finished,
Ok(WorkerResult::Paused) => Outcome::Paused,
Ok(WorkerResult::LimitReached) => Outcome::LimitReached,
Err(e) => Outcome::Error {
message: e.to_string(),
},
};
session_store::save_outcome(
&self.store,
self.session_id,
&mut self.head_hash,
outcome,
interrupted,
)
.await?;
Ok(())
}
}
impl<St: Store> Pod<Box<dyn LlmClient>, St> {
/// Create a Pod entirely from a manifest.
///
/// Builds the LLM client from the provider config, applies worker
/// settings, and creates a new persistent session.
pub async fn from_manifest(
manifest: PodManifest,
store: St,
@ -179,10 +298,19 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
let client = provider::build_client(&manifest.provider, manifest_dir.as_deref())?;
let mut worker = Worker::new(client);
apply_worker_manifest(&mut worker, &manifest.worker);
let session = Session::new(worker, store, SessionConfig::default()).await?;
let state = SessionStartState {
system_prompt: worker.get_system_prompt(),
config: worker.request_config(),
history: worker.history(),
};
let (session_id, head_hash) = session_store::create_session(&store, state).await?;
Ok(Self {
manifest,
session,
worker: Some(worker),
store,
session_id,
head_hash: Some(head_hash),
scope,
hook_builder: HookRegistryBuilder::new(),
interceptor_installed: false,
@ -217,12 +345,12 @@ pub enum PodRunResult {
LimitReached,
}
impl From<llm_worker::WorkerResult> for PodRunResult {
fn from(r: llm_worker::WorkerResult) -> Self {
impl From<WorkerResult> for PodRunResult {
fn from(r: WorkerResult) -> Self {
match r {
llm_worker::WorkerResult::Finished => PodRunResult::Finished,
llm_worker::WorkerResult::Paused => PodRunResult::Paused,
llm_worker::WorkerResult::LimitReached => PodRunResult::LimitReached,
WorkerResult::Finished => PodRunResult::Finished,
WorkerResult::Paused => PodRunResult::Paused,
WorkerResult::LimitReached => PodRunResult::LimitReached,
}
}
}
@ -231,7 +359,7 @@ impl From<llm_worker::WorkerResult> for PodRunResult {
#[derive(Debug, thiserror::Error)]
pub enum PodError {
#[error(transparent)]
Session(#[from] SessionError),
Worker(#[from] WorkerError),
#[error(transparent)]
Store(#[from] StoreError),

View File

@ -107,7 +107,7 @@ mod tests {
fn test_state() -> PodSharedState {
PodSharedState::new(
"test-pod".into(),
llm_worker_persistence::new_session_id(),
session_store::new_session_id(),
"[pod]\nname = \"test-pod\"".into(),
)
}

View File

@ -1,7 +1,7 @@
use std::sync::RwLock;
use llm_worker::llm_client::types::Item;
use llm_worker_persistence::SessionId;
use session_store::SessionId;
use serde::{Deserialize, Serialize};
/// Shared state between PodController and runtime directory.
@ -88,7 +88,7 @@ mod tests {
fn test_state() -> PodSharedState {
PodSharedState::new(
"test-pod".into(),
llm_worker_persistence::new_session_id(),
session_store::new_session_id(),
"[pod]\nname = \"test-pod\"".into(),
)
}

View File

@ -7,7 +7,7 @@ use futures::Stream;
use llm_worker::llm_client::event::{Event as LlmEvent, ResponseStatus, StatusEvent};
use llm_worker::llm_client::{ClientError, LlmClient, Request};
use llm_worker::Worker;
use llm_worker_persistence::FsStore;
use session_store::FsStore;
use pod::{
Event, Method, Pod, PodController, PodManifest, PodStatus,

View File

@ -1,6 +1,6 @@
[package]
name = "llm-worker-persistence"
description = "Session persistence for llm-worker via append-only JSONL logs"
name = "session-store"
description = "Session persistence via append-only JSONL logs"
version = "0.1.0"
edition.workspace = true
license.workspace = true

View File

@ -1,10 +1,14 @@
//! Session persistence for `llm-worker` via append-only JSONL logs.
//! Session persistence via append-only JSONL logs.
//!
//! # Architecture
//!
//! Sessions are recorded as a sequence of [`LogEntry`] values, one per line
//! in a `.jsonl` file. Reading the log and collecting entries reconstructs
//! the full [`Worker`] state — no separate snapshots or checkpoints needed.
//! the full Worker state — no separate snapshots or checkpoints needed.
//!
//! This crate provides free functions for persistence operations.
//! The caller (typically Pod) holds the Worker directly and calls these
//! functions after state-mutating operations.
//!
//! Debug-mode [`TraceEntry`] records capture raw stream events in a separate
//! `.trace.jsonl` file, independent of the session log.
@ -12,12 +16,14 @@
//! # Quick start
//!
//! ```ignore
//! use llm_worker_persistence::{Session, SessionConfig, FsStore};
//! use session_store::{create_session, restore, save_delta, FsStore, SessionStartState};
//!
//! let store = FsStore::new("./sessions").await?;
//! let worker = Worker::new(client);
//! let mut session = Session::new(worker, store, SessionConfig::default()).await?;
//! session.run("Hello!").await?;
//! let (session_id, head_hash) = create_session(&store, SessionStartState {
//! system_prompt: None,
//! config: &config,
//! history: &[],
//! }).await?;
//! ```
pub mod event_trace;
@ -28,7 +34,10 @@ pub mod store;
pub use event_trace::TraceEntry;
pub use fs_store::FsStore;
pub use session::{Session, SessionConfig, SessionError};
pub use session::{
SessionStartState, create_session, ensure_head_or_fork, fork, fork_at, restore, save_cache_locked,
save_cache_unlocked, save_config_changed, save_delta, save_outcome, save_turn_end,
};
pub use session_log::{
EntryHash, HashedEntry, LogEntry, Outcome, RestoredState, build_chain, collect_state,
compute_hash,

View File

@ -0,0 +1,291 @@
//! Free functions for session persistence operations.
//!
//! These functions record and restore session state without owning a Worker.
//! The caller (typically Pod) holds the Worker directly and calls these
//! functions after state-mutating operations.
use crate::session_log::{self, EntryHash, HashedEntry, LogEntry, Outcome};
use crate::store::{Store, StoreError};
use crate::SessionId;
use llm_worker::llm_client::types::Item;
use llm_worker::llm_client::RequestConfig;
/// State snapshot for creating a SessionStart entry.
pub struct SessionStartState<'a> {
pub system_prompt: Option<&'a str>,
pub config: &'a RequestConfig,
pub history: &'a [Item],
}
/// Create a new session, writing the initial `SessionStart` entry.
///
/// Returns the new session ID and head hash.
pub async fn create_session(
store: &impl Store,
state: SessionStartState<'_>,
) -> Result<(SessionId, EntryHash), StoreError> {
let session_id = crate::new_session_id();
let entry = LogEntry::SessionStart {
ts: session_log::now_millis(),
system_prompt: state.system_prompt.map(String::from),
config: state.config.clone(),
history: state.history.to_vec(),
};
let hash = session_log::compute_hash(None, &entry);
let hashed_entry = HashedEntry {
hash: hash.clone(),
prev_hash: None,
entry,
};
store.append(session_id, &hashed_entry).await?;
Ok((session_id, hash))
}
/// Restore session state from a stored log.
///
/// Returns the reconstructed state. The caller is responsible for
/// applying it to a Worker.
pub async fn restore(
store: &impl Store,
session_id: SessionId,
) -> Result<crate::session_log::RestoredState, StoreError> {
let entries = store.read_all(session_id).await?;
Ok(session_log::collect_state(&entries))
}
/// Check if the store's head still matches the expected head hash.
/// If not, auto-fork into a new session.
///
/// Updates `session_id` and `head_hash` in place when a fork occurs.
pub async fn ensure_head_or_fork(
store: &impl Store,
session_id: &mut SessionId,
head_hash: &mut Option<EntryHash>,
state: SessionStartState<'_>,
) -> Result<(), StoreError> {
let store_head = store.read_head_hash(*session_id).await?;
if store_head == *head_hash {
return Ok(());
}
let fork_id = crate::new_session_id();
let entry = LogEntry::SessionStart {
ts: session_log::now_millis(),
system_prompt: state.system_prompt.map(String::from),
config: state.config.clone(),
history: state.history.to_vec(),
};
let hash = session_log::compute_hash(None, &entry);
let hashed_entry = HashedEntry {
hash: hash.clone(),
prev_hash: None,
entry,
};
store.create_session(fork_id, &[hashed_entry]).await?;
*session_id = fork_id;
*head_hash = Some(hash);
Ok(())
}
/// Log the history delta — new items added since the previous snapshot.
///
/// Classifies items into UserInput, AssistantItems, ToolResults, and
/// HookInjectedItems entries automatically.
pub async fn save_delta(
store: &impl Store,
session_id: SessionId,
head_hash: &mut Option<EntryHash>,
new_items: &[Item],
) -> Result<(), StoreError> {
if new_items.is_empty() {
return Ok(());
}
let ts = session_log::now_millis();
let mut i = 0;
while i < new_items.len() {
let item = &new_items[i];
if item.is_user_message() {
append_entry(store, session_id, head_hash, LogEntry::UserInput {
ts,
item: new_items[i].clone(),
})
.await?;
i += 1;
} else if item.is_tool_result() {
let start = i;
while i < new_items.len() && new_items[i].is_tool_result() {
i += 1;
}
append_entry(store, session_id, head_hash, LogEntry::ToolResults {
ts,
items: new_items[start..i].to_vec(),
})
.await?;
} else if item.is_assistant_message() || item.is_tool_call() || item.is_reasoning() {
let start = i;
while i < new_items.len()
&& (new_items[i].is_assistant_message()
|| new_items[i].is_tool_call()
|| new_items[i].is_reasoning())
{
i += 1;
}
append_entry(store, session_id, head_hash, LogEntry::AssistantItems {
ts,
items: new_items[start..i].to_vec(),
})
.await?;
} else {
append_entry(store, session_id, head_hash, LogEntry::HookInjectedItems {
ts,
items: vec![new_items[i].clone()],
})
.await?;
i += 1;
}
}
Ok(())
}
/// Log a TurnEnd entry.
pub async fn save_turn_end(
store: &impl Store,
session_id: SessionId,
head_hash: &mut Option<EntryHash>,
turn_count: usize,
) -> Result<(), StoreError> {
append_entry(store, session_id, head_hash, LogEntry::TurnEnd {
ts: session_log::now_millis(),
turn_count,
})
.await
}
/// Log a RunOutcome entry.
pub async fn save_outcome(
store: &impl Store,
session_id: SessionId,
head_hash: &mut Option<EntryHash>,
outcome: Outcome,
interrupted: bool,
) -> Result<(), StoreError> {
append_entry(store, session_id, head_hash, LogEntry::RunOutcome {
ts: session_log::now_millis(),
outcome,
interrupted,
})
.await
}
/// Log a `Locked` entry (KV cache locked).
pub async fn save_cache_locked(
store: &impl Store,
session_id: SessionId,
head_hash: &mut Option<EntryHash>,
locked_prefix_len: usize,
) -> Result<(), StoreError> {
append_entry(store, session_id, head_hash, LogEntry::Locked {
ts: session_log::now_millis(),
locked_prefix_len,
})
.await
}
/// Log a `CacheUnlocked` entry.
pub async fn save_cache_unlocked(
store: &impl Store,
session_id: SessionId,
head_hash: &mut Option<EntryHash>,
) -> Result<(), StoreError> {
append_entry(store, session_id, head_hash, LogEntry::CacheUnlocked {
ts: session_log::now_millis(),
})
.await
}
/// Log a `ConfigChanged` entry.
pub async fn save_config_changed(
store: &impl Store,
session_id: SessionId,
head_hash: &mut Option<EntryHash>,
config: &RequestConfig,
) -> Result<(), StoreError> {
append_entry(store, session_id, head_hash, LogEntry::ConfigChanged {
ts: session_log::now_millis(),
config: config.clone(),
})
.await
}
/// Fork the current state into a new session.
pub async fn fork(
store: &impl Store,
state: SessionStartState<'_>,
) -> Result<SessionId, StoreError> {
let fork_id = crate::new_session_id();
let entry = LogEntry::SessionStart {
ts: session_log::now_millis(),
system_prompt: state.system_prompt.map(String::from),
config: state.config.clone(),
history: state.history.to_vec(),
};
let hash = session_log::compute_hash(None, &entry);
let hashed_entry = HashedEntry {
hash,
prev_hash: None,
entry,
};
store.create_session(fork_id, &[hashed_entry]).await?;
Ok(fork_id)
}
/// Fork from an arbitrary point in a stored session's log.
pub async fn fork_at(
store: &impl Store,
source_id: SessionId,
at_hash: &EntryHash,
) -> Result<SessionId, StoreError> {
let entries = store.read_all(source_id).await?;
let cut = entries
.iter()
.position(|e| &e.hash == at_hash)
.map(|i| i + 1)
.unwrap_or(entries.len());
let state = session_log::collect_state(&entries[..cut]);
let fork_id = crate::new_session_id();
let entry = LogEntry::SessionStart {
ts: session_log::now_millis(),
system_prompt: state.system_prompt,
config: state.config,
history: state.history,
};
let hash = session_log::compute_hash(None, &entry);
let hashed_entry = HashedEntry {
hash,
prev_hash: None,
entry,
};
store.create_session(fork_id, &[hashed_entry]).await?;
Ok(fork_id)
}
// ── Private helper ──────────────────────────────────────────────────────
async fn append_entry(
store: &impl Store,
session_id: SessionId,
head_hash: &mut Option<EntryHash>,
entry: LogEntry,
) -> Result<(), StoreError> {
let hash = session_log::compute_hash(head_hash.as_ref(), &entry);
let hashed_entry = HashedEntry {
hash: hash.clone(),
prev_hash: head_hash.clone(),
entry,
};
store.append(session_id, &hashed_entry).await?;
*head_hash = Some(hash);
Ok(())
}

View File

@ -1,5 +1,5 @@
use llm_worker::llm_client::types::{Item, RequestConfig};
use llm_worker_persistence::{
use session_store::{
FsStore, LogEntry, Outcome, Store, TraceEntry, build_chain, collect_state, new_session_id,
};

View File

@ -9,8 +9,8 @@ use llm_worker::llm_client::event::{Event, ResponseStatus, StatusEvent};
use llm_worker::llm_client::types::{Item, RequestConfig};
use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput};
use llm_worker::Worker;
use llm_worker_persistence::{
FsStore, LogEntry, Outcome, Session, SessionConfig, Store, collect_state,
use session_store::{
EntryHash, FsStore, LogEntry, Outcome, SessionStartState, Store, collect_state,
};
// =============================================================================
@ -92,6 +92,45 @@ async fn make_store() -> (tempfile::TempDir, FsStore) {
(dir, store)
}
/// Run a worker turn and persist via session-store functions.
/// Takes ownership of the worker (needed for lock/unlock) and returns it.
async fn run_and_persist(
worker: Worker<MockLlmClient>,
store: &FsStore,
session_id: session_store::SessionId,
head_hash: &mut Option<EntryHash>,
input: &str,
) -> (Worker<MockLlmClient>, llm_worker::WorkerResult) {
let history_before = worker.history().len();
let mut locked = worker.lock();
let result = locked.run(input).await;
let worker = locked.unlock();
let new_items = &worker.history()[history_before..];
session_store::save_delta(store, session_id, head_hash, new_items)
.await
.unwrap();
session_store::save_turn_end(store, session_id, head_hash, worker.turn_count())
.await
.unwrap();
let outcome = match &result {
Ok(llm_worker::WorkerResult::Finished) => Outcome::Finished,
Ok(llm_worker::WorkerResult::Paused) => Outcome::Paused,
Ok(llm_worker::WorkerResult::LimitReached) => Outcome::LimitReached,
Err(e) => Outcome::Error {
message: e.to_string(),
},
};
session_store::save_outcome(store, session_id, head_hash, outcome, worker.last_run_interrupted())
.await
.unwrap();
let r = result.unwrap();
(worker, r)
}
// =============================================================================
// Tests
// =============================================================================
@ -102,12 +141,20 @@ async fn session_run_logs_entries() {
let client = MockLlmClient::new(simple_text_events());
let worker = Worker::new(client);
let mut session = Session::new(worker, store.clone(), SessionConfig::default())
let (sid, head_hash) = session_store::create_session(
&store,
SessionStartState {
system_prompt: worker.get_system_prompt(),
config: worker.request_config(),
history: worker.history(),
},
)
.await
.unwrap();
let sid = session.session_id();
session.run("Hi").await.unwrap();
let mut head_hash = Some(head_hash);
let (worker, _) = run_and_persist(worker, &store, sid, &mut head_hash, "Hi").await;
let _ = &worker;
let entries = store.read_all(sid).await.unwrap();
@ -152,31 +199,30 @@ async fn session_restore_round_trip() {
let mut worker = Worker::new(client);
worker.set_system_prompt("You are helpful.");
let mut session = Session::new(worker, store.clone(), SessionConfig::default())
let (sid, head_hash) = session_store::create_session(
&store,
SessionStartState {
system_prompt: worker.get_system_prompt(),
config: worker.request_config(),
history: worker.history(),
},
)
.await
.unwrap();
let sid = session.session_id();
let mut head_hash = Some(head_hash);
session.run("Hi").await.unwrap();
let (worker, _) = run_and_persist(worker, &store, sid, &mut head_hash, "Hi").await;
let original_history = session.worker().history().to_vec();
let original_turn_count = session.worker().turn_count();
let original_head_hash = session.head_hash().cloned();
let original_history_len = worker.history().len();
let original_turn_count = worker.turn_count();
// Restore
let restore_client = MockLlmClient::new(vec![]); // won't be called
let restored =
Session::restore(restore_client, store.clone(), sid, SessionConfig::default())
.await
.unwrap();
let state = session_store::restore(&store, sid).await.unwrap();
assert_eq!(restored.worker().history().len(), original_history.len());
assert_eq!(restored.worker().turn_count(), original_turn_count);
assert_eq!(
restored.worker().get_system_prompt().map(String::from),
Some("You are helpful.".to_string())
);
assert_eq!(restored.head_hash(), original_head_hash.as_ref());
assert_eq!(state.history.len(), original_history_len);
assert_eq!(state.turn_count, original_turn_count);
assert_eq!(state.system_prompt.as_deref(), Some("You are helpful."));
assert_eq!(state.head_hash, head_hash);
}
#[tokio::test]
@ -186,12 +232,19 @@ async fn session_run_with_tool_call() {
let mut worker = Worker::new(client);
worker.register_tool(weather_tool_definition());
let mut session = Session::new(worker, store.clone(), SessionConfig::default())
let (sid, head_hash) = session_store::create_session(
&store,
SessionStartState {
system_prompt: worker.get_system_prompt(),
config: worker.request_config(),
history: worker.history(),
},
)
.await
.unwrap();
let sid = session.session_id();
let mut head_hash = Some(head_hash);
session.run("What's the weather?").await.unwrap();
let (_worker, _) = run_and_persist(worker, &store, sid, &mut head_hash, "What's the weather?").await;
let entries = store.read_all(sid).await.unwrap();
@ -210,18 +263,25 @@ async fn session_run_with_tool_call() {
async fn session_resume_after_pause() {
let (_dir, store) = make_store().await;
// First run: tool call with pause hook → Paused
// First run: tool call with pause policy → Paused
let client = MockLlmClient::with_responses(tool_call_events());
let mut worker = Worker::new(client);
worker.register_tool(weather_tool_definition());
worker.set_interceptor(PausePolicy);
let mut session = Session::new(worker, store.clone(), SessionConfig::default())
let (sid, head_hash) = session_store::create_session(
&store,
SessionStartState {
system_prompt: worker.get_system_prompt(),
config: worker.request_config(),
history: worker.history(),
},
)
.await
.unwrap();
let sid = session.session_id();
let mut head_hash = Some(head_hash);
let result = session.run("Weather?").await.unwrap();
let (_worker, result) = run_and_persist(worker, &store, sid, &mut head_hash, "Weather?").await;
assert!(matches!(result, llm_worker::WorkerResult::Paused));
// Check RunOutcome is Paused
@ -237,25 +297,9 @@ async fn session_resume_after_pause() {
});
assert!(has_paused, "should have Paused outcome");
// Restore and resume
let resume_client = MockLlmClient::with_responses(vec![vec![
Event::text_block_start(0),
Event::text_delta(0, "After resume"),
Event::text_block_stop(0, None),
Event::Status(StatusEvent {
status: ResponseStatus::Completed,
}),
]]);
let mut restored =
Session::restore(resume_client, store.clone(), sid, SessionConfig::default())
.await
.unwrap();
assert!(restored.worker().last_run_interrupted());
// resume may or may not succeed depending on Worker internal state,
// but the restore itself should work
let _ = restored.resume().await;
// Restore state and verify
let state = session_store::restore(&store, sid).await.unwrap();
assert!(state.last_run_interrupted);
}
#[tokio::test]
@ -265,14 +309,31 @@ async fn session_fork_preserves_state() {
let mut worker = Worker::new(client);
worker.set_system_prompt("System prompt");
let mut session = Session::new(worker, store.clone(), SessionConfig::default())
let (sid, head_hash) = session_store::create_session(
&store,
SessionStartState {
system_prompt: worker.get_system_prompt(),
config: worker.request_config(),
history: worker.history(),
},
)
.await
.unwrap();
let mut head_hash = Some(head_hash);
session.run("Hello").await.unwrap();
let (worker, _) = run_and_persist(worker, &store, sid, &mut head_hash, "Hello").await;
let original_history_len = session.worker().history().len();
let fork_id = session.fork().await.unwrap();
let original_history_len = worker.history().len();
let fork_id = session_store::fork(
&store,
SessionStartState {
system_prompt: worker.get_system_prompt(),
config: worker.request_config(),
history: worker.history(),
},
)
.await
.unwrap();
// Fork should have a SessionStart with the current history
let fork_entries = store.read_all(fork_id).await.unwrap();
@ -293,21 +354,26 @@ async fn session_fork_at_truncates() {
let client = MockLlmClient::new(simple_text_events());
let worker = Worker::new(client);
let mut session = Session::new(worker, store.clone(), SessionConfig::default())
let (sid, head_hash) = session_store::create_session(
&store,
SessionStartState {
system_prompt: worker.get_system_prompt(),
config: worker.request_config(),
history: worker.history(),
},
)
.await
.unwrap();
let sid = session.session_id();
let mut head_hash = Some(head_hash);
session.run("Hello").await.unwrap();
let (_worker, _) = run_and_persist(worker, &store, sid, &mut head_hash, "Hello").await;
let all_entries = store.read_all(sid).await.unwrap();
assert!(all_entries.len() > 2);
// Fork at the hash of the 2nd entry (SessionStart + UserInput)
let at_hash = &all_entries[1].hash;
let fork_id = Session::<MockLlmClient, FsStore>::fork_at(&store, sid, at_hash)
.await
.unwrap();
let fork_id = session_store::fork_at(&store, sid, at_hash).await.unwrap();
let fork_entries = store.read_all(fork_id).await.unwrap();
assert_eq!(fork_entries.len(), 1); // Just the new SessionStart
@ -325,18 +391,26 @@ async fn session_fork_at_truncates() {
async fn session_config_changed_logged() {
let (_dir, store) = make_store().await;
let client = MockLlmClient::new(vec![]);
let worker = Worker::new(client);
let mut worker = Worker::new(client);
let mut session = Session::new(worker, store.clone(), SessionConfig::default())
let (sid, head_hash) = session_store::create_session(
&store,
SessionStartState {
system_prompt: worker.get_system_prompt(),
config: worker.request_config(),
history: worker.history(),
},
)
.await
.unwrap();
let sid = session.session_id();
let mut head_hash = Some(head_hash);
// Modify config via worker and log it
session
.worker_mut()
.set_request_config(RequestConfig::default().with_temperature(0.7));
session.log_config_changed().await.unwrap();
// Modify config and log it
let new_config = RequestConfig::default().with_temperature(0.7);
worker.set_request_config(new_config.clone());
session_store::save_config_changed(&store, sid, &mut head_hash, &new_config)
.await
.unwrap();
let entries = store.read_all(sid).await.unwrap();
let has_config_changed = entries.iter().any(|e| {
@ -354,13 +428,24 @@ async fn session_cache_lock_unlock_logged() {
let client = MockLlmClient::new(vec![]);
let worker = Worker::new(client);
let mut session = Session::new(worker, store.clone(), SessionConfig::default())
let (sid, head_hash) = session_store::create_session(
&store,
SessionStartState {
system_prompt: worker.get_system_prompt(),
config: worker.request_config(),
history: worker.history(),
},
)
.await
.unwrap();
let sid = session.session_id();
let mut head_hash = Some(head_hash);
session.log_cache_locked(5).await.unwrap();
session.log_cache_unlocked().await.unwrap();
session_store::save_cache_locked(&store, sid, &mut head_hash, 5)
.await
.unwrap();
session_store::save_cache_unlocked(&store, sid, &mut head_hash)
.await
.unwrap();
let entries = store.read_all(sid).await.unwrap();
@ -392,10 +477,19 @@ async fn session_auto_forks_on_conflict() {
// Create a session
let client_a = MockLlmClient::new(simple_text_events());
let worker_a = Worker::new(client_a);
let mut session_a = Session::new(worker_a, store.clone(), SessionConfig::default())
let (original_sid, head_hash) = session_store::create_session(
&store,
SessionStartState {
system_prompt: worker_a.get_system_prompt(),
config: worker_a.request_config(),
history: worker_a.history(),
},
)
.await
.unwrap();
let original_sid = session_a.session_id();
let mut session_id = original_sid;
let mut head_hash = Some(head_hash);
// Simulate another Pod writing to the same session behind our back
let extra_entry = LogEntry::UserInput {
@ -403,22 +497,33 @@ async fn session_auto_forks_on_conflict() {
item: Item::user_message("Interloper"),
};
let current_head = store.read_head_hash(original_sid).await.unwrap();
let hash = llm_worker_persistence::compute_hash(current_head.as_ref(), &extra_entry);
let hashed = llm_worker_persistence::HashedEntry {
let hash = session_store::compute_hash(current_head.as_ref(), &extra_entry);
let hashed = session_store::HashedEntry {
hash,
prev_hash: current_head,
entry: extra_entry,
};
store.append(original_sid, &hashed).await.unwrap();
// Now session_a's head_hash is stale — run should auto-fork
session_a.run("Hello").await.unwrap();
// Now head_hash is stale — ensure_head_or_fork should auto-fork
session_store::ensure_head_or_fork(
&store,
&mut session_id,
&mut head_hash,
SessionStartState {
system_prompt: worker_a.get_system_prompt(),
config: worker_a.request_config(),
history: worker_a.history(),
},
)
.await
.unwrap();
// session_a should now have a different session_id
assert_ne!(session_a.session_id(), original_sid);
// session_id should now be different
assert_ne!(session_id, original_sid);
// The fork session should exist and have entries
let fork_entries = store.read_all(session_a.session_id()).await.unwrap();
let fork_entries = store.read_all(session_id).await.unwrap();
assert!(!fork_entries.is_empty());
// Original session should still have the interloper entry

View File

@ -1,57 +0,0 @@
# session-store: persistence クレートの再構成
## 背景
`llm-worker-persistence` は名前・構造ともに llm-worker のサブクレートに見えるが、
実態はセッション管理という上位層の関心を持っている。
現状の `Session` は Worker を wrap して `run()`/`resume()` をインターセプトするが、
永続化のためにレイヤーとして呼び出しパスに噛む必要はない。
Worker からセッション状態を抜き出して保存する/復元するだけで十分。
## 方針
- クレート名を `llm-worker-persistence``session-store` に変更
- `Session` の Worker wrap を廃止し、save/restore の関数群にする
- Pod が Worker を直接保持し、run 後に session-store の関数を呼ぶ
- `llm-worker` への型依存(`Item`, `RequestConfig`)はそのまま残す(構造的に層にならなければ問題ない)
## 現状の構造
```
Controller → Pod → Session (wraps Worker) → Worker
↑ run()/resume() をインターセプト
```
`pod.session_mut().worker_mut()` と2段潜る必要がある。
## 変更後の構造
```
Controller → Pod → Worker (直接保持)
└─ run 後に session_store::save_delta(store, ...) を呼ぶ
restore 時に session_store::restore(store, id) → state を返す
```
## 変更内容
### session-store クレート(旧 llm-worker-persistence
- `Session` struct を廃止
- save 系関数を提供: history delta の記録、turn end、outcome 等
- restore 関数: ログ再生 → `RestoredState` を返すWorker は作らない)
- `Store` trait, `FsStore`, `LogEntry`, ハッシュチェーンはそのまま維持
- fork / fork_at も関数として残す
### pod クレート
- `Pod``Worker` を直接フィールドに持つ
- `Pod::run()` 内で Worker を呼び、その後 session-store の save 関数を呼ぶ
- `Pod::restore()` は session-store から `RestoredState` を受け取り、Worker に適用
- Controller は `pod.worker()` / `pod.worker_mut()` で直接アクセス
### 影響範囲
- `Session` を使っている箇所: `pod.rs`, `controller.rs`, テスト
- `SessionError` が消えるので、`PodError` から `SessionError` variant を除去し、`StoreError` に置換