diff --git a/Cargo.lock b/Cargo.lock index 9a182b1b..271b69a0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -102,6 +102,15 @@ version = "2.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "843867be96c8daad0d758b57df9392b6d8d271134fce549de6ce169ff98a92af" +[[package]] +name = "block-buffer" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cdd35008169921d80bc60d3d0ab416eecb028c4cd653352907921d95084790be" +dependencies = [ + "hybrid-array", +] + [[package]] name = "bumpalo" version = "3.20.2" @@ -205,6 +214,12 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "const-oid" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6ef517f0926dd24a1582492c791b6a4818a4d94e789a334894aa15b0d12f55c" + [[package]] name = "core-foundation" version = "0.10.1" @@ -221,6 +236,15 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" +[[package]] +name = "cpufeatures" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b2a41393f66f16b0823bb79094d54ac5fbd34ab292ddafb9a0456ac9f87d201" +dependencies = [ + "libc", +] + [[package]] name = "crossterm" version = "0.28.1" @@ -246,6 +270,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "crypto-common" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77727bb15fa921304124b128af125e7e3b968275d1b108b379190264f4423710" +dependencies = [ + "hybrid-array", +] + [[package]] name = "daemon" version = "0.1.0" @@ -289,6 +322,17 @@ dependencies = [ "syn", ] +[[package]] +name = "digest" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4850db49bf08e663084f7fb5c87d202ef91a3907271aff24a94eb97ff039153c" +dependencies = [ + "block-buffer", + "const-oid", + "crypto-common", +] + [[package]] name = "displaydoc" version = "0.2.5" @@ -553,6 +597,12 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" +[[package]] +name = "hex" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" + [[package]] name = "http" version = "1.4.0" @@ -592,6 +642,15 @@ version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6dbf3de79e51f3d586ab4cb9d5c3e2c14aa28ed23d180cf89b4df0454a69cc87" +[[package]] +name = "hybrid-array" +version = "0.4.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3944cf8cf766b40e2a1a333ee5e9b563f854d5fa49d6a8ca2764e97c6eddb214" +dependencies = [ + "typenum", +] + [[package]] name = "hyper" version = "1.9.0" @@ -940,9 +999,11 @@ version = "0.1.0" dependencies = [ "async-trait", "futures", + "hex", "llm-worker", "serde", "serde_json", + "sha2", "tempfile", "thiserror", "tokio", @@ -1559,6 +1620,17 @@ dependencies = [ "serde_core", ] +[[package]] +name = "sha2" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "446ba717509524cb3f22f17ecc096f10f4822d76ab5c0b9822c5f9c284e825f4" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "sharded-slab" version = "0.1.7" @@ -2009,6 +2081,12 @@ dependencies = [ "tokio", ] +[[package]] +name = "typenum" +version = "1.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "562d481066bde0658276a35467c4af00bdc6ee726305698a55b86e61d7ad82bb" + [[package]] name = "unicode-ident" version = "1.0.24" diff --git a/TODO.md b/TODO.md index c57d59d7..90c3e6d9 100644 --- a/TODO.md +++ b/TODO.md @@ -7,3 +7,5 @@ - [x] inspect ツール実装 - [x] max_turns: マニフェストによるターン数制限 - [x] pod バイナリエントリポイント +- [x] セッションエントリのハッシュチェーン +- [x] Subscriber → クロージャ API 移行 diff --git a/crates/llm-worker-persistence/Cargo.toml b/crates/llm-worker-persistence/Cargo.toml index f2b5ee13..41bf4bda 100644 --- a/crates/llm-worker-persistence/Cargo.toml +++ b/crates/llm-worker-persistence/Cargo.toml @@ -13,6 +13,8 @@ serde_json = "1.0" tokio = { version = "1.49", features = ["fs", "io-util"] } uuid = { version = "1", features = ["v7", "serde"] } thiserror = "2.0" +sha2 = "0.11.0" +hex = "0.4.3" [dev-dependencies] tokio = { version = "1.49", features = ["macros", "rt-multi-thread", "fs", "io-util"] } diff --git a/crates/llm-worker-persistence/src/fs_store.rs b/crates/llm-worker-persistence/src/fs_store.rs index 63e73117..af308ec8 100644 --- a/crates/llm-worker-persistence/src/fs_store.rs +++ b/crates/llm-worker-persistence/src/fs_store.rs @@ -5,7 +5,7 @@ //! - Event trace: `{root}/{session_id}.trace.jsonl` use crate::event_trace::TraceEntry; -use crate::session_log::LogEntry; +use crate::session_log::{EntryHash, HashedEntry}; use crate::store::{Store, StoreError}; use crate::SessionId; use std::path::{Path, PathBuf}; @@ -70,12 +70,12 @@ impl FsStore { } impl Store for FsStore { - async fn append(&self, id: SessionId, entry: &LogEntry) -> Result<(), StoreError> { + async fn append(&self, id: SessionId, entry: &HashedEntry) -> Result<(), StoreError> { let line = serde_json::to_string(entry)?; self.append_line(&self.log_path(id), &line).await } - async fn read_all(&self, id: SessionId) -> Result, StoreError> { + async fn read_all(&self, id: SessionId) -> Result, StoreError> { let path = self.log_path(id); if !path.exists() { return Err(StoreError::NotFound(id)); @@ -106,7 +106,7 @@ impl Store for FsStore { async fn create_session( &self, id: SessionId, - entries: &[LogEntry], + entries: &[HashedEntry], ) -> Result<(), StoreError> { let path = self.log_path(id); let mut content = String::new(); @@ -122,6 +122,30 @@ impl Store for FsStore { Ok(self.log_path(id).exists()) } + async fn read_head_hash( + &self, + id: SessionId, + ) -> Result, StoreError> { + let path = self.log_path(id); + if !path.exists() { + return Err(StoreError::NotFound(id)); + } + let content = fs::read_to_string(&path).await?; + let last_line = content.lines().rev().find(|l| !l.trim().is_empty()); + match last_line { + Some(line) => { + let entry: HashedEntry = serde_json::from_str(line).map_err(|e| { + StoreError::Corrupt { + line: content.lines().count(), + message: e.to_string(), + } + })?; + Ok(Some(entry.hash)) + } + None => Ok(None), + } + } + async fn append_trace( &self, id: SessionId, diff --git a/crates/llm-worker-persistence/src/lib.rs b/crates/llm-worker-persistence/src/lib.rs index c329738c..111222d1 100644 --- a/crates/llm-worker-persistence/src/lib.rs +++ b/crates/llm-worker-persistence/src/lib.rs @@ -37,7 +37,10 @@ pub use event_trace::TraceEntry; pub use fs_blob_store::FsBlobStore; pub use fs_store::FsStore; pub use session::{Session, SessionConfig, SessionError}; -pub use session_log::{LogEntry, Outcome, RestoredState, collect_state}; +pub use session_log::{ + EntryHash, HashedEntry, LogEntry, Outcome, RestoredState, build_chain, collect_state, + compute_hash, +}; pub use store::{Store, StoreError}; /// Session identifier. UUID v7 (time-ordered, lexicographically sortable). diff --git a/crates/llm-worker-persistence/src/session.rs b/crates/llm-worker-persistence/src/session.rs index 6a42fce6..9d2ea1b7 100644 --- a/crates/llm-worker-persistence/src/session.rs +++ b/crates/llm-worker-persistence/src/session.rs @@ -1,10 +1,14 @@ //! Persistent session wrapper around [`Worker`]. //! -//! [`Session`] intercepts `Worker` operations and appends [`LogEntry`] records +//! [`Session`] intercepts `Worker` operations and appends [`HashedEntry`] records //! to a [`Store`]. It does not modify `Worker` internals — all persistence //! happens by observing state before and after each operation. +//! +//! Each appended entry carries a hash that chains to the previous entry. +//! On append, the session checks whether the store's head still matches its +//! own `head_hash`; if not, it auto-forks into a new session. -use crate::session_log::{self, LogEntry, Outcome}; +use crate::session_log::{self, EntryHash, HashedEntry, LogEntry, Outcome}; use crate::store::{Store, StoreError}; use crate::SessionId; use llm_worker::llm_client::client::LlmClient; @@ -47,6 +51,7 @@ pub struct Session { pub worker: Worker, store: St, session_id: SessionId, + head_hash: Option, _config: SessionConfig, } @@ -58,18 +63,25 @@ impl Session { config: SessionConfig, ) -> Result { let session_id = crate::new_session_id(); - let start = LogEntry::SessionStart { + let entry = LogEntry::SessionStart { ts: session_log::now_millis(), system_prompt: worker.get_system_prompt().map(String::from), config: worker.request_config().clone(), history: worker.history().to_vec(), }; - store.append(session_id, &start).await?; + let hashed = session_log::compute_hash(None, &entry); + let hashed_entry = HashedEntry { + hash: hashed.clone(), + prev_hash: None, + entry, + }; + store.append(session_id, &hashed_entry).await?; Ok(Self { worker, store, session_id, + head_hash: Some(hashed), _config: config, }) } @@ -100,6 +112,7 @@ impl Session { worker, store, session_id, + head_hash: state.head_hash, _config: config, }) } @@ -109,6 +122,11 @@ impl Session { self.session_id } + /// The current head hash of the session log chain. + pub fn head_hash(&self) -> Option<&EntryHash> { + self.head_hash.as_ref() + } + /// Reference to the underlying store. pub fn store(&self) -> &St { &self.store @@ -119,6 +137,8 @@ impl Session { &mut self, user_input: impl Into, ) -> Result { + self.ensure_head_or_fork().await?; + let history_before = self.worker.history().len(); let result = self.worker.run(user_input).await; @@ -132,6 +152,8 @@ impl Session { /// Resume from a paused state, logging all state changes. pub async fn resume(&mut self) -> Result { + self.ensure_head_or_fork().await?; + let history_before = self.worker.history().len(); let result = self.worker.resume().await; @@ -148,122 +170,160 @@ impl Session { /// seeded with the current history. pub async fn fork(&self) -> Result { let fork_id = crate::new_session_id(); - let start = LogEntry::SessionStart { + let entry = LogEntry::SessionStart { ts: session_log::now_millis(), system_prompt: self.worker.get_system_prompt().map(String::from), config: self.worker.request_config().clone(), history: self.worker.history().to_vec(), }; - self.store.create_session(fork_id, &[start]).await?; + let hashed = session_log::compute_hash(None, &entry); + let hashed_entry = HashedEntry { + hash: hashed, + prev_hash: None, + entry, + }; + self.store + .create_session(fork_id, &[hashed_entry]) + .await?; Ok(fork_id) } /// Fork from an arbitrary point in a stored session's log. - /// Replays entries up to `up_to_entry` and creates a new session - /// with that reconstructed state. + /// Finds the entry matching `at_hash` and creates a new session + /// with state reconstructed up to that point. pub async fn fork_at( store: &St, source_id: SessionId, - up_to_entry: usize, + at_hash: &EntryHash, ) -> Result { let entries = store.read_all(source_id).await?; - let truncated = &entries[..up_to_entry.min(entries.len())]; - let state = session_log::collect_state(truncated); + let cut = entries + .iter() + .position(|e| &e.hash == at_hash) + .map(|i| i + 1) + .unwrap_or(entries.len()); + let state = session_log::collect_state(&entries[..cut]); let fork_id = crate::new_session_id(); - let start = LogEntry::SessionStart { + let entry = LogEntry::SessionStart { ts: session_log::now_millis(), system_prompt: state.system_prompt, config: state.config, history: state.history, }; - store.create_session(fork_id, &[start]).await?; + let hashed = session_log::compute_hash(None, &entry); + let hashed_entry = HashedEntry { + hash: hashed, + prev_hash: None, + entry, + }; + store.create_session(fork_id, &[hashed_entry]).await?; Ok(fork_id) } /// Log a `CacheLocked` entry. pub async fn log_cache_locked( - &self, + &mut self, locked_prefix_len: usize, ) -> Result<(), StoreError> { - self.store - .append( - self.session_id, - &LogEntry::CacheLocked { - ts: session_log::now_millis(), - locked_prefix_len, - }, - ) - .await + let entry = LogEntry::CacheLocked { + ts: session_log::now_millis(), + locked_prefix_len, + }; + self.append_entry(entry).await } /// Log a `CacheUnlocked` entry. - pub async fn log_cache_unlocked(&self) -> Result<(), StoreError> { - self.store - .append( - self.session_id, - &LogEntry::CacheUnlocked { - ts: session_log::now_millis(), - }, - ) - .await + pub async fn log_cache_unlocked(&mut self) -> Result<(), StoreError> { + let entry = LogEntry::CacheUnlocked { + ts: session_log::now_millis(), + }; + self.append_entry(entry).await } /// Log a `ConfigChanged` entry. - pub async fn log_config_changed(&self) -> Result<(), StoreError> { - self.store - .append( - self.session_id, - &LogEntry::ConfigChanged { - ts: session_log::now_millis(), - config: self.worker.request_config().clone(), - }, - ) - .await + pub async fn log_config_changed(&mut self) -> Result<(), StoreError> { + let entry = LogEntry::ConfigChanged { + ts: session_log::now_millis(), + config: self.worker.request_config().clone(), + }; + self.append_entry(entry).await } // ── Private helpers ────────────────────────────────────────────────── - async fn log_history_delta(&self, before_len: usize) -> Result<(), StoreError> { + /// Append a `LogEntry`, computing its hash and updating `head_hash`. + async fn append_entry(&mut self, entry: LogEntry) -> Result<(), StoreError> { + let hash = session_log::compute_hash(self.head_hash.as_ref(), &entry); + let hashed_entry = HashedEntry { + hash: hash.clone(), + prev_hash: self.head_hash.clone(), + entry, + }; + self.store + .append(self.session_id, &hashed_entry) + .await?; + self.head_hash = Some(hash); + Ok(()) + } + + /// Check that the store's head still matches ours. If not, auto-fork. + async fn ensure_head_or_fork(&mut self) -> Result<(), StoreError> { + let store_head = self.store.read_head_hash(self.session_id).await?; + if store_head == self.head_hash { + return Ok(()); + } + // Another writer advanced this session — fork from our known state. + let fork_id = crate::new_session_id(); + let entry = LogEntry::SessionStart { + ts: session_log::now_millis(), + system_prompt: self.worker.get_system_prompt().map(String::from), + config: self.worker.request_config().clone(), + history: self.worker.history().to_vec(), + }; + let hash = session_log::compute_hash(None, &entry); + let hashed_entry = HashedEntry { + hash: hash.clone(), + prev_hash: None, + entry, + }; + self.store + .create_session(fork_id, &[hashed_entry]) + .await?; + self.session_id = fork_id; + self.head_hash = Some(hash); + Ok(()) + } + + async fn log_history_delta(&mut self, before_len: usize) -> Result<(), StoreError> { let history = self.worker.history(); if history.len() <= before_len { return Ok(()); } let ts = session_log::now_millis(); - let new_items = &history[before_len..]; + let new_items = history[before_len..].to_vec(); let mut i = 0; - // Classify and group items by type. - // The actual items from history are used (not pre-constructed copies), - // so any modifications by hooks (e.g. on_prompt_submit) are captured correctly. while i < new_items.len() { let item = &new_items[i]; if item.is_user_message() { - self.store - .append( - self.session_id, - &LogEntry::UserInput { - ts, - item: new_items[i].clone(), - }, - ) - .await?; + self.append_entry(LogEntry::UserInput { + ts, + item: new_items[i].clone(), + }) + .await?; i += 1; } else if item.is_tool_result() { let start = i; while i < new_items.len() && new_items[i].is_tool_result() { i += 1; } - self.store - .append( - self.session_id, - &LogEntry::ToolResults { - ts, - items: new_items[start..i].to_vec(), - }, - ) - .await?; + self.append_entry(LogEntry::ToolResults { + ts, + items: new_items[start..i].to_vec(), + }) + .await?; } else if item.is_assistant_message() || item.is_tool_call() || item.is_reasoning() @@ -276,45 +336,33 @@ impl Session { { i += 1; } - self.store - .append( - self.session_id, - &LogEntry::AssistantItems { - ts, - items: new_items[start..i].to_vec(), - }, - ) - .await?; + self.append_entry(LogEntry::AssistantItems { + ts, + items: new_items[start..i].to_vec(), + }) + .await?; } else { - self.store - .append( - self.session_id, - &LogEntry::HookInjectedItems { - ts, - items: vec![new_items[i].clone()], - }, - ) - .await?; + self.append_entry(LogEntry::HookInjectedItems { + ts, + items: vec![new_items[i].clone()], + }) + .await?; i += 1; } } Ok(()) } - async fn log_turn_end(&self) -> Result<(), StoreError> { - self.store - .append( - self.session_id, - &LogEntry::TurnEnd { - ts: session_log::now_millis(), - turn_count: self.worker.turn_count(), - }, - ) - .await + async fn log_turn_end(&mut self) -> Result<(), StoreError> { + self.append_entry(LogEntry::TurnEnd { + ts: session_log::now_millis(), + turn_count: self.worker.turn_count(), + }) + .await } async fn log_outcome( - &self, + &mut self, result: &Result, ) -> Result<(), StoreError> { let outcome = match result { @@ -325,15 +373,11 @@ impl Session { message: e.to_string(), }, }; - self.store - .append( - self.session_id, - &LogEntry::RunOutcome { - ts: session_log::now_millis(), - outcome, - interrupted: self.worker.last_run_interrupted(), - }, - ) - .await + self.append_entry(LogEntry::RunOutcome { + ts: session_log::now_millis(), + outcome, + interrupted: self.worker.last_run_interrupted(), + }) + .await } } diff --git a/crates/llm-worker-persistence/src/session_log.rs b/crates/llm-worker-persistence/src/session_log.rs index 0f1db783..be907877 100644 --- a/crates/llm-worker-persistence/src/session_log.rs +++ b/crates/llm-worker-persistence/src/session_log.rs @@ -3,9 +3,84 @@ //! Each [`LogEntry`] represents a single state transition in a session, //! serialized as one line in a `.jsonl` file. Reading all entries and //! collecting them via [`collect_state`] reconstructs the full [`Worker`] state. +//! +//! Entries are chained via [`EntryHash`]: each [`HashedEntry`] records the hash +//! of the previous entry, forming a tamper-evident append-only chain. This +//! enables safe fork detection when multiple writers share a session. use llm_worker::llm_client::types::{Item, RequestConfig}; use serde::{Deserialize, Serialize}; +use sha2::{Digest, Sha256}; + +/// SHA-256 hash identifying a specific log entry in the chain. +/// +/// Computed as `sha256(prev_hash_bytes || canonical_json(entry))`. +/// Displayed and serialized as a lowercase hex string. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct EntryHash([u8; 32]); + +impl EntryHash { + pub fn as_bytes(&self) -> &[u8; 32] { + &self.0 + } + + pub fn to_hex(&self) -> String { + hex::encode(self.0) + } + + pub fn from_hex(s: &str) -> Result { + let mut buf = [0u8; 32]; + hex::decode_to_slice(s, &mut buf)?; + Ok(Self(buf)) + } +} + +impl std::fmt::Display for EntryHash { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(&self.to_hex()) + } +} + +impl Serialize for EntryHash { + fn serialize(&self, serializer: S) -> Result { + serializer.serialize_str(&self.to_hex()) + } +} + +impl<'de> Deserialize<'de> for EntryHash { + fn deserialize>(deserializer: D) -> Result { + let s = String::deserialize(deserializer)?; + Self::from_hex(&s).map_err(serde::de::Error::custom) + } +} + +/// Compute the hash for a log entry given its predecessor's hash. +pub fn compute_hash(prev: Option<&EntryHash>, entry: &LogEntry) -> EntryHash { + let mut hasher = Sha256::new(); + + // Feed prev_hash bytes (32 zero bytes if None). + match prev { + Some(h) => hasher.update(h.as_bytes()), + None => hasher.update([0u8; 32]), + } + + // Canonical JSON of the entry. + let json = serde_json::to_string(entry).expect("LogEntry serialization cannot fail"); + hasher.update(json.as_bytes()); + + EntryHash(hasher.finalize().into()) +} + +/// A [`LogEntry`] with hash-chain metadata. +/// +/// This is the unit persisted to JSONL — one line per `HashedEntry`. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct HashedEntry { + pub hash: EntryHash, + pub prev_hash: Option, + #[serde(flatten)] + pub entry: LogEntry, +} /// A single session log entry, serialized as one JSONL line. /// @@ -80,10 +155,12 @@ pub struct RestoredState { pub turn_count: usize, pub locked_prefix_len: usize, pub last_run_interrupted: bool, + /// Hash of the last entry in the chain (None if empty). + pub head_hash: Option, } -/// Replay a sequence of log entries to reconstruct worker state. -pub fn collect_state(entries: &[LogEntry]) -> RestoredState { +/// Replay a sequence of hashed entries to reconstruct worker state. +pub fn collect_state(entries: &[HashedEntry]) -> RestoredState { let mut state = RestoredState { system_prompt: None, config: RequestConfig::default(), @@ -91,10 +168,13 @@ pub fn collect_state(entries: &[LogEntry]) -> RestoredState { turn_count: 0, locked_prefix_len: 0, last_run_interrupted: false, + head_hash: None, }; - for entry in entries { - match entry { + for hashed in entries { + state.head_hash = Some(hashed.hash.clone()); + + match &hashed.entry { LogEntry::SessionStart { system_prompt, config, @@ -148,6 +228,26 @@ pub fn now_millis() -> u64 { .as_millis() as u64 } +/// Build a hash chain from plain `LogEntry` values. +/// +/// Useful for tests and for seeding new sessions from a list of entries. +pub fn build_chain(entries: &[LogEntry]) -> Vec { + let mut chain = Vec::with_capacity(entries.len()); + let mut prev: Option = None; + + for entry in entries { + let hash = compute_hash(prev.as_ref(), entry); + chain.push(HashedEntry { + hash: hash.clone(), + prev_hash: prev, + entry: entry.clone(), + }); + prev = Some(hash); + } + + chain +} + #[cfg(test)] mod tests { use super::*; @@ -158,25 +258,27 @@ mod tests { assert!(state.history.is_empty()); assert_eq!(state.turn_count, 0); assert_eq!(state.locked_prefix_len, 0); + assert!(state.head_hash.is_none()); } #[test] fn replay_session_start_sets_initial_state() { - let entries = vec![LogEntry::SessionStart { + let entries = build_chain(&[LogEntry::SessionStart { ts: 1000, system_prompt: Some("You are helpful.".into()), config: RequestConfig::default().with_max_tokens(1024), history: vec![Item::user_message("seed")], - }]; + }]); let state = collect_state(&entries); assert_eq!(state.system_prompt.as_deref(), Some("You are helpful.")); assert_eq!(state.config.max_tokens, Some(1024)); assert_eq!(state.history.len(), 1); + assert!(state.head_hash.is_some()); } #[test] fn replay_full_turn() { - let entries = vec![ + let entries = build_chain(&[ LogEntry::SessionStart { ts: 1000, system_prompt: None, @@ -200,7 +302,7 @@ mod tests { outcome: Outcome::Finished, interrupted: false, }, - ]; + ]); let state = collect_state(&entries); assert_eq!(state.history.len(), 2); assert_eq!(state.turn_count, 1); @@ -209,7 +311,7 @@ mod tests { #[test] fn replay_with_tool_calls() { - let entries = vec![ + let entries = build_chain(&[ LogEntry::SessionStart { ts: 1000, system_prompt: None, @@ -236,7 +338,7 @@ mod tests { ts: 4100, turn_count: 1, }, - ]; + ]); let state = collect_state(&entries); assert_eq!(state.history.len(), 4); assert!(state.history[1].is_tool_call()); @@ -245,7 +347,7 @@ mod tests { #[test] fn replay_cache_lock_unlock() { - let entries = vec![ + let entries = build_chain(&[ LogEntry::SessionStart { ts: 1000, system_prompt: None, @@ -257,7 +359,7 @@ mod tests { locked_prefix_len: 2, }, LogEntry::CacheUnlocked { ts: 3000 }, - ]; + ]); let state = collect_state(&entries); assert_eq!(state.locked_prefix_len, 0); @@ -268,7 +370,7 @@ mod tests { #[test] fn replay_config_changed() { - let entries = vec![ + let entries = build_chain(&[ LogEntry::SessionStart { ts: 1000, system_prompt: None, @@ -279,8 +381,57 @@ mod tests { ts: 2000, config: RequestConfig::default().with_temperature(0.5), }, - ]; + ]); let state = collect_state(&entries); assert_eq!(state.config.temperature, Some(0.5)); } + + #[test] + fn hash_chain_is_deterministic() { + let raw = vec![ + LogEntry::SessionStart { + ts: 1000, + system_prompt: None, + config: RequestConfig::default(), + history: vec![], + }, + LogEntry::UserInput { + ts: 2000, + item: Item::user_message("Hello"), + }, + ]; + let chain_a = build_chain(&raw); + let chain_b = build_chain(&raw); + assert_eq!(chain_a[0].hash, chain_b[0].hash); + assert_eq!(chain_a[1].hash, chain_b[1].hash); + } + + #[test] + fn different_content_produces_different_hash() { + let entry_a = LogEntry::UserInput { + ts: 1000, + item: Item::user_message("Hello"), + }; + let entry_b = LogEntry::UserInput { + ts: 1000, + item: Item::user_message("World"), + }; + let hash_a = compute_hash(None, &entry_a); + let hash_b = compute_hash(None, &entry_b); + assert_ne!(hash_a, hash_b); + } + + #[test] + fn hash_hex_round_trip() { + let entry = LogEntry::SessionStart { + ts: 1000, + system_prompt: None, + config: RequestConfig::default(), + history: vec![], + }; + let hash = compute_hash(None, &entry); + let hex = hash.to_hex(); + let parsed = EntryHash::from_hex(&hex).unwrap(); + assert_eq!(hash, parsed); + } } diff --git a/crates/llm-worker-persistence/src/store.rs b/crates/llm-worker-persistence/src/store.rs index feb7060a..9565af2a 100644 --- a/crates/llm-worker-persistence/src/store.rs +++ b/crates/llm-worker-persistence/src/store.rs @@ -4,7 +4,7 @@ //! Implementations handle the physical storage (filesystem, database, etc.). use crate::event_trace::TraceEntry; -use crate::session_log::LogEntry; +use crate::session_log::{EntryHash, HashedEntry}; use crate::SessionId; use std::future::Future; @@ -29,28 +29,29 @@ pub enum StoreError { /// All methods take `&self` — implementations should use interior mutability /// (e.g., append-mode file handles) when needed. pub trait Store: Send + Sync { - /// Append a single log entry to the session. + /// Append a single hashed entry to the session log. fn append( &self, id: SessionId, - entry: &LogEntry, + entry: &HashedEntry, ) -> impl Future> + Send; - /// Read all log entries for a session, in order. + /// Read all hashed entries for a session, in order. fn read_all( &self, id: SessionId, - ) -> impl Future, StoreError>> + Send; + ) -> impl Future, StoreError>> + Send; /// List all session IDs, most recent first. - fn list_sessions(&self) - -> impl Future, StoreError>> + Send; + fn list_sessions( + &self, + ) -> impl Future, StoreError>> + Send; /// Create a new session with initial entries. fn create_session( &self, id: SessionId, - entries: &[LogEntry], + entries: &[HashedEntry], ) -> impl Future> + Send; /// Check if a session exists. @@ -59,6 +60,14 @@ pub trait Store: Send + Sync { id: SessionId, ) -> impl Future> + Send; + /// Read the hash of the last entry in a session (the head). + /// + /// Returns `None` if the session is empty. + fn read_head_hash( + &self, + id: SessionId, + ) -> impl Future, StoreError>> + Send; + /// Append a trace entry to the debug event trace file. fn append_trace( &self, diff --git a/crates/llm-worker-persistence/tests/fs_store_test.rs b/crates/llm-worker-persistence/tests/fs_store_test.rs index 7385aeb0..0804f205 100644 --- a/crates/llm-worker-persistence/tests/fs_store_test.rs +++ b/crates/llm-worker-persistence/tests/fs_store_test.rs @@ -1,6 +1,6 @@ use llm_worker::llm_client::types::{Item, RequestConfig}; use llm_worker_persistence::{ - FsStore, LogEntry, Outcome, Store, TraceEntry, new_session_id, collect_state, + FsStore, LogEntry, Outcome, Store, TraceEntry, build_chain, collect_state, new_session_id, }; #[tokio::test] @@ -9,7 +9,7 @@ async fn round_trip_write_and_read() { let store = FsStore::new(dir.path()).await.unwrap(); let id = new_session_id(); - let entries = vec![ + let raw = vec![ LogEntry::SessionStart { ts: 1000, system_prompt: Some("You are helpful.".into()), @@ -34,6 +34,7 @@ async fn round_trip_write_and_read() { interrupted: false, }, ]; + let entries = build_chain(&raw); // Write entries one by one for entry in &entries { @@ -44,6 +45,12 @@ async fn round_trip_write_and_read() { let read_back = store.read_all(id).await.unwrap(); assert_eq!(read_back.len(), entries.len()); + // Verify hashes survived round-trip + for (orig, read) in entries.iter().zip(read_back.iter()) { + assert_eq!(orig.hash, read.hash); + assert_eq!(orig.prev_hash, read.prev_hash); + } + // Replay and verify state let state = collect_state(&read_back); assert_eq!(state.system_prompt.as_deref(), Some("You are helpful.")); @@ -51,6 +58,7 @@ async fn round_trip_write_and_read() { assert_eq!(state.history.len(), 2); assert_eq!(state.turn_count, 1); assert!(!state.last_run_interrupted); + assert!(state.head_hash.is_some()); } #[tokio::test] @@ -59,14 +67,12 @@ async fn create_session_writes_all_entries() { let store = FsStore::new(dir.path()).await.unwrap(); let id = new_session_id(); - let entries = vec![ - LogEntry::SessionStart { - ts: 1000, - system_prompt: None, - config: RequestConfig::default(), - history: vec![Item::user_message("seed"), Item::assistant_message("ok")], - }, - ]; + let entries = build_chain(&[LogEntry::SessionStart { + ts: 1000, + system_prompt: None, + config: RequestConfig::default(), + history: vec![Item::user_message("seed"), Item::assistant_message("ok")], + }]); store.create_session(id, &entries).await.unwrap(); let read_back = store.read_all(id).await.unwrap(); @@ -86,15 +92,21 @@ async fn list_sessions_returns_newest_first() { tokio::time::sleep(std::time::Duration::from_millis(2)).await; let id2 = new_session_id(); - let start = LogEntry::SessionStart { + let entries1 = build_chain(&[LogEntry::SessionStart { ts: 1000, system_prompt: None, config: RequestConfig::default(), history: vec![], - }; + }]); + let entries2 = build_chain(&[LogEntry::SessionStart { + ts: 1001, + system_prompt: None, + config: RequestConfig::default(), + history: vec![], + }]); - store.append(id1, &start).await.unwrap(); - store.append(id2, &start).await.unwrap(); + store.append(id1, &entries1[0]).await.unwrap(); + store.append(id2, &entries2[0]).await.unwrap(); let sessions = store.list_sessions().await.unwrap(); assert_eq!(sessions.len(), 2); @@ -110,18 +122,13 @@ async fn exists_returns_correct_state() { assert!(!store.exists(id).await.unwrap()); - store - .append( - id, - &LogEntry::SessionStart { - ts: 1000, - system_prompt: None, - config: RequestConfig::default(), - history: vec![], - }, - ) - .await - .unwrap(); + let entries = build_chain(&[LogEntry::SessionStart { + ts: 1000, + system_prompt: None, + config: RequestConfig::default(), + history: vec![], + }]); + store.append(id, &entries[0]).await.unwrap(); assert!(store.exists(id).await.unwrap()); } @@ -143,18 +150,13 @@ async fn trace_entries_in_separate_file() { let id = new_session_id(); // Write a log entry - store - .append( - id, - &LogEntry::SessionStart { - ts: 1000, - system_prompt: None, - config: RequestConfig::default(), - history: vec![], - }, - ) - .await - .unwrap(); + let entries = build_chain(&[LogEntry::SessionStart { + ts: 1000, + system_prompt: None, + config: RequestConfig::default(), + history: vec![], + }]); + store.append(id, &entries[0]).await.unwrap(); // Write a trace entry let trace = TraceEntry { @@ -174,3 +176,30 @@ async fn trace_entries_in_separate_file() { let trace_path = dir.path().join(format!("{id}.trace.jsonl")); assert!(trace_path.exists()); } + +#[tokio::test] +async fn read_head_hash_returns_last_entry_hash() { + let dir = tempfile::tempdir().unwrap(); + let store = FsStore::new(dir.path()).await.unwrap(); + let id = new_session_id(); + + let entries = build_chain(&[ + LogEntry::SessionStart { + ts: 1000, + system_prompt: None, + config: RequestConfig::default(), + history: vec![], + }, + LogEntry::UserInput { + ts: 2000, + item: Item::user_message("Hello"), + }, + ]); + + for entry in &entries { + store.append(id, entry).await.unwrap(); + } + + let head = store.read_head_hash(id).await.unwrap(); + assert_eq!(head.as_ref(), Some(&entries[1].hash)); +} diff --git a/crates/llm-worker-persistence/tests/session_test.rs b/crates/llm-worker-persistence/tests/session_test.rs index 00ba3346..99149a17 100644 --- a/crates/llm-worker-persistence/tests/session_test.rs +++ b/crates/llm-worker-persistence/tests/session_test.rs @@ -112,17 +112,37 @@ async fn session_run_logs_entries() { let entries = store.read_all(sid).await.unwrap(); // SessionStart, UserInput, AssistantItems, TurnEnd, RunOutcome (at minimum) - assert!(entries.len() >= 4, "expected at least 4 entries, got {}", entries.len()); + assert!( + entries.len() >= 4, + "expected at least 4 entries, got {}", + entries.len() + ); // First entry is SessionStart - assert!(matches!(entries[0], LogEntry::SessionStart { .. })); + assert!(matches!(&entries[0].entry, LogEntry::SessionStart { .. })); // Has a RunOutcome with Finished - let has_finished = entries.iter().any(|e| matches!( - e, - LogEntry::RunOutcome { outcome: Outcome::Finished, .. } - )); + let has_finished = entries.iter().any(|e| { + matches!( + &e.entry, + LogEntry::RunOutcome { + outcome: Outcome::Finished, + .. + } + ) + }); assert!(has_finished, "should have a Finished outcome"); + + // Verify hash chain integrity + assert!(entries[0].prev_hash.is_none()); + for i in 1..entries.len() { + assert_eq!( + entries[i].prev_hash.as_ref(), + Some(&entries[i - 1].hash), + "hash chain broken at entry {}", + i + ); + } } #[tokio::test] @@ -141,12 +161,14 @@ async fn session_restore_round_trip() { let original_history = session.worker.history().to_vec(); let original_turn_count = session.worker.turn_count(); + let original_head_hash = session.head_hash().cloned(); // Restore let restore_client = MockLlmClient::new(vec![]); // won't be called - let restored = Session::restore(restore_client, store.clone(), sid, SessionConfig::default()) - .await - .unwrap(); + let restored = + Session::restore(restore_client, store.clone(), sid, SessionConfig::default()) + .await + .unwrap(); assert_eq!(restored.worker.history().len(), original_history.len()); assert_eq!(restored.worker.turn_count(), original_turn_count); @@ -154,6 +176,7 @@ async fn session_restore_round_trip() { restored.worker.get_system_prompt().map(String::from), Some("You are helpful.".to_string()) ); + assert_eq!(restored.head_hash(), original_head_hash.as_ref()); } #[tokio::test] @@ -172,10 +195,14 @@ async fn session_run_with_tool_call() { let entries = store.read_all(sid).await.unwrap(); - let has_tool_results = entries.iter().any(|e| matches!(e, LogEntry::ToolResults { .. })); + let has_tool_results = entries + .iter() + .any(|e| matches!(&e.entry, LogEntry::ToolResults { .. })); assert!(has_tool_results, "should have ToolResults entry"); - let has_assistant = entries.iter().any(|e| matches!(e, LogEntry::AssistantItems { .. })); + let has_assistant = entries + .iter() + .any(|e| matches!(&e.entry, LogEntry::AssistantItems { .. })); assert!(has_assistant, "should have AssistantItems entry"); } @@ -199,10 +226,15 @@ async fn session_resume_after_pause() { // Check RunOutcome is Paused let entries = store.read_all(sid).await.unwrap(); - let has_paused = entries.iter().any(|e| matches!( - e, - LogEntry::RunOutcome { outcome: Outcome::Paused, .. } - )); + let has_paused = entries.iter().any(|e| { + matches!( + &e.entry, + LogEntry::RunOutcome { + outcome: Outcome::Paused, + .. + } + ) + }); assert!(has_paused, "should have Paused outcome"); // Restore and resume @@ -214,9 +246,10 @@ async fn session_resume_after_pause() { status: ResponseStatus::Completed, }), ]]); - let mut restored = Session::restore(resume_client, store.clone(), sid, SessionConfig::default()) - .await - .unwrap(); + let mut restored = + Session::restore(resume_client, store.clone(), sid, SessionConfig::default()) + .await + .unwrap(); assert!(restored.worker.last_run_interrupted()); @@ -244,7 +277,10 @@ async fn session_fork_preserves_state() { // Fork should have a SessionStart with the current history let fork_entries = store.read_all(fork_id).await.unwrap(); assert_eq!(fork_entries.len(), 1); - assert!(matches!(&fork_entries[0], LogEntry::SessionStart { .. })); + assert!(matches!( + &fork_entries[0].entry, + LogEntry::SessionStart { .. } + )); let fork_state = collect_state(&fork_entries); assert_eq!(fork_state.history.len(), original_history_len); @@ -267,8 +303,9 @@ async fn session_fork_at_truncates() { let all_entries = store.read_all(sid).await.unwrap(); assert!(all_entries.len() > 2); - // Fork at entry 2 (SessionStart + UserInput only) - let fork_id = Session::::fork_at(&store, sid, 2) + // Fork at the hash of the 2nd entry (SessionStart + UserInput) + let at_hash = &all_entries[1].hash; + let fork_id = Session::::fork_at(&store, sid, at_hash) .await .unwrap(); @@ -278,7 +315,10 @@ async fn session_fork_at_truncates() { let fork_state = collect_state(&fork_entries); // Should have the state from replaying only the first 2 entries let original_truncated_state = collect_state(&all_entries[..2]); - assert_eq!(fork_state.history.len(), original_truncated_state.history.len()); + assert_eq!( + fork_state.history.len(), + original_truncated_state.history.len() + ); } #[tokio::test] @@ -293,14 +333,18 @@ async fn session_config_changed_logged() { let sid = session.session_id(); // Modify config via worker and log it - session.worker.set_request_config(RequestConfig::default().with_temperature(0.7)); + session + .worker + .set_request_config(RequestConfig::default().with_temperature(0.7)); session.log_config_changed().await.unwrap(); let entries = store.read_all(sid).await.unwrap(); - let has_config_changed = entries.iter().any(|e| matches!( - e, - LogEntry::ConfigChanged { config, .. } if config.temperature == Some(0.7) - )); + let has_config_changed = entries.iter().any(|e| { + matches!( + &e.entry, + LogEntry::ConfigChanged { config, .. } if config.temperature == Some(0.7) + ) + }); assert!(has_config_changed, "should have ConfigChanged entry"); } @@ -310,7 +354,7 @@ async fn session_cache_lock_unlock_logged() { let client = MockLlmClient::new(vec![]); let worker = Worker::new(client); - let session = Session::new(worker, store.clone(), SessionConfig::default()) + let mut session = Session::new(worker, store.clone(), SessionConfig::default()) .await .unwrap(); let sid = session.session_id(); @@ -320,16 +364,71 @@ async fn session_cache_lock_unlock_logged() { let entries = store.read_all(sid).await.unwrap(); - let has_locked = entries.iter().any(|e| matches!( - e, - LogEntry::CacheLocked { locked_prefix_len: 5, .. } - )); + let has_locked = entries.iter().any(|e| { + matches!( + &e.entry, + LogEntry::CacheLocked { + locked_prefix_len: 5, + .. + } + ) + }); assert!(has_locked, "should have CacheLocked entry"); - let has_unlocked = entries.iter().any(|e| matches!(e, LogEntry::CacheUnlocked { .. })); + let has_unlocked = entries + .iter() + .any(|e| matches!(&e.entry, LogEntry::CacheUnlocked { .. })); assert!(has_unlocked, "should have CacheUnlocked entry"); // State after all entries: unlocked let state = collect_state(&entries); assert_eq!(state.locked_prefix_len, 0); } + +#[tokio::test] +async fn session_auto_forks_on_conflict() { + let (_dir, store) = make_store().await; + + // Create a session + let client_a = MockLlmClient::new(simple_text_events()); + let worker_a = Worker::new(client_a); + let mut session_a = Session::new(worker_a, store.clone(), SessionConfig::default()) + .await + .unwrap(); + let original_sid = session_a.session_id(); + + // Simulate another Pod writing to the same session behind our back + let extra_entry = LogEntry::UserInput { + ts: 9999, + item: Item::user_message("Interloper"), + }; + let current_head = store.read_head_hash(original_sid).await.unwrap(); + let hash = llm_worker_persistence::compute_hash(current_head.as_ref(), &extra_entry); + let hashed = llm_worker_persistence::HashedEntry { + hash, + prev_hash: current_head, + entry: extra_entry, + }; + store.append(original_sid, &hashed).await.unwrap(); + + // Now session_a's head_hash is stale — run should auto-fork + session_a.run("Hello").await.unwrap(); + + // session_a should now have a different session_id + assert_ne!(session_a.session_id(), original_sid); + + // The fork session should exist and have entries + let fork_entries = store.read_all(session_a.session_id()).await.unwrap(); + assert!(!fork_entries.is_empty()); + + // Original session should still have the interloper entry + let original_entries = store.read_all(original_sid).await.unwrap(); + let has_interloper = original_entries.iter().any(|e| { + if let LogEntry::UserInput { item, .. } = &e.entry { + item.is_user_message() + } else { + false + } + }); + assert!(has_interloper); +} diff --git a/tickets/session-entry-hash.md b/tickets/session-entry-hash.md deleted file mode 100644 index 8b320afe..00000000 --- a/tickets/session-entry-hash.md +++ /dev/null @@ -1,26 +0,0 @@ -# セッションエントリのハッシュチェーン - -## 背景 - -複数の Pod が同じ Session を読み込んで作業を進めるケースがある。現状の設計では、同一セッションファイルへの同時書き込みがコンフリクトを起こす。また `fork_at` のエントリ指定が `usize`(インデックス)であるため、元セッションにエントリが追加されると同じインデックスが別の内容を指してしまう。 - -## やること - -- 各 `LogEntry` に `prev_hash: Option` を追加(先頭エントリは `None`) -- `EntryHash` は `sha256(prev_hash + serialized_content)` で算出 -- `Session` がロード時に head hash(末尾エントリのハッシュ)を保持 -- `Session::append` 時にファイル末尾のハッシュと保持している head hash を比較 - - 一致 → 通常 append、head hash を更新 - - 不一致 → auto-fork(新 SessionId で分岐) -- `fork_at` の引数を `usize` → `EntryHash` に変更 - -## アドレッシング - -- **SessionId (UUID v7)** → どのセッション(ファイル)か -- **EntryHash** → そのセッション内のどの時点か - -## 判断根拠 - -- ファイルサイズやエントリ数による変更検知は同じ長さの別内容を区別できず信頼性が低い -- ハッシュチェーンなら暗号的に一意であり、ログの整合性検証も副産物として得られる -- ストレージレイアウト(JSONL)は変更不要。LogEntry の構造変更だけで済む