Sessionのハッシュ

This commit is contained in:
Keisuke Hirata 2026-04-11 15:14:02 +09:00
parent 59bfd89940
commit 3d2a49e1e4
11 changed files with 643 additions and 228 deletions

78
Cargo.lock generated
View File

@ -102,6 +102,15 @@ version = "2.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "843867be96c8daad0d758b57df9392b6d8d271134fce549de6ce169ff98a92af" checksum = "843867be96c8daad0d758b57df9392b6d8d271134fce549de6ce169ff98a92af"
[[package]]
name = "block-buffer"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cdd35008169921d80bc60d3d0ab416eecb028c4cd653352907921d95084790be"
dependencies = [
"hybrid-array",
]
[[package]] [[package]]
name = "bumpalo" name = "bumpalo"
version = "3.20.2" version = "3.20.2"
@ -205,6 +214,12 @@ dependencies = [
"static_assertions", "static_assertions",
] ]
[[package]]
name = "const-oid"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a6ef517f0926dd24a1582492c791b6a4818a4d94e789a334894aa15b0d12f55c"
[[package]] [[package]]
name = "core-foundation" name = "core-foundation"
version = "0.10.1" version = "0.10.1"
@ -221,6 +236,15 @@ version = "0.8.7"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b"
[[package]]
name = "cpufeatures"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b2a41393f66f16b0823bb79094d54ac5fbd34ab292ddafb9a0456ac9f87d201"
dependencies = [
"libc",
]
[[package]] [[package]]
name = "crossterm" name = "crossterm"
version = "0.28.1" version = "0.28.1"
@ -246,6 +270,15 @@ dependencies = [
"winapi", "winapi",
] ]
[[package]]
name = "crypto-common"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77727bb15fa921304124b128af125e7e3b968275d1b108b379190264f4423710"
dependencies = [
"hybrid-array",
]
[[package]] [[package]]
name = "daemon" name = "daemon"
version = "0.1.0" version = "0.1.0"
@ -289,6 +322,17 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "digest"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4850db49bf08e663084f7fb5c87d202ef91a3907271aff24a94eb97ff039153c"
dependencies = [
"block-buffer",
"const-oid",
"crypto-common",
]
[[package]] [[package]]
name = "displaydoc" name = "displaydoc"
version = "0.2.5" version = "0.2.5"
@ -553,6 +597,12 @@ version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
[[package]]
name = "hex"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
[[package]] [[package]]
name = "http" name = "http"
version = "1.4.0" version = "1.4.0"
@ -592,6 +642,15 @@ version = "1.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6dbf3de79e51f3d586ab4cb9d5c3e2c14aa28ed23d180cf89b4df0454a69cc87" checksum = "6dbf3de79e51f3d586ab4cb9d5c3e2c14aa28ed23d180cf89b4df0454a69cc87"
[[package]]
name = "hybrid-array"
version = "0.4.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3944cf8cf766b40e2a1a333ee5e9b563f854d5fa49d6a8ca2764e97c6eddb214"
dependencies = [
"typenum",
]
[[package]] [[package]]
name = "hyper" name = "hyper"
version = "1.9.0" version = "1.9.0"
@ -940,9 +999,11 @@ version = "0.1.0"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"futures", "futures",
"hex",
"llm-worker", "llm-worker",
"serde", "serde",
"serde_json", "serde_json",
"sha2",
"tempfile", "tempfile",
"thiserror", "thiserror",
"tokio", "tokio",
@ -1559,6 +1620,17 @@ dependencies = [
"serde_core", "serde_core",
] ]
[[package]]
name = "sha2"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "446ba717509524cb3f22f17ecc096f10f4822d76ab5c0b9822c5f9c284e825f4"
dependencies = [
"cfg-if",
"cpufeatures",
"digest",
]
[[package]] [[package]]
name = "sharded-slab" name = "sharded-slab"
version = "0.1.7" version = "0.1.7"
@ -2009,6 +2081,12 @@ dependencies = [
"tokio", "tokio",
] ]
[[package]]
name = "typenum"
version = "1.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "562d481066bde0658276a35467c4af00bdc6ee726305698a55b86e61d7ad82bb"
[[package]] [[package]]
name = "unicode-ident" name = "unicode-ident"
version = "1.0.24" version = "1.0.24"

View File

@ -7,3 +7,5 @@
- [x] inspect ツール実装 - [x] inspect ツール実装
- [x] max_turns: マニフェストによるターン数制限 - [x] max_turns: マニフェストによるターン数制限
- [x] pod バイナリエントリポイント - [x] pod バイナリエントリポイント
- [x] セッションエントリのハッシュチェーン
- [x] Subscriber → クロージャ API 移行

View File

@ -13,6 +13,8 @@ serde_json = "1.0"
tokio = { version = "1.49", features = ["fs", "io-util"] } tokio = { version = "1.49", features = ["fs", "io-util"] }
uuid = { version = "1", features = ["v7", "serde"] } uuid = { version = "1", features = ["v7", "serde"] }
thiserror = "2.0" thiserror = "2.0"
sha2 = "0.11.0"
hex = "0.4.3"
[dev-dependencies] [dev-dependencies]
tokio = { version = "1.49", features = ["macros", "rt-multi-thread", "fs", "io-util"] } tokio = { version = "1.49", features = ["macros", "rt-multi-thread", "fs", "io-util"] }

View File

@ -5,7 +5,7 @@
//! - Event trace: `{root}/{session_id}.trace.jsonl` //! - Event trace: `{root}/{session_id}.trace.jsonl`
use crate::event_trace::TraceEntry; use crate::event_trace::TraceEntry;
use crate::session_log::LogEntry; use crate::session_log::{EntryHash, HashedEntry};
use crate::store::{Store, StoreError}; use crate::store::{Store, StoreError};
use crate::SessionId; use crate::SessionId;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
@ -70,12 +70,12 @@ impl FsStore {
} }
impl Store for FsStore { impl Store for FsStore {
async fn append(&self, id: SessionId, entry: &LogEntry) -> Result<(), StoreError> { async fn append(&self, id: SessionId, entry: &HashedEntry) -> Result<(), StoreError> {
let line = serde_json::to_string(entry)?; let line = serde_json::to_string(entry)?;
self.append_line(&self.log_path(id), &line).await self.append_line(&self.log_path(id), &line).await
} }
async fn read_all(&self, id: SessionId) -> Result<Vec<LogEntry>, StoreError> { async fn read_all(&self, id: SessionId) -> Result<Vec<HashedEntry>, StoreError> {
let path = self.log_path(id); let path = self.log_path(id);
if !path.exists() { if !path.exists() {
return Err(StoreError::NotFound(id)); return Err(StoreError::NotFound(id));
@ -106,7 +106,7 @@ impl Store for FsStore {
async fn create_session( async fn create_session(
&self, &self,
id: SessionId, id: SessionId,
entries: &[LogEntry], entries: &[HashedEntry],
) -> Result<(), StoreError> { ) -> Result<(), StoreError> {
let path = self.log_path(id); let path = self.log_path(id);
let mut content = String::new(); let mut content = String::new();
@ -122,6 +122,30 @@ impl Store for FsStore {
Ok(self.log_path(id).exists()) Ok(self.log_path(id).exists())
} }
async fn read_head_hash(
&self,
id: SessionId,
) -> Result<Option<EntryHash>, StoreError> {
let path = self.log_path(id);
if !path.exists() {
return Err(StoreError::NotFound(id));
}
let content = fs::read_to_string(&path).await?;
let last_line = content.lines().rev().find(|l| !l.trim().is_empty());
match last_line {
Some(line) => {
let entry: HashedEntry = serde_json::from_str(line).map_err(|e| {
StoreError::Corrupt {
line: content.lines().count(),
message: e.to_string(),
}
})?;
Ok(Some(entry.hash))
}
None => Ok(None),
}
}
async fn append_trace( async fn append_trace(
&self, &self,
id: SessionId, id: SessionId,

View File

@ -37,7 +37,10 @@ pub use event_trace::TraceEntry;
pub use fs_blob_store::FsBlobStore; pub use fs_blob_store::FsBlobStore;
pub use fs_store::FsStore; pub use fs_store::FsStore;
pub use session::{Session, SessionConfig, SessionError}; pub use session::{Session, SessionConfig, SessionError};
pub use session_log::{LogEntry, Outcome, RestoredState, collect_state}; pub use session_log::{
EntryHash, HashedEntry, LogEntry, Outcome, RestoredState, build_chain, collect_state,
compute_hash,
};
pub use store::{Store, StoreError}; pub use store::{Store, StoreError};
/// Session identifier. UUID v7 (time-ordered, lexicographically sortable). /// Session identifier. UUID v7 (time-ordered, lexicographically sortable).

View File

@ -1,10 +1,14 @@
//! Persistent session wrapper around [`Worker`]. //! Persistent session wrapper around [`Worker`].
//! //!
//! [`Session`] intercepts `Worker` operations and appends [`LogEntry`] records //! [`Session`] intercepts `Worker` operations and appends [`HashedEntry`] records
//! to a [`Store`]. It does not modify `Worker` internals — all persistence //! to a [`Store`]. It does not modify `Worker` internals — all persistence
//! happens by observing state before and after each operation. //! happens by observing state before and after each operation.
//!
//! Each appended entry carries a hash that chains to the previous entry.
//! On append, the session checks whether the store's head still matches its
//! own `head_hash`; if not, it auto-forks into a new session.
use crate::session_log::{self, LogEntry, Outcome}; use crate::session_log::{self, EntryHash, HashedEntry, LogEntry, Outcome};
use crate::store::{Store, StoreError}; use crate::store::{Store, StoreError};
use crate::SessionId; use crate::SessionId;
use llm_worker::llm_client::client::LlmClient; use llm_worker::llm_client::client::LlmClient;
@ -47,6 +51,7 @@ pub struct Session<C: LlmClient, St: Store> {
pub worker: Worker<C, Mutable>, pub worker: Worker<C, Mutable>,
store: St, store: St,
session_id: SessionId, session_id: SessionId,
head_hash: Option<EntryHash>,
_config: SessionConfig, _config: SessionConfig,
} }
@ -58,18 +63,25 @@ impl<C: LlmClient, St: Store> Session<C, St> {
config: SessionConfig, config: SessionConfig,
) -> Result<Self, StoreError> { ) -> Result<Self, StoreError> {
let session_id = crate::new_session_id(); let session_id = crate::new_session_id();
let start = LogEntry::SessionStart { let entry = LogEntry::SessionStart {
ts: session_log::now_millis(), ts: session_log::now_millis(),
system_prompt: worker.get_system_prompt().map(String::from), system_prompt: worker.get_system_prompt().map(String::from),
config: worker.request_config().clone(), config: worker.request_config().clone(),
history: worker.history().to_vec(), history: worker.history().to_vec(),
}; };
store.append(session_id, &start).await?; let hashed = session_log::compute_hash(None, &entry);
let hashed_entry = HashedEntry {
hash: hashed.clone(),
prev_hash: None,
entry,
};
store.append(session_id, &hashed_entry).await?;
Ok(Self { Ok(Self {
worker, worker,
store, store,
session_id, session_id,
head_hash: Some(hashed),
_config: config, _config: config,
}) })
} }
@ -100,6 +112,7 @@ impl<C: LlmClient, St: Store> Session<C, St> {
worker, worker,
store, store,
session_id, session_id,
head_hash: state.head_hash,
_config: config, _config: config,
}) })
} }
@ -109,6 +122,11 @@ impl<C: LlmClient, St: Store> Session<C, St> {
self.session_id self.session_id
} }
/// The current head hash of the session log chain.
pub fn head_hash(&self) -> Option<&EntryHash> {
self.head_hash.as_ref()
}
/// Reference to the underlying store. /// Reference to the underlying store.
pub fn store(&self) -> &St { pub fn store(&self) -> &St {
&self.store &self.store
@ -119,6 +137,8 @@ impl<C: LlmClient, St: Store> Session<C, St> {
&mut self, &mut self,
user_input: impl Into<String>, user_input: impl Into<String>,
) -> Result<WorkerResult, SessionError> { ) -> Result<WorkerResult, SessionError> {
self.ensure_head_or_fork().await?;
let history_before = self.worker.history().len(); let history_before = self.worker.history().len();
let result = self.worker.run(user_input).await; let result = self.worker.run(user_input).await;
@ -132,6 +152,8 @@ impl<C: LlmClient, St: Store> Session<C, St> {
/// Resume from a paused state, logging all state changes. /// Resume from a paused state, logging all state changes.
pub async fn resume(&mut self) -> Result<WorkerResult, SessionError> { pub async fn resume(&mut self) -> Result<WorkerResult, SessionError> {
self.ensure_head_or_fork().await?;
let history_before = self.worker.history().len(); let history_before = self.worker.history().len();
let result = self.worker.resume().await; let result = self.worker.resume().await;
@ -148,122 +170,160 @@ impl<C: LlmClient, St: Store> Session<C, St> {
/// seeded with the current history. /// seeded with the current history.
pub async fn fork(&self) -> Result<SessionId, StoreError> { pub async fn fork(&self) -> Result<SessionId, StoreError> {
let fork_id = crate::new_session_id(); let fork_id = crate::new_session_id();
let start = LogEntry::SessionStart { let entry = LogEntry::SessionStart {
ts: session_log::now_millis(), ts: session_log::now_millis(),
system_prompt: self.worker.get_system_prompt().map(String::from), system_prompt: self.worker.get_system_prompt().map(String::from),
config: self.worker.request_config().clone(), config: self.worker.request_config().clone(),
history: self.worker.history().to_vec(), history: self.worker.history().to_vec(),
}; };
self.store.create_session(fork_id, &[start]).await?; let hashed = session_log::compute_hash(None, &entry);
let hashed_entry = HashedEntry {
hash: hashed,
prev_hash: None,
entry,
};
self.store
.create_session(fork_id, &[hashed_entry])
.await?;
Ok(fork_id) Ok(fork_id)
} }
/// Fork from an arbitrary point in a stored session's log. /// Fork from an arbitrary point in a stored session's log.
/// Replays entries up to `up_to_entry` and creates a new session /// Finds the entry matching `at_hash` and creates a new session
/// with that reconstructed state. /// with state reconstructed up to that point.
pub async fn fork_at( pub async fn fork_at(
store: &St, store: &St,
source_id: SessionId, source_id: SessionId,
up_to_entry: usize, at_hash: &EntryHash,
) -> Result<SessionId, StoreError> { ) -> Result<SessionId, StoreError> {
let entries = store.read_all(source_id).await?; let entries = store.read_all(source_id).await?;
let truncated = &entries[..up_to_entry.min(entries.len())]; let cut = entries
let state = session_log::collect_state(truncated); .iter()
.position(|e| &e.hash == at_hash)
.map(|i| i + 1)
.unwrap_or(entries.len());
let state = session_log::collect_state(&entries[..cut]);
let fork_id = crate::new_session_id(); let fork_id = crate::new_session_id();
let start = LogEntry::SessionStart { let entry = LogEntry::SessionStart {
ts: session_log::now_millis(), ts: session_log::now_millis(),
system_prompt: state.system_prompt, system_prompt: state.system_prompt,
config: state.config, config: state.config,
history: state.history, history: state.history,
}; };
store.create_session(fork_id, &[start]).await?; let hashed = session_log::compute_hash(None, &entry);
let hashed_entry = HashedEntry {
hash: hashed,
prev_hash: None,
entry,
};
store.create_session(fork_id, &[hashed_entry]).await?;
Ok(fork_id) Ok(fork_id)
} }
/// Log a `CacheLocked` entry. /// Log a `CacheLocked` entry.
pub async fn log_cache_locked( pub async fn log_cache_locked(
&self, &mut self,
locked_prefix_len: usize, locked_prefix_len: usize,
) -> Result<(), StoreError> { ) -> Result<(), StoreError> {
self.store let entry = LogEntry::CacheLocked {
.append( ts: session_log::now_millis(),
self.session_id, locked_prefix_len,
&LogEntry::CacheLocked { };
ts: session_log::now_millis(), self.append_entry(entry).await
locked_prefix_len,
},
)
.await
} }
/// Log a `CacheUnlocked` entry. /// Log a `CacheUnlocked` entry.
pub async fn log_cache_unlocked(&self) -> Result<(), StoreError> { pub async fn log_cache_unlocked(&mut self) -> Result<(), StoreError> {
self.store let entry = LogEntry::CacheUnlocked {
.append( ts: session_log::now_millis(),
self.session_id, };
&LogEntry::CacheUnlocked { self.append_entry(entry).await
ts: session_log::now_millis(),
},
)
.await
} }
/// Log a `ConfigChanged` entry. /// Log a `ConfigChanged` entry.
pub async fn log_config_changed(&self) -> Result<(), StoreError> { pub async fn log_config_changed(&mut self) -> Result<(), StoreError> {
self.store let entry = LogEntry::ConfigChanged {
.append( ts: session_log::now_millis(),
self.session_id, config: self.worker.request_config().clone(),
&LogEntry::ConfigChanged { };
ts: session_log::now_millis(), self.append_entry(entry).await
config: self.worker.request_config().clone(),
},
)
.await
} }
// ── Private helpers ────────────────────────────────────────────────── // ── Private helpers ──────────────────────────────────────────────────
async fn log_history_delta(&self, before_len: usize) -> Result<(), StoreError> { /// Append a `LogEntry`, computing its hash and updating `head_hash`.
async fn append_entry(&mut self, entry: LogEntry) -> Result<(), StoreError> {
let hash = session_log::compute_hash(self.head_hash.as_ref(), &entry);
let hashed_entry = HashedEntry {
hash: hash.clone(),
prev_hash: self.head_hash.clone(),
entry,
};
self.store
.append(self.session_id, &hashed_entry)
.await?;
self.head_hash = Some(hash);
Ok(())
}
/// Check that the store's head still matches ours. If not, auto-fork.
async fn ensure_head_or_fork(&mut self) -> Result<(), StoreError> {
let store_head = self.store.read_head_hash(self.session_id).await?;
if store_head == self.head_hash {
return Ok(());
}
// Another writer advanced this session — fork from our known state.
let fork_id = crate::new_session_id();
let entry = LogEntry::SessionStart {
ts: session_log::now_millis(),
system_prompt: self.worker.get_system_prompt().map(String::from),
config: self.worker.request_config().clone(),
history: self.worker.history().to_vec(),
};
let hash = session_log::compute_hash(None, &entry);
let hashed_entry = HashedEntry {
hash: hash.clone(),
prev_hash: None,
entry,
};
self.store
.create_session(fork_id, &[hashed_entry])
.await?;
self.session_id = fork_id;
self.head_hash = Some(hash);
Ok(())
}
async fn log_history_delta(&mut self, before_len: usize) -> Result<(), StoreError> {
let history = self.worker.history(); let history = self.worker.history();
if history.len() <= before_len { if history.len() <= before_len {
return Ok(()); return Ok(());
} }
let ts = session_log::now_millis(); let ts = session_log::now_millis();
let new_items = &history[before_len..]; let new_items = history[before_len..].to_vec();
let mut i = 0; let mut i = 0;
// Classify and group items by type.
// The actual items from history are used (not pre-constructed copies),
// so any modifications by hooks (e.g. on_prompt_submit) are captured correctly.
while i < new_items.len() { while i < new_items.len() {
let item = &new_items[i]; let item = &new_items[i];
if item.is_user_message() { if item.is_user_message() {
self.store self.append_entry(LogEntry::UserInput {
.append( ts,
self.session_id, item: new_items[i].clone(),
&LogEntry::UserInput { })
ts, .await?;
item: new_items[i].clone(),
},
)
.await?;
i += 1; i += 1;
} else if item.is_tool_result() { } else if item.is_tool_result() {
let start = i; let start = i;
while i < new_items.len() && new_items[i].is_tool_result() { while i < new_items.len() && new_items[i].is_tool_result() {
i += 1; i += 1;
} }
self.store self.append_entry(LogEntry::ToolResults {
.append( ts,
self.session_id, items: new_items[start..i].to_vec(),
&LogEntry::ToolResults { })
ts, .await?;
items: new_items[start..i].to_vec(),
},
)
.await?;
} else if item.is_assistant_message() } else if item.is_assistant_message()
|| item.is_tool_call() || item.is_tool_call()
|| item.is_reasoning() || item.is_reasoning()
@ -276,45 +336,33 @@ impl<C: LlmClient, St: Store> Session<C, St> {
{ {
i += 1; i += 1;
} }
self.store self.append_entry(LogEntry::AssistantItems {
.append( ts,
self.session_id, items: new_items[start..i].to_vec(),
&LogEntry::AssistantItems { })
ts, .await?;
items: new_items[start..i].to_vec(),
},
)
.await?;
} else { } else {
self.store self.append_entry(LogEntry::HookInjectedItems {
.append( ts,
self.session_id, items: vec![new_items[i].clone()],
&LogEntry::HookInjectedItems { })
ts, .await?;
items: vec![new_items[i].clone()],
},
)
.await?;
i += 1; i += 1;
} }
} }
Ok(()) Ok(())
} }
async fn log_turn_end(&self) -> Result<(), StoreError> { async fn log_turn_end(&mut self) -> Result<(), StoreError> {
self.store self.append_entry(LogEntry::TurnEnd {
.append( ts: session_log::now_millis(),
self.session_id, turn_count: self.worker.turn_count(),
&LogEntry::TurnEnd { })
ts: session_log::now_millis(), .await
turn_count: self.worker.turn_count(),
},
)
.await
} }
async fn log_outcome( async fn log_outcome(
&self, &mut self,
result: &Result<WorkerResult, WorkerError>, result: &Result<WorkerResult, WorkerError>,
) -> Result<(), StoreError> { ) -> Result<(), StoreError> {
let outcome = match result { let outcome = match result {
@ -325,15 +373,11 @@ impl<C: LlmClient, St: Store> Session<C, St> {
message: e.to_string(), message: e.to_string(),
}, },
}; };
self.store self.append_entry(LogEntry::RunOutcome {
.append( ts: session_log::now_millis(),
self.session_id, outcome,
&LogEntry::RunOutcome { interrupted: self.worker.last_run_interrupted(),
ts: session_log::now_millis(), })
outcome, .await
interrupted: self.worker.last_run_interrupted(),
},
)
.await
} }
} }

View File

@ -3,9 +3,84 @@
//! Each [`LogEntry`] represents a single state transition in a session, //! Each [`LogEntry`] represents a single state transition in a session,
//! serialized as one line in a `.jsonl` file. Reading all entries and //! serialized as one line in a `.jsonl` file. Reading all entries and
//! collecting them via [`collect_state`] reconstructs the full [`Worker`] state. //! collecting them via [`collect_state`] reconstructs the full [`Worker`] state.
//!
//! Entries are chained via [`EntryHash`]: each [`HashedEntry`] records the hash
//! of the previous entry, forming a tamper-evident append-only chain. This
//! enables safe fork detection when multiple writers share a session.
use llm_worker::llm_client::types::{Item, RequestConfig}; use llm_worker::llm_client::types::{Item, RequestConfig};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
/// SHA-256 hash identifying a specific log entry in the chain.
///
/// Computed as `sha256(prev_hash_bytes || canonical_json(entry))`.
/// Displayed and serialized as a lowercase hex string.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct EntryHash([u8; 32]);
impl EntryHash {
pub fn as_bytes(&self) -> &[u8; 32] {
&self.0
}
pub fn to_hex(&self) -> String {
hex::encode(self.0)
}
pub fn from_hex(s: &str) -> Result<Self, hex::FromHexError> {
let mut buf = [0u8; 32];
hex::decode_to_slice(s, &mut buf)?;
Ok(Self(buf))
}
}
impl std::fmt::Display for EntryHash {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.to_hex())
}
}
impl Serialize for EntryHash {
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
serializer.serialize_str(&self.to_hex())
}
}
impl<'de> Deserialize<'de> for EntryHash {
fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
let s = String::deserialize(deserializer)?;
Self::from_hex(&s).map_err(serde::de::Error::custom)
}
}
/// Compute the hash for a log entry given its predecessor's hash.
pub fn compute_hash(prev: Option<&EntryHash>, entry: &LogEntry) -> EntryHash {
let mut hasher = Sha256::new();
// Feed prev_hash bytes (32 zero bytes if None).
match prev {
Some(h) => hasher.update(h.as_bytes()),
None => hasher.update([0u8; 32]),
}
// Canonical JSON of the entry.
let json = serde_json::to_string(entry).expect("LogEntry serialization cannot fail");
hasher.update(json.as_bytes());
EntryHash(hasher.finalize().into())
}
/// A [`LogEntry`] with hash-chain metadata.
///
/// This is the unit persisted to JSONL — one line per `HashedEntry`.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HashedEntry {
pub hash: EntryHash,
pub prev_hash: Option<EntryHash>,
#[serde(flatten)]
pub entry: LogEntry,
}
/// A single session log entry, serialized as one JSONL line. /// A single session log entry, serialized as one JSONL line.
/// ///
@ -80,10 +155,12 @@ pub struct RestoredState {
pub turn_count: usize, pub turn_count: usize,
pub locked_prefix_len: usize, pub locked_prefix_len: usize,
pub last_run_interrupted: bool, pub last_run_interrupted: bool,
/// Hash of the last entry in the chain (None if empty).
pub head_hash: Option<EntryHash>,
} }
/// Replay a sequence of log entries to reconstruct worker state. /// Replay a sequence of hashed entries to reconstruct worker state.
pub fn collect_state(entries: &[LogEntry]) -> RestoredState { pub fn collect_state(entries: &[HashedEntry]) -> RestoredState {
let mut state = RestoredState { let mut state = RestoredState {
system_prompt: None, system_prompt: None,
config: RequestConfig::default(), config: RequestConfig::default(),
@ -91,10 +168,13 @@ pub fn collect_state(entries: &[LogEntry]) -> RestoredState {
turn_count: 0, turn_count: 0,
locked_prefix_len: 0, locked_prefix_len: 0,
last_run_interrupted: false, last_run_interrupted: false,
head_hash: None,
}; };
for entry in entries { for hashed in entries {
match entry { state.head_hash = Some(hashed.hash.clone());
match &hashed.entry {
LogEntry::SessionStart { LogEntry::SessionStart {
system_prompt, system_prompt,
config, config,
@ -148,6 +228,26 @@ pub fn now_millis() -> u64 {
.as_millis() as u64 .as_millis() as u64
} }
/// Build a hash chain from plain `LogEntry` values.
///
/// Useful for tests and for seeding new sessions from a list of entries.
pub fn build_chain(entries: &[LogEntry]) -> Vec<HashedEntry> {
let mut chain = Vec::with_capacity(entries.len());
let mut prev: Option<EntryHash> = None;
for entry in entries {
let hash = compute_hash(prev.as_ref(), entry);
chain.push(HashedEntry {
hash: hash.clone(),
prev_hash: prev,
entry: entry.clone(),
});
prev = Some(hash);
}
chain
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
@ -158,25 +258,27 @@ mod tests {
assert!(state.history.is_empty()); assert!(state.history.is_empty());
assert_eq!(state.turn_count, 0); assert_eq!(state.turn_count, 0);
assert_eq!(state.locked_prefix_len, 0); assert_eq!(state.locked_prefix_len, 0);
assert!(state.head_hash.is_none());
} }
#[test] #[test]
fn replay_session_start_sets_initial_state() { fn replay_session_start_sets_initial_state() {
let entries = vec![LogEntry::SessionStart { let entries = build_chain(&[LogEntry::SessionStart {
ts: 1000, ts: 1000,
system_prompt: Some("You are helpful.".into()), system_prompt: Some("You are helpful.".into()),
config: RequestConfig::default().with_max_tokens(1024), config: RequestConfig::default().with_max_tokens(1024),
history: vec![Item::user_message("seed")], history: vec![Item::user_message("seed")],
}]; }]);
let state = collect_state(&entries); let state = collect_state(&entries);
assert_eq!(state.system_prompt.as_deref(), Some("You are helpful.")); assert_eq!(state.system_prompt.as_deref(), Some("You are helpful."));
assert_eq!(state.config.max_tokens, Some(1024)); assert_eq!(state.config.max_tokens, Some(1024));
assert_eq!(state.history.len(), 1); assert_eq!(state.history.len(), 1);
assert!(state.head_hash.is_some());
} }
#[test] #[test]
fn replay_full_turn() { fn replay_full_turn() {
let entries = vec![ let entries = build_chain(&[
LogEntry::SessionStart { LogEntry::SessionStart {
ts: 1000, ts: 1000,
system_prompt: None, system_prompt: None,
@ -200,7 +302,7 @@ mod tests {
outcome: Outcome::Finished, outcome: Outcome::Finished,
interrupted: false, interrupted: false,
}, },
]; ]);
let state = collect_state(&entries); let state = collect_state(&entries);
assert_eq!(state.history.len(), 2); assert_eq!(state.history.len(), 2);
assert_eq!(state.turn_count, 1); assert_eq!(state.turn_count, 1);
@ -209,7 +311,7 @@ mod tests {
#[test] #[test]
fn replay_with_tool_calls() { fn replay_with_tool_calls() {
let entries = vec![ let entries = build_chain(&[
LogEntry::SessionStart { LogEntry::SessionStart {
ts: 1000, ts: 1000,
system_prompt: None, system_prompt: None,
@ -236,7 +338,7 @@ mod tests {
ts: 4100, ts: 4100,
turn_count: 1, turn_count: 1,
}, },
]; ]);
let state = collect_state(&entries); let state = collect_state(&entries);
assert_eq!(state.history.len(), 4); assert_eq!(state.history.len(), 4);
assert!(state.history[1].is_tool_call()); assert!(state.history[1].is_tool_call());
@ -245,7 +347,7 @@ mod tests {
#[test] #[test]
fn replay_cache_lock_unlock() { fn replay_cache_lock_unlock() {
let entries = vec![ let entries = build_chain(&[
LogEntry::SessionStart { LogEntry::SessionStart {
ts: 1000, ts: 1000,
system_prompt: None, system_prompt: None,
@ -257,7 +359,7 @@ mod tests {
locked_prefix_len: 2, locked_prefix_len: 2,
}, },
LogEntry::CacheUnlocked { ts: 3000 }, LogEntry::CacheUnlocked { ts: 3000 },
]; ]);
let state = collect_state(&entries); let state = collect_state(&entries);
assert_eq!(state.locked_prefix_len, 0); assert_eq!(state.locked_prefix_len, 0);
@ -268,7 +370,7 @@ mod tests {
#[test] #[test]
fn replay_config_changed() { fn replay_config_changed() {
let entries = vec![ let entries = build_chain(&[
LogEntry::SessionStart { LogEntry::SessionStart {
ts: 1000, ts: 1000,
system_prompt: None, system_prompt: None,
@ -279,8 +381,57 @@ mod tests {
ts: 2000, ts: 2000,
config: RequestConfig::default().with_temperature(0.5), config: RequestConfig::default().with_temperature(0.5),
}, },
]; ]);
let state = collect_state(&entries); let state = collect_state(&entries);
assert_eq!(state.config.temperature, Some(0.5)); assert_eq!(state.config.temperature, Some(0.5));
} }
#[test]
fn hash_chain_is_deterministic() {
let raw = vec![
LogEntry::SessionStart {
ts: 1000,
system_prompt: None,
config: RequestConfig::default(),
history: vec![],
},
LogEntry::UserInput {
ts: 2000,
item: Item::user_message("Hello"),
},
];
let chain_a = build_chain(&raw);
let chain_b = build_chain(&raw);
assert_eq!(chain_a[0].hash, chain_b[0].hash);
assert_eq!(chain_a[1].hash, chain_b[1].hash);
}
#[test]
fn different_content_produces_different_hash() {
let entry_a = LogEntry::UserInput {
ts: 1000,
item: Item::user_message("Hello"),
};
let entry_b = LogEntry::UserInput {
ts: 1000,
item: Item::user_message("World"),
};
let hash_a = compute_hash(None, &entry_a);
let hash_b = compute_hash(None, &entry_b);
assert_ne!(hash_a, hash_b);
}
#[test]
fn hash_hex_round_trip() {
let entry = LogEntry::SessionStart {
ts: 1000,
system_prompt: None,
config: RequestConfig::default(),
history: vec![],
};
let hash = compute_hash(None, &entry);
let hex = hash.to_hex();
let parsed = EntryHash::from_hex(&hex).unwrap();
assert_eq!(hash, parsed);
}
} }

View File

@ -4,7 +4,7 @@
//! Implementations handle the physical storage (filesystem, database, etc.). //! Implementations handle the physical storage (filesystem, database, etc.).
use crate::event_trace::TraceEntry; use crate::event_trace::TraceEntry;
use crate::session_log::LogEntry; use crate::session_log::{EntryHash, HashedEntry};
use crate::SessionId; use crate::SessionId;
use std::future::Future; use std::future::Future;
@ -29,28 +29,29 @@ pub enum StoreError {
/// All methods take `&self` — implementations should use interior mutability /// All methods take `&self` — implementations should use interior mutability
/// (e.g., append-mode file handles) when needed. /// (e.g., append-mode file handles) when needed.
pub trait Store: Send + Sync { pub trait Store: Send + Sync {
/// Append a single log entry to the session. /// Append a single hashed entry to the session log.
fn append( fn append(
&self, &self,
id: SessionId, id: SessionId,
entry: &LogEntry, entry: &HashedEntry,
) -> impl Future<Output = Result<(), StoreError>> + Send; ) -> impl Future<Output = Result<(), StoreError>> + Send;
/// Read all log entries for a session, in order. /// Read all hashed entries for a session, in order.
fn read_all( fn read_all(
&self, &self,
id: SessionId, id: SessionId,
) -> impl Future<Output = Result<Vec<LogEntry>, StoreError>> + Send; ) -> impl Future<Output = Result<Vec<HashedEntry>, StoreError>> + Send;
/// List all session IDs, most recent first. /// List all session IDs, most recent first.
fn list_sessions(&self) fn list_sessions(
-> impl Future<Output = Result<Vec<SessionId>, StoreError>> + Send; &self,
) -> impl Future<Output = Result<Vec<SessionId>, StoreError>> + Send;
/// Create a new session with initial entries. /// Create a new session with initial entries.
fn create_session( fn create_session(
&self, &self,
id: SessionId, id: SessionId,
entries: &[LogEntry], entries: &[HashedEntry],
) -> impl Future<Output = Result<(), StoreError>> + Send; ) -> impl Future<Output = Result<(), StoreError>> + Send;
/// Check if a session exists. /// Check if a session exists.
@ -59,6 +60,14 @@ pub trait Store: Send + Sync {
id: SessionId, id: SessionId,
) -> impl Future<Output = Result<bool, StoreError>> + Send; ) -> impl Future<Output = Result<bool, StoreError>> + Send;
/// Read the hash of the last entry in a session (the head).
///
/// Returns `None` if the session is empty.
fn read_head_hash(
&self,
id: SessionId,
) -> impl Future<Output = Result<Option<EntryHash>, StoreError>> + Send;
/// Append a trace entry to the debug event trace file. /// Append a trace entry to the debug event trace file.
fn append_trace( fn append_trace(
&self, &self,

View File

@ -1,6 +1,6 @@
use llm_worker::llm_client::types::{Item, RequestConfig}; use llm_worker::llm_client::types::{Item, RequestConfig};
use llm_worker_persistence::{ use llm_worker_persistence::{
FsStore, LogEntry, Outcome, Store, TraceEntry, new_session_id, collect_state, FsStore, LogEntry, Outcome, Store, TraceEntry, build_chain, collect_state, new_session_id,
}; };
#[tokio::test] #[tokio::test]
@ -9,7 +9,7 @@ async fn round_trip_write_and_read() {
let store = FsStore::new(dir.path()).await.unwrap(); let store = FsStore::new(dir.path()).await.unwrap();
let id = new_session_id(); let id = new_session_id();
let entries = vec![ let raw = vec![
LogEntry::SessionStart { LogEntry::SessionStart {
ts: 1000, ts: 1000,
system_prompt: Some("You are helpful.".into()), system_prompt: Some("You are helpful.".into()),
@ -34,6 +34,7 @@ async fn round_trip_write_and_read() {
interrupted: false, interrupted: false,
}, },
]; ];
let entries = build_chain(&raw);
// Write entries one by one // Write entries one by one
for entry in &entries { for entry in &entries {
@ -44,6 +45,12 @@ async fn round_trip_write_and_read() {
let read_back = store.read_all(id).await.unwrap(); let read_back = store.read_all(id).await.unwrap();
assert_eq!(read_back.len(), entries.len()); assert_eq!(read_back.len(), entries.len());
// Verify hashes survived round-trip
for (orig, read) in entries.iter().zip(read_back.iter()) {
assert_eq!(orig.hash, read.hash);
assert_eq!(orig.prev_hash, read.prev_hash);
}
// Replay and verify state // Replay and verify state
let state = collect_state(&read_back); let state = collect_state(&read_back);
assert_eq!(state.system_prompt.as_deref(), Some("You are helpful.")); assert_eq!(state.system_prompt.as_deref(), Some("You are helpful."));
@ -51,6 +58,7 @@ async fn round_trip_write_and_read() {
assert_eq!(state.history.len(), 2); assert_eq!(state.history.len(), 2);
assert_eq!(state.turn_count, 1); assert_eq!(state.turn_count, 1);
assert!(!state.last_run_interrupted); assert!(!state.last_run_interrupted);
assert!(state.head_hash.is_some());
} }
#[tokio::test] #[tokio::test]
@ -59,14 +67,12 @@ async fn create_session_writes_all_entries() {
let store = FsStore::new(dir.path()).await.unwrap(); let store = FsStore::new(dir.path()).await.unwrap();
let id = new_session_id(); let id = new_session_id();
let entries = vec![ let entries = build_chain(&[LogEntry::SessionStart {
LogEntry::SessionStart { ts: 1000,
ts: 1000, system_prompt: None,
system_prompt: None, config: RequestConfig::default(),
config: RequestConfig::default(), history: vec![Item::user_message("seed"), Item::assistant_message("ok")],
history: vec![Item::user_message("seed"), Item::assistant_message("ok")], }]);
},
];
store.create_session(id, &entries).await.unwrap(); store.create_session(id, &entries).await.unwrap();
let read_back = store.read_all(id).await.unwrap(); let read_back = store.read_all(id).await.unwrap();
@ -86,15 +92,21 @@ async fn list_sessions_returns_newest_first() {
tokio::time::sleep(std::time::Duration::from_millis(2)).await; tokio::time::sleep(std::time::Duration::from_millis(2)).await;
let id2 = new_session_id(); let id2 = new_session_id();
let start = LogEntry::SessionStart { let entries1 = build_chain(&[LogEntry::SessionStart {
ts: 1000, ts: 1000,
system_prompt: None, system_prompt: None,
config: RequestConfig::default(), config: RequestConfig::default(),
history: vec![], history: vec![],
}; }]);
let entries2 = build_chain(&[LogEntry::SessionStart {
ts: 1001,
system_prompt: None,
config: RequestConfig::default(),
history: vec![],
}]);
store.append(id1, &start).await.unwrap(); store.append(id1, &entries1[0]).await.unwrap();
store.append(id2, &start).await.unwrap(); store.append(id2, &entries2[0]).await.unwrap();
let sessions = store.list_sessions().await.unwrap(); let sessions = store.list_sessions().await.unwrap();
assert_eq!(sessions.len(), 2); assert_eq!(sessions.len(), 2);
@ -110,18 +122,13 @@ async fn exists_returns_correct_state() {
assert!(!store.exists(id).await.unwrap()); assert!(!store.exists(id).await.unwrap());
store let entries = build_chain(&[LogEntry::SessionStart {
.append( ts: 1000,
id, system_prompt: None,
&LogEntry::SessionStart { config: RequestConfig::default(),
ts: 1000, history: vec![],
system_prompt: None, }]);
config: RequestConfig::default(), store.append(id, &entries[0]).await.unwrap();
history: vec![],
},
)
.await
.unwrap();
assert!(store.exists(id).await.unwrap()); assert!(store.exists(id).await.unwrap());
} }
@ -143,18 +150,13 @@ async fn trace_entries_in_separate_file() {
let id = new_session_id(); let id = new_session_id();
// Write a log entry // Write a log entry
store let entries = build_chain(&[LogEntry::SessionStart {
.append( ts: 1000,
id, system_prompt: None,
&LogEntry::SessionStart { config: RequestConfig::default(),
ts: 1000, history: vec![],
system_prompt: None, }]);
config: RequestConfig::default(), store.append(id, &entries[0]).await.unwrap();
history: vec![],
},
)
.await
.unwrap();
// Write a trace entry // Write a trace entry
let trace = TraceEntry { let trace = TraceEntry {
@ -174,3 +176,30 @@ async fn trace_entries_in_separate_file() {
let trace_path = dir.path().join(format!("{id}.trace.jsonl")); let trace_path = dir.path().join(format!("{id}.trace.jsonl"));
assert!(trace_path.exists()); assert!(trace_path.exists());
} }
#[tokio::test]
async fn read_head_hash_returns_last_entry_hash() {
let dir = tempfile::tempdir().unwrap();
let store = FsStore::new(dir.path()).await.unwrap();
let id = new_session_id();
let entries = build_chain(&[
LogEntry::SessionStart {
ts: 1000,
system_prompt: None,
config: RequestConfig::default(),
history: vec![],
},
LogEntry::UserInput {
ts: 2000,
item: Item::user_message("Hello"),
},
]);
for entry in &entries {
store.append(id, entry).await.unwrap();
}
let head = store.read_head_hash(id).await.unwrap();
assert_eq!(head.as_ref(), Some(&entries[1].hash));
}

View File

@ -112,17 +112,37 @@ async fn session_run_logs_entries() {
let entries = store.read_all(sid).await.unwrap(); let entries = store.read_all(sid).await.unwrap();
// SessionStart, UserInput, AssistantItems, TurnEnd, RunOutcome (at minimum) // SessionStart, UserInput, AssistantItems, TurnEnd, RunOutcome (at minimum)
assert!(entries.len() >= 4, "expected at least 4 entries, got {}", entries.len()); assert!(
entries.len() >= 4,
"expected at least 4 entries, got {}",
entries.len()
);
// First entry is SessionStart // First entry is SessionStart
assert!(matches!(entries[0], LogEntry::SessionStart { .. })); assert!(matches!(&entries[0].entry, LogEntry::SessionStart { .. }));
// Has a RunOutcome with Finished // Has a RunOutcome with Finished
let has_finished = entries.iter().any(|e| matches!( let has_finished = entries.iter().any(|e| {
e, matches!(
LogEntry::RunOutcome { outcome: Outcome::Finished, .. } &e.entry,
)); LogEntry::RunOutcome {
outcome: Outcome::Finished,
..
}
)
});
assert!(has_finished, "should have a Finished outcome"); assert!(has_finished, "should have a Finished outcome");
// Verify hash chain integrity
assert!(entries[0].prev_hash.is_none());
for i in 1..entries.len() {
assert_eq!(
entries[i].prev_hash.as_ref(),
Some(&entries[i - 1].hash),
"hash chain broken at entry {}",
i
);
}
} }
#[tokio::test] #[tokio::test]
@ -141,12 +161,14 @@ async fn session_restore_round_trip() {
let original_history = session.worker.history().to_vec(); let original_history = session.worker.history().to_vec();
let original_turn_count = session.worker.turn_count(); let original_turn_count = session.worker.turn_count();
let original_head_hash = session.head_hash().cloned();
// Restore // Restore
let restore_client = MockLlmClient::new(vec![]); // won't be called let restore_client = MockLlmClient::new(vec![]); // won't be called
let restored = Session::restore(restore_client, store.clone(), sid, SessionConfig::default()) let restored =
.await Session::restore(restore_client, store.clone(), sid, SessionConfig::default())
.unwrap(); .await
.unwrap();
assert_eq!(restored.worker.history().len(), original_history.len()); assert_eq!(restored.worker.history().len(), original_history.len());
assert_eq!(restored.worker.turn_count(), original_turn_count); assert_eq!(restored.worker.turn_count(), original_turn_count);
@ -154,6 +176,7 @@ async fn session_restore_round_trip() {
restored.worker.get_system_prompt().map(String::from), restored.worker.get_system_prompt().map(String::from),
Some("You are helpful.".to_string()) Some("You are helpful.".to_string())
); );
assert_eq!(restored.head_hash(), original_head_hash.as_ref());
} }
#[tokio::test] #[tokio::test]
@ -172,10 +195,14 @@ async fn session_run_with_tool_call() {
let entries = store.read_all(sid).await.unwrap(); let entries = store.read_all(sid).await.unwrap();
let has_tool_results = entries.iter().any(|e| matches!(e, LogEntry::ToolResults { .. })); let has_tool_results = entries
.iter()
.any(|e| matches!(&e.entry, LogEntry::ToolResults { .. }));
assert!(has_tool_results, "should have ToolResults entry"); assert!(has_tool_results, "should have ToolResults entry");
let has_assistant = entries.iter().any(|e| matches!(e, LogEntry::AssistantItems { .. })); let has_assistant = entries
.iter()
.any(|e| matches!(&e.entry, LogEntry::AssistantItems { .. }));
assert!(has_assistant, "should have AssistantItems entry"); assert!(has_assistant, "should have AssistantItems entry");
} }
@ -199,10 +226,15 @@ async fn session_resume_after_pause() {
// Check RunOutcome is Paused // Check RunOutcome is Paused
let entries = store.read_all(sid).await.unwrap(); let entries = store.read_all(sid).await.unwrap();
let has_paused = entries.iter().any(|e| matches!( let has_paused = entries.iter().any(|e| {
e, matches!(
LogEntry::RunOutcome { outcome: Outcome::Paused, .. } &e.entry,
)); LogEntry::RunOutcome {
outcome: Outcome::Paused,
..
}
)
});
assert!(has_paused, "should have Paused outcome"); assert!(has_paused, "should have Paused outcome");
// Restore and resume // Restore and resume
@ -214,9 +246,10 @@ async fn session_resume_after_pause() {
status: ResponseStatus::Completed, status: ResponseStatus::Completed,
}), }),
]]); ]]);
let mut restored = Session::restore(resume_client, store.clone(), sid, SessionConfig::default()) let mut restored =
.await Session::restore(resume_client, store.clone(), sid, SessionConfig::default())
.unwrap(); .await
.unwrap();
assert!(restored.worker.last_run_interrupted()); assert!(restored.worker.last_run_interrupted());
@ -244,7 +277,10 @@ async fn session_fork_preserves_state() {
// Fork should have a SessionStart with the current history // Fork should have a SessionStart with the current history
let fork_entries = store.read_all(fork_id).await.unwrap(); let fork_entries = store.read_all(fork_id).await.unwrap();
assert_eq!(fork_entries.len(), 1); assert_eq!(fork_entries.len(), 1);
assert!(matches!(&fork_entries[0], LogEntry::SessionStart { .. })); assert!(matches!(
&fork_entries[0].entry,
LogEntry::SessionStart { .. }
));
let fork_state = collect_state(&fork_entries); let fork_state = collect_state(&fork_entries);
assert_eq!(fork_state.history.len(), original_history_len); assert_eq!(fork_state.history.len(), original_history_len);
@ -267,8 +303,9 @@ async fn session_fork_at_truncates() {
let all_entries = store.read_all(sid).await.unwrap(); let all_entries = store.read_all(sid).await.unwrap();
assert!(all_entries.len() > 2); assert!(all_entries.len() > 2);
// Fork at entry 2 (SessionStart + UserInput only) // Fork at the hash of the 2nd entry (SessionStart + UserInput)
let fork_id = Session::<MockLlmClient, FsStore>::fork_at(&store, sid, 2) let at_hash = &all_entries[1].hash;
let fork_id = Session::<MockLlmClient, FsStore>::fork_at(&store, sid, at_hash)
.await .await
.unwrap(); .unwrap();
@ -278,7 +315,10 @@ async fn session_fork_at_truncates() {
let fork_state = collect_state(&fork_entries); let fork_state = collect_state(&fork_entries);
// Should have the state from replaying only the first 2 entries // Should have the state from replaying only the first 2 entries
let original_truncated_state = collect_state(&all_entries[..2]); let original_truncated_state = collect_state(&all_entries[..2]);
assert_eq!(fork_state.history.len(), original_truncated_state.history.len()); assert_eq!(
fork_state.history.len(),
original_truncated_state.history.len()
);
} }
#[tokio::test] #[tokio::test]
@ -293,14 +333,18 @@ async fn session_config_changed_logged() {
let sid = session.session_id(); let sid = session.session_id();
// Modify config via worker and log it // Modify config via worker and log it
session.worker.set_request_config(RequestConfig::default().with_temperature(0.7)); session
.worker
.set_request_config(RequestConfig::default().with_temperature(0.7));
session.log_config_changed().await.unwrap(); session.log_config_changed().await.unwrap();
let entries = store.read_all(sid).await.unwrap(); let entries = store.read_all(sid).await.unwrap();
let has_config_changed = entries.iter().any(|e| matches!( let has_config_changed = entries.iter().any(|e| {
e, matches!(
LogEntry::ConfigChanged { config, .. } if config.temperature == Some(0.7) &e.entry,
)); LogEntry::ConfigChanged { config, .. } if config.temperature == Some(0.7)
)
});
assert!(has_config_changed, "should have ConfigChanged entry"); assert!(has_config_changed, "should have ConfigChanged entry");
} }
@ -310,7 +354,7 @@ async fn session_cache_lock_unlock_logged() {
let client = MockLlmClient::new(vec![]); let client = MockLlmClient::new(vec![]);
let worker = Worker::new(client); let worker = Worker::new(client);
let session = Session::new(worker, store.clone(), SessionConfig::default()) let mut session = Session::new(worker, store.clone(), SessionConfig::default())
.await .await
.unwrap(); .unwrap();
let sid = session.session_id(); let sid = session.session_id();
@ -320,16 +364,71 @@ async fn session_cache_lock_unlock_logged() {
let entries = store.read_all(sid).await.unwrap(); let entries = store.read_all(sid).await.unwrap();
let has_locked = entries.iter().any(|e| matches!( let has_locked = entries.iter().any(|e| {
e, matches!(
LogEntry::CacheLocked { locked_prefix_len: 5, .. } &e.entry,
)); LogEntry::CacheLocked {
locked_prefix_len: 5,
..
}
)
});
assert!(has_locked, "should have CacheLocked entry"); assert!(has_locked, "should have CacheLocked entry");
let has_unlocked = entries.iter().any(|e| matches!(e, LogEntry::CacheUnlocked { .. })); let has_unlocked = entries
.iter()
.any(|e| matches!(&e.entry, LogEntry::CacheUnlocked { .. }));
assert!(has_unlocked, "should have CacheUnlocked entry"); assert!(has_unlocked, "should have CacheUnlocked entry");
// State after all entries: unlocked // State after all entries: unlocked
let state = collect_state(&entries); let state = collect_state(&entries);
assert_eq!(state.locked_prefix_len, 0); assert_eq!(state.locked_prefix_len, 0);
} }
#[tokio::test]
async fn session_auto_forks_on_conflict() {
let (_dir, store) = make_store().await;
// Create a session
let client_a = MockLlmClient::new(simple_text_events());
let worker_a = Worker::new(client_a);
let mut session_a = Session::new(worker_a, store.clone(), SessionConfig::default())
.await
.unwrap();
let original_sid = session_a.session_id();
// Simulate another Pod writing to the same session behind our back
let extra_entry = LogEntry::UserInput {
ts: 9999,
item: Item::user_message("Interloper"),
};
let current_head = store.read_head_hash(original_sid).await.unwrap();
let hash = llm_worker_persistence::compute_hash(current_head.as_ref(), &extra_entry);
let hashed = llm_worker_persistence::HashedEntry {
hash,
prev_hash: current_head,
entry: extra_entry,
};
store.append(original_sid, &hashed).await.unwrap();
// Now session_a's head_hash is stale — run should auto-fork
session_a.run("Hello").await.unwrap();
// session_a should now have a different session_id
assert_ne!(session_a.session_id(), original_sid);
// The fork session should exist and have entries
let fork_entries = store.read_all(session_a.session_id()).await.unwrap();
assert!(!fork_entries.is_empty());
// Original session should still have the interloper entry
let original_entries = store.read_all(original_sid).await.unwrap();
let has_interloper = original_entries.iter().any(|e| {
if let LogEntry::UserInput { item, .. } = &e.entry {
item.is_user_message()
} else {
false
}
});
assert!(has_interloper);
}

View File

@ -1,26 +0,0 @@
# セッションエントリのハッシュチェーン
## 背景
複数の Pod が同じ Session を読み込んで作業を進めるケースがある。現状の設計では、同一セッションファイルへの同時書き込みがコンフリクトを起こす。また `fork_at` のエントリ指定が `usize`(インデックス)であるため、元セッションにエントリが追加されると同じインデックスが別の内容を指してしまう。
## やること
- 各 `LogEntry``prev_hash: Option<EntryHash>` を追加(先頭エントリは `None`
- `EntryHash``sha256(prev_hash + serialized_content)` で算出
- `Session` がロード時に head hash末尾エントリのハッシュを保持
- `Session::append` 時にファイル末尾のハッシュと保持している head hash を比較
- 一致 → 通常 append、head hash を更新
- 不一致 → auto-fork新 SessionId で分岐)
- `fork_at` の引数を `usize``EntryHash` に変更
## アドレッシング
- **SessionId (UUID v7)** → どのセッション(ファイル)か
- **EntryHash** → そのセッション内のどの時点か
## 判断根拠
- ファイルサイズやエントリ数による変更検知は同じ長さの別内容を区別できず信頼性が低い
- ハッシュチェーンなら暗号的に一意であり、ログの整合性検証も副産物として得られる
- ストレージレイアウトJSONLは変更不要。LogEntry の構造変更だけで済む