diff --git a/Cargo.lock b/Cargo.lock index 0a05ebcd..6e334614 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2148,6 +2148,7 @@ checksum = "b4596b6d070b27117e987119b4dac604f3c58cfb0b191112e24771b2faeac1a6" name = "pod" version = "0.1.0" dependencies = [ + "arc-swap", "async-trait", "chrono", "clap", @@ -2160,7 +2161,6 @@ dependencies = [ "manifest", "memory", "minijinja", - "parking_lot", "pod-registry", "protocol", "provider", @@ -2977,12 +2977,10 @@ version = "0.1.0" dependencies = [ "async-trait", "futures", - "hex", "llm-worker", "protocol", "serde", "serde_json", - "sha2 0.11.0", "tempfile", "thiserror 2.0.18", "tokio", diff --git a/Cargo.toml b/Cargo.toml index c9088d14..0294b309 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,7 +31,6 @@ llm-worker-macros = { path = "crates/llm-worker-macros", version = "0.2" } manifest = { path = "crates/manifest" } lint-common = { path = "crates/lint-common" } memory = { path = "crates/memory" } -workflow = { path = "crates/workflow" } pod-registry = { path = "crates/pod-registry" } protocol = { path = "crates/protocol" } provider = { path = "crates/provider" } diff --git a/crates/memory/src/extract/pointer.rs b/crates/memory/src/extract/pointer.rs index df58f14c..59d86310 100644 --- a/crates/memory/src/extract/pointer.rs +++ b/crates/memory/src/extract/pointer.rs @@ -10,7 +10,7 @@ use super::EXTRACT_DOMAIN; /// として 1 回ずつ書かれ、最新の 1 件が現行 pointer として有効になる。 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct ExtractPointerPayload { - /// 直近 extract が処理した最後の session-store HashedEntry の index。 + /// 直近 extract が処理した最後の session-store LogEntry の index。 /// 次回の `source.range.start` はこの値 + 1。 pub processed_through_entry: usize, /// 直近 extract 時点の `history.len()`。次回入力は diff --git a/crates/pod/Cargo.toml b/crates/pod/Cargo.toml index 2f663d62..350ad92d 100644 --- a/crates/pod/Cargo.toml +++ b/crates/pod/Cargo.toml @@ -30,7 +30,7 @@ memory = { workspace = true } workflow-crate = { package = "workflow", path = "../workflow" } uuid = { workspace = true, features = ["v7"] } session-metrics = { workspace = true } -parking_lot = "0.12.5" +arc-swap = "1.9.1" [dev-dependencies] dotenv = "0.15.0" diff --git a/crates/pod/src/pod.rs b/crates/pod/src/pod.rs index 63d1ed01..de9322fa 100644 --- a/crates/pod/src/pod.rs +++ b/crates/pod/src/pod.rs @@ -1,16 +1,15 @@ use std::path::{Path, PathBuf}; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; +use arc_swap::ArcSwap; use llm_worker::Item; use llm_worker::llm_client::RequestConfig; use llm_worker::llm_client::client::LlmClient; use llm_worker::state::Mutable; use llm_worker::{ToolOutputLimits, UsageRecord, Worker, WorkerError, WorkerResult}; -use parking_lot::Mutex as SyncMutex; use session_store::{ - EntryHash, HashedEntry, LogEntry, PodScopeSnapshot, SessionId, Store, StoreError, SystemItem, - session_log, to_logged, + LogEntry, PodScopeSnapshot, SessionId, Store, StoreError, SystemItem, session_log, to_logged, }; use tracing::{info, warn}; @@ -43,22 +42,59 @@ use protocol::{AlertLevel, AlertSource, Event, Segment}; use tokio::sync::broadcast; use tokio::task::JoinHandle; -pub struct SessionHead { - pub session_id: SessionId, - pub head_hash: Option, +/// Lock-free shared session pointer. +/// +/// Holds the current `(session_id, entries_written)` pair so that the +/// Pod and every `LogWriterHandle` clone see a consistent view through +/// `Arc`-shared lock-free reads. `session_id` is wrapped in `ArcSwap` +/// so fork (a rare, run-start-only event) can atomically swap it +/// without taking a mutex on the append hot path. `entries_written` is +/// an `AtomicUsize` bumped on every successful append; the writer's +/// tally is compared against the store's on-disk count to detect +/// concurrent writers in `ensure_session_head`. +pub struct SessionState { + session_id: ArcSwap, + entries_written: AtomicUsize, } -/// Cheap-cloneable bundle of (store + session-head lock + sink) handed -/// to the worker callback and the interceptor so they can commit -/// `LogEntry` values directly without going through an mpsc ferry. -/// -/// All three fields are `Clone` (the latter two as `Arc` clones, the -/// store per its `Clone` impl) so the handle itself is a flat triple of -/// cheap copies. +impl SessionState { + pub fn new(session_id: SessionId, entries_written: usize) -> Arc { + Arc::new(Self { + session_id: ArcSwap::from_pointee(session_id), + entries_written: AtomicUsize::new(entries_written), + }) + } + + pub fn session_id(&self) -> SessionId { + **self.session_id.load() + } + + pub fn set_session_id(&self, id: SessionId) { + self.session_id.store(Arc::new(id)); + } + + pub fn entries_written(&self) -> usize { + self.entries_written.load(Ordering::Acquire) + } + + pub fn set_entries_written(&self, n: usize) { + self.entries_written.store(n, Ordering::Release); + } + + fn increment_entries(&self) { + self.entries_written.fetch_add(1, Ordering::Release); + } +} + +/// Cheap-cloneable bundle of (store + shared session pointer + sink) +/// handed to the worker callback and the interceptor so they can +/// commit `LogEntry` values directly without going through an mpsc +/// ferry. All fields are `Clone` (`store` per its `Clone` impl, +/// `state` and `sink` as `Arc` clones). #[derive(Clone)] pub struct LogWriterHandle { pub store: St, - pub session_head: Arc>, + pub state: Arc, pub sink: SessionLogSink, } @@ -66,18 +102,16 @@ impl LogWriterHandle where St: Store + Clone, { - /// Append `entry` to the log: disk write → in-memory mirror push → - /// broadcast — atomic w.r.t. `subscribe_with_snapshot` callers. - pub fn append_entry(&self, entry: LogEntry) -> Result { - let mut head = self.session_head.lock(); - let hash = session_store::append_entry_with_hash( - &self.store, - head.session_id, - &mut head.head_hash, - entry.clone(), - )?; + /// Append `entry` to the log: disk write → counter bump → in-memory + /// mirror push → broadcast. The kernel orders concurrent `O_APPEND` + /// writes for `< PIPE_BUF` lines, so no user-space serialization is + /// needed across appenders. + pub fn append_entry(&self, entry: LogEntry) -> Result<(), StoreError> { + let session_id = self.state.session_id(); + self.store.append(session_id, &entry)?; + self.state.increment_entries(); self.sink.publish(entry); - Ok(hash) + Ok(()) } } @@ -127,8 +161,10 @@ pub struct Pod { /// Always `Some` outside of `run()`/`resume()`. worker: Option>, store: St, - session_id: SessionId, - session_head: Arc>, + /// Shared session pointer. Source of truth for the Pod's current + /// `session_id` and append tally. `self.session_id()` is a thin + /// wrapper over `session_state.session_id()`. + session_state: Arc, /// Absolute working directory of the Pod. pwd: PathBuf, /// Shared, atomically-swappable view of the Pod's resolved scope. @@ -302,8 +338,7 @@ impl Pod { manifest: self.manifest.clone(), worker: Some(worker), store: self.store.clone(), - session_id: self.session_id, - session_head: self.session_head.clone(), + session_state: self.session_state.clone(), pwd: self.pwd.clone(), scope: self.scope.clone(), hook_builder: HookRegistryBuilder::new(), @@ -342,12 +377,12 @@ impl Pod { /// Build a `LogWriterHandle` carrying everything the worker /// callback / interceptor needs to commit `LogEntry` values - /// directly: store handle, the shared session-head lock, and the + /// directly: store handle, the shared session pointer, and the /// broadcast sink. All three are cheap clones. pub fn log_writer_handle(&self) -> LogWriterHandle { LogWriterHandle { store: self.store.clone(), - session_head: self.session_head.clone(), + state: self.session_state.clone(), sink: self.sink.clone(), } } @@ -443,11 +478,7 @@ impl Pod { manifest, worker: Some(worker), store, - session_id, - session_head: Arc::new(SyncMutex::new(SessionHead { - session_id, - head_hash: None, - })), + session_state: SessionState::new(session_id, 0), pwd, scope: SharedScope::new(scope), hook_builder: HookRegistryBuilder::new(), @@ -511,9 +542,10 @@ impl Pod { &self.prompts } - /// The session ID used for persistence. + /// The session ID used for persistence. Read lock-free from the + /// shared session pointer so fork-time swaps are observed immediately. pub fn session_id(&self) -> SessionId { - self.session_id + self.session_state.session_id() } /// The Pod's manifest. @@ -567,12 +599,12 @@ impl Pod { } /// Snapshot the current runtime scope in the session log. The entry - /// is intentionally appended as soon as a session head exists: if the + /// is intentionally appended as soon as a session log exists: if the /// process later exits while children keep their allocations, resume /// can restore the narrowed scope instead of reclaiming delegated /// writes. pub fn persist_scope_snapshot(&mut self) -> Result<(), StoreError> { - if self.session_head.lock().head_hash.is_none() { + if self.session_state.entries_written() == 0 { return Ok(()); } let snapshot = { @@ -588,23 +620,18 @@ impl Pod { domain: session_store::POD_SCOPE_EXTENSION_DOMAIN.into(), payload, }) - .map(|_| ()) } /// Append `entry` to the session log AND publish it through the - /// broadcast sink. Holds the session-head sync lock across the - /// disk write and the sink publish so subscribers see a gap-free - /// `(snapshot, live)` stream consistent with what's on disk. - pub(crate) fn commit_entry(&self, entry: LogEntry) -> Result { - let mut head = self.session_head.lock(); - let hash = session_store::append_entry_with_hash( - &self.store, - head.session_id, - &mut head.head_hash, - entry.clone(), - )?; + /// broadcast sink. No user-space serialization is needed across + /// concurrent appenders — the kernel orders `O_APPEND` writes for + /// lines smaller than `PIPE_BUF`. + pub(crate) fn commit_entry(&self, entry: LogEntry) -> Result<(), StoreError> { + let session_id = self.session_state.session_id(); + self.store.append(session_id, &entry)?; + self.session_state.increment_entries(); self.sink.publish(entry); - Ok(hash) + Ok(()) } /// Cloneable sink handle. Exposed to the controller so the IPC @@ -1160,7 +1187,6 @@ impl Pod { // IDLE → active marker. Commits first so the next UserInput entry // is contained inside this Invoke range. See `tickets/invoke-turn-llmcall-semantics.md`. - self.session_id = self.session_head.lock().session_id; self.commit_entry(LogEntry::Invoke { ts: session_log::now_millis(), trigger: protocol::InvokeKind::UserSend, @@ -1350,7 +1376,7 @@ impl Pod { return; }; if let Err(err) = - memory::append_use_event(layout, self.session_id.to_string(), source, records) + memory::append_use_event(layout, self.session_id().to_string(), source, records) { warn!(error = %err, "failed to append memory usage event"); } @@ -1361,7 +1387,7 @@ impl Pod { return; }; if let Err(err) = - memory::append_resident_exposure_event(layout, self.session_id.to_string(), records) + memory::append_resident_exposure_event(layout, self.session_id().to_string(), records) { warn!(error = %err, "failed to append resident exposure event"); } @@ -1551,7 +1577,6 @@ impl Pod { // IDLE → active marker for the buffered notification / pod-event // drain. The trailing SystemItem entries (drained by the // PodInterceptor) carry the actual payload. - self.session_id = self.session_head.lock().session_id; self.commit_entry(LogEntry::Invoke { ts: session_log::now_millis(), trigger: kind, @@ -1582,23 +1607,20 @@ impl Pod { self.handle_worker_result(result, history_before).await } - /// Ensure the session exists and its head still matches ours. + /// Ensure the session exists and the writer's tally still matches + /// the on-disk entry count. /// /// On the first call for a Pod built via `from_manifest`, the session /// has not been written to the store yet — this is when we append the /// initial `SessionStart` entry, carrying the system prompt that /// `ensure_system_prompt_materialized` has just rendered. Subsequent - /// calls fall through to `ensure_head_or_fork`, which auto-forks when - /// another writer has advanced the store head behind our back. + /// calls fall through to entry-count comparison, which auto-forks + /// when another writer has appended behind our back. fn ensure_session_head(&mut self) -> Result<(), PodError> { let w = self.worker.as_ref().unwrap(); - let prev_session_id; - let initial_state = { - let head = self.session_head.lock(); - prev_session_id = head.session_id; - head.head_hash.is_none() - }; - if initial_state { + let prev_session_id = self.session_state.session_id(); + let entries_written = self.session_state.entries_written(); + if entries_written == 0 { let initial = LogEntry::SessionStart { ts: session_log::now_millis(), system_prompt: w.get_system_prompt().map(String::from), @@ -1611,13 +1633,12 @@ impl Pod { self.persist_scope_snapshot()?; return Ok(()); } - // Check store head + auto-fork if it drifted. - let store_head = self + // Check store count + auto-fork if it drifted. + let store_count = self .store - .read_head_hash(prev_session_id) + .read_entry_count(prev_session_id) .map_err(PodError::from)?; - let mut head = self.session_head.lock(); - if store_head == head.head_hash { + if store_count == entries_written { return Ok(()); } // Fork: mint a fresh session and switch to it. The new @@ -1632,20 +1653,12 @@ impl Pod { forked_from: None, compacted_from: None, }; - let hash = session_log::compute_hash(None, &entry); - let hashed = HashedEntry { - hash: hash.clone(), - prev_hash: None, - entry: entry.clone(), - }; self.store - .create_session(fork_id, &[hashed]) + .create_session(fork_id, &[entry.clone()]) .map_err(PodError::from)?; - head.session_id = fork_id; - head.head_hash = Some(hash); - self.session_id = fork_id; + self.session_state.set_session_id(fork_id); + self.session_state.set_entries_written(1); self.sink.reset_with_initial(entry); - drop(head); if self.scope_allocation.is_some() { pod_registry::update_session(&self.manifest.pod.name, fork_id)?; } @@ -1796,7 +1809,6 @@ impl Pod { // the callback fall through this branch: they classify the // slice from `history_before` inline so the test's // `restore`-style assertions still see entries on disk. - self.session_id = self.session_head.lock().session_id; if !self.history_persistence_wired { let new_items: Vec = self.worker.as_ref().unwrap().history()[history_before..] .iter() @@ -1989,7 +2001,7 @@ impl Pod { .compact_system() .map_err(PodError::PromptCatalog)?; let mut summary_worker = Worker::new(summary_client).system_prompt(summary_system_prompt); - summary_worker.set_cache_key(Some(self.session_id.to_string())); + summary_worker.set_cache_key(Some(self.session_id().to_string())); // Occupancy-based input-token meter + interceptor. The tracker pairs // each pre-request history length with the following UsageEvent, then @@ -2133,37 +2145,24 @@ impl Pod { // the broadcast sink so existing subscribers see the new // `SessionStart { compacted_from }` and reset their view. let new_session_id = session_store::new_session_id(); - let session_start = { - let mut head = self.session_head.lock(); - let old_session_id = head.session_id; - let old_head_hash = head - .head_hash - .clone() - .expect("head_hash should be set after at least one entry"); - let w = self.worker.as_ref().unwrap(); - let entry = LogEntry::SessionStart { - ts: session_log::now_millis(), - system_prompt: w.get_system_prompt().map(String::from), - config: w.request_config().clone(), - history: to_logged(&new_history), - forked_from: None, - compacted_from: Some(session_store::SessionOrigin { - session_id: old_session_id, - at_hash: old_head_hash, - }), - }; - let hash = session_log::compute_hash(None, &entry); - let hashed = HashedEntry { - hash: hash.clone(), - prev_hash: None, - entry: entry.clone(), - }; - self.store.create_session(new_session_id, &[hashed])?; - head.session_id = new_session_id; - head.head_hash = Some(hash); - self.session_id = new_session_id; - entry + let old_session_id = self.session_state.session_id(); + let source_turn_count = self.worker.as_ref().unwrap().turn_count(); + let w = self.worker.as_ref().unwrap(); + let entry = LogEntry::SessionStart { + ts: session_log::now_millis(), + system_prompt: w.get_system_prompt().map(String::from), + config: w.request_config().clone(), + history: to_logged(&new_history), + forked_from: None, + compacted_from: Some(session_store::SessionOrigin { + session_id: old_session_id, + at_turn_index: source_turn_count, + }), }; + self.store.create_session(new_session_id, &[entry.clone()])?; + self.session_state.set_session_id(new_session_id); + self.session_state.set_entries_written(1); + let session_start = entry; // Broadcast the SessionStart through the sink. This atomically // resets the mirror to `[SessionStart]` so any subscriber // querying after this point sees the post-compaction prefix. @@ -2368,7 +2367,7 @@ impl Pod { // Read the session log to get the current entry count. This is // the boundary for the source.range end_entry. Called once per // extract, on a small local file. - let entries_now = self.store.read_all(self.session_id)?.len(); + let entries_now = self.store.read_all(self.session_id())?.len(); if entries_now == 0 { return Ok(ExtractDecision::Skipped); } @@ -2400,7 +2399,7 @@ impl Pod { .memory_extract_system(memory_language) .map_err(PodError::PromptCatalog)?; let mut extract_worker = Worker::new(client).system_prompt(extract_system_prompt); - extract_worker.set_cache_key(Some(self.session_id.to_string())); + extract_worker.set_cache_key(Some(self.session_id().to_string())); // Occupancy-based input-token meter + interceptor. The tracker pairs // each pre-request history length with the following UsageEvent, then @@ -2436,7 +2435,7 @@ impl Pod { extract::ExtractedPayload::default() }); - let source_session_id = self.session_head.lock().session_id; + let source_session_id = self.session_state.session_id(); let staging_id = if payload.is_empty() { String::new() } else { @@ -2460,9 +2459,7 @@ impl Pod { ts: session_log::now_millis(), domain: extract::EXTRACT_DOMAIN.into(), payload: payload_value, - }) - ?; - self.session_id = self.session_head.lock().session_id; + })?; *self .extract_pointer @@ -2601,7 +2598,7 @@ impl Pod { } }; let mut worker = Worker::new(client).system_prompt(consolidation_system_prompt); - worker.set_cache_key(Some(self.session_id.to_string())); + worker.set_cache_key(Some(self.session_id().to_string())); // Memory tools are self-contained — they bypass ScopedFs and write // directly under the workspace via WorkspaceLayout. Resident @@ -2613,7 +2610,7 @@ impl Pod { let query_cfg = memory::tool::QueryConfig::from(memory_cfg); worker.register_tool(memory::tool::read_tool_with_usage( layout.clone(), - self.session_id.to_string(), + self.session_id().to_string(), )); worker.register_tool(memory::tool::write_tool(layout.clone())); worker.register_tool(memory::tool::edit_tool(layout.clone())); @@ -2768,11 +2765,7 @@ impl Pod, St> { manifest, worker: Some(worker), store, - session_id, - session_head: Arc::new(SyncMutex::new(SessionHead { - session_id, - head_hash: None, - })), + session_state: SessionState::new(session_id, 0), pwd: common.pwd, scope: SharedScope::new(common.scope), hook_builder: HookRegistryBuilder::new(), @@ -2842,11 +2835,7 @@ impl Pod, St> { manifest, worker: Some(worker), store, - session_id, - session_head: Arc::new(SyncMutex::new(SessionHead { - session_id, - head_hash: None, - })), + session_state: SessionState::new(session_id, 0), pwd: common.pwd, scope: SharedScope::new(common.scope), hook_builder: HookRegistryBuilder::new(), @@ -2913,10 +2902,10 @@ impl Pod, St> { // sits on disk. let raw_entries = store.read_all(session_id)?; let state = session_store::collect_state(&raw_entries); - if state.head_hash.is_none() { + if state.entries_count == 0 { return Err(PodError::SessionEmpty { session_id }); } - let mirror_entries: Vec = raw_entries.iter().map(|e| e.entry.clone()).collect(); + let mirror_entries: Vec = raw_entries.clone(); let scope_snapshot = state .pod_scope .clone() @@ -2985,11 +2974,7 @@ impl Pod, St> { manifest, worker: Some(worker), store, - session_id, - session_head: Arc::new(SyncMutex::new(SessionHead { - session_id, - head_hash: state.head_hash, - })), + session_state: SessionState::new(session_id, state.entries_count), pwd: common.pwd, scope: SharedScope::new(common.scope), hook_builder: HookRegistryBuilder::new(), diff --git a/crates/pod/tests/session_metrics_test.rs b/crates/pod/tests/session_metrics_test.rs index 9e1a4d90..e72ea9f0 100644 --- a/crates/pod/tests/session_metrics_test.rs +++ b/crates/pod/tests/session_metrics_test.rs @@ -26,9 +26,7 @@ use llm_worker::llm_client::event::{Event as LlmEvent, ResponseStatus, StatusEve use llm_worker::llm_client::{ClientError, LlmClient, Request}; use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput}; use session_metrics::{DOMAIN, Metric, metrics_from_extensions}; -use session_store::{ - EntryHash, FsStore, HashedEntry, LogEntry, SessionId, Store, StoreError, TraceEntry, -}; +use session_store::{FsStore, LogEntry, SessionId, Store, StoreError, TraceEntry}; use pod::{Pod, PodManifest}; @@ -329,32 +327,28 @@ struct MetricFailingStore { } impl Store for MetricFailingStore { - fn append(&self, id: SessionId, entry: &HashedEntry) -> Result<(), StoreError> { - if let LogEntry::Extension { domain, .. } = &entry.entry { + fn append(&self, id: SessionId, entry: &LogEntry) -> Result<(), StoreError> { + if let LogEntry::Extension { domain, .. } = entry { if domain == DOMAIN { return Err(StoreError::Io(std::io::Error::other("synthetic failure"))); } } self.inner.append(id, entry) } - fn read_all(&self, id: SessionId) -> Result, StoreError> { + fn read_all(&self, id: SessionId) -> Result, StoreError> { self.inner.read_all(id) } fn list_sessions(&self) -> Result, StoreError> { self.inner.list_sessions() } - fn create_session( - &self, - id: SessionId, - entries: &[HashedEntry], - ) -> Result<(), StoreError> { + fn create_session(&self, id: SessionId, entries: &[LogEntry]) -> Result<(), StoreError> { self.inner.create_session(id, entries) } fn exists(&self, id: SessionId) -> Result { self.inner.exists(id) } - fn read_head_hash(&self, id: SessionId) -> Result, StoreError> { - self.inner.read_head_hash(id) + fn read_entry_count(&self, id: SessionId) -> Result { + self.inner.read_entry_count(id) } fn append_trace(&self, id: SessionId, entry: &TraceEntry) -> Result<(), StoreError> { self.inner.append_trace(id, entry) diff --git a/crates/pod/tests/system_prompt_template_test.rs b/crates/pod/tests/system_prompt_template_test.rs index 04789f79..cb6bca4f 100644 --- a/crates/pod/tests/system_prompt_template_test.rs +++ b/crates/pod/tests/system_prompt_template_test.rs @@ -184,7 +184,7 @@ async fn session_start_state_captures_rendered_prompt() { let entries = pod.store().read_all(pod.session_id()).unwrap(); let first = entries.first().expect("at least one entry"); - match &first.entry { + match first { LogEntry::SessionStart { system_prompt, .. } => { let sp = system_prompt.as_deref().expect("system prompt set"); assert!(sp.starts_with("hello cwd=")); diff --git a/crates/session-metrics/src/lib.rs b/crates/session-metrics/src/lib.rs index 6fd8d481..0600c597 100644 --- a/crates/session-metrics/src/lib.rs +++ b/crates/session-metrics/src/lib.rs @@ -18,7 +18,7 @@ use std::collections::BTreeMap; use serde::{Deserialize, Serialize}; -use session_store::{EntryHash, SessionId, Store, StoreError, save_extension, session_log}; +use session_store::{SessionId, Store, StoreError, save_extension, session_log}; /// Domain tag used in `LogEntry::Extension` for all metrics records. pub const DOMAIN: &str = "metrics"; @@ -78,11 +78,10 @@ impl Metric { pub fn record_metric( store: &impl Store, session_id: SessionId, - head_hash: &mut Option, metric: &Metric, ) -> Result<(), StoreError> { let payload = serde_json::to_value(metric).expect("Metric serialization cannot fail"); - save_extension(store, session_id, head_hash, DOMAIN, payload) + save_extension(store, session_id, DOMAIN, payload) } /// `RestoredState.extensions` から metrics domain の payload を順に取り出し、 diff --git a/crates/session-store/Cargo.toml b/crates/session-store/Cargo.toml index 42a93e2f..e9f4da8f 100644 --- a/crates/session-store/Cargo.toml +++ b/crates/session-store/Cargo.toml @@ -11,8 +11,6 @@ serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } uuid = { workspace = true, features = ["v7", "serde"] } thiserror = { workspace = true } -sha2 = { workspace = true } -hex = "0.4.3" protocol = { workspace = true } tracing.workspace = true diff --git a/crates/session-store/src/fs_store.rs b/crates/session-store/src/fs_store.rs index f3e07b0e..0d04db74 100644 --- a/crates/session-store/src/fs_store.rs +++ b/crates/session-store/src/fs_store.rs @@ -6,7 +6,7 @@ use crate::SessionId; use crate::event_trace::TraceEntry; -use crate::session_log::{EntryHash, HashedEntry}; +use crate::session_log::LogEntry; use crate::store::{Store, StoreError}; use std::fs; use std::io::Write; @@ -65,12 +65,12 @@ impl FsStore { } impl Store for FsStore { - fn append(&self, id: SessionId, entry: &HashedEntry) -> Result<(), StoreError> { + fn append(&self, id: SessionId, entry: &LogEntry) -> Result<(), StoreError> { let line = serde_json::to_string(entry)?; self.append_line(&self.log_path(id), &line) } - fn read_all(&self, id: SessionId) -> Result, StoreError> { + fn read_all(&self, id: SessionId) -> Result, StoreError> { let path = self.log_path(id); if !path.exists() { return Err(StoreError::NotFound(id)); @@ -98,7 +98,7 @@ impl Store for FsStore { Ok(sessions) } - fn create_session(&self, id: SessionId, entries: &[HashedEntry]) -> Result<(), StoreError> { + fn create_session(&self, id: SessionId, entries: &[LogEntry]) -> Result<(), StoreError> { let path = self.log_path(id); let mut content = String::new(); for entry in entries { @@ -113,24 +113,13 @@ impl Store for FsStore { Ok(self.log_path(id).exists()) } - fn read_head_hash(&self, id: SessionId) -> Result, StoreError> { + fn read_entry_count(&self, id: SessionId) -> Result { let path = self.log_path(id); if !path.exists() { return Err(StoreError::NotFound(id)); } let content = fs::read_to_string(&path)?; - let last_line = content.lines().rev().find(|l| !l.trim().is_empty()); - match last_line { - Some(line) => { - let entry: HashedEntry = - serde_json::from_str(line).map_err(|e| StoreError::Corrupt { - line: content.lines().count(), - message: e.to_string(), - })?; - Ok(Some(entry.hash)) - } - None => Ok(None), - } + Ok(content.lines().filter(|l| !l.trim().is_empty()).count()) } fn append_trace(&self, id: SessionId, entry: &TraceEntry) -> Result<(), StoreError> { diff --git a/crates/session-store/src/lib.rs b/crates/session-store/src/lib.rs index a8cfcb0c..9979c691 100644 --- a/crates/session-store/src/lib.rs +++ b/crates/session-store/src/lib.rs @@ -40,15 +40,14 @@ pub use llm_worker::UsageRecord; pub use llm_worker::llm_client::types::{ContentPart, Item, Role}; pub use logged_item::{LoggedContentPart, LoggedItem, LoggedRole, from_logged, to_logged}; pub use session::{ - SessionStartState, append_entry, append_entry_with_hash, append_system_item, - classify_history_item, create_compacted_session, create_session, create_session_with_id, - ensure_head_or_fork, fork, fork_at, restore, save_config_changed, save_delta, save_extension, - save_pod_scope, save_run_completed, save_run_errored, save_turn_end, save_usage, - save_user_input, + SessionStartState, append_entry, append_system_item, classify_history_item, + create_compacted_session, create_session, create_session_with_id, ensure_head_or_fork, fork, + fork_at, restore, save_config_changed, save_delta, save_extension, save_pod_scope, + save_run_completed, save_run_errored, save_turn_end, save_usage, save_user_input, }; pub use session_log::{ - EntryHash, HashedEntry, LogEntry, POD_SCOPE_EXTENSION_DOMAIN, PodScopeSnapshot, RestoredState, - SessionOrigin, build_chain, collect_state, compute_hash, + LogEntry, POD_SCOPE_EXTENSION_DOMAIN, PodScopeSnapshot, RestoredState, SessionOrigin, + collect_state, }; pub use system_item::{SystemItem, render_pod_event}; pub use store::{Store, StoreError}; diff --git a/crates/session-store/src/session.rs b/crates/session-store/src/session.rs index 2c472335..2bad2a08 100644 --- a/crates/session-store/src/session.rs +++ b/crates/session-store/src/session.rs @@ -6,7 +6,7 @@ use crate::SessionId; use crate::logged_item::{LoggedItem, to_logged}; -use crate::session_log::{self, EntryHash, HashedEntry, LogEntry, PodScopeSnapshot, SessionOrigin}; +use crate::session_log::{self, LogEntry, PodScopeSnapshot, SessionOrigin}; use crate::store::{Store, StoreError}; use crate::system_item::SystemItem; use llm_worker::WorkerResult; @@ -22,27 +22,25 @@ pub struct SessionStartState<'a> { } /// Create a new session, writing the initial `SessionStart` entry. -/// -/// Returns the new session ID and head hash. pub fn create_session( store: &impl Store, state: SessionStartState<'_>, -) -> Result<(SessionId, EntryHash), StoreError> { +) -> Result { let session_id = crate::new_session_id(); - let hash = create_session_with_id(store, session_id, state)?; - Ok((session_id, hash)) + create_session_with_id(store, session_id, state)?; + Ok(session_id) } /// Write a fresh `SessionStart` entry using a pre-generated session ID. /// /// Used by callers that need to reserve a session ID synchronously but /// defer the initial log append (e.g. Pod, which resolves a templated -/// system prompt only at first turn). Returns the resulting head hash. +/// system prompt only at first turn). pub fn create_session_with_id( store: &impl Store, session_id: SessionId, state: SessionStartState<'_>, -) -> Result { +) -> Result<(), StoreError> { let entry = LogEntry::SessionStart { ts: session_log::now_millis(), system_prompt: state.system_prompt.map(String::from), @@ -51,26 +49,20 @@ pub fn create_session_with_id( forked_from: None, compacted_from: None, }; - let hash = session_log::compute_hash(None, &entry); - let hashed_entry = HashedEntry { - hash: hash.clone(), - prev_hash: None, - entry, - }; - store.append(session_id, &hashed_entry)?; - Ok(hash) + store.append(session_id, &entry) } /// Create a compacted session from an existing one. /// -/// Records `compacted_from` provenance linking back to the source session. -/// Returns the new session ID and head hash. +/// Records `compacted_from` provenance linking back to the source session +/// at the turn boundary captured by `source_turn_count` (the most recent +/// completed turn in the source). pub fn create_compacted_session( store: &impl Store, state: SessionStartState<'_>, source_session_id: SessionId, - source_head_hash: EntryHash, -) -> Result<(SessionId, EntryHash), StoreError> { + source_turn_count: usize, +) -> Result { let session_id = crate::new_session_id(); let entry = LogEntry::SessionStart { ts: session_log::now_millis(), @@ -80,17 +72,11 @@ pub fn create_compacted_session( forked_from: None, compacted_from: Some(SessionOrigin { session_id: source_session_id, - at_hash: source_head_hash, + at_turn_index: source_turn_count, }), }; - let hash = session_log::compute_hash(None, &entry); - let hashed_entry = HashedEntry { - hash: hash.clone(), - prev_hash: None, - entry, - }; - store.append(session_id, &hashed_entry)?; - Ok((session_id, hash)) + store.append(session_id, &entry)?; + Ok(session_id) } /// Restore session state from a stored log. @@ -105,18 +91,18 @@ pub fn restore( Ok(session_log::collect_state(&entries)) } -/// Check if the store's head still matches the expected head hash. +/// Check if the store's entry count still matches the writer's tally. /// If not, auto-fork into a new session. /// -/// Updates `session_id` and `head_hash` in place when a fork occurs. +/// Updates `session_id` and `entries_written` in place when a fork occurs. pub fn ensure_head_or_fork( store: &impl Store, session_id: &mut SessionId, - head_hash: &mut Option, + entries_written: &mut usize, state: SessionStartState<'_>, ) -> Result<(), StoreError> { - let store_head = store.read_head_hash(*session_id)?; - if store_head == *head_hash { + let store_count = store.read_entry_count(*session_id)?; + if store_count == *entries_written { return Ok(()); } let fork_id = crate::new_session_id(); @@ -128,15 +114,9 @@ pub fn ensure_head_or_fork( forked_from: None, compacted_from: None, }; - let hash = session_log::compute_hash(None, &entry); - let hashed_entry = HashedEntry { - hash: hash.clone(), - prev_hash: None, - entry, - }; - store.create_session(fork_id, &[hashed_entry])?; + store.create_session(fork_id, &[entry])?; *session_id = fork_id; - *head_hash = Some(hash); + *entries_written = 1; Ok(()) } @@ -149,13 +129,11 @@ pub fn ensure_head_or_fork( pub fn save_user_input( store: &impl Store, session_id: SessionId, - head_hash: &mut Option, segments: Vec, ) -> Result<(), StoreError> { append_entry( store, session_id, - head_hash, LogEntry::UserInput { ts: session_log::now_millis(), segments, @@ -174,7 +152,6 @@ pub fn save_user_input( pub fn save_delta( store: &impl Store, session_id: SessionId, - head_hash: &mut Option, new_items: &[Item], ) -> Result<(), StoreError> { if new_items.is_empty() { @@ -188,7 +165,7 @@ pub fn save_delta( continue; } let entry = classify_history_item(item, ts); - append_entry(store, session_id, head_hash, entry)?; + append_entry(store, session_id, entry)?; } Ok(()) } @@ -223,13 +200,11 @@ pub fn classify_history_item(item: &Item, ts: u64) -> LogEntry { pub fn append_system_item( store: &impl Store, session_id: SessionId, - head_hash: &mut Option, item: SystemItem, -) -> Result { - append_entry_with_hash( +) -> Result<(), StoreError> { + append_entry( store, session_id, - head_hash, LogEntry::SystemItem { ts: session_log::now_millis(), item, @@ -241,13 +216,11 @@ pub fn append_system_item( pub fn save_turn_end( store: &impl Store, session_id: SessionId, - head_hash: &mut Option, turn_count: usize, ) -> Result<(), StoreError> { append_entry( store, session_id, - head_hash, LogEntry::TurnEnd { ts: session_log::now_millis(), turn_count, @@ -259,14 +232,12 @@ pub fn save_turn_end( pub fn save_run_completed( store: &impl Store, session_id: SessionId, - head_hash: &mut Option, result: WorkerResult, interrupted: bool, ) -> Result<(), StoreError> { append_entry( store, session_id, - head_hash, LogEntry::RunCompleted { ts: session_log::now_millis(), interrupted, @@ -282,14 +253,12 @@ pub fn save_run_completed( pub fn save_run_errored( store: &impl Store, session_id: SessionId, - head_hash: &mut Option, message: String, interrupted: bool, ) -> Result<(), StoreError> { append_entry( store, session_id, - head_hash, LogEntry::RunErrored { ts: session_log::now_millis(), interrupted, @@ -307,7 +276,6 @@ pub fn save_run_errored( pub fn save_usage( store: &impl Store, session_id: SessionId, - head_hash: &mut Option, history_len: usize, input_total_tokens: u64, cache_read_tokens: u64, @@ -317,7 +285,6 @@ pub fn save_usage( append_entry( store, session_id, - head_hash, LogEntry::LlmUsage { ts: session_log::now_millis(), history_len, @@ -337,14 +304,12 @@ pub fn save_usage( pub fn save_extension( store: &impl Store, session_id: SessionId, - head_hash: &mut Option, domain: impl Into, payload: serde_json::Value, ) -> Result<(), StoreError> { append_entry( store, session_id, - head_hash, LogEntry::Extension { ts: session_log::now_millis(), domain: domain.into(), @@ -357,14 +322,12 @@ pub fn save_extension( pub fn save_pod_scope( store: &impl Store, session_id: SessionId, - head_hash: &mut Option, snapshot: &PodScopeSnapshot, ) -> Result<(), StoreError> { let payload = serde_json::to_value(snapshot)?; save_extension( store, session_id, - head_hash, session_log::POD_SCOPE_EXTENSION_DOMAIN, payload, ) @@ -374,13 +337,11 @@ pub fn save_pod_scope( pub fn save_config_changed( store: &impl Store, session_id: SessionId, - head_hash: &mut Option, config: &RequestConfig, ) -> Result<(), StoreError> { append_entry( store, session_id, - head_hash, LogEntry::ConfigChanged { ts: session_log::now_millis(), config: config.clone(), @@ -399,28 +360,36 @@ pub fn fork(store: &impl Store, state: SessionStartState<'_>) -> Result Result { let entries = store.read_all(source_id)?; - let cut = entries - .iter() - .position(|e| &e.hash == at_hash) - .map(|i| i + 1) - .unwrap_or(entries.len()); + let cut = if at_turn_index == 0 { + // Branch directly after the SessionStart (or whatever opens the + // segment), before any turn completes. + entries + .iter() + .position(|e| !matches!(e, LogEntry::SessionStart { .. })) + .unwrap_or(entries.len()) + } else { + entries + .iter() + .position(|e| matches!(e, LogEntry::TurnEnd { turn_count, .. } if *turn_count == at_turn_index)) + .map(|i| i + 1) + .unwrap_or(entries.len()) + }; let state = session_log::collect_state(&entries[..cut]); let fork_id = crate::new_session_id(); @@ -429,23 +398,17 @@ pub fn fork_at( system_prompt: state.system_prompt, config: state.config, history: to_logged(&state.history), - forked_from: Some(session_log::SessionOrigin { + forked_from: Some(SessionOrigin { session_id: source_id, - at_hash: at_hash.clone(), + at_turn_index, }), compacted_from: None, }; - let hash = session_log::compute_hash(None, &entry); - let hashed_entry = HashedEntry { - hash, - prev_hash: None, - entry, - }; - store.create_session(fork_id, &[hashed_entry])?; + store.create_session(fork_id, &[entry])?; Ok(fork_id) } -/// Append a single `LogEntry`, chaining the hash and updating `head_hash`. +/// Append a single `LogEntry`. /// /// Lower-level dual of the `save_*` convenience wrappers in this module. /// Use when the caller already builds the typed entry itself (e.g. when @@ -453,30 +416,7 @@ pub fn fork_at( pub fn append_entry( store: &impl Store, session_id: SessionId, - head_hash: &mut Option, entry: LogEntry, ) -> Result<(), StoreError> { - append_entry_with_hash(store, session_id, head_hash, entry)?; - Ok(()) -} - -/// Same as [`append_entry`] but returns the freshly computed entry hash. -/// -/// Used by paths that need the hash for downstream broadcast or mirror -/// updates (e.g. the Pod's `SessionLogSink`). -pub fn append_entry_with_hash( - store: &impl Store, - session_id: SessionId, - head_hash: &mut Option, - entry: LogEntry, -) -> Result { - let hash = session_log::compute_hash(head_hash.as_ref(), &entry); - let hashed_entry = HashedEntry { - hash: hash.clone(), - prev_hash: head_hash.clone(), - entry, - }; - store.append(session_id, &hashed_entry)?; - *head_hash = Some(hash.clone()); - Ok(hash) + store.append(session_id, &entry) } diff --git a/crates/session-store/src/session_log.rs b/crates/session-store/src/session_log.rs index ee391664..da7e4495 100644 --- a/crates/session-store/src/session_log.rs +++ b/crates/session-store/src/session_log.rs @@ -4,89 +4,18 @@ //! serialized as one line in a `.jsonl` file. Reading all entries and //! collecting them via [`collect_state`] reconstructs the full [`Worker`] state. //! -//! Entries are chained via [`EntryHash`]: each [`HashedEntry`] records the hash -//! of the previous entry, forming a tamper-evident append-only chain. This -//! enables safe fork detection when multiple writers share a session. +//! The on-disk format is one `LogEntry` per line — entries are positionally +//! ordered. Fork lineage references between segments use turn-number indices +//! (`SessionOrigin.at_turn_index`) rather than per-entry hashes. use llm_worker::llm_client::types::{Item, RequestConfig}; use llm_worker::{UsageRecord, WorkerResult}; use protocol::{InvokeKind, ScopeRule, Segment}; use serde::{Deserialize, Serialize}; -use sha2::{Digest, Sha256}; use crate::logged_item::LoggedItem; use crate::system_item::SystemItem; -/// SHA-256 hash identifying a specific log entry in the chain. -/// -/// Computed as `sha256(prev_hash_bytes || canonical_json(entry))`. -/// Displayed and serialized as a lowercase hex string. -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct EntryHash([u8; 32]); - -impl EntryHash { - pub fn as_bytes(&self) -> &[u8; 32] { - &self.0 - } - - pub fn to_hex(&self) -> String { - hex::encode(self.0) - } - - pub fn from_hex(s: &str) -> Result { - let mut buf = [0u8; 32]; - hex::decode_to_slice(s, &mut buf)?; - Ok(Self(buf)) - } -} - -impl std::fmt::Display for EntryHash { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.write_str(&self.to_hex()) - } -} - -impl Serialize for EntryHash { - fn serialize(&self, serializer: S) -> Result { - serializer.serialize_str(&self.to_hex()) - } -} - -impl<'de> Deserialize<'de> for EntryHash { - fn deserialize>(deserializer: D) -> Result { - let s = String::deserialize(deserializer)?; - Self::from_hex(&s).map_err(serde::de::Error::custom) - } -} - -/// Compute the hash for a log entry given its predecessor's hash. -pub fn compute_hash(prev: Option<&EntryHash>, entry: &LogEntry) -> EntryHash { - let mut hasher = Sha256::new(); - - // Feed prev_hash bytes (32 zero bytes if None). - match prev { - Some(h) => hasher.update(h.as_bytes()), - None => hasher.update([0u8; 32]), - } - - // Canonical JSON of the entry. - let json = serde_json::to_string(entry).expect("LogEntry serialization cannot fail"); - hasher.update(json.as_bytes()); - - EntryHash(hasher.finalize().into()) -} - -/// A [`LogEntry`] with hash-chain metadata. -/// -/// This is the unit persisted to JSONL — one line per `HashedEntry`. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct HashedEntry { - pub hash: EntryHash, - pub prev_hash: Option, - #[serde(flatten)] - pub entry: LogEntry, -} - /// A single session log entry, serialized as one JSONL line. /// /// Variants correspond to specific mutation points in `Worker`: @@ -110,10 +39,10 @@ pub enum LogEntry { system_prompt: Option, config: RequestConfig, history: Vec, - /// Origin: forked from another session at a specific entry. + /// Origin: forked from another session at a specific turn boundary. #[serde(default, skip_serializing_if = "Option::is_none")] forked_from: Option, - /// Origin: compacted from another session at a specific entry. + /// Origin: compacted from another session at a specific turn boundary. #[serde(default, skip_serializing_if = "Option::is_none")] compacted_from: Option, }, @@ -235,13 +164,16 @@ pub enum LogEntry { }, } -/// Provenance reference to a parent session. +/// Provenance reference to a parent segment. +/// +/// `at_turn_index` is the `turn_count` value of the most recent +/// `TurnEnd` entry preceding the split point in the source segment. +/// A value of `0` means the split happened before any turn completed +/// (e.g. immediately after `SessionStart`). #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct SessionOrigin { - /// Session ID of the source session. pub session_id: crate::SessionId, - /// Hash of the entry in the source session at the point of fork/compact. - pub at_hash: EntryHash, + pub at_turn_index: usize, } /// Domain used by Pod to persist its latest effective runtime scope. @@ -262,8 +194,10 @@ pub struct RestoredState { pub history: Vec, pub turn_count: usize, pub last_run_interrupted: bool, - /// Hash of the last entry in the chain (None if empty). - pub head_hash: Option, + /// Number of entries replayed. `0` means the session log was empty. + /// Writers track their own append count via the same counter so + /// `ensure_head_or_fork` can compare it with the on-disk count. + pub entries_count: usize, /// LLM リクエストごとの Usage スナップショット時系列。 /// `LogEntry::LlmUsage` を replay して時系列順に積まれる。 /// 任意位置のトークン数推定に使う。 @@ -283,25 +217,25 @@ pub struct RestoredState { pub user_segments: Vec>, } -/// Replay a sequence of hashed entries to reconstruct worker state. -pub fn collect_state(entries: &[HashedEntry]) -> RestoredState { +/// Replay a sequence of log entries to reconstruct worker state. +pub fn collect_state(entries: &[LogEntry]) -> RestoredState { let mut state = RestoredState { system_prompt: None, config: RequestConfig::default(), history: Vec::new(), turn_count: 0, last_run_interrupted: false, - head_hash: None, + entries_count: 0, usage_history: Vec::new(), extensions: Vec::new(), pod_scope: None, user_segments: Vec::new(), }; - for hashed in entries { - state.head_hash = Some(hashed.hash.clone()); + for entry in entries { + state.entries_count += 1; - match &hashed.entry { + match entry { LogEntry::SessionStart { system_prompt, config, @@ -403,26 +337,6 @@ pub fn now_millis() -> u64 { .as_millis() as u64 } -/// Build a hash chain from plain `LogEntry` values. -/// -/// Useful for tests and for seeding new sessions from a list of entries. -pub fn build_chain(entries: &[LogEntry]) -> Vec { - let mut chain = Vec::with_capacity(entries.len()); - let mut prev: Option = None; - - for entry in entries { - let hash = compute_hash(prev.as_ref(), entry); - chain.push(HashedEntry { - hash: hash.clone(), - prev_hash: prev, - entry: entry.clone(), - }); - prev = Some(hash); - } - - chain -} - #[cfg(test)] mod tests { use super::*; @@ -432,12 +346,12 @@ mod tests { let state = collect_state(&[]); assert!(state.history.is_empty()); assert_eq!(state.turn_count, 0); - assert!(state.head_hash.is_none()); + assert_eq!(state.entries_count, 0); } #[test] fn replay_session_start_sets_initial_state() { - let entries = build_chain(&[LogEntry::SessionStart { + let state = collect_state(&[LogEntry::SessionStart { ts: 1000, system_prompt: Some("You are helpful.".into()), config: RequestConfig::default().with_max_tokens(1024), @@ -445,16 +359,15 @@ mod tests { forked_from: None, compacted_from: None, }]); - let state = collect_state(&entries); assert_eq!(state.system_prompt.as_deref(), Some("You are helpful.")); assert_eq!(state.config.max_tokens, Some(1024)); assert_eq!(state.history.len(), 1); - assert!(state.head_hash.is_some()); + assert_eq!(state.entries_count, 1); } #[test] fn replay_full_turn() { - let entries = build_chain(&[ + let state = collect_state(&[ LogEntry::SessionStart { ts: 1000, system_prompt: None, @@ -481,7 +394,6 @@ mod tests { result: WorkerResult::Finished, }, ]); - let state = collect_state(&entries); assert_eq!(state.history.len(), 2); assert_eq!(state.turn_count, 1); assert!(!state.last_run_interrupted); @@ -489,7 +401,7 @@ mod tests { #[test] fn replay_with_tool_calls() { - let entries = build_chain(&[ + let state = collect_state(&[ LogEntry::SessionStart { ts: 1000, system_prompt: None, @@ -519,7 +431,6 @@ mod tests { turn_count: 1, }, ]); - let state = collect_state(&entries); assert_eq!(state.history.len(), 4); assert!(state.history[1].is_tool_call()); assert!(state.history[2].is_tool_result()); @@ -527,7 +438,7 @@ mod tests { #[test] fn replay_config_changed() { - let entries = build_chain(&[ + let state = collect_state(&[ LogEntry::SessionStart { ts: 1000, system_prompt: None, @@ -541,50 +452,12 @@ mod tests { config: RequestConfig::default().with_temperature(0.5), }, ]); - let state = collect_state(&entries); assert_eq!(state.config.temperature, Some(0.5)); } - #[test] - fn hash_chain_is_deterministic() { - let raw = vec![ - LogEntry::SessionStart { - ts: 1000, - system_prompt: None, - config: RequestConfig::default(), - history: vec![], - forked_from: None, - compacted_from: None, - }, - LogEntry::UserInput { - ts: 2000, - segments: vec![Segment::text("Hello")], - }, - ]; - let chain_a = build_chain(&raw); - let chain_b = build_chain(&raw); - assert_eq!(chain_a[0].hash, chain_b[0].hash); - assert_eq!(chain_a[1].hash, chain_b[1].hash); - } - - #[test] - fn different_content_produces_different_hash() { - let entry_a = LogEntry::UserInput { - ts: 1000, - segments: vec![Segment::text("Hello")], - }; - let entry_b = LogEntry::UserInput { - ts: 1000, - segments: vec![Segment::text("World")], - }; - let hash_a = compute_hash(None, &entry_a); - let hash_b = compute_hash(None, &entry_b); - assert_ne!(hash_a, hash_b); - } - #[test] fn replay_llm_usage_appends_to_usage_history() { - let entries = build_chain(&[ + let state = collect_state(&[ LogEntry::SessionStart { ts: 1000, system_prompt: None, @@ -618,7 +491,6 @@ mod tests { output_tokens: 5, }, ]); - let state = collect_state(&entries); // history は LlmUsage で変化しない assert_eq!(state.history.len(), 2); // usage_history は時系列順 @@ -631,8 +503,7 @@ mod tests { #[test] fn replay_without_llm_usage_keeps_usage_history_empty() { - // 既存ログ互換: LlmUsage entry が無くても collect_state は壊れない - let entries = build_chain(&[ + let state = collect_state(&[ LogEntry::SessionStart { ts: 1000, system_prompt: None, @@ -646,7 +517,6 @@ mod tests { segments: vec![Segment::text("hi")], }, ]); - let state = collect_state(&entries); assert!(state.usage_history.is_empty()); } @@ -704,7 +574,7 @@ mod tests { #[test] fn replay_invoke_marker_does_not_mutate_state() { - let entries = build_chain(&[ + let state = collect_state(&[ LogEntry::SessionStart { ts: 0, system_prompt: None, @@ -730,14 +600,13 @@ mod tests { trigger: InvokeKind::Notify, }, ]); - let state = collect_state(&entries); assert_eq!(state.history.len(), 1); assert_eq!(state.turn_count, 1); } #[test] fn replay_extension_collects_domain_payload_pairs() { - let entries = build_chain(&[ + let state = collect_state(&[ LogEntry::SessionStart { ts: 1000, system_prompt: None, @@ -762,7 +631,6 @@ mod tests { payload: serde_json::json!({ "x": 1 }), }, ]); - let state = collect_state(&entries); // 順序保持で全件積まれる。fold は呼び出し側の責務。 assert_eq!(state.extensions.len(), 3); assert_eq!(state.extensions[0].0, "memory.extract"); @@ -794,22 +662,6 @@ mod tests { } } - #[test] - fn hash_hex_round_trip() { - let entry = LogEntry::SessionStart { - ts: 1000, - system_prompt: None, - config: RequestConfig::default(), - history: vec![], - forked_from: None, - compacted_from: None, - }; - let hash = compute_hash(None, &entry); - let hex = hash.to_hex(); - let parsed = EntryHash::from_hex(&hex).unwrap(); - assert_eq!(hash, parsed); - } - /// Mixed segments survive a JSON round-trip through `LogEntry::UserInput`, /// and `collect_state` derives `Item::user_message` from the flattened /// text while preserving the original segments separately. This covers @@ -834,10 +686,10 @@ mod tests { ts: 4242, segments: segments.clone(), }; - // Hash + JSON round-trip preserves the variant byte-for-byte. + // JSON round-trip preserves the variant byte-for-byte. let json = serde_json::to_string(&entry).unwrap(); let parsed: LogEntry = serde_json::from_str(&json).unwrap(); - let entries = build_chain(&[ + let state = collect_state(&[ LogEntry::SessionStart { ts: 1, system_prompt: None, @@ -848,7 +700,6 @@ mod tests { }, parsed, ]); - let state = collect_state(&entries); // Worker history gets a flattened user_message item. assert_eq!(state.history.len(), 1); match &state.history[0] { diff --git a/crates/session-store/src/store.rs b/crates/session-store/src/store.rs index 6906ce0d..43981133 100644 --- a/crates/session-store/src/store.rs +++ b/crates/session-store/src/store.rs @@ -12,7 +12,7 @@ use crate::SessionId; use crate::event_trace::TraceEntry; -use crate::session_log::{EntryHash, HashedEntry}; +use crate::session_log::LogEntry; /// Errors from the persistence store. #[derive(Debug, thiserror::Error)] @@ -35,25 +35,30 @@ pub enum StoreError { /// All methods take `&self` — implementations should use interior mutability /// (e.g., append-mode file handles) when needed. pub trait Store: Send + Sync { - /// Append a single hashed entry to the session log. - fn append(&self, id: SessionId, entry: &HashedEntry) -> Result<(), StoreError>; + /// Append a single log entry to the session log. + /// + /// One line per call. The kernel orders concurrent `O_APPEND` writes + /// for lines < `PIPE_BUF`, so user-space serialization is unnecessary. + fn append(&self, id: SessionId, entry: &LogEntry) -> Result<(), StoreError>; - /// Read all hashed entries for a session, in order. - fn read_all(&self, id: SessionId) -> Result, StoreError>; + /// Read all log entries for a session, in order. + fn read_all(&self, id: SessionId) -> Result, StoreError>; /// List all session IDs, most recent first. fn list_sessions(&self) -> Result, StoreError>; /// Create a new session with initial entries. - fn create_session(&self, id: SessionId, entries: &[HashedEntry]) -> Result<(), StoreError>; + fn create_session(&self, id: SessionId, entries: &[LogEntry]) -> Result<(), StoreError>; /// Check if a session exists. fn exists(&self, id: SessionId) -> Result; - /// Read the hash of the last entry in a session (the head). + /// Count entries currently stored for a session. /// - /// Returns `None` if the session is empty. - fn read_head_hash(&self, id: SessionId) -> Result, StoreError>; + /// Used by `ensure_head_or_fork` to detect concurrent writers: + /// if the on-disk count exceeds the writer's own append tally, + /// another process has extended the log. + fn read_entry_count(&self, id: SessionId) -> Result; /// Append a trace entry to the debug event trace file. fn append_trace(&self, id: SessionId, entry: &TraceEntry) -> Result<(), StoreError>; diff --git a/crates/session-store/tests/fs_store_test.rs b/crates/session-store/tests/fs_store_test.rs index 52a9dee9..d7c27024 100644 --- a/crates/session-store/tests/fs_store_test.rs +++ b/crates/session-store/tests/fs_store_test.rs @@ -1,8 +1,6 @@ use llm_worker::WorkerResult; use llm_worker::llm_client::types::{Item, RequestConfig}; -use session_store::{ - FsStore, LogEntry, Store, TraceEntry, build_chain, collect_state, new_session_id, -}; +use session_store::{FsStore, LogEntry, Store, TraceEntry, collect_state, new_session_id}; #[test] fn round_trip_write_and_read() { @@ -10,7 +8,7 @@ fn round_trip_write_and_read() { let store = FsStore::new(dir.path()).unwrap(); let id = new_session_id(); - let raw = vec![ + let entries = vec![ LogEntry::SessionStart { ts: 1000, system_prompt: Some("You are helpful.".into()), @@ -37,31 +35,21 @@ fn round_trip_write_and_read() { result: WorkerResult::Finished, }, ]; - let entries = build_chain(&raw); - // Write entries one by one for entry in &entries { store.append(id, entry).unwrap(); } - // Read back let read_back = store.read_all(id).unwrap(); assert_eq!(read_back.len(), entries.len()); - // Verify hashes survived round-trip - for (orig, read) in entries.iter().zip(read_back.iter()) { - assert_eq!(orig.hash, read.hash); - assert_eq!(orig.prev_hash, read.prev_hash); - } - - // Replay and verify state let state = collect_state(&read_back); assert_eq!(state.system_prompt.as_deref(), Some("You are helpful.")); assert_eq!(state.config.max_tokens, Some(1024)); assert_eq!(state.history.len(), 2); assert_eq!(state.turn_count, 1); assert!(!state.last_run_interrupted); - assert!(state.head_hash.is_some()); + assert_eq!(state.entries_count, entries.len()); } #[test] @@ -70,7 +58,7 @@ fn create_session_writes_all_entries() { let store = FsStore::new(dir.path()).unwrap(); let id = new_session_id(); - let entries = build_chain(&[LogEntry::SessionStart { + let entries = [LogEntry::SessionStart { ts: 1000, system_prompt: None, config: RequestConfig::default(), @@ -80,7 +68,7 @@ fn create_session_writes_all_entries() { ], forked_from: None, compacted_from: None, - }]); + }]; store.create_session(id, &entries).unwrap(); let read_back = store.read_all(id).unwrap(); @@ -100,25 +88,17 @@ fn list_sessions_returns_newest_first() { std::thread::sleep(std::time::Duration::from_millis(2)); let id2 = new_session_id(); - let entries1 = build_chain(&[LogEntry::SessionStart { + let entry = LogEntry::SessionStart { ts: 1000, system_prompt: None, config: RequestConfig::default(), history: vec![], forked_from: None, compacted_from: None, - }]); - let entries2 = build_chain(&[LogEntry::SessionStart { - ts: 1001, - system_prompt: None, - config: RequestConfig::default(), - history: vec![], - forked_from: None, - compacted_from: None, - }]); + }; - store.append(id1, &entries1[0]).unwrap(); - store.append(id2, &entries2[0]).unwrap(); + store.append(id1, &entry).unwrap(); + store.append(id2, &entry).unwrap(); let sessions = store.list_sessions().unwrap(); assert_eq!(sessions.len(), 2); @@ -134,15 +114,19 @@ fn exists_returns_correct_state() { assert!(!store.exists(id).unwrap()); - let entries = build_chain(&[LogEntry::SessionStart { - ts: 1000, - system_prompt: None, - config: RequestConfig::default(), - history: vec![], - forked_from: None, - compacted_from: None, - }]); - store.append(id, &entries[0]).unwrap(); + store + .append( + id, + &LogEntry::SessionStart { + ts: 1000, + system_prompt: None, + config: RequestConfig::default(), + history: vec![], + forked_from: None, + compacted_from: None, + }, + ) + .unwrap(); assert!(store.exists(id).unwrap()); } @@ -163,18 +147,20 @@ fn trace_entries_in_separate_file() { let store = FsStore::new(dir.path()).unwrap(); let id = new_session_id(); - // Write a log entry - let entries = build_chain(&[LogEntry::SessionStart { - ts: 1000, - system_prompt: None, - config: RequestConfig::default(), - history: vec![], - forked_from: None, - compacted_from: None, - }]); - store.append(id, &entries[0]).unwrap(); + store + .append( + id, + &LogEntry::SessionStart { + ts: 1000, + system_prompt: None, + config: RequestConfig::default(), + history: vec![], + forked_from: None, + compacted_from: None, + }, + ) + .unwrap(); - // Write a trace entry let trace = TraceEntry { ts: 1500, turn: 0, @@ -194,12 +180,12 @@ fn trace_entries_in_separate_file() { } #[test] -fn read_head_hash_returns_last_entry_hash() { +fn read_entry_count_matches_append_tally() { let dir = tempfile::tempdir().unwrap(); let store = FsStore::new(dir.path()).unwrap(); let id = new_session_id(); - let entries = build_chain(&[ + let entries = [ LogEntry::SessionStart { ts: 1000, system_prompt: None, @@ -212,12 +198,11 @@ fn read_head_hash_returns_last_entry_hash() { ts: 2000, segments: vec![protocol::Segment::text("Hello")], }, - ]); + ]; for entry in &entries { store.append(id, entry).unwrap(); } - let head = store.read_head_hash(id).unwrap(); - assert_eq!(head.as_ref(), Some(&entries[1].hash)); + assert_eq!(store.read_entry_count(id).unwrap(), entries.len()); } diff --git a/crates/session-store/tests/session_test.rs b/crates/session-store/tests/session_test.rs index 070f14d2..e393175c 100644 --- a/crates/session-store/tests/session_test.rs +++ b/crates/session-store/tests/session_test.rs @@ -9,7 +9,7 @@ use llm_worker::interceptor::{Interceptor, TurnEndAction}; use llm_worker::llm_client::event::{Event, ResponseStatus, StatusEvent}; use llm_worker::llm_client::types::{Item, RequestConfig}; use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput}; -use session_store::{EntryHash, FsStore, LogEntry, SessionStartState, Store, collect_state}; +use session_store::{FsStore, LogEntry, SessionStartState, Store, collect_state}; // ============================================================================= // Helpers @@ -96,20 +96,13 @@ async fn run_and_persist( worker: Worker, store: &FsStore, session_id: session_store::SessionId, - head_hash: &mut Option, input: &str, ) -> (Worker, llm_worker::WorkerResult) { // Mirror Pod's run-entry contract: log the user input as segments // before the worker pushes its flattened user_message; save_delta // skips the resulting user_message item to avoid double-write. - session_store::save_user_input( - store, - session_id, - head_hash, - vec![protocol::Segment::text(input)], - ) - - .unwrap(); + session_store::save_user_input(store, session_id, vec![protocol::Segment::text(input)]) + .unwrap(); let history_before = worker.history().len(); @@ -118,34 +111,26 @@ async fn run_and_persist( let worker = locked.unlock(); let new_items = &worker.history()[history_before..]; - session_store::save_delta(store, session_id, head_hash, new_items) - - .unwrap(); - session_store::save_turn_end(store, session_id, head_hash, worker.turn_count()) - - .unwrap(); + session_store::save_delta(store, session_id, new_items).unwrap(); + session_store::save_turn_end(store, session_id, worker.turn_count()).unwrap(); match &result { Ok(r) => { session_store::save_run_completed( store, session_id, - head_hash, r.clone(), worker.last_run_interrupted(), ) - .unwrap(); } Err(e) => { session_store::save_run_errored( store, session_id, - head_hash, e.to_string(), worker.last_run_interrupted(), ) - .unwrap(); } } @@ -164,7 +149,7 @@ async fn session_run_logs_entries() { let client = MockLlmClient::new(simple_text_events()); let worker = Worker::new(client); - let (sid, head_hash) = session_store::create_session( + let sid = session_store::create_session( &store, SessionStartState { system_prompt: worker.get_system_prompt(), @@ -172,11 +157,9 @@ async fn session_run_logs_entries() { history: worker.history(), }, ) - .unwrap(); - let mut head_hash = Some(head_hash); - let (worker, _) = run_and_persist(worker, &store, sid, &mut head_hash, "Hi").await; + let (worker, _) = run_and_persist(worker, &store, sid, "Hi").await; let _ = &worker; let entries = store.read_all(sid).unwrap(); @@ -189,12 +172,12 @@ async fn session_run_logs_entries() { ); // First entry is SessionStart - assert!(matches!(&entries[0].entry, LogEntry::SessionStart { .. })); + assert!(matches!(&entries[0], LogEntry::SessionStart { .. })); // Has a RunCompleted with Finished let has_finished = entries.iter().any(|e| { matches!( - &e.entry, + e, LogEntry::RunCompleted { result: llm_worker::WorkerResult::Finished, .. @@ -202,17 +185,6 @@ async fn session_run_logs_entries() { ) }); assert!(has_finished, "should have a Finished outcome"); - - // Verify hash chain integrity - assert!(entries[0].prev_hash.is_none()); - for i in 1..entries.len() { - assert_eq!( - entries[i].prev_hash.as_ref(), - Some(&entries[i - 1].hash), - "hash chain broken at entry {}", - i - ); - } } #[tokio::test] @@ -222,7 +194,7 @@ async fn session_restore_round_trip() { let mut worker = Worker::new(client); worker.set_system_prompt("You are helpful."); - let (sid, head_hash) = session_store::create_session( + let sid = session_store::create_session( &store, SessionStartState { system_prompt: worker.get_system_prompt(), @@ -230,11 +202,9 @@ async fn session_restore_round_trip() { history: worker.history(), }, ) - .unwrap(); - let mut head_hash = Some(head_hash); - let (worker, _) = run_and_persist(worker, &store, sid, &mut head_hash, "Hi").await; + let (worker, _) = run_and_persist(worker, &store, sid, "Hi").await; let original_history_len = worker.history().len(); let original_turn_count = worker.turn_count(); @@ -245,7 +215,7 @@ async fn session_restore_round_trip() { assert_eq!(state.history.len(), original_history_len); assert_eq!(state.turn_count, original_turn_count); assert_eq!(state.system_prompt.as_deref(), Some("You are helpful.")); - assert_eq!(state.head_hash, head_hash); + assert_eq!(state.entries_count, store.read_entry_count(sid).unwrap()); } #[tokio::test] @@ -255,7 +225,7 @@ async fn session_run_with_tool_call() { let mut worker = Worker::new(client); worker.register_tool(weather_tool_definition()); - let (sid, head_hash) = session_store::create_session( + let sid = session_store::create_session( &store, SessionStartState { system_prompt: worker.get_system_prompt(), @@ -263,23 +233,20 @@ async fn session_run_with_tool_call() { history: worker.history(), }, ) - .unwrap(); - let mut head_hash = Some(head_hash); - let (_worker, _) = - run_and_persist(worker, &store, sid, &mut head_hash, "What's the weather?").await; + let (_worker, _) = run_and_persist(worker, &store, sid, "What's the weather?").await; let entries = store.read_all(sid).unwrap(); let has_tool_results = entries .iter() - .any(|e| matches!(&e.entry, LogEntry::ToolResult { .. })); + .any(|e| matches!(e, LogEntry::ToolResult { .. })); assert!(has_tool_results, "should have ToolResult entry"); let has_assistant = entries .iter() - .any(|e| matches!(&e.entry, LogEntry::AssistantItem { .. })); + .any(|e| matches!(e, LogEntry::AssistantItem { .. })); assert!(has_assistant, "should have AssistantItem entry"); } @@ -293,7 +260,7 @@ async fn session_resume_after_pause() { worker.register_tool(weather_tool_definition()); worker.set_interceptor(PausePolicy); - let (sid, head_hash) = session_store::create_session( + let sid = session_store::create_session( &store, SessionStartState { system_prompt: worker.get_system_prompt(), @@ -301,18 +268,16 @@ async fn session_resume_after_pause() { history: worker.history(), }, ) - .unwrap(); - let mut head_hash = Some(head_hash); - let (_worker, result) = run_and_persist(worker, &store, sid, &mut head_hash, "Weather?").await; + let (_worker, result) = run_and_persist(worker, &store, sid, "Weather?").await; assert!(matches!(result, llm_worker::WorkerResult::Paused)); // Check RunCompleted is Paused let entries = store.read_all(sid).unwrap(); let has_paused = entries.iter().any(|e| { matches!( - &e.entry, + e, LogEntry::RunCompleted { result: llm_worker::WorkerResult::Paused, .. @@ -333,7 +298,7 @@ async fn session_fork_preserves_state() { let mut worker = Worker::new(client); worker.set_system_prompt("System prompt"); - let (sid, head_hash) = session_store::create_session( + let sid = session_store::create_session( &store, SessionStartState { system_prompt: worker.get_system_prompt(), @@ -341,11 +306,9 @@ async fn session_fork_preserves_state() { history: worker.history(), }, ) - .unwrap(); - let mut head_hash = Some(head_hash); - let (worker, _) = run_and_persist(worker, &store, sid, &mut head_hash, "Hello").await; + let (worker, _) = run_and_persist(worker, &store, sid, "Hello").await; let original_history_len = worker.history().len(); let fork_id = session_store::fork( @@ -356,16 +319,12 @@ async fn session_fork_preserves_state() { history: worker.history(), }, ) - .unwrap(); // Fork should have a SessionStart with the current history let fork_entries = store.read_all(fork_id).unwrap(); assert_eq!(fork_entries.len(), 1); - assert!(matches!( - &fork_entries[0].entry, - LogEntry::SessionStart { .. } - )); + assert!(matches!(&fork_entries[0], LogEntry::SessionStart { .. })); let fork_state = collect_state(&fork_entries); assert_eq!(fork_state.history.len(), original_history_len); @@ -378,7 +337,7 @@ async fn session_fork_at_truncates() { let client = MockLlmClient::new(simple_text_events()); let worker = Worker::new(client); - let (sid, head_hash) = session_store::create_session( + let sid = session_store::create_session( &store, SessionStartState { system_prompt: worker.get_system_prompt(), @@ -386,29 +345,28 @@ async fn session_fork_at_truncates() { history: worker.history(), }, ) - .unwrap(); - let mut head_hash = Some(head_hash); - let (_worker, _) = run_and_persist(worker, &store, sid, &mut head_hash, "Hello").await; + let (worker, _) = run_and_persist(worker, &store, sid, "Hello").await; let all_entries = store.read_all(sid).unwrap(); assert!(all_entries.len() > 2); - // Fork at the hash of the 2nd entry (SessionStart + UserInput) - let at_hash = &all_entries[1].hash; - let fork_id = session_store::fork_at(&store, sid, at_hash).unwrap(); + // Fork at turn 1 (one completed turn). + let fork_id = session_store::fork_at(&store, sid, worker.turn_count()).unwrap(); let fork_entries = store.read_all(fork_id).unwrap(); assert_eq!(fork_entries.len(), 1); // Just the new SessionStart let fork_state = collect_state(&fork_entries); - // Should have the state from replaying only the first 2 entries - let original_truncated_state = collect_state(&all_entries[..2]); - assert_eq!( - fork_state.history.len(), - original_truncated_state.history.len() - ); + // History at fork point should match history right after the TurnEnd in + // the source session. + let turn_end_pos = all_entries + .iter() + .position(|e| matches!(e, LogEntry::TurnEnd { turn_count, .. } if *turn_count == worker.turn_count())) + .expect("source session has the matching TurnEnd"); + let source_state_at_fork = collect_state(&all_entries[..=turn_end_pos]); + assert_eq!(fork_state.history.len(), source_state_at_fork.history.len()); } #[tokio::test] @@ -417,7 +375,7 @@ async fn session_config_changed_logged() { let client = MockLlmClient::new(vec![]); let mut worker = Worker::new(client); - let (sid, head_hash) = session_store::create_session( + let sid = session_store::create_session( &store, SessionStartState { system_prompt: worker.get_system_prompt(), @@ -425,21 +383,17 @@ async fn session_config_changed_logged() { history: worker.history(), }, ) - .unwrap(); - let mut head_hash = Some(head_hash); // Modify config and log it let new_config = RequestConfig::default().with_temperature(0.7); worker.set_request_config(new_config.clone()); - session_store::save_config_changed(&store, sid, &mut head_hash, &new_config) - - .unwrap(); + session_store::save_config_changed(&store, sid, &new_config).unwrap(); let entries = store.read_all(sid).unwrap(); let has_config_changed = entries.iter().any(|e| { matches!( - &e.entry, + e, LogEntry::ConfigChanged { config, .. } if config.temperature == Some(0.7) ) }); @@ -454,7 +408,7 @@ async fn session_auto_forks_on_conflict() { let client_a = MockLlmClient::new(simple_text_events()); let worker_a = Worker::new(client_a); - let (original_sid, head_hash) = session_store::create_session( + let original_sid = session_store::create_session( &store, SessionStartState { system_prompt: worker_a.get_system_prompt(), @@ -462,37 +416,29 @@ async fn session_auto_forks_on_conflict() { history: worker_a.history(), }, ) - .unwrap(); let mut session_id = original_sid; - let mut head_hash = Some(head_hash); + // Writer tracked: just the SessionStart we wrote. + let mut entries_written: usize = 1; - // Simulate another Pod writing to the same session behind our back + // Simulate another Pod writing to the same session behind our back. let extra_entry = LogEntry::UserInput { ts: 9999, segments: vec![protocol::Segment::text("Interloper")], }; - let current_head = store.read_head_hash(original_sid).unwrap(); - let hash = session_store::compute_hash(current_head.as_ref(), &extra_entry); - let hashed = session_store::HashedEntry { - hash, - prev_hash: current_head, - entry: extra_entry, - }; - store.append(original_sid, &hashed).unwrap(); + store.append(original_sid, &extra_entry).unwrap(); - // Now head_hash is stale — ensure_head_or_fork should auto-fork + // Now the on-disk count exceeds our tally — ensure_head_or_fork should auto-fork. session_store::ensure_head_or_fork( &store, &mut session_id, - &mut head_hash, + &mut entries_written, SessionStartState { system_prompt: worker_a.get_system_prompt(), config: worker_a.request_config(), history: worker_a.history(), }, ) - .unwrap(); // session_id should now be different @@ -506,6 +452,6 @@ async fn session_auto_forks_on_conflict() { let original_entries = store.read_all(original_sid).unwrap(); let has_interloper = original_entries .iter() - .any(|e| matches!(&e.entry, LogEntry::UserInput { .. })); + .any(|e| matches!(e, LogEntry::UserInput { .. })); assert!(has_interloper); } diff --git a/crates/tui/src/picker.rs b/crates/tui/src/picker.rs index 5839af48..e81bf3df 100644 --- a/crates/tui/src/picker.rs +++ b/crates/tui/src/picker.rs @@ -20,9 +20,7 @@ use ratatui::style::{Color, Modifier, Style}; use ratatui::text::{Line, Span}; use ratatui::widgets::Paragraph; use ratatui::{Frame, TerminalOptions, Viewport}; -use session_store::{ - FsStore, HashedEntry, LogEntry, LoggedContentPart, LoggedItem, SessionId, Store, -}; +use session_store::{FsStore, LogEntry, LoggedContentPart, LoggedItem, SessionId, Store}; const MAX_ROWS: usize = 10; const VIEWPORT_LINES: u16 = MAX_ROWS as u16 + 4; @@ -170,9 +168,9 @@ fn build_preview(store: &FsStore, id: SessionId) -> String { /// Walk the log from the tail looking for the most recent user-message /// or assistant-message entry, then render its first text fragment in /// a single line. -fn last_message_preview(entries: &[HashedEntry]) -> Option { - for hashed in entries.iter().rev() { - match &hashed.entry { +fn last_message_preview(entries: &[LogEntry]) -> Option { + for entry in entries.iter().rev() { + match entry { LogEntry::UserInput { segments, .. } => { let text = protocol::Segment::flatten_to_text(segments); if !text.is_empty() {