diff --git a/Cargo.lock b/Cargo.lock index a54d0d61..32c1c125 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -214,6 +214,20 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "compact_str" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fdb1325a1cece981e8a296ab8f0f9b63ae357bd0784a9faaf548cc7b480707a" +dependencies = [ + "castaway", + "cfg-if", + "itoa", + "rustversion", + "ryu", + "static_assertions", +] + [[package]] name = "const-oid" version = "0.10.2" @@ -322,6 +336,15 @@ dependencies = [ "syn", ] +[[package]] +name = "deranged" +version = "0.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7cd812cc2bc1d69d4764bd80df88b4317eaef9e773c75226407d9bc0876b211c" +dependencies = [ + "powerfmt", +] + [[package]] name = "digest" version = "0.11.2" @@ -413,6 +436,12 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" +[[package]] +name = "foldhash" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77ce24cb58228fbb8aa041425bb1050850ac19177686ea6e0f41a70416f56fdb" + [[package]] name = "foreign-types" version = "0.3.2" @@ -582,7 +611,7 @@ checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" dependencies = [ "allocator-api2", "equivalent", - "foldhash", + "foldhash 0.1.5", ] [[package]] @@ -590,6 +619,11 @@ name = "hashbrown" version = "0.16.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" +dependencies = [ + "allocator-api2", + "equivalent", + "foldhash 0.2.0", +] [[package]] name = "heck" @@ -907,6 +941,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b192c782037fadd9cfa75548310488aabdbf3d2da73885b31bd0abd03351285" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.18" @@ -925,6 +968,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "kasuari" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bde5057d6143cc94e861d90f591b9303d6716c6b9602309150bd068853c10899" +dependencies = [ + "hashbrown 0.16.1", + "thiserror", +] + [[package]] name = "lazy_static" version = "1.5.0" @@ -943,6 +996,15 @@ version = "0.2.184" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48f5d2a454e16a5ea0f4ced81bd44e4cfc7bd3a507b61887c99fd3538b28e4af" +[[package]] +name = "line-clipping" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f50e8f47623268b5407192d26876c4d7f89d686ca130fdc53bced4814cd29f8" +dependencies = [ + "bitflags", +] + [[package]] name = "linux-raw-sys" version = "0.4.15" @@ -993,23 +1055,6 @@ dependencies = [ "syn", ] -[[package]] -name = "llm-worker-persistence" -version = "0.1.0" -dependencies = [ - "async-trait", - "futures", - "hex", - "llm-worker", - "serde", - "serde_json", - "sha2", - "tempfile", - "thiserror", - "tokio", - "uuid", -] - [[package]] name = "lock_api" version = "0.4.14" @@ -1034,6 +1079,15 @@ dependencies = [ "hashbrown 0.15.5", ] +[[package]] +name = "lru" +version = "0.16.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1dc47f592c06f33f8e3aea9591776ec7c9f9e4124778ff8a3c3b87159f7e593" +dependencies = [ + "hashbrown 0.16.1", +] + [[package]] name = "manifest" version = "0.1.0" @@ -1112,6 +1166,12 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "num-conv" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6673768db2d862beb9b39a78fdcb1a69439615d5794a1be50caa9bc92c81967" + [[package]] name = "once_cell" version = "1.21.4" @@ -1224,12 +1284,12 @@ dependencies = [ "dotenv", "futures", "llm-worker", - "llm-worker-persistence", "manifest", "protocol", "provider", "serde", "serde_json", + "session-store", "tempfile", "thiserror", "tokio", @@ -1246,6 +1306,12 @@ dependencies = [ "zerovec", ] +[[package]] +name = "powerfmt" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" + [[package]] name = "prettyplease" version = "0.2.37" @@ -1308,16 +1374,55 @@ checksum = "eabd94c2f37801c20583fc49dd5cd6b0ba68c716787c2dd6ed18571e1e63117b" dependencies = [ "bitflags", "cassowary", - "compact_str", + "compact_str 0.8.1", "crossterm", "indoc", "instability", - "itertools", - "lru", + "itertools 0.13.0", + "lru 0.12.5", "paste", - "strum", + "strum 0.26.3", + "unicode-segmentation", + "unicode-truncate 1.1.0", + "unicode-width 0.2.0", +] + +[[package]] +name = "ratatui-core" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ef8dea09a92caaf73bff7adb70b76162e5937524058a7e5bff37869cbbec293" +dependencies = [ + "bitflags", + "compact_str 0.9.0", + "hashbrown 0.16.1", + "indoc", + "itertools 0.14.0", + "kasuari", + "lru 0.16.3", + "strum 0.27.2", + "thiserror", + "unicode-segmentation", + "unicode-truncate 2.0.1", + "unicode-width 0.2.0", +] + +[[package]] +name = "ratatui-widgets" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7dbfa023cd4e604c2553483820c5fe8aa9d71a42eea5aa77c6e7f35756612db" +dependencies = [ + "bitflags", + "hashbrown 0.16.1", + "indoc", + "instability", + "itertools 0.14.0", + "line-clipping", + "ratatui-core", + "strum 0.27.2", + "time", "unicode-segmentation", - "unicode-truncate", "unicode-width 0.2.0", ] @@ -1665,6 +1770,23 @@ dependencies = [ "syn", ] +[[package]] +name = "session-store" +version = "0.1.0" +dependencies = [ + "async-trait", + "futures", + "hex", + "llm-worker", + "serde", + "serde_json", + "sha2", + "tempfile", + "thiserror", + "tokio", + "uuid", +] + [[package]] name = "sha2" version = "0.11.0" @@ -1768,7 +1890,16 @@ version = "0.26.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8fec0f0aef304996cf250b31b5a10dee7980c85da9d759361292b8bca5a18f06" dependencies = [ - "strum_macros", + "strum_macros 0.26.4", +] + +[[package]] +name = "strum" +version = "0.27.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af23d6f6c1a224baef9d3f61e287d2761385a5b88fdab4eb4c6f11aeb54c4bcf" +dependencies = [ + "strum_macros 0.27.2", ] [[package]] @@ -1784,6 +1915,18 @@ dependencies = [ "syn", ] +[[package]] +name = "strum_macros" +version = "0.27.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7695ce3845ea4b33927c055a39dc438a45b059f7c1b3d91d38d10355fb8cbca7" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "subtle" version = "2.6.1" @@ -1878,6 +2021,24 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "time" +version = "0.3.47" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "743bd48c283afc0388f9b8827b976905fb217ad9e647fae3a379a9283c4def2c" +dependencies = [ + "deranged", + "num-conv", + "powerfmt", + "time-core", +] + +[[package]] +name = "time-core" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7694e1cfe791f8d31026952abf09c69ca6f6fa4e1a1229e18988f06a04a12dca" + [[package]] name = "tinystr" version = "0.8.3" @@ -2124,6 +2285,18 @@ dependencies = [ "ratatui", "serde_json", "tokio", + "tui-scrollview", +] + +[[package]] +name = "tui-scrollview" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94a94f467c7ac7c291039b0733e3b2d379c77884e34fc27d167921fc1ab4842f" +dependencies = [ + "indoc", + "ratatui-core", + "ratatui-widgets", ] [[package]] @@ -2150,11 +2323,22 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b3644627a5af5fa321c95b9b235a72fd24cd29c648c2c379431e6628655627bf" dependencies = [ - "itertools", + "itertools 0.13.0", "unicode-segmentation", "unicode-width 0.1.14", ] +[[package]] +name = "unicode-truncate" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16b380a1238663e5f8a691f9039c73e1cdae598a30e9855f541d29b08b53e9a5" +dependencies = [ + "itertools 0.14.0", + "unicode-segmentation", + "unicode-width 0.2.0", +] + [[package]] name = "unicode-width" version = "0.1.14" diff --git a/Cargo.toml b/Cargo.toml index 38ca9700..bf267a00 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,7 @@ members = [ "crates/daemon", "crates/llm-worker", "crates/llm-worker-macros", - "crates/llm-worker-persistence", + "crates/session-store", "crates/manifest", "crates/pod", "crates/protocol", diff --git a/TODO.md b/TODO.md index fefe9e1f..faff19b1 100644 --- a/TODO.md +++ b/TODO.md @@ -20,5 +20,5 @@ - [ ] Compact (Step 5-8、session-store-extraction 後) - [x] Protocol: request-response パターン (GetHistory等) → [tickets/request-response-protocol.md](tickets/request-response-protocol.md) - [ ] パーミッション: パターンベースのツール実行制御 → [tickets/permission-extension-point.md](tickets/permission-extension-point.md) -- [ ] session-store: persistence クレートの再構成(wrap廃止、リネーム) → [tickets/session-store-extraction.md](tickets/session-store-extraction.md) +- [x] session-store: persistence クレートの再構成(wrap廃止、リネーム) → [tickets/session-store-extraction.md](tickets/session-store-extraction.md) - [ ] UI用トークン情報の記録(run stats の永続化、session-store 後) diff --git a/crates/llm-worker-persistence/src/session.rs b/crates/llm-worker-persistence/src/session.rs deleted file mode 100644 index e7516ae4..00000000 --- a/crates/llm-worker-persistence/src/session.rs +++ /dev/null @@ -1,400 +0,0 @@ -//! Persistent session wrapper around [`Worker`]. -//! -//! [`Session`] intercepts `Worker` operations and appends [`HashedEntry`] records -//! to a [`Store`]. It does not modify `Worker` internals — all persistence -//! happens by observing state before and after each operation. -//! -//! Each appended entry carries a hash that chains to the previous entry. -//! On append, the session checks whether the store's head still matches its -//! own `head_hash`; if not, it auto-forks into a new session. - -use crate::session_log::{self, EntryHash, HashedEntry, LogEntry, Outcome}; -use crate::store::{Store, StoreError}; -use crate::SessionId; -use llm_worker::llm_client::client::LlmClient; -use llm_worker::state::Mutable; -use llm_worker::{Worker, WorkerError, WorkerResult}; - -/// Configuration for session persistence. -#[derive(Debug, Clone)] -pub struct SessionConfig { - /// Record raw stream events to a separate trace file. - /// Default: `false`. - pub record_event_trace: bool, -} - -impl Default for SessionConfig { - fn default() -> Self { - Self { - record_event_trace: false, - } - } -} - -/// Errors from session operations. -#[derive(Debug, thiserror::Error)] -pub enum SessionError { - #[error(transparent)] - Worker(#[from] WorkerError), - - #[error(transparent)] - Store(#[from] StoreError), -} - -/// Persistent session wrapping a [`Worker`]. -/// -/// Use [`worker()`](Self::worker) / [`worker_mut()`](Self::worker_mut) to -/// access the underlying Worker for configuration (tool registration, etc.). -/// State-mutating operations (`run`, `resume`) should go through Session -/// methods to ensure proper logging. -pub struct Session { - /// Always `Some` outside of `run()` / `resume()`. - worker: Option>, - store: St, - session_id: SessionId, - head_hash: Option, - _config: SessionConfig, -} - -impl Session { - /// Create a new session, writing the initial `SessionStart` entry. - pub async fn new( - worker: Worker, - store: St, - config: SessionConfig, - ) -> Result { - let session_id = crate::new_session_id(); - let entry = LogEntry::SessionStart { - ts: session_log::now_millis(), - system_prompt: worker.get_system_prompt().map(String::from), - config: worker.request_config().clone(), - history: worker.history().to_vec(), - }; - let hashed = session_log::compute_hash(None, &entry); - let hashed_entry = HashedEntry { - hash: hashed.clone(), - prev_hash: None, - entry, - }; - store.append(session_id, &hashed_entry).await?; - - Ok(Self { - worker: Some(worker), - store, - session_id, - head_hash: Some(hashed), - _config: config, - }) - } - - /// Restore a session from a stored log. - pub async fn restore( - client: C, - store: St, - session_id: SessionId, - config: SessionConfig, - ) -> Result { - let entries = store.read_all(session_id).await?; - let state = session_log::collect_state(&entries); - - let mut worker = Worker::new(client); - if let Some(ref prompt) = state.system_prompt { - worker.set_system_prompt(prompt); - } - worker.set_history(state.history); - worker.set_request_config(state.config); - worker.set_turn_count(state.turn_count); - worker.set_last_run_interrupted(state.last_run_interrupted); - - Ok(Self { - worker: Some(worker), - store, - session_id, - head_hash: state.head_hash, - _config: config, - }) - } - - fn w(&self) -> &Worker { - self.worker.as_ref().expect("worker taken during run") - } - - /// Reference to the underlying Worker. - pub fn worker(&self) -> &Worker { - self.w() - } - - /// Mutable reference to the underlying Worker. - pub fn worker_mut(&mut self) -> &mut Worker { - self.worker.as_mut().expect("worker taken during run") - } - - /// The session ID. - pub fn session_id(&self) -> SessionId { - self.session_id - } - - /// The current head hash of the session log chain. - pub fn head_hash(&self) -> Option<&EntryHash> { - self.head_hash.as_ref() - } - - /// Reference to the underlying store. - pub fn store(&self) -> &St { - &self.store - } - - /// Run a user turn, logging all state changes. - /// - /// Internally locks the Worker (flushing pending tools), runs the turn, - /// then unlocks back to Mutable state. - pub async fn run( - &mut self, - user_input: impl Into, - ) -> Result { - let input = user_input.into(); - self.ensure_head_or_fork().await?; - - let history_before = self.w().history().len(); - - // lock → run → unlock (use lock() directly to keep worker on error) - let worker = self.worker.take().expect("worker taken during run"); - let mut locked = worker.lock(); - let result = locked.run(input).await; - self.worker = Some(locked.unlock()); - - self.log_history_delta(history_before).await?; - self.log_turn_end().await?; - self.log_outcome(&result).await?; - - result.map_err(SessionError::Worker) - } - - /// Resume from a paused state, logging all state changes. - pub async fn resume(&mut self) -> Result { - self.ensure_head_or_fork().await?; - - let history_before = self.w().history().len(); - - // lock → resume → unlock - let worker = self.worker.take().expect("worker taken during run"); - let mut locked = worker.lock(); - let result = locked.resume().await; - self.worker = Some(locked.unlock()); - - self.log_history_delta(history_before).await?; - self.log_turn_end().await?; - self.log_outcome(&result).await?; - - result.map_err(SessionError::Worker) - } - - /// Fork this session at its current state. - pub async fn fork(&self) -> Result { - let fork_id = crate::new_session_id(); - let entry = LogEntry::SessionStart { - ts: session_log::now_millis(), - system_prompt: self.w().get_system_prompt().map(String::from), - config: self.w().request_config().clone(), - history: self.w().history().to_vec(), - }; - let hashed = session_log::compute_hash(None, &entry); - let hashed_entry = HashedEntry { - hash: hashed, - prev_hash: None, - entry, - }; - self.store - .create_session(fork_id, &[hashed_entry]) - .await?; - Ok(fork_id) - } - - /// Fork from an arbitrary point in a stored session's log. - pub async fn fork_at( - store: &St, - source_id: SessionId, - at_hash: &EntryHash, - ) -> Result { - let entries = store.read_all(source_id).await?; - let cut = entries - .iter() - .position(|e| &e.hash == at_hash) - .map(|i| i + 1) - .unwrap_or(entries.len()); - let state = session_log::collect_state(&entries[..cut]); - - let fork_id = crate::new_session_id(); - let entry = LogEntry::SessionStart { - ts: session_log::now_millis(), - system_prompt: state.system_prompt, - config: state.config, - history: state.history, - }; - let hashed = session_log::compute_hash(None, &entry); - let hashed_entry = HashedEntry { - hash: hashed, - prev_hash: None, - entry, - }; - store.create_session(fork_id, &[hashed_entry]).await?; - Ok(fork_id) - } - - /// Log a `Locked` entry. - pub async fn log_cache_locked( - &mut self, - locked_prefix_len: usize, - ) -> Result<(), StoreError> { - let entry = LogEntry::Locked { - ts: session_log::now_millis(), - locked_prefix_len, - }; - self.append_entry(entry).await - } - - /// Log a `CacheUnlocked` entry. - pub async fn log_cache_unlocked(&mut self) -> Result<(), StoreError> { - let entry = LogEntry::CacheUnlocked { - ts: session_log::now_millis(), - }; - self.append_entry(entry).await - } - - /// Log a `ConfigChanged` entry. - pub async fn log_config_changed(&mut self) -> Result<(), StoreError> { - let entry = LogEntry::ConfigChanged { - ts: session_log::now_millis(), - config: self.w().request_config().clone(), - }; - self.append_entry(entry).await - } - - // ── Private helpers ────────────────────────────────────────────────── - - async fn append_entry(&mut self, entry: LogEntry) -> Result<(), StoreError> { - let hash = session_log::compute_hash(self.head_hash.as_ref(), &entry); - let hashed_entry = HashedEntry { - hash: hash.clone(), - prev_hash: self.head_hash.clone(), - entry, - }; - self.store - .append(self.session_id, &hashed_entry) - .await?; - self.head_hash = Some(hash); - Ok(()) - } - - async fn ensure_head_or_fork(&mut self) -> Result<(), StoreError> { - let store_head = self.store.read_head_hash(self.session_id).await?; - if store_head == self.head_hash { - return Ok(()); - } - let fork_id = crate::new_session_id(); - let entry = LogEntry::SessionStart { - ts: session_log::now_millis(), - system_prompt: self.w().get_system_prompt().map(String::from), - config: self.w().request_config().clone(), - history: self.w().history().to_vec(), - }; - let hash = session_log::compute_hash(None, &entry); - let hashed_entry = HashedEntry { - hash: hash.clone(), - prev_hash: None, - entry, - }; - self.store - .create_session(fork_id, &[hashed_entry]) - .await?; - self.session_id = fork_id; - self.head_hash = Some(hash); - Ok(()) - } - - async fn log_history_delta(&mut self, before_len: usize) -> Result<(), StoreError> { - let history = self.w().history(); - if history.len() <= before_len { - return Ok(()); - } - - let ts = session_log::now_millis(); - let new_items = history[before_len..].to_vec(); - let mut i = 0; - - while i < new_items.len() { - let item = &new_items[i]; - if item.is_user_message() { - self.append_entry(LogEntry::UserInput { - ts, - item: new_items[i].clone(), - }) - .await?; - i += 1; - } else if item.is_tool_result() { - let start = i; - while i < new_items.len() && new_items[i].is_tool_result() { - i += 1; - } - self.append_entry(LogEntry::ToolResults { - ts, - items: new_items[start..i].to_vec(), - }) - .await?; - } else if item.is_assistant_message() - || item.is_tool_call() - || item.is_reasoning() - { - let start = i; - while i < new_items.len() - && (new_items[i].is_assistant_message() - || new_items[i].is_tool_call() - || new_items[i].is_reasoning()) - { - i += 1; - } - self.append_entry(LogEntry::AssistantItems { - ts, - items: new_items[start..i].to_vec(), - }) - .await?; - } else { - self.append_entry(LogEntry::HookInjectedItems { - ts, - items: vec![new_items[i].clone()], - }) - .await?; - i += 1; - } - } - Ok(()) - } - - async fn log_turn_end(&mut self) -> Result<(), StoreError> { - self.append_entry(LogEntry::TurnEnd { - ts: session_log::now_millis(), - turn_count: self.w().turn_count(), - }) - .await - } - - async fn log_outcome( - &mut self, - result: &Result, - ) -> Result<(), StoreError> { - let outcome = match result { - Ok(WorkerResult::Finished) => Outcome::Finished, - Ok(WorkerResult::Paused) => Outcome::Paused, - Ok(WorkerResult::LimitReached) => Outcome::LimitReached, - Err(e) => Outcome::Error { - message: e.to_string(), - }, - }; - self.append_entry(LogEntry::RunOutcome { - ts: session_log::now_millis(), - outcome, - interrupted: self.w().last_run_interrupted(), - }) - .await - } -} diff --git a/crates/pod/Cargo.toml b/crates/pod/Cargo.toml index f452b641..61562400 100644 --- a/crates/pod/Cargo.toml +++ b/crates/pod/Cargo.toml @@ -8,7 +8,7 @@ license.workspace = true async-trait = "0.1.89" clap = { version = "4.6.0", features = ["derive"] } llm-worker = { version = "0.2.1", path = "../llm-worker" } -llm-worker-persistence = { version = "0.1.0", path = "../llm-worker-persistence" } +session-store = { version = "0.1.0", path = "../session-store" } manifest = { version = "0.1.0", path = "../manifest" } protocol = { version = "0.1.0", path = "../protocol" } provider = { version = "0.1.0", path = "../provider" } @@ -23,6 +23,6 @@ tracing = "0.1.44" async-trait = "0.1.89" dotenv = "0.15.0" futures = "0.3.32" -llm-worker-persistence = { path = "../llm-worker-persistence" } +session-store = { path = "../session-store" } tempfile = "3.27.0" tokio = { version = "1.49", features = ["macros", "rt-multi-thread", "time"] } diff --git a/crates/pod/examples/pod_cli.rs b/crates/pod/examples/pod_cli.rs index e6376385..f811c5ce 100644 --- a/crates/pod/examples/pod_cli.rs +++ b/crates/pod/examples/pod_cli.rs @@ -12,7 +12,7 @@ //! ``` use pod::{Pod, PodManifest, PodRunResult}; -use llm_worker_persistence::FsStore; +use session_store::FsStore; const MANIFEST_TOML: &str = r#" [pod] @@ -52,7 +52,7 @@ async fn main() -> Result<(), Box> { } // 5. Extract the assistant's reply from history - let history = pod.session_mut().worker().history(); + let history = pod.worker().history(); if let Some(text) = history .iter() .rev() diff --git a/crates/pod/examples/pod_protocol.rs b/crates/pod/examples/pod_protocol.rs index 991ba884..bcd8707f 100644 --- a/crates/pod/examples/pod_protocol.rs +++ b/crates/pod/examples/pod_protocol.rs @@ -6,7 +6,7 @@ //! ``` use pod::{Event, Method, PodController, PodManifest}; -use llm_worker_persistence::FsStore; +use session_store::FsStore; const MANIFEST_TOML: &str = r#" [pod] diff --git a/crates/pod/src/controller.rs b/crates/pod/src/controller.rs index ea114aba..a80465b9 100644 --- a/crates/pod/src/controller.rs +++ b/crates/pod/src/controller.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use llm_worker::llm_client::client::LlmClient; use llm_worker::WorkerError; -use llm_worker_persistence::Store; +use session_store::Store; use tokio::sync::{broadcast, mpsc}; use crate::pod::{Pod, PodRunResult, PodError}; @@ -85,7 +85,7 @@ impl PodController { // Register event bridge callbacks on the worker { - let worker = pod.session_mut().worker_mut(); + let worker = pod.worker_mut(); let tx = event_tx.clone(); worker.on_turn_start(move |turn| { @@ -158,7 +158,7 @@ impl PodController { } // Clone cancel sender before moving pod - let cancel_tx = pod.session_mut().worker_mut().cancel_sender(); + let cancel_tx = pod.worker_mut().cancel_sender(); tokio::spawn(async move { // Hold socket server alive for the lifetime of the controller task @@ -191,7 +191,7 @@ impl PodController { ) .await; - let items = pod.session_mut().worker_mut().history().to_vec(); + let items = pod.worker().history().to_vec(); shared_state.update_history(items); shared_state.set_status(new_status); let _ = runtime_dir.write_status(&shared_state).await; @@ -218,7 +218,7 @@ impl PodController { ) .await; - let items = pod.session_mut().worker_mut().history().to_vec(); + let items = pod.worker().history().to_vec(); shared_state.update_history(items); shared_state.set_status(new_status); let _ = runtime_dir.write_status(&shared_state).await; @@ -307,19 +307,12 @@ where fn worker_error_code(e: &PodError) -> ErrorCode { match e { - PodError::Session(se) => { - use llm_worker_persistence::SessionError; - match se { - SessionError::Worker(we) => match we { - WorkerError::Tool(_) => ErrorCode::ToolError, - WorkerError::Client(_) => ErrorCode::ProviderError, - _ => ErrorCode::Internal, - }, - _ => ErrorCode::Internal, - } - } + PodError::Worker(we) => match we { + WorkerError::Tool(_) => ErrorCode::ToolError, + WorkerError::Client(_) => ErrorCode::ProviderError, + _ => ErrorCode::Internal, + }, PodError::Provider(_) => ErrorCode::ProviderError, _ => ErrorCode::Internal, } } - diff --git a/crates/pod/src/main.rs b/crates/pod/src/main.rs index beda01fb..fb1442cb 100644 --- a/crates/pod/src/main.rs +++ b/crates/pod/src/main.rs @@ -2,7 +2,7 @@ use std::path::{Path, PathBuf}; use std::process::ExitCode; use clap::Parser; -use llm_worker_persistence::FsStore; +use session_store::FsStore; use pod::{Pod, PodController}; #[derive(Parser)] diff --git a/crates/pod/src/pod.rs b/crates/pod/src/pod.rs index dcf263f7..9d377aeb 100644 --- a/crates/pod/src/pod.rs +++ b/crates/pod/src/pod.rs @@ -3,9 +3,10 @@ use std::sync::Arc; use llm_worker::llm_client::client::LlmClient; use llm_worker::llm_client::RequestConfig; -use llm_worker::Worker; -use llm_worker_persistence::{ - Session, SessionConfig, SessionError, SessionId, Store, StoreError, +use llm_worker::state::Mutable; +use llm_worker::{Worker, WorkerError, WorkerResult}; +use session_store::{ + EntryHash, Outcome, SessionId, SessionStartState, Store, StoreError, }; use manifest::{PodManifest, Scope, WorkerManifest}; @@ -18,11 +19,15 @@ use crate::hook_interceptor::HookInterceptor; /// An independent agent execution unit. /// -/// Wraps a persistent [`Session`] with manifest metadata and an optional -/// directory scope. This is the primary abstraction in insomnia. +/// Holds a [`Worker`] directly and persists session state via +/// `session-store` functions after each turn. pub struct Pod { manifest: PodManifest, - session: Session, + /// Always `Some` outside of `run()`/`resume()`. + worker: Option>, + store: St, + session_id: SessionId, + head_hash: Option, scope: Option, hook_builder: HookRegistryBuilder, interceptor_installed: bool, @@ -30,20 +35,24 @@ pub struct Pod { impl Pod { /// Create a new Pod from a pre-built Worker and store. - /// - /// The caller is responsible for constructing the `LlmClient` from the - /// manifest's provider config. This keeps Pod free of provider-specific - /// dependencies. pub async fn new( manifest: PodManifest, worker: Worker, store: St, scope: Option, ) -> Result { - let session = Session::new(worker, store, SessionConfig::default()).await?; + let state = SessionStartState { + system_prompt: worker.get_system_prompt(), + config: worker.request_config(), + history: worker.history(), + }; + let (session_id, head_hash) = session_store::create_session(&store, state).await?; Ok(Self { manifest, - session, + worker: Some(worker), + store, + session_id, + head_hash: Some(head_hash), scope, hook_builder: HookRegistryBuilder::new(), interceptor_installed: false, @@ -58,10 +67,22 @@ impl Pod { store: St, scope: Option, ) -> Result { - let session = Session::restore(client, store, session_id, SessionConfig::default()).await?; + let state = session_store::restore(&store, session_id).await?; + let mut worker = Worker::new(client); + if let Some(ref prompt) = state.system_prompt { + worker.set_system_prompt(prompt); + } + worker.set_history(state.history); + worker.set_request_config(state.config); + worker.set_turn_count(state.turn_count); + worker.set_last_run_interrupted(state.last_run_interrupted); + Ok(Self { manifest, - session, + worker: Some(worker), + store, + session_id, + head_hash: state.head_hash, scope, hook_builder: HookRegistryBuilder::new(), interceptor_installed: false, @@ -70,7 +91,7 @@ impl Pod { /// The session ID used for persistence. pub fn session_id(&self) -> SessionId { - self.session.session_id() + self.session_id } /// The Pod's manifest. @@ -83,18 +104,25 @@ impl Pod { self.scope.as_ref() } - /// Direct access to the underlying session. + /// Direct access to the underlying Worker. + pub fn worker(&self) -> &Worker { + self.worker.as_ref().expect("worker taken during run") + } + + /// Mutable access to the underlying Worker. /// - /// Use this to register tools, hooks, or subscribers on the worker - /// before calling [`run`](Self::run). - pub fn session_mut(&mut self) -> &mut Session { - &mut self.session + /// Use this to register tools, hooks, or subscribers before calling + /// [`run`](Self::run). + pub fn worker_mut(&mut self) -> &mut Worker { + self.worker.as_mut().expect("worker taken during run") + } + + /// Reference to the store. + pub fn store(&self) -> &St { + &self.store } // --- Hook registration --- - // - // Hooks must be registered before the first call to `run()` or `resume()`. - // Attempting to add a hook after execution has started will panic. fn assert_hooks_open(&self) { assert!( @@ -145,7 +173,7 @@ impl Pod { let builder = std::mem::take(&mut self.hook_builder); let registry = Arc::new(builder.build()); let interceptor = HookInterceptor::new(registry); - self.session.worker_mut().set_interceptor(interceptor); + self.worker_mut().set_interceptor(interceptor); self.interceptor_installed = true; } } @@ -153,23 +181,114 @@ impl Pod { /// Send user input and run until the LLM turn completes. pub async fn run(&mut self, input: impl Into) -> Result { self.ensure_interceptor_installed(); - let result = self.session.run(input).await?; - Ok(result.into()) + + // Split borrow: access worker field directly to allow concurrent + // mutable borrows on session_id / head_hash. + let w = self.worker.as_ref().unwrap(); + session_store::ensure_head_or_fork( + &self.store, + &mut self.session_id, + &mut self.head_hash, + SessionStartState { + system_prompt: w.get_system_prompt(), + config: w.request_config(), + history: w.history(), + }, + ) + .await?; + + let history_before = self.worker.as_ref().unwrap().history().len(); + + // lock → run → unlock + let worker = self.worker.take().expect("worker taken during run"); + let mut locked = worker.lock(); + let result = locked.run(input).await; + self.worker = Some(locked.unlock()); + + self.persist_turn(history_before, &result).await?; + result.map(PodRunResult::from).map_err(PodError::Worker) } /// Resume from a paused state. pub async fn resume(&mut self) -> Result { self.ensure_interceptor_installed(); - let result = self.session.resume().await?; - Ok(result.into()) + + let w = self.worker.as_ref().unwrap(); + session_store::ensure_head_or_fork( + &self.store, + &mut self.session_id, + &mut self.head_hash, + SessionStartState { + system_prompt: w.get_system_prompt(), + config: w.request_config(), + history: w.history(), + }, + ) + .await?; + + let history_before = self.worker.as_ref().unwrap().history().len(); + + // lock → resume → unlock + let worker = self.worker.take().expect("worker taken during run"); + let mut locked = worker.lock(); + let result = locked.resume().await; + self.worker = Some(locked.unlock()); + + self.persist_turn(history_before, &result).await?; + result.map(PodRunResult::from).map_err(PodError::Worker) + } + + /// Persist delta + turn end + outcome after a run/resume. + async fn persist_turn( + &mut self, + history_before: usize, + result: &Result, + ) -> Result<(), StoreError> { + // Use direct field access for split borrows (worker immutable, + // head_hash mutable). + let w = self.worker.as_ref().unwrap(); + let new_items = &w.history()[history_before..]; + session_store::save_delta( + &self.store, + self.session_id, + &mut self.head_hash, + new_items, + ) + .await?; + + let turn_count = self.worker.as_ref().unwrap().turn_count(); + session_store::save_turn_end( + &self.store, + self.session_id, + &mut self.head_hash, + turn_count, + ) + .await?; + + let interrupted = self.worker.as_ref().unwrap().last_run_interrupted(); + let outcome = match result { + Ok(WorkerResult::Finished) => Outcome::Finished, + Ok(WorkerResult::Paused) => Outcome::Paused, + Ok(WorkerResult::LimitReached) => Outcome::LimitReached, + Err(e) => Outcome::Error { + message: e.to_string(), + }, + }; + session_store::save_outcome( + &self.store, + self.session_id, + &mut self.head_hash, + outcome, + interrupted, + ) + .await?; + + Ok(()) } } impl Pod, St> { /// Create a Pod entirely from a manifest. - /// - /// Builds the LLM client from the provider config, applies worker - /// settings, and creates a new persistent session. pub async fn from_manifest( manifest: PodManifest, store: St, @@ -179,10 +298,19 @@ impl Pod, St> { let client = provider::build_client(&manifest.provider, manifest_dir.as_deref())?; let mut worker = Worker::new(client); apply_worker_manifest(&mut worker, &manifest.worker); - let session = Session::new(worker, store, SessionConfig::default()).await?; + + let state = SessionStartState { + system_prompt: worker.get_system_prompt(), + config: worker.request_config(), + history: worker.history(), + }; + let (session_id, head_hash) = session_store::create_session(&store, state).await?; Ok(Self { manifest, - session, + worker: Some(worker), + store, + session_id, + head_hash: Some(head_hash), scope, hook_builder: HookRegistryBuilder::new(), interceptor_installed: false, @@ -217,12 +345,12 @@ pub enum PodRunResult { LimitReached, } -impl From for PodRunResult { - fn from(r: llm_worker::WorkerResult) -> Self { +impl From for PodRunResult { + fn from(r: WorkerResult) -> Self { match r { - llm_worker::WorkerResult::Finished => PodRunResult::Finished, - llm_worker::WorkerResult::Paused => PodRunResult::Paused, - llm_worker::WorkerResult::LimitReached => PodRunResult::LimitReached, + WorkerResult::Finished => PodRunResult::Finished, + WorkerResult::Paused => PodRunResult::Paused, + WorkerResult::LimitReached => PodRunResult::LimitReached, } } } @@ -231,7 +359,7 @@ impl From for PodRunResult { #[derive(Debug, thiserror::Error)] pub enum PodError { #[error(transparent)] - Session(#[from] SessionError), + Worker(#[from] WorkerError), #[error(transparent)] Store(#[from] StoreError), diff --git a/crates/pod/src/runtime_dir.rs b/crates/pod/src/runtime_dir.rs index 59a8f3ea..e16fe5d6 100644 --- a/crates/pod/src/runtime_dir.rs +++ b/crates/pod/src/runtime_dir.rs @@ -107,7 +107,7 @@ mod tests { fn test_state() -> PodSharedState { PodSharedState::new( "test-pod".into(), - llm_worker_persistence::new_session_id(), + session_store::new_session_id(), "[pod]\nname = \"test-pod\"".into(), ) } diff --git a/crates/pod/src/shared_state.rs b/crates/pod/src/shared_state.rs index a4c47235..7f94c239 100644 --- a/crates/pod/src/shared_state.rs +++ b/crates/pod/src/shared_state.rs @@ -1,7 +1,7 @@ use std::sync::RwLock; use llm_worker::llm_client::types::Item; -use llm_worker_persistence::SessionId; +use session_store::SessionId; use serde::{Deserialize, Serialize}; /// Shared state between PodController and runtime directory. @@ -88,7 +88,7 @@ mod tests { fn test_state() -> PodSharedState { PodSharedState::new( "test-pod".into(), - llm_worker_persistence::new_session_id(), + session_store::new_session_id(), "[pod]\nname = \"test-pod\"".into(), ) } diff --git a/crates/pod/tests/controller_test.rs b/crates/pod/tests/controller_test.rs index 8526bbb4..ab89b98e 100644 --- a/crates/pod/tests/controller_test.rs +++ b/crates/pod/tests/controller_test.rs @@ -7,7 +7,7 @@ use futures::Stream; use llm_worker::llm_client::event::{Event as LlmEvent, ResponseStatus, StatusEvent}; use llm_worker::llm_client::{ClientError, LlmClient, Request}; use llm_worker::Worker; -use llm_worker_persistence::FsStore; +use session_store::FsStore; use pod::{ Event, Method, Pod, PodController, PodManifest, PodStatus, diff --git a/crates/llm-worker-persistence/Cargo.toml b/crates/session-store/Cargo.toml similarity index 83% rename from crates/llm-worker-persistence/Cargo.toml rename to crates/session-store/Cargo.toml index 41bf4bda..f7d9145c 100644 --- a/crates/llm-worker-persistence/Cargo.toml +++ b/crates/session-store/Cargo.toml @@ -1,6 +1,6 @@ [package] -name = "llm-worker-persistence" -description = "Session persistence for llm-worker via append-only JSONL logs" +name = "session-store" +description = "Session persistence via append-only JSONL logs" version = "0.1.0" edition.workspace = true license.workspace = true diff --git a/crates/llm-worker-persistence/README.md b/crates/session-store/README.md similarity index 100% rename from crates/llm-worker-persistence/README.md rename to crates/session-store/README.md diff --git a/crates/llm-worker-persistence/src/event_trace.rs b/crates/session-store/src/event_trace.rs similarity index 100% rename from crates/llm-worker-persistence/src/event_trace.rs rename to crates/session-store/src/event_trace.rs diff --git a/crates/llm-worker-persistence/src/fs_store.rs b/crates/session-store/src/fs_store.rs similarity index 100% rename from crates/llm-worker-persistence/src/fs_store.rs rename to crates/session-store/src/fs_store.rs diff --git a/crates/llm-worker-persistence/src/lib.rs b/crates/session-store/src/lib.rs similarity index 54% rename from crates/llm-worker-persistence/src/lib.rs rename to crates/session-store/src/lib.rs index d2ff8f9e..84dd67e5 100644 --- a/crates/llm-worker-persistence/src/lib.rs +++ b/crates/session-store/src/lib.rs @@ -1,10 +1,14 @@ -//! Session persistence for `llm-worker` via append-only JSONL logs. +//! Session persistence via append-only JSONL logs. //! //! # Architecture //! //! Sessions are recorded as a sequence of [`LogEntry`] values, one per line //! in a `.jsonl` file. Reading the log and collecting entries reconstructs -//! the full [`Worker`] state — no separate snapshots or checkpoints needed. +//! the full Worker state — no separate snapshots or checkpoints needed. +//! +//! This crate provides free functions for persistence operations. +//! The caller (typically Pod) holds the Worker directly and calls these +//! functions after state-mutating operations. //! //! Debug-mode [`TraceEntry`] records capture raw stream events in a separate //! `.trace.jsonl` file, independent of the session log. @@ -12,12 +16,14 @@ //! # Quick start //! //! ```ignore -//! use llm_worker_persistence::{Session, SessionConfig, FsStore}; +//! use session_store::{create_session, restore, save_delta, FsStore, SessionStartState}; //! //! let store = FsStore::new("./sessions").await?; -//! let worker = Worker::new(client); -//! let mut session = Session::new(worker, store, SessionConfig::default()).await?; -//! session.run("Hello!").await?; +//! let (session_id, head_hash) = create_session(&store, SessionStartState { +//! system_prompt: None, +//! config: &config, +//! history: &[], +//! }).await?; //! ``` pub mod event_trace; @@ -28,7 +34,10 @@ pub mod store; pub use event_trace::TraceEntry; pub use fs_store::FsStore; -pub use session::{Session, SessionConfig, SessionError}; +pub use session::{ + SessionStartState, create_session, ensure_head_or_fork, fork, fork_at, restore, save_cache_locked, + save_cache_unlocked, save_config_changed, save_delta, save_outcome, save_turn_end, +}; pub use session_log::{ EntryHash, HashedEntry, LogEntry, Outcome, RestoredState, build_chain, collect_state, compute_hash, diff --git a/crates/session-store/src/session.rs b/crates/session-store/src/session.rs new file mode 100644 index 00000000..dc7ee987 --- /dev/null +++ b/crates/session-store/src/session.rs @@ -0,0 +1,291 @@ +//! Free functions for session persistence operations. +//! +//! These functions record and restore session state without owning a Worker. +//! The caller (typically Pod) holds the Worker directly and calls these +//! functions after state-mutating operations. + +use crate::session_log::{self, EntryHash, HashedEntry, LogEntry, Outcome}; +use crate::store::{Store, StoreError}; +use crate::SessionId; +use llm_worker::llm_client::types::Item; +use llm_worker::llm_client::RequestConfig; + +/// State snapshot for creating a SessionStart entry. +pub struct SessionStartState<'a> { + pub system_prompt: Option<&'a str>, + pub config: &'a RequestConfig, + pub history: &'a [Item], +} + +/// Create a new session, writing the initial `SessionStart` entry. +/// +/// Returns the new session ID and head hash. +pub async fn create_session( + store: &impl Store, + state: SessionStartState<'_>, +) -> Result<(SessionId, EntryHash), StoreError> { + let session_id = crate::new_session_id(); + let entry = LogEntry::SessionStart { + ts: session_log::now_millis(), + system_prompt: state.system_prompt.map(String::from), + config: state.config.clone(), + history: state.history.to_vec(), + }; + let hash = session_log::compute_hash(None, &entry); + let hashed_entry = HashedEntry { + hash: hash.clone(), + prev_hash: None, + entry, + }; + store.append(session_id, &hashed_entry).await?; + Ok((session_id, hash)) +} + +/// Restore session state from a stored log. +/// +/// Returns the reconstructed state. The caller is responsible for +/// applying it to a Worker. +pub async fn restore( + store: &impl Store, + session_id: SessionId, +) -> Result { + let entries = store.read_all(session_id).await?; + Ok(session_log::collect_state(&entries)) +} + +/// Check if the store's head still matches the expected head hash. +/// If not, auto-fork into a new session. +/// +/// Updates `session_id` and `head_hash` in place when a fork occurs. +pub async fn ensure_head_or_fork( + store: &impl Store, + session_id: &mut SessionId, + head_hash: &mut Option, + state: SessionStartState<'_>, +) -> Result<(), StoreError> { + let store_head = store.read_head_hash(*session_id).await?; + if store_head == *head_hash { + return Ok(()); + } + let fork_id = crate::new_session_id(); + let entry = LogEntry::SessionStart { + ts: session_log::now_millis(), + system_prompt: state.system_prompt.map(String::from), + config: state.config.clone(), + history: state.history.to_vec(), + }; + let hash = session_log::compute_hash(None, &entry); + let hashed_entry = HashedEntry { + hash: hash.clone(), + prev_hash: None, + entry, + }; + store.create_session(fork_id, &[hashed_entry]).await?; + *session_id = fork_id; + *head_hash = Some(hash); + Ok(()) +} + +/// Log the history delta — new items added since the previous snapshot. +/// +/// Classifies items into UserInput, AssistantItems, ToolResults, and +/// HookInjectedItems entries automatically. +pub async fn save_delta( + store: &impl Store, + session_id: SessionId, + head_hash: &mut Option, + new_items: &[Item], +) -> Result<(), StoreError> { + if new_items.is_empty() { + return Ok(()); + } + + let ts = session_log::now_millis(); + let mut i = 0; + + while i < new_items.len() { + let item = &new_items[i]; + if item.is_user_message() { + append_entry(store, session_id, head_hash, LogEntry::UserInput { + ts, + item: new_items[i].clone(), + }) + .await?; + i += 1; + } else if item.is_tool_result() { + let start = i; + while i < new_items.len() && new_items[i].is_tool_result() { + i += 1; + } + append_entry(store, session_id, head_hash, LogEntry::ToolResults { + ts, + items: new_items[start..i].to_vec(), + }) + .await?; + } else if item.is_assistant_message() || item.is_tool_call() || item.is_reasoning() { + let start = i; + while i < new_items.len() + && (new_items[i].is_assistant_message() + || new_items[i].is_tool_call() + || new_items[i].is_reasoning()) + { + i += 1; + } + append_entry(store, session_id, head_hash, LogEntry::AssistantItems { + ts, + items: new_items[start..i].to_vec(), + }) + .await?; + } else { + append_entry(store, session_id, head_hash, LogEntry::HookInjectedItems { + ts, + items: vec![new_items[i].clone()], + }) + .await?; + i += 1; + } + } + Ok(()) +} + +/// Log a TurnEnd entry. +pub async fn save_turn_end( + store: &impl Store, + session_id: SessionId, + head_hash: &mut Option, + turn_count: usize, +) -> Result<(), StoreError> { + append_entry(store, session_id, head_hash, LogEntry::TurnEnd { + ts: session_log::now_millis(), + turn_count, + }) + .await +} + +/// Log a RunOutcome entry. +pub async fn save_outcome( + store: &impl Store, + session_id: SessionId, + head_hash: &mut Option, + outcome: Outcome, + interrupted: bool, +) -> Result<(), StoreError> { + append_entry(store, session_id, head_hash, LogEntry::RunOutcome { + ts: session_log::now_millis(), + outcome, + interrupted, + }) + .await +} + +/// Log a `Locked` entry (KV cache locked). +pub async fn save_cache_locked( + store: &impl Store, + session_id: SessionId, + head_hash: &mut Option, + locked_prefix_len: usize, +) -> Result<(), StoreError> { + append_entry(store, session_id, head_hash, LogEntry::Locked { + ts: session_log::now_millis(), + locked_prefix_len, + }) + .await +} + +/// Log a `CacheUnlocked` entry. +pub async fn save_cache_unlocked( + store: &impl Store, + session_id: SessionId, + head_hash: &mut Option, +) -> Result<(), StoreError> { + append_entry(store, session_id, head_hash, LogEntry::CacheUnlocked { + ts: session_log::now_millis(), + }) + .await +} + +/// Log a `ConfigChanged` entry. +pub async fn save_config_changed( + store: &impl Store, + session_id: SessionId, + head_hash: &mut Option, + config: &RequestConfig, +) -> Result<(), StoreError> { + append_entry(store, session_id, head_hash, LogEntry::ConfigChanged { + ts: session_log::now_millis(), + config: config.clone(), + }) + .await +} + +/// Fork the current state into a new session. +pub async fn fork( + store: &impl Store, + state: SessionStartState<'_>, +) -> Result { + let fork_id = crate::new_session_id(); + let entry = LogEntry::SessionStart { + ts: session_log::now_millis(), + system_prompt: state.system_prompt.map(String::from), + config: state.config.clone(), + history: state.history.to_vec(), + }; + let hash = session_log::compute_hash(None, &entry); + let hashed_entry = HashedEntry { + hash, + prev_hash: None, + entry, + }; + store.create_session(fork_id, &[hashed_entry]).await?; + Ok(fork_id) +} + +/// Fork from an arbitrary point in a stored session's log. +pub async fn fork_at( + store: &impl Store, + source_id: SessionId, + at_hash: &EntryHash, +) -> Result { + let entries = store.read_all(source_id).await?; + let cut = entries + .iter() + .position(|e| &e.hash == at_hash) + .map(|i| i + 1) + .unwrap_or(entries.len()); + let state = session_log::collect_state(&entries[..cut]); + + let fork_id = crate::new_session_id(); + let entry = LogEntry::SessionStart { + ts: session_log::now_millis(), + system_prompt: state.system_prompt, + config: state.config, + history: state.history, + }; + let hash = session_log::compute_hash(None, &entry); + let hashed_entry = HashedEntry { + hash, + prev_hash: None, + entry, + }; + store.create_session(fork_id, &[hashed_entry]).await?; + Ok(fork_id) +} + +// ── Private helper ────────────────────────────────────────────────────── + +async fn append_entry( + store: &impl Store, + session_id: SessionId, + head_hash: &mut Option, + entry: LogEntry, +) -> Result<(), StoreError> { + let hash = session_log::compute_hash(head_hash.as_ref(), &entry); + let hashed_entry = HashedEntry { + hash: hash.clone(), + prev_hash: head_hash.clone(), + entry, + }; + store.append(session_id, &hashed_entry).await?; + *head_hash = Some(hash); + Ok(()) +} diff --git a/crates/llm-worker-persistence/src/session_log.rs b/crates/session-store/src/session_log.rs similarity index 100% rename from crates/llm-worker-persistence/src/session_log.rs rename to crates/session-store/src/session_log.rs diff --git a/crates/llm-worker-persistence/src/store.rs b/crates/session-store/src/store.rs similarity index 100% rename from crates/llm-worker-persistence/src/store.rs rename to crates/session-store/src/store.rs diff --git a/crates/llm-worker-persistence/tests/common/mod.rs b/crates/session-store/tests/common/mod.rs similarity index 100% rename from crates/llm-worker-persistence/tests/common/mod.rs rename to crates/session-store/tests/common/mod.rs diff --git a/crates/llm-worker-persistence/tests/fs_store_test.rs b/crates/session-store/tests/fs_store_test.rs similarity index 99% rename from crates/llm-worker-persistence/tests/fs_store_test.rs rename to crates/session-store/tests/fs_store_test.rs index 0804f205..32c74afa 100644 --- a/crates/llm-worker-persistence/tests/fs_store_test.rs +++ b/crates/session-store/tests/fs_store_test.rs @@ -1,5 +1,5 @@ use llm_worker::llm_client::types::{Item, RequestConfig}; -use llm_worker_persistence::{ +use session_store::{ FsStore, LogEntry, Outcome, Store, TraceEntry, build_chain, collect_state, new_session_id, }; diff --git a/crates/llm-worker-persistence/tests/session_test.rs b/crates/session-store/tests/session_test.rs similarity index 58% rename from crates/llm-worker-persistence/tests/session_test.rs rename to crates/session-store/tests/session_test.rs index fc97c58e..01157cc4 100644 --- a/crates/llm-worker-persistence/tests/session_test.rs +++ b/crates/session-store/tests/session_test.rs @@ -9,8 +9,8 @@ use llm_worker::llm_client::event::{Event, ResponseStatus, StatusEvent}; use llm_worker::llm_client::types::{Item, RequestConfig}; use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput}; use llm_worker::Worker; -use llm_worker_persistence::{ - FsStore, LogEntry, Outcome, Session, SessionConfig, Store, collect_state, +use session_store::{ + EntryHash, FsStore, LogEntry, Outcome, SessionStartState, Store, collect_state, }; // ============================================================================= @@ -92,6 +92,45 @@ async fn make_store() -> (tempfile::TempDir, FsStore) { (dir, store) } +/// Run a worker turn and persist via session-store functions. +/// Takes ownership of the worker (needed for lock/unlock) and returns it. +async fn run_and_persist( + worker: Worker, + store: &FsStore, + session_id: session_store::SessionId, + head_hash: &mut Option, + input: &str, +) -> (Worker, llm_worker::WorkerResult) { + let history_before = worker.history().len(); + + let mut locked = worker.lock(); + let result = locked.run(input).await; + let worker = locked.unlock(); + + let new_items = &worker.history()[history_before..]; + session_store::save_delta(store, session_id, head_hash, new_items) + .await + .unwrap(); + session_store::save_turn_end(store, session_id, head_hash, worker.turn_count()) + .await + .unwrap(); + + let outcome = match &result { + Ok(llm_worker::WorkerResult::Finished) => Outcome::Finished, + Ok(llm_worker::WorkerResult::Paused) => Outcome::Paused, + Ok(llm_worker::WorkerResult::LimitReached) => Outcome::LimitReached, + Err(e) => Outcome::Error { + message: e.to_string(), + }, + }; + session_store::save_outcome(store, session_id, head_hash, outcome, worker.last_run_interrupted()) + .await + .unwrap(); + + let r = result.unwrap(); + (worker, r) +} + // ============================================================================= // Tests // ============================================================================= @@ -102,12 +141,20 @@ async fn session_run_logs_entries() { let client = MockLlmClient::new(simple_text_events()); let worker = Worker::new(client); - let mut session = Session::new(worker, store.clone(), SessionConfig::default()) - .await - .unwrap(); - let sid = session.session_id(); + let (sid, head_hash) = session_store::create_session( + &store, + SessionStartState { + system_prompt: worker.get_system_prompt(), + config: worker.request_config(), + history: worker.history(), + }, + ) + .await + .unwrap(); - session.run("Hi").await.unwrap(); + let mut head_hash = Some(head_hash); + let (worker, _) = run_and_persist(worker, &store, sid, &mut head_hash, "Hi").await; + let _ = &worker; let entries = store.read_all(sid).await.unwrap(); @@ -152,31 +199,30 @@ async fn session_restore_round_trip() { let mut worker = Worker::new(client); worker.set_system_prompt("You are helpful."); - let mut session = Session::new(worker, store.clone(), SessionConfig::default()) - .await - .unwrap(); - let sid = session.session_id(); + let (sid, head_hash) = session_store::create_session( + &store, + SessionStartState { + system_prompt: worker.get_system_prompt(), + config: worker.request_config(), + history: worker.history(), + }, + ) + .await + .unwrap(); + let mut head_hash = Some(head_hash); - session.run("Hi").await.unwrap(); + let (worker, _) = run_and_persist(worker, &store, sid, &mut head_hash, "Hi").await; - let original_history = session.worker().history().to_vec(); - let original_turn_count = session.worker().turn_count(); - let original_head_hash = session.head_hash().cloned(); + let original_history_len = worker.history().len(); + let original_turn_count = worker.turn_count(); // Restore - let restore_client = MockLlmClient::new(vec![]); // won't be called - let restored = - Session::restore(restore_client, store.clone(), sid, SessionConfig::default()) - .await - .unwrap(); + let state = session_store::restore(&store, sid).await.unwrap(); - assert_eq!(restored.worker().history().len(), original_history.len()); - assert_eq!(restored.worker().turn_count(), original_turn_count); - assert_eq!( - restored.worker().get_system_prompt().map(String::from), - Some("You are helpful.".to_string()) - ); - assert_eq!(restored.head_hash(), original_head_hash.as_ref()); + assert_eq!(state.history.len(), original_history_len); + assert_eq!(state.turn_count, original_turn_count); + assert_eq!(state.system_prompt.as_deref(), Some("You are helpful.")); + assert_eq!(state.head_hash, head_hash); } #[tokio::test] @@ -186,12 +232,19 @@ async fn session_run_with_tool_call() { let mut worker = Worker::new(client); worker.register_tool(weather_tool_definition()); - let mut session = Session::new(worker, store.clone(), SessionConfig::default()) - .await - .unwrap(); - let sid = session.session_id(); + let (sid, head_hash) = session_store::create_session( + &store, + SessionStartState { + system_prompt: worker.get_system_prompt(), + config: worker.request_config(), + history: worker.history(), + }, + ) + .await + .unwrap(); + let mut head_hash = Some(head_hash); - session.run("What's the weather?").await.unwrap(); + let (_worker, _) = run_and_persist(worker, &store, sid, &mut head_hash, "What's the weather?").await; let entries = store.read_all(sid).await.unwrap(); @@ -210,18 +263,25 @@ async fn session_run_with_tool_call() { async fn session_resume_after_pause() { let (_dir, store) = make_store().await; - // First run: tool call with pause hook → Paused + // First run: tool call with pause policy → Paused let client = MockLlmClient::with_responses(tool_call_events()); let mut worker = Worker::new(client); worker.register_tool(weather_tool_definition()); worker.set_interceptor(PausePolicy); - let mut session = Session::new(worker, store.clone(), SessionConfig::default()) - .await - .unwrap(); - let sid = session.session_id(); + let (sid, head_hash) = session_store::create_session( + &store, + SessionStartState { + system_prompt: worker.get_system_prompt(), + config: worker.request_config(), + history: worker.history(), + }, + ) + .await + .unwrap(); + let mut head_hash = Some(head_hash); - let result = session.run("Weather?").await.unwrap(); + let (_worker, result) = run_and_persist(worker, &store, sid, &mut head_hash, "Weather?").await; assert!(matches!(result, llm_worker::WorkerResult::Paused)); // Check RunOutcome is Paused @@ -237,25 +297,9 @@ async fn session_resume_after_pause() { }); assert!(has_paused, "should have Paused outcome"); - // Restore and resume - let resume_client = MockLlmClient::with_responses(vec![vec![ - Event::text_block_start(0), - Event::text_delta(0, "After resume"), - Event::text_block_stop(0, None), - Event::Status(StatusEvent { - status: ResponseStatus::Completed, - }), - ]]); - let mut restored = - Session::restore(resume_client, store.clone(), sid, SessionConfig::default()) - .await - .unwrap(); - - assert!(restored.worker().last_run_interrupted()); - - // resume may or may not succeed depending on Worker internal state, - // but the restore itself should work - let _ = restored.resume().await; + // Restore state and verify + let state = session_store::restore(&store, sid).await.unwrap(); + assert!(state.last_run_interrupted); } #[tokio::test] @@ -265,14 +309,31 @@ async fn session_fork_preserves_state() { let mut worker = Worker::new(client); worker.set_system_prompt("System prompt"); - let mut session = Session::new(worker, store.clone(), SessionConfig::default()) - .await - .unwrap(); + let (sid, head_hash) = session_store::create_session( + &store, + SessionStartState { + system_prompt: worker.get_system_prompt(), + config: worker.request_config(), + history: worker.history(), + }, + ) + .await + .unwrap(); + let mut head_hash = Some(head_hash); - session.run("Hello").await.unwrap(); + let (worker, _) = run_and_persist(worker, &store, sid, &mut head_hash, "Hello").await; - let original_history_len = session.worker().history().len(); - let fork_id = session.fork().await.unwrap(); + let original_history_len = worker.history().len(); + let fork_id = session_store::fork( + &store, + SessionStartState { + system_prompt: worker.get_system_prompt(), + config: worker.request_config(), + history: worker.history(), + }, + ) + .await + .unwrap(); // Fork should have a SessionStart with the current history let fork_entries = store.read_all(fork_id).await.unwrap(); @@ -293,21 +354,26 @@ async fn session_fork_at_truncates() { let client = MockLlmClient::new(simple_text_events()); let worker = Worker::new(client); - let mut session = Session::new(worker, store.clone(), SessionConfig::default()) - .await - .unwrap(); - let sid = session.session_id(); + let (sid, head_hash) = session_store::create_session( + &store, + SessionStartState { + system_prompt: worker.get_system_prompt(), + config: worker.request_config(), + history: worker.history(), + }, + ) + .await + .unwrap(); + let mut head_hash = Some(head_hash); - session.run("Hello").await.unwrap(); + let (_worker, _) = run_and_persist(worker, &store, sid, &mut head_hash, "Hello").await; let all_entries = store.read_all(sid).await.unwrap(); assert!(all_entries.len() > 2); // Fork at the hash of the 2nd entry (SessionStart + UserInput) let at_hash = &all_entries[1].hash; - let fork_id = Session::::fork_at(&store, sid, at_hash) - .await - .unwrap(); + let fork_id = session_store::fork_at(&store, sid, at_hash).await.unwrap(); let fork_entries = store.read_all(fork_id).await.unwrap(); assert_eq!(fork_entries.len(), 1); // Just the new SessionStart @@ -325,18 +391,26 @@ async fn session_fork_at_truncates() { async fn session_config_changed_logged() { let (_dir, store) = make_store().await; let client = MockLlmClient::new(vec![]); - let worker = Worker::new(client); + let mut worker = Worker::new(client); - let mut session = Session::new(worker, store.clone(), SessionConfig::default()) + let (sid, head_hash) = session_store::create_session( + &store, + SessionStartState { + system_prompt: worker.get_system_prompt(), + config: worker.request_config(), + history: worker.history(), + }, + ) + .await + .unwrap(); + let mut head_hash = Some(head_hash); + + // Modify config and log it + let new_config = RequestConfig::default().with_temperature(0.7); + worker.set_request_config(new_config.clone()); + session_store::save_config_changed(&store, sid, &mut head_hash, &new_config) .await .unwrap(); - let sid = session.session_id(); - - // Modify config via worker and log it - session - .worker_mut() - .set_request_config(RequestConfig::default().with_temperature(0.7)); - session.log_config_changed().await.unwrap(); let entries = store.read_all(sid).await.unwrap(); let has_config_changed = entries.iter().any(|e| { @@ -354,13 +428,24 @@ async fn session_cache_lock_unlock_logged() { let client = MockLlmClient::new(vec![]); let worker = Worker::new(client); - let mut session = Session::new(worker, store.clone(), SessionConfig::default()) + let (sid, head_hash) = session_store::create_session( + &store, + SessionStartState { + system_prompt: worker.get_system_prompt(), + config: worker.request_config(), + history: worker.history(), + }, + ) + .await + .unwrap(); + let mut head_hash = Some(head_hash); + + session_store::save_cache_locked(&store, sid, &mut head_hash, 5) + .await + .unwrap(); + session_store::save_cache_unlocked(&store, sid, &mut head_hash) .await .unwrap(); - let sid = session.session_id(); - - session.log_cache_locked(5).await.unwrap(); - session.log_cache_unlocked().await.unwrap(); let entries = store.read_all(sid).await.unwrap(); @@ -392,10 +477,19 @@ async fn session_auto_forks_on_conflict() { // Create a session let client_a = MockLlmClient::new(simple_text_events()); let worker_a = Worker::new(client_a); - let mut session_a = Session::new(worker_a, store.clone(), SessionConfig::default()) - .await - .unwrap(); - let original_sid = session_a.session_id(); + + let (original_sid, head_hash) = session_store::create_session( + &store, + SessionStartState { + system_prompt: worker_a.get_system_prompt(), + config: worker_a.request_config(), + history: worker_a.history(), + }, + ) + .await + .unwrap(); + let mut session_id = original_sid; + let mut head_hash = Some(head_hash); // Simulate another Pod writing to the same session behind our back let extra_entry = LogEntry::UserInput { @@ -403,22 +497,33 @@ async fn session_auto_forks_on_conflict() { item: Item::user_message("Interloper"), }; let current_head = store.read_head_hash(original_sid).await.unwrap(); - let hash = llm_worker_persistence::compute_hash(current_head.as_ref(), &extra_entry); - let hashed = llm_worker_persistence::HashedEntry { + let hash = session_store::compute_hash(current_head.as_ref(), &extra_entry); + let hashed = session_store::HashedEntry { hash, prev_hash: current_head, entry: extra_entry, }; store.append(original_sid, &hashed).await.unwrap(); - // Now session_a's head_hash is stale — run should auto-fork - session_a.run("Hello").await.unwrap(); + // Now head_hash is stale — ensure_head_or_fork should auto-fork + session_store::ensure_head_or_fork( + &store, + &mut session_id, + &mut head_hash, + SessionStartState { + system_prompt: worker_a.get_system_prompt(), + config: worker_a.request_config(), + history: worker_a.history(), + }, + ) + .await + .unwrap(); - // session_a should now have a different session_id - assert_ne!(session_a.session_id(), original_sid); + // session_id should now be different + assert_ne!(session_id, original_sid); // The fork session should exist and have entries - let fork_entries = store.read_all(session_a.session_id()).await.unwrap(); + let fork_entries = store.read_all(session_id).await.unwrap(); assert!(!fork_entries.is_empty()); // Original session should still have the interloper entry diff --git a/tickets/session-store-extraction.md b/tickets/session-store-extraction.md deleted file mode 100644 index 4963ff85..00000000 --- a/tickets/session-store-extraction.md +++ /dev/null @@ -1,57 +0,0 @@ -# session-store: persistence クレートの再構成 - -## 背景 - -`llm-worker-persistence` は名前・構造ともに llm-worker のサブクレートに見えるが、 -実態はセッション管理という上位層の関心を持っている。 - -現状の `Session` は Worker を wrap して `run()`/`resume()` をインターセプトするが、 -永続化のためにレイヤーとして呼び出しパスに噛む必要はない。 -Worker からセッション状態を抜き出して保存する/復元するだけで十分。 - -## 方針 - -- クレート名を `llm-worker-persistence` → `session-store` に変更 -- `Session` の Worker wrap を廃止し、save/restore の関数群にする -- Pod が Worker を直接保持し、run 後に session-store の関数を呼ぶ -- `llm-worker` への型依存(`Item`, `RequestConfig`)はそのまま残す(構造的に層にならなければ問題ない) - -## 現状の構造 - -``` -Controller → Pod → Session (wraps Worker) → Worker - ↑ run()/resume() をインターセプト -``` - -`pod.session_mut().worker_mut()` と2段潜る必要がある。 - -## 変更後の構造 - -``` -Controller → Pod → Worker (直接保持) - │ - └─ run 後に session_store::save_delta(store, ...) を呼ぶ - restore 時に session_store::restore(store, id) → state を返す -``` - -## 変更内容 - -### session-store クレート(旧 llm-worker-persistence) - -- `Session` struct を廃止 -- save 系関数を提供: history delta の記録、turn end、outcome 等 -- restore 関数: ログ再生 → `RestoredState` を返す(Worker は作らない) -- `Store` trait, `FsStore`, `LogEntry`, ハッシュチェーンはそのまま維持 -- fork / fork_at も関数として残す - -### pod クレート - -- `Pod` が `Worker` を直接フィールドに持つ -- `Pod::run()` 内で Worker を呼び、その後 session-store の save 関数を呼ぶ -- `Pod::restore()` は session-store から `RestoredState` を受け取り、Worker に適用 -- Controller は `pod.worker()` / `pod.worker_mut()` で直接アクセス - -### 影響範囲 - -- `Session` を使っている箇所: `pod.rs`, `controller.rs`, テスト -- `SessionError` が消えるので、`PodError` から `SessionError` variant を除去し、`StoreError` に置換