diff --git a/crates/pod/src/main.rs b/crates/pod/src/main.rs index e9ff6d45..01cb2055 100644 --- a/crates/pod/src/main.rs +++ b/crates/pod/src/main.rs @@ -5,7 +5,7 @@ use std::process::ExitCode; use clap::Parser; use manifest::{PodManifest, paths}; use pod::{Pod, PodController, PodFactory, PromptLoader}; -use session_store::{FsStore, SegmentId}; +use session_store::{FsStore, SegmentId, Store}; const USER_MANIFEST_ENV: &str = "INSOMNIA_USER_MANIFEST"; @@ -186,7 +186,28 @@ async fn main() -> ExitCode { } } } else if let Some(source_segment_id) = cli.session { - match Pod::restore_from_manifest(source_segment_id, manifest, store, loader).await { + let source_session_id = match store.lookup_session_of(source_segment_id) { + Ok(Some(sid)) => sid, + Ok(None) => { + eprintln!( + "error: --session {source_segment_id}: segment is not registered to any session" + ); + return ExitCode::FAILURE; + } + Err(e) => { + eprintln!("error: lookup_session_of failed: {e}"); + return ExitCode::FAILURE; + } + }; + match Pod::restore_from_manifest( + source_session_id, + source_segment_id, + manifest, + store, + loader, + ) + .await + { Ok(p) => p, Err(e) => { eprintln!("error: failed to restore pod: {e}"); diff --git a/crates/pod/src/pod.rs b/crates/pod/src/pod.rs index 137f8106..db155db1 100644 --- a/crates/pod/src/pod.rs +++ b/crates/pod/src/pod.rs @@ -9,7 +9,8 @@ use llm_worker::llm_client::client::LlmClient; use llm_worker::state::Mutable; use llm_worker::{ToolOutputLimits, UsageRecord, Worker, WorkerError, WorkerResult}; use session_store::{ - LogEntry, PodScopeSnapshot, SegmentId, Store, StoreError, SystemItem, segment_log, to_logged, + LogEntry, PodScopeSnapshot, SegmentId, SessionId, Store, StoreError, SystemItem, segment_log, + to_logged, }; use tracing::{info, warn}; @@ -42,35 +43,54 @@ use protocol::{AlertLevel, AlertSource, Event, Segment}; use tokio::sync::broadcast; use tokio::task::JoinHandle; -/// Lock-free shared session pointer. +/// `(SessionId, SegmentId)` pair the Pod is currently writing to. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct SegmentLocation { + pub session_id: SessionId, + pub segment_id: SegmentId, +} + +/// Lock-free shared session/segment pointer. /// -/// Holds the current `(segment_id, entries_written)` pair so that the -/// Pod and every `LogWriterHandle` clone see a consistent view through -/// `Arc`-shared lock-free reads. `segment_id` is wrapped in `ArcSwap` -/// so fork (a rare, run-start-only event) can atomically swap it -/// without taking a mutex on the append hot path. `entries_written` is -/// an `AtomicUsize` bumped on every successful append; the writer's -/// tally is compared against the store's on-disk count to detect -/// concurrent writers in `ensure_segment_head`. +/// Holds the current `(SessionId, SegmentId)` pair and the append tally +/// so that the Pod and every `LogWriterHandle` clone see a consistent +/// view through `Arc`-shared lock-free reads. The location is wrapped in +/// `ArcSwap` so fork (a rare, run-start-only event) can atomically swap +/// session_id + segment_id together without taking a mutex on the +/// append hot path. `entries_written` is an `AtomicUsize` bumped on +/// every successful append; the writer's tally is compared against the +/// store's on-disk count to detect concurrent writers in +/// `ensure_segment_head`. pub struct SegmentState { - segment_id: ArcSwap, + location: ArcSwap, entries_written: AtomicUsize, } impl SegmentState { - pub fn new(segment_id: SegmentId, entries_written: usize) -> Arc { + pub fn new(session_id: SessionId, segment_id: SegmentId, entries_written: usize) -> Arc { Arc::new(Self { - segment_id: ArcSwap::from_pointee(segment_id), + location: ArcSwap::from_pointee(SegmentLocation { + session_id, + segment_id, + }), entries_written: AtomicUsize::new(entries_written), }) } - pub fn segment_id(&self) -> SegmentId { - **self.segment_id.load() + pub fn location(&self) -> SegmentLocation { + **self.location.load() } - pub fn set_segment_id(&self, id: SegmentId) { - self.segment_id.store(Arc::new(id)); + pub fn session_id(&self) -> SessionId { + self.location().session_id + } + + pub fn segment_id(&self) -> SegmentId { + self.location().segment_id + } + + pub fn set_location(&self, loc: SegmentLocation) { + self.location.store(Arc::new(loc)); } pub fn entries_written(&self) -> usize { @@ -107,8 +127,8 @@ where /// writes for `< PIPE_BUF` lines, so no user-space serialization is /// needed across appenders. pub fn append_entry(&self, entry: LogEntry) -> Result<(), StoreError> { - let segment_id = self.state.segment_id(); - self.store.append(segment_id, &entry)?; + let loc = self.state.location(); + self.store.append(loc.session_id, loc.segment_id, &entry)?; self.state.increment_entries(); self.sink.publish(entry); Ok(()) @@ -472,13 +492,14 @@ impl Pod { // Segment creation is deferred to `ensure_segment_head` at first // run so a later-installed system-prompt template (see // `set_system_prompt_template`) can be captured by `SegmentStart`. + let session_id = session_store::new_session_id(); let segment_id = session_store::new_segment_id(); let prompts = PromptCatalog::builtins_only()?; let mut pod = Self { manifest, worker: Some(worker), store, - segment_state: SegmentState::new(segment_id, 0), + segment_state: SegmentState::new(session_id, segment_id, 0), pwd, scope: SharedScope::new(scope), hook_builder: HookRegistryBuilder::new(), @@ -542,12 +563,18 @@ impl Pod { &self.prompts } - /// The session ID used for persistence. Read lock-free from the - /// shared session pointer so fork-time swaps are observed immediately. + /// The current segment ID. Read lock-free from the shared session + /// pointer so fork-time swaps are observed immediately. pub fn segment_id(&self) -> SegmentId { self.segment_state.segment_id() } + /// The Session this Pod belongs to. Stable across compaction and + /// in-Session fork — only `fork` (a brand-new Session) changes it. + pub fn session_id(&self) -> SessionId { + self.segment_state.session_id() + } + /// The Pod's manifest. pub fn manifest(&self) -> &PodManifest { &self.manifest @@ -627,8 +654,8 @@ impl Pod { /// concurrent appenders — the kernel orders `O_APPEND` writes for /// lines smaller than `PIPE_BUF`. pub(crate) fn commit_entry(&self, entry: LogEntry) -> Result<(), StoreError> { - let segment_id = self.segment_state.segment_id(); - self.store.append(segment_id, &entry)?; + let loc = self.segment_state.location(); + self.store.append(loc.session_id, loc.segment_id, &entry)?; self.segment_state.increment_entries(); self.sink.publish(entry); Ok(()) @@ -1618,11 +1645,12 @@ impl Pod { /// when another writer has appended behind our back. fn ensure_segment_head(&mut self) -> Result<(), PodError> { let w = self.worker.as_ref().unwrap(); - let prev_segment_id = self.segment_state.segment_id(); + let loc = self.segment_state.location(); let entries_written = self.segment_state.entries_written(); if entries_written == 0 { let initial = LogEntry::SegmentStart { ts: segment_log::now_millis(), + session_id: loc.session_id, system_prompt: w.get_system_prompt().map(String::from), config: w.request_config().clone(), history: to_logged(w.history()), @@ -1636,17 +1664,19 @@ impl Pod { // Check store count + auto-fork if it drifted. let store_count = self .store - .read_entry_count(prev_segment_id) + .read_entry_count(loc.session_id, loc.segment_id) .map_err(PodError::from)?; if store_count == entries_written { return Ok(()); } - // Fork: mint a fresh session and switch to it. The new - // SegmentStart entry replaces the mirror and is broadcast - // through the sink so existing subscribers reset their view. - let fork_id = session_store::new_segment_id(); + // Auto-fork within the same Session: mint a fresh Segment and + // switch to it. The new SegmentStart entry replaces the mirror + // and is broadcast through the sink so existing subscribers + // reset their view. + let fork_segment_id = session_store::new_segment_id(); let entry = LogEntry::SegmentStart { ts: segment_log::now_millis(), + session_id: loc.session_id, system_prompt: w.get_system_prompt().map(String::from), config: w.request_config().clone(), history: to_logged(w.history()), @@ -1654,13 +1684,16 @@ impl Pod { compacted_from: None, }; self.store - .create_segment(fork_id, &[entry.clone()]) + .create_segment(loc.session_id, fork_segment_id, &[entry.clone()]) .map_err(PodError::from)?; - self.segment_state.set_segment_id(fork_id); + self.segment_state.set_location(SegmentLocation { + session_id: loc.session_id, + segment_id: fork_segment_id, + }); self.segment_state.set_entries_written(1); self.sink.reset_with_initial(entry); if self.scope_allocation.is_some() { - pod_registry::update_segment(&self.manifest.pod.name, fork_id)?; + pod_registry::update_segment(&self.manifest.pod.name, fork_segment_id)?; } Ok(()) } @@ -2140,27 +2173,34 @@ impl Pod { task_snapshot_text.clone(), )); - // Build the SegmentStart entry for the new compacted session, - // then atomically rotate to it: create on disk, swap head, reset - // the broadcast sink so existing subscribers see the new - // `SegmentStart { compacted_from }` and reset their view. + // Build the SegmentStart entry for the new compacted segment. + // Inherits the source Segment's session_id so the compacted + // lineage stays grouped under the same Session. Atomically + // rotate: create on disk, swap location, reset the broadcast + // sink so existing subscribers see the new `SegmentStart + // { compacted_from }` and reset their view. let new_segment_id = session_store::new_segment_id(); - let old_session_id = self.segment_state.segment_id(); + let old_loc = self.segment_state.location(); let source_turn_count = self.worker.as_ref().unwrap().turn_count(); let w = self.worker.as_ref().unwrap(); let entry = LogEntry::SegmentStart { ts: segment_log::now_millis(), + session_id: old_loc.session_id, system_prompt: w.get_system_prompt().map(String::from), config: w.request_config().clone(), history: to_logged(&new_history), forked_from: None, compacted_from: Some(session_store::SegmentOrigin { - segment_id: old_session_id, + segment_id: old_loc.segment_id, at_turn_index: source_turn_count, }), }; - self.store.create_segment(new_segment_id, &[entry.clone()])?; - self.segment_state.set_segment_id(new_segment_id); + self.store + .create_segment(old_loc.session_id, new_segment_id, &[entry.clone()])?; + self.segment_state.set_location(SegmentLocation { + session_id: old_loc.session_id, + segment_id: new_segment_id, + }); self.segment_state.set_entries_written(1); let session_start = entry; // Broadcast the SegmentStart through the sink. This atomically @@ -2367,7 +2407,10 @@ impl Pod { // Read the session log to get the current entry count. This is // the boundary for the source.range end_entry. Called once per // extract, on a small local file. - let entries_now = self.store.read_all(self.segment_id())?.len(); + let entries_now = self + .store + .read_all(self.session_id(), self.segment_id())? + .len(); if entries_now == 0 { return Ok(ExtractDecision::Skipped); } @@ -2738,8 +2781,9 @@ impl Pod, St> { // Segment creation is deferred to the first run (see // `ensure_segment_head`) so the SegmentStart entry can capture // the rendered system prompt, not the raw template source. The - // segment_id is allocated here so the pod-registry registration - // can record it from the start. + // session_id + segment_id are allocated here so the pod-registry + // registration can record them from the start. + let session_id = session_store::new_session_id(); let segment_id = session_store::new_segment_id(); // Register this Pod in the machine-wide pod-registry @@ -2765,7 +2809,7 @@ impl Pod, St> { manifest, worker: Some(worker), store, - segment_state: SegmentState::new(segment_id, 0), + segment_state: SegmentState::new(session_id, segment_id, 0), pwd: common.pwd, scope: SharedScope::new(common.scope), hook_builder: HookRegistryBuilder::new(), @@ -2820,6 +2864,7 @@ impl Pod, St> { let mut common = prepare_pod_common(&manifest, &loader, /* parse_template */ true)?; let skill_shadows = std::mem::take(&mut common.skill_shadows); + let session_id = session_store::new_session_id(); let segment_id = session_store::new_segment_id(); let scope_allocation = pod_registry::adopt_allocation( manifest.pod.name.clone(), @@ -2835,7 +2880,7 @@ impl Pod, St> { manifest, worker: Some(worker), store, - segment_state: SegmentState::new(segment_id, 0), + segment_state: SegmentState::new(session_id, segment_id, 0), pwd: common.pwd, scope: SharedScope::new(common.scope), hook_builder: HookRegistryBuilder::new(), @@ -2892,6 +2937,7 @@ impl Pod, St> { /// session keeps a stable cache prefix even when the manifest's /// instruction template would render differently today. pub async fn restore_from_manifest( + session_id: SessionId, segment_id: SegmentId, manifest: PodManifest, store: St, @@ -2900,7 +2946,7 @@ impl Pod, St> { // Read raw entries once so we can both reconstruct state and // seed the broadcast sink's mirror with the same prefix that // sits on disk. - let raw_entries = store.read_all(segment_id)?; + let raw_entries = store.read_all(session_id, segment_id)?; let state = session_store::collect_state(&raw_entries); if state.entries_count == 0 { return Err(PodError::SegmentEmpty { segment_id }); @@ -2974,7 +3020,7 @@ impl Pod, St> { manifest, worker: Some(worker), store, - segment_state: SegmentState::new(segment_id, state.entries_count), + segment_state: SegmentState::new(session_id, segment_id, state.entries_count), pwd: common.pwd, scope: SharedScope::new(common.scope), hook_builder: HookRegistryBuilder::new(), diff --git a/crates/pod/src/segment_log_sink.rs b/crates/pod/src/segment_log_sink.rs index d206a630..f59c4d68 100644 --- a/crates/pod/src/segment_log_sink.rs +++ b/crates/pod/src/segment_log_sink.rs @@ -203,6 +203,7 @@ mod tests { fn session_start() -> LogEntry { LogEntry::SegmentStart { ts: now_millis(), + session_id: uuid::Uuid::nil(), system_prompt: None, config: RequestConfig::default(), history: vec![], diff --git a/crates/pod/tests/restore_test.rs b/crates/pod/tests/restore_test.rs index 4028b82a..62044e5d 100644 --- a/crates/pod/tests/restore_test.rs +++ b/crates/pod/tests/restore_test.rs @@ -8,7 +8,7 @@ use std::sync::{LazyLock, Mutex}; use pod::{Pod, PodError}; -use session_store::{FsStore, SegmentId, StoreError}; +use session_store::{FsStore, StoreError}; const MINIMAL_MANIFEST_TOML: &str = r#" [pod] @@ -32,7 +32,7 @@ permission = "write" static ENV_LOCK: LazyLock> = LazyLock::new(|| Mutex::new(())); #[tokio::test] -async fn restore_from_manifest_rejects_unknown_session() { +async fn restore_from_manifest_rejects_unknown_segment() { let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner()); let store_tmp = tempfile::tempdir().unwrap(); @@ -42,66 +42,76 @@ async fn restore_from_manifest_rejects_unknown_session() { // A freshly-minted id with no jsonl file at all → store returns // NotFound, which `Pod::restore_from_manifest` surfaces verbatim // as `PodError::Store`. - let unknown = session_store::new_segment_id(); + let unknown_sid = session_store::new_session_id(); + let unknown_seg = session_store::new_segment_id(); + let result = Pod::restore_from_manifest( + unknown_sid, + unknown_seg, + manifest, + store, + pod::PromptLoader::builtins_only(), + ) + .await; + + match result { + Err(PodError::Store(StoreError::NotFound(id))) => assert_eq!(id, unknown_seg), + Err(other) => panic!("expected Store(NotFound), got {other:?}"), + Ok(_) => panic!("expected unknown segment to fail"), + } +} + +#[tokio::test] +async fn restore_from_manifest_rejects_empty_segment_log() { + let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner()); + + let store_tmp = tempfile::tempdir().unwrap(); + let store = FsStore::new(store_tmp.path()).unwrap(); + let manifest = pod::PodManifest::from_toml(MINIMAL_MANIFEST_TOML).unwrap(); + + // Pre-create an empty `/.jsonl` so `read_all` succeeds + // with no entries. `collect_state` returns `entries_count = 0`, + // which `restore_from_manifest` rejects with `SegmentEmpty` *before* + // it gets as far as building the LLM client. + let sid = session_store::new_session_id(); + let segid = session_store::new_segment_id(); + let dir = store_tmp.path().join(sid.to_string()); + std::fs::create_dir_all(&dir).unwrap(); + std::fs::write(dir.join(format!("{segid}.jsonl")), b"").unwrap(); + let result = - Pod::restore_from_manifest(unknown, manifest, store, pod::PromptLoader::builtins_only()) + Pod::restore_from_manifest(sid, segid, manifest, store, pod::PromptLoader::builtins_only()) .await; match result { - Err(PodError::Store(StoreError::NotFound(id))) => assert_eq!(id, unknown), - Err(other) => panic!("expected Store(NotFound), got {other:?}"), - Ok(_) => panic!("expected unknown session to fail"), - } -} - -#[tokio::test] -async fn restore_from_manifest_rejects_empty_session_log() { - let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner()); - - let store_tmp = tempfile::tempdir().unwrap(); - let store = FsStore::new(store_tmp.path()).unwrap(); - let manifest = pod::PodManifest::from_toml(MINIMAL_MANIFEST_TOML).unwrap(); - - // Pre-create an empty `.jsonl` so `read_all` succeeds with no - // entries. `collect_state` returns `entries_count = 0`, which - // `restore_from_manifest` rejects with `SegmentEmpty` *before* it - // gets as far as building the LLM client — so the test does not - // need credentials or a runtime sandbox. - let id: SegmentId = session_store::new_segment_id(); - let path = store_tmp.path().join(format!("{id}.jsonl")); - std::fs::write(&path, b"").unwrap(); - - let result = - Pod::restore_from_manifest(id, manifest, store, pod::PromptLoader::builtins_only()).await; - - match result { - Err(PodError::SegmentEmpty { segment_id }) => assert_eq!(segment_id, id), + Err(PodError::SegmentEmpty { segment_id }) => assert_eq!(segment_id, segid), Err(other) => panic!("expected SegmentEmpty, got {other:?}"), - Ok(_) => panic!("expected empty session log to fail"), + Ok(_) => panic!("expected empty segment log to fail"), } } #[tokio::test] -async fn restore_from_manifest_rejects_session_without_scope_snapshot() { +async fn restore_from_manifest_rejects_segment_without_scope_snapshot() { let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner()); let store_tmp = tempfile::tempdir().unwrap(); let store = FsStore::new(store_tmp.path()).unwrap(); let manifest = pod::PodManifest::from_toml(MINIMAL_MANIFEST_TOML).unwrap(); - let id = session_store::new_segment_id(); + let sid = session_store::new_session_id(); + let segid = session_store::new_segment_id(); let state = session_store::SegmentStartState { system_prompt: None, config: &Default::default(), history: &[], }; - session_store::create_segment_with_id(&store, id, state).unwrap(); + session_store::create_segment_with_ids(&store, sid, segid, state).unwrap(); let result = - Pod::restore_from_manifest(id, manifest, store, pod::PromptLoader::builtins_only()).await; + Pod::restore_from_manifest(sid, segid, manifest, store, pod::PromptLoader::builtins_only()) + .await; match result { - Err(PodError::SegmentScopeMissing { segment_id }) => assert_eq!(segment_id, id), + Err(PodError::SegmentScopeMissing { segment_id }) => assert_eq!(segment_id, segid), Err(other) => panic!("expected SegmentScopeMissing, got {other:?}"), Ok(_) => panic!("expected missing scope snapshot to fail"), } diff --git a/crates/pod/tests/session_metrics_test.rs b/crates/pod/tests/session_metrics_test.rs index 659f47d5..4b2ba875 100644 --- a/crates/pod/tests/session_metrics_test.rs +++ b/crates/pod/tests/session_metrics_test.rs @@ -26,7 +26,7 @@ use llm_worker::llm_client::event::{Event as LlmEvent, ResponseStatus, StatusEve use llm_worker::llm_client::{ClientError, LlmClient, Request}; use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput}; use session_metrics::{DOMAIN, Metric, metrics_from_extensions}; -use session_store::{FsStore, LogEntry, SegmentId, Store, StoreError, TraceEntry}; +use session_store::{FsStore, LogEntry, SegmentId, SessionId, Store, StoreError, TraceEntry}; use pod::{Pod, PodManifest}; @@ -200,6 +200,7 @@ async fn prune_metrics_emit_skip_then_fire_with_post_request_join() { text_response_with_cache("done", 1234, 50), ]); let (mut pod, _store_tmp, _pwd_tmp) = make_pod(manifest_toml(1, 1), client, "big_tool").await; + let session_id = pod.session_id(); let segment_id = pod.segment_id(); // Cloning the store handle to read the session log back after the // runs complete — the Pod retains its own copy. @@ -208,7 +209,7 @@ async fn prune_metrics_emit_skip_then_fire_with_post_request_join() { pod.run_text("first").await.unwrap(); pod.run_text("second").await.unwrap(); - let state = session_store::restore(&store, segment_id).unwrap(); + let state = session_store::restore(&store, session_id, segment_id).unwrap(); let metrics = metrics_from_extensions(&state.extensions); // Run 1 has 2 LLM iterations (tool loop), each evaluates prune with @@ -288,13 +289,14 @@ async fn prune_metrics_record_below_min_savings_skip() { ]); let (mut pod, _store_tmp, _pwd_tmp) = make_pod(manifest_toml(1, u64::MAX), client, "big_tool").await; + let session_id = pod.session_id(); let segment_id = pod.segment_id(); let store = pod.store().clone(); pod.run_text("first").await.unwrap(); pod.run_text("second").await.unwrap(); - let state = session_store::restore(&store, segment_id).unwrap(); + let state = session_store::restore(&store, session_id, segment_id).unwrap(); let metrics = metrics_from_extensions(&state.extensions); let below = metrics .iter() @@ -327,31 +329,60 @@ struct MetricFailingStore { } impl Store for MetricFailingStore { - fn append(&self, id: SegmentId, entry: &LogEntry) -> Result<(), StoreError> { + fn append( + &self, + session_id: SessionId, + segment_id: SegmentId, + entry: &LogEntry, + ) -> Result<(), StoreError> { if let LogEntry::Extension { domain, .. } = entry { if domain == DOMAIN { return Err(StoreError::Io(std::io::Error::other("synthetic failure"))); } } - self.inner.append(id, entry) + self.inner.append(session_id, segment_id, entry) } - fn read_all(&self, id: SegmentId) -> Result, StoreError> { - self.inner.read_all(id) + fn read_all( + &self, + session_id: SessionId, + segment_id: SegmentId, + ) -> Result, StoreError> { + self.inner.read_all(session_id, segment_id) } - fn list_segments(&self) -> Result, StoreError> { - self.inner.list_segments() + fn list_sessions(&self) -> Result, StoreError> { + self.inner.list_sessions() } - fn create_segment(&self, id: SegmentId, entries: &[LogEntry]) -> Result<(), StoreError> { - self.inner.create_segment(id, entries) + fn list_segments(&self, session_id: SessionId) -> Result, StoreError> { + self.inner.list_segments(session_id) } - fn exists(&self, id: SegmentId) -> Result { - self.inner.exists(id) + fn lookup_session_of(&self, segment_id: SegmentId) -> Result, StoreError> { + self.inner.lookup_session_of(segment_id) } - fn read_entry_count(&self, id: SegmentId) -> Result { - self.inner.read_entry_count(id) + fn create_segment( + &self, + session_id: SessionId, + segment_id: SegmentId, + entries: &[LogEntry], + ) -> Result<(), StoreError> { + self.inner.create_segment(session_id, segment_id, entries) } - fn append_trace(&self, id: SegmentId, entry: &TraceEntry) -> Result<(), StoreError> { - self.inner.append_trace(id, entry) + fn exists(&self, session_id: SessionId, segment_id: SegmentId) -> Result { + self.inner.exists(session_id, segment_id) + } + fn read_entry_count( + &self, + session_id: SessionId, + segment_id: SegmentId, + ) -> Result { + self.inner.read_entry_count(session_id, segment_id) + } + fn append_trace( + &self, + session_id: SessionId, + segment_id: SegmentId, + entry: &TraceEntry, + ) -> Result<(), StoreError> { + self.inner.append_trace(session_id, segment_id, entry) } } @@ -386,12 +417,13 @@ async fn metric_write_failure_emits_warn_alert_and_does_not_abort_run() { let alerter = pod::Alerter::new(tx); pod.attach_alerter(alerter); + let session_id = pod.session_id(); let segment_id = pod.segment_id(); // Run completes successfully despite metric failure. pod.run_text("hello").await.unwrap(); // No metrics ended up in the log (writes were rejected). - let state = session_store::restore(&store, segment_id).unwrap(); + let state = session_store::restore(&store, session_id, segment_id).unwrap(); let metrics = metrics_from_extensions(&state.extensions); assert!(metrics.is_empty(), "metrics must drop on write failure"); @@ -446,10 +478,11 @@ permission = "write" let mut pod = Pod::new(manifest, worker, store.clone(), pwd, scope) .await .unwrap(); + let session_id = pod.session_id(); let segment_id = pod.segment_id(); pod.run_text("hello").await.unwrap(); - let state = session_store::restore(&store, segment_id).unwrap(); + let state = session_store::restore(&store, session_id, segment_id).unwrap(); let metrics = metrics_from_extensions(&state.extensions); assert!( metrics.is_empty(), diff --git a/crates/pod/tests/system_prompt_template_test.rs b/crates/pod/tests/system_prompt_template_test.rs index 98dd7328..8433968a 100644 --- a/crates/pod/tests/system_prompt_template_test.rs +++ b/crates/pod/tests/system_prompt_template_test.rs @@ -182,7 +182,10 @@ async fn session_start_state_captures_rendered_prompt() { .unwrap(); pod.run_text("hi").await.unwrap(); - let entries = pod.store().read_all(pod.segment_id()).unwrap(); + let entries = pod + .store() + .read_all(pod.session_id(), pod.segment_id()) + .unwrap(); let first = entries.first().expect("at least one entry"); match first { LogEntry::SegmentStart { system_prompt, .. } => { diff --git a/crates/session-metrics/src/lib.rs b/crates/session-metrics/src/lib.rs index eb0d9c7a..f7deb9db 100644 --- a/crates/session-metrics/src/lib.rs +++ b/crates/session-metrics/src/lib.rs @@ -18,7 +18,7 @@ use std::collections::BTreeMap; use serde::{Deserialize, Serialize}; -use session_store::{SegmentId, Store, StoreError, save_extension, segment_log}; +use session_store::{SegmentId, SessionId, Store, StoreError, save_extension, segment_log}; /// Domain tag used in `LogEntry::Extension` for all metrics records. pub const DOMAIN: &str = "metrics"; @@ -77,11 +77,12 @@ impl Metric { /// (メトリクスのために本体処理を止めるかは呼び出し側の判断)。 pub fn record_metric( store: &impl Store, + session_id: SessionId, segment_id: SegmentId, metric: &Metric, ) -> Result<(), StoreError> { let payload = serde_json::to_value(metric).expect("Metric serialization cannot fail"); - save_extension(store, segment_id, DOMAIN, payload) + save_extension(store, session_id, segment_id, DOMAIN, payload) } /// `RestoredState.extensions` から metrics domain の payload を順に取り出し、 diff --git a/crates/session-store/src/fs_store.rs b/crates/session-store/src/fs_store.rs index 50dde365..076455e0 100644 --- a/crates/session-store/src/fs_store.rs +++ b/crates/session-store/src/fs_store.rs @@ -1,13 +1,16 @@ //! Filesystem-backed JSONL store. //! //! Layout: -//! - Segment log: `{root}/{segment_id}.jsonl` -//! - Event trace: `{root}/{segment_id}.trace.jsonl` +//! - Segment log: `{root}/{session_id}/{segment_id}.jsonl` +//! - Event trace: `{root}/{session_id}/{segment_id}.trace.jsonl` +//! +//! The per-Session directory makes `list_segments(session_id)` an O(dir) +//! scan and gives the fork tree a visible grouping in the filesystem. -use crate::SegmentId; use crate::event_trace::TraceEntry; use crate::segment_log::LogEntry; use crate::store::{Store, StoreError}; +use crate::{SegmentId, SessionId}; use std::fs; use std::io::Write; use std::path::{Path, PathBuf}; @@ -30,15 +33,24 @@ impl FsStore { Ok(Self { root }) } - fn log_path(&self, id: SegmentId) -> PathBuf { - self.root.join(format!("{id}.jsonl")) + fn session_dir(&self, session_id: SessionId) -> PathBuf { + self.root.join(session_id.to_string()) } - fn trace_path(&self, id: SegmentId) -> PathBuf { - self.root.join(format!("{id}.trace.jsonl")) + fn log_path(&self, session_id: SessionId, segment_id: SegmentId) -> PathBuf { + self.session_dir(session_id) + .join(format!("{segment_id}.jsonl")) + } + + fn trace_path(&self, session_id: SessionId, segment_id: SegmentId) -> PathBuf { + self.session_dir(session_id) + .join(format!("{segment_id}.trace.jsonl")) } fn append_line(&self, path: &Path, line: &str) -> Result<(), StoreError> { + if let Some(parent) = path.parent() { + fs::create_dir_all(parent)?; + } let mut file = fs::OpenOptions::new().create(true).append(true).open(path)?; file.write_all(line.as_bytes())?; file.write_all(b"\n")?; @@ -65,23 +77,56 @@ impl FsStore { } impl Store for FsStore { - fn append(&self, id: SegmentId, entry: &LogEntry) -> Result<(), StoreError> { + fn append( + &self, + session_id: SessionId, + segment_id: SegmentId, + entry: &LogEntry, + ) -> Result<(), StoreError> { let line = serde_json::to_string(entry)?; - self.append_line(&self.log_path(id), &line) + self.append_line(&self.log_path(session_id, segment_id), &line) } - fn read_all(&self, id: SegmentId) -> Result, StoreError> { - let path = self.log_path(id); + fn read_all( + &self, + session_id: SessionId, + segment_id: SegmentId, + ) -> Result, StoreError> { + let path = self.log_path(session_id, segment_id); if !path.exists() { - return Err(StoreError::NotFound(id)); + return Err(StoreError::NotFound(segment_id)); } let content = fs::read_to_string(&path)?; Self::parse_jsonl(&content) } - fn list_segments(&self) -> Result, StoreError> { - let mut segments = Vec::new(); + fn list_sessions(&self) -> Result, StoreError> { + let mut sessions = Vec::new(); + if !self.root.exists() { + return Ok(sessions); + } for entry in fs::read_dir(&self.root)? { + let entry = entry?; + if !entry.file_type()?.is_dir() { + continue; + } + if let Some(name) = entry.file_name().to_str() { + if let Ok(id) = name.parse::() { + sessions.push(id); + } + } + } + sessions.sort_by(|a, b| b.cmp(a)); + Ok(sessions) + } + + fn list_segments(&self, session_id: SessionId) -> Result, StoreError> { + let dir = self.session_dir(session_id); + let mut segments = Vec::new(); + if !dir.exists() { + return Ok(segments); + } + for entry in fs::read_dir(&dir)? { let entry = entry?; let path = entry.path(); // Only match .jsonl files, not .trace.jsonl @@ -98,8 +143,36 @@ impl Store for FsStore { Ok(segments) } - fn create_segment(&self, id: SegmentId, entries: &[LogEntry]) -> Result<(), StoreError> { - let path = self.log_path(id); + fn lookup_session_of(&self, segment_id: SegmentId) -> Result, StoreError> { + if !self.root.exists() { + return Ok(None); + } + let needle = format!("{segment_id}.jsonl"); + for entry in fs::read_dir(&self.root)? { + let entry = entry?; + if !entry.file_type()?.is_dir() { + continue; + } + if entry.path().join(&needle).exists() + && let Some(name) = entry.file_name().to_str() + && let Ok(id) = name.parse::() + { + return Ok(Some(id)); + } + } + Ok(None) + } + + fn create_segment( + &self, + session_id: SessionId, + segment_id: SegmentId, + entries: &[LogEntry], + ) -> Result<(), StoreError> { + let path = self.log_path(session_id, segment_id); + if let Some(parent) = path.parent() { + fs::create_dir_all(parent)?; + } let mut content = String::new(); for entry in entries { content.push_str(&serde_json::to_string(entry)?); @@ -109,21 +182,30 @@ impl Store for FsStore { Ok(()) } - fn exists(&self, id: SegmentId) -> Result { - Ok(self.log_path(id).exists()) + fn exists(&self, session_id: SessionId, segment_id: SegmentId) -> Result { + Ok(self.log_path(session_id, segment_id).exists()) } - fn read_entry_count(&self, id: SegmentId) -> Result { - let path = self.log_path(id); + fn read_entry_count( + &self, + session_id: SessionId, + segment_id: SegmentId, + ) -> Result { + let path = self.log_path(session_id, segment_id); if !path.exists() { - return Err(StoreError::NotFound(id)); + return Err(StoreError::NotFound(segment_id)); } let content = fs::read_to_string(&path)?; Ok(content.lines().filter(|l| !l.trim().is_empty()).count()) } - fn append_trace(&self, id: SegmentId, entry: &TraceEntry) -> Result<(), StoreError> { + fn append_trace( + &self, + session_id: SessionId, + segment_id: SegmentId, + entry: &TraceEntry, + ) -> Result<(), StoreError> { let line = serde_json::to_string(entry)?; - self.append_line(&self.trace_path(id), &line) + self.append_line(&self.trace_path(session_id, segment_id), &line) } } diff --git a/crates/session-store/src/lib.rs b/crates/session-store/src/lib.rs index 3e5df709..d2754081 100644 --- a/crates/session-store/src/lib.rs +++ b/crates/session-store/src/lib.rs @@ -1,10 +1,14 @@ -//! Segment persistence via append-only JSONL logs. +//! Session persistence via append-only JSONL logs. //! //! # Architecture //! -//! Sessions are recorded as a sequence of [`LogEntry`] values, one per line -//! in a `.jsonl` file. Reading the log and collecting entries reconstructs -//! the full Worker state — no separate snapshots or checkpoints needed. +//! A [`Session`](SessionId) is a fork-tree of [`Segment`](SegmentId)s +//! belonging to the same logical conversation. Each Segment is recorded +//! as a sequence of [`LogEntry`] values, one per line in a `.jsonl` +//! file. Reading a segment log and collecting entries reconstructs the +//! Worker state at that segment — no separate snapshots or checkpoints +//! needed. Compaction and fork operations mint a fresh Segment within +//! the same Session. //! //! This crate provides free functions for persistence operations. //! The caller (typically Pod) holds the Worker directly and calls these @@ -19,7 +23,7 @@ //! use session_store::{create_segment, restore, save_delta, FsStore, SegmentStartState}; //! //! let store = FsStore::new("./sessions")?; -//! let segment_id = create_segment(&store, SegmentStartState { +//! let (session_id, segment_id) = create_segment(&store, SegmentStartState { //! system_prompt: None, //! config: &config, //! history: &[], @@ -41,9 +45,10 @@ pub use llm_worker::llm_client::types::{ContentPart, Item, Role}; pub use logged_item::{LoggedContentPart, LoggedItem, LoggedRole, from_logged, to_logged}; pub use segment::{ SegmentStartState, append_entry, append_system_item, classify_history_item, - create_compacted_segment, create_segment, create_segment_with_id, ensure_head_or_fork, fork, - fork_at, restore, save_config_changed, save_delta, save_extension, save_pod_scope, - save_run_completed, save_run_errored, save_turn_end, save_usage, save_user_input, + create_compacted_segment, create_segment, create_segment_with_ids, ensure_head_or_fork, fork, + fork_at, restore, restore_by_segment, save_config_changed, save_delta, save_extension, + save_pod_scope, save_run_completed, save_run_errored, save_turn_end, save_usage, + save_user_input, }; pub use segment_log::{ LogEntry, POD_SCOPE_EXTENSION_DOMAIN, PodScopeSnapshot, RestoredState, SegmentOrigin, @@ -52,9 +57,21 @@ pub use segment_log::{ pub use system_item::{SystemItem, render_pod_event}; pub use store::{Store, StoreError}; +/// Session identifier — the fork-tree root. UUID v7 (time-ordered). +/// +/// All Segments belonging to the same Session share this ID. Compaction +/// and fork operations create a new Segment within the same Session, so +/// `WHERE session_id = ?` retrieves the full lineage. +pub type SessionId = uuid::Uuid; + /// Segment identifier. UUID v7 (time-ordered, lexicographically sortable). pub type SegmentId = uuid::Uuid; +/// Generate a new session ID. +pub fn new_session_id() -> SessionId { + uuid::Uuid::now_v7() +} + /// Generate a new segment ID. pub fn new_segment_id() -> SegmentId { uuid::Uuid::now_v7() diff --git a/crates/session-store/src/segment.rs b/crates/session-store/src/segment.rs index 07d6daf9..00b62f65 100644 --- a/crates/session-store/src/segment.rs +++ b/crates/session-store/src/segment.rs @@ -4,7 +4,7 @@ //! The caller (typically Pod) holds the Worker directly and calls these //! functions after state-mutating operations. -use crate::SegmentId; +use crate::{SegmentId, SessionId}; use crate::logged_item::{LoggedItem, to_logged}; use crate::segment_log::{self, LogEntry, PodScopeSnapshot, SegmentOrigin}; use crate::store::{Store, StoreError}; @@ -21,38 +21,43 @@ pub struct SegmentStartState<'a> { pub history: &'a [Item], } -/// Create a new segment, writing the initial `SegmentStart` entry. +/// Create a new session + initial segment, writing the initial +/// `SegmentStart` entry. Returns the freshly minted `(session_id, segment_id)`. pub fn create_segment( store: &impl Store, state: SegmentStartState<'_>, -) -> Result { +) -> Result<(SessionId, SegmentId), StoreError> { + let session_id = crate::new_session_id(); let segment_id = crate::new_segment_id(); - create_segment_with_id(store, segment_id, state)?; - Ok(segment_id) + create_segment_with_ids(store, session_id, segment_id, state)?; + Ok((session_id, segment_id)) } -/// Write a fresh `SegmentStart` entry using a pre-generated segment ID. +/// Write a fresh `SegmentStart` entry using pre-generated IDs. /// -/// Used by callers that need to reserve a segment ID synchronously but -/// defer the initial log append (e.g. Pod, which resolves a templated -/// system prompt only at first turn). -pub fn create_segment_with_id( +/// Used by callers that need to reserve `(session_id, segment_id)` +/// synchronously but defer the initial log append (e.g. Pod, which +/// resolves a templated system prompt only at first turn). +pub fn create_segment_with_ids( store: &impl Store, + session_id: SessionId, segment_id: SegmentId, state: SegmentStartState<'_>, ) -> Result<(), StoreError> { let entry = LogEntry::SegmentStart { ts: segment_log::now_millis(), + session_id, system_prompt: state.system_prompt.map(String::from), config: state.config.clone(), history: to_logged(state.history), forked_from: None, compacted_from: None, }; - store.append(segment_id, &entry) + store.append(session_id, segment_id, &entry) } -/// Create a compacted segment from an existing one. +/// Create a compacted segment from an existing one. Inherits the source's +/// `session_id` so the compacted lineage stays within the same Session. /// /// Records `compacted_from` provenance linking back to the source segment /// at the turn boundary captured by `source_turn_count` (the most recent @@ -60,12 +65,14 @@ pub fn create_segment_with_id( pub fn create_compacted_segment( store: &impl Store, state: SegmentStartState<'_>, + source_session_id: SessionId, source_segment_id: SegmentId, source_turn_count: usize, ) -> Result { let segment_id = crate::new_segment_id(); let entry = LogEntry::SegmentStart { ts: segment_log::now_millis(), + session_id: source_session_id, system_prompt: state.system_prompt.map(String::from), config: state.config.clone(), history: to_logged(state.history), @@ -75,7 +82,7 @@ pub fn create_compacted_segment( at_turn_index: source_turn_count, }), }; - store.append(segment_id, &entry)?; + store.append(source_session_id, segment_id, &entry)?; Ok(segment_id) } @@ -85,36 +92,54 @@ pub fn create_compacted_segment( /// applying it to a Worker. pub fn restore( store: &impl Store, + session_id: SessionId, segment_id: SegmentId, ) -> Result { - let entries = store.read_all(segment_id)?; + let entries = store.read_all(session_id, segment_id)?; Ok(segment_log::collect_state(&entries)) } +/// Restore segment state when only the segment ID is known. Uses +/// [`Store::lookup_session_of`] to resolve the parent Session. +/// +/// Shim for legacy entry points (`pod-cli --session ` etc.) that +/// receive a Segment ID without a Session ID. +pub fn restore_by_segment( + store: &impl Store, + segment_id: SegmentId, +) -> Result { + let session_id = store + .lookup_session_of(segment_id)? + .ok_or(StoreError::NotFound(segment_id))?; + restore(store, session_id, segment_id) +} + /// Check if the store's entry count still matches the writer's tally. -/// If not, auto-fork into a new segment. +/// If not, auto-fork into a new segment within the same Session. /// /// Updates `segment_id` and `entries_written` in place when a fork occurs. pub fn ensure_head_or_fork( store: &impl Store, + session_id: SessionId, segment_id: &mut SegmentId, entries_written: &mut usize, state: SegmentStartState<'_>, ) -> Result<(), StoreError> { - let store_count = store.read_entry_count(*segment_id)?; + let store_count = store.read_entry_count(session_id, *segment_id)?; if store_count == *entries_written { return Ok(()); } let fork_id = crate::new_segment_id(); let entry = LogEntry::SegmentStart { ts: segment_log::now_millis(), + session_id, system_prompt: state.system_prompt.map(String::from), config: state.config.clone(), history: to_logged(state.history), forked_from: None, compacted_from: None, }; - store.create_segment(fork_id, &[entry])?; + store.create_segment(session_id, fork_id, &[entry])?; *segment_id = fork_id; *entries_written = 1; Ok(()) @@ -128,11 +153,13 @@ pub fn ensure_head_or_fork( /// [`Segment::flatten_to_text`]. pub fn save_user_input( store: &impl Store, + session_id: SessionId, segment_id: SegmentId, segments: Vec, ) -> Result<(), StoreError> { append_entry( store, + session_id, segment_id, LogEntry::UserInput { ts: segment_log::now_millis(), @@ -151,6 +178,7 @@ pub fn save_user_input( /// `UserInput` entry. pub fn save_delta( store: &impl Store, + session_id: SessionId, segment_id: SegmentId, new_items: &[Item], ) -> Result<(), StoreError> { @@ -165,7 +193,7 @@ pub fn save_delta( continue; } let entry = classify_history_item(item, ts); - append_entry(store, segment_id, entry)?; + append_entry(store, session_id, segment_id, entry)?; } Ok(()) } @@ -199,11 +227,13 @@ pub fn classify_history_item(item: &Item, ts: u64) -> LogEntry { /// commit shape used for assistant / tool result entries. pub fn append_system_item( store: &impl Store, + session_id: SessionId, segment_id: SegmentId, item: SystemItem, ) -> Result<(), StoreError> { append_entry( store, + session_id, segment_id, LogEntry::SystemItem { ts: segment_log::now_millis(), @@ -215,11 +245,13 @@ pub fn append_system_item( /// Log a TurnEnd entry. pub fn save_turn_end( store: &impl Store, + session_id: SessionId, segment_id: SegmentId, turn_count: usize, ) -> Result<(), StoreError> { append_entry( store, + session_id, segment_id, LogEntry::TurnEnd { ts: segment_log::now_millis(), @@ -231,12 +263,14 @@ pub fn save_turn_end( /// Log a `RunCompleted` entry — `run()` / `resume()` returned `Ok(WorkerResult)`. pub fn save_run_completed( store: &impl Store, + session_id: SessionId, segment_id: SegmentId, result: WorkerResult, interrupted: bool, ) -> Result<(), StoreError> { append_entry( store, + session_id, segment_id, LogEntry::RunCompleted { ts: segment_log::now_millis(), @@ -252,12 +286,14 @@ pub fn save_run_completed( /// `to_string()` rendering as `message`. pub fn save_run_errored( store: &impl Store, + session_id: SessionId, segment_id: SegmentId, message: String, interrupted: bool, ) -> Result<(), StoreError> { append_entry( store, + session_id, segment_id, LogEntry::RunErrored { ts: segment_log::now_millis(), @@ -275,6 +311,7 @@ pub fn save_run_errored( /// 済ませた値を渡す。 pub fn save_usage( store: &impl Store, + session_id: SessionId, segment_id: SegmentId, history_len: usize, input_total_tokens: u64, @@ -284,6 +321,7 @@ pub fn save_usage( ) -> Result<(), StoreError> { append_entry( store, + session_id, segment_id, LogEntry::LlmUsage { ts: segment_log::now_millis(), @@ -303,12 +341,14 @@ pub fn save_usage( /// Use `RestoredState.extensions` to read entries back at restore time. pub fn save_extension( store: &impl Store, + session_id: SessionId, segment_id: SegmentId, domain: impl Into, payload: serde_json::Value, ) -> Result<(), StoreError> { append_entry( store, + session_id, segment_id, LogEntry::Extension { ts: segment_log::now_millis(), @@ -321,12 +361,14 @@ pub fn save_extension( /// Log the Pod's latest runtime scope snapshot. pub fn save_pod_scope( store: &impl Store, + session_id: SessionId, segment_id: SegmentId, snapshot: &PodScopeSnapshot, ) -> Result<(), StoreError> { let payload = serde_json::to_value(snapshot)?; save_extension( store, + session_id, segment_id, segment_log::POD_SCOPE_EXTENSION_DOMAIN, payload, @@ -336,11 +378,13 @@ pub fn save_pod_scope( /// Log a `ConfigChanged` entry. pub fn save_config_changed( store: &impl Store, + session_id: SessionId, segment_id: SegmentId, config: &RequestConfig, ) -> Result<(), StoreError> { append_entry( store, + session_id, segment_id, LogEntry::ConfigChanged { ts: segment_log::now_millis(), @@ -349,22 +393,33 @@ pub fn save_config_changed( ) } -/// Fork the current state into a new segment. -pub fn fork(store: &impl Store, state: SegmentStartState<'_>) -> Result { +/// Fork the current state into a brand-new Session (no parent lineage). +/// +/// Use this for "start a fresh conversation from this state" — the +/// returned segment does not share `session_id` with any prior segment. +/// In-Session forks (live auto-fork / past-turn fork) go through +/// [`fork_at`] or [`ensure_head_or_fork`] instead. +pub fn fork( + store: &impl Store, + state: SegmentStartState<'_>, +) -> Result<(SessionId, SegmentId), StoreError> { + let session_id = crate::new_session_id(); let fork_id = crate::new_segment_id(); let entry = LogEntry::SegmentStart { ts: segment_log::now_millis(), + session_id, system_prompt: state.system_prompt.map(String::from), config: state.config.clone(), history: to_logged(state.history), forked_from: None, compacted_from: None, }; - store.create_segment(fork_id, &[entry])?; - Ok(fork_id) + store.create_segment(session_id, fork_id, &[entry])?; + Ok((session_id, fork_id)) } -/// Fork from a turn boundary in a stored segment log. +/// Fork from a turn boundary in a stored segment log, keeping the new +/// segment in the same Session as `source_id`. /// /// `at_turn_index` is the `turn_count` of the most recent completed /// `TurnEnd` in the source segment that the fork should branch from. @@ -372,10 +427,11 @@ pub fn fork(store: &impl Store, state: SegmentStartState<'_>) -> Result Result { - let entries = store.read_all(source_id)?; + let entries = store.read_all(source_session_id, source_id)?; let cut = if at_turn_index == 0 { // Branch directly after the SegmentStart (or whatever opens the // segment), before any turn completes. @@ -395,6 +451,7 @@ pub fn fork_at( let fork_id = crate::new_segment_id(); let entry = LogEntry::SegmentStart { ts: segment_log::now_millis(), + session_id: source_session_id, system_prompt: state.system_prompt, config: state.config, history: to_logged(&state.history), @@ -404,7 +461,7 @@ pub fn fork_at( }), compacted_from: None, }; - store.create_segment(fork_id, &[entry])?; + store.create_segment(source_session_id, fork_id, &[entry])?; Ok(fork_id) } @@ -415,8 +472,9 @@ pub fn fork_at( /// it needs the same value for an in-memory mirror + broadcast). pub fn append_entry( store: &impl Store, + session_id: SessionId, segment_id: SegmentId, entry: LogEntry, ) -> Result<(), StoreError> { - store.append(segment_id, &entry) + store.append(session_id, segment_id, &entry) } diff --git a/crates/session-store/src/segment_log.rs b/crates/session-store/src/segment_log.rs index 15835a63..b26626db 100644 --- a/crates/session-store/src/segment_log.rs +++ b/crates/session-store/src/segment_log.rs @@ -37,13 +37,19 @@ pub enum LogEntry { /// For forked segments, `history` contains the seed state from the parent. SegmentStart { ts: u64, + /// Session this segment belongs to. Compaction / fork inherits + /// the source segment's session_id; only fresh "new conversation" + /// segments mint a new session_id. + session_id: crate::SessionId, system_prompt: Option, config: RequestConfig, history: Vec, - /// Origin: forked from another segment at a specific turn boundary. + /// Origin: forked from a sibling segment at a specific turn boundary. + /// The referenced segment is guaranteed to share `session_id`. #[serde(default, skip_serializing_if = "Option::is_none")] forked_from: Option, - /// Origin: compacted from another segment at a specific turn boundary. + /// Origin: compacted from a sibling segment at a specific turn boundary. + /// The referenced segment is guaranteed to share `session_id`. #[serde(default, skip_serializing_if = "Option::is_none")] compacted_from: Option, }, @@ -190,6 +196,10 @@ pub struct PodScopeSnapshot { /// State collected from log entries. #[derive(Debug, Clone)] pub struct RestoredState { + /// Session the replayed segment belongs to. Sourced from the + /// `SegmentStart` entry; `None` only if the log was empty (in which + /// case `entries_count == 0`). + pub session_id: Option, pub system_prompt: Option, pub config: RequestConfig, pub history: Vec, @@ -221,6 +231,7 @@ pub struct RestoredState { /// Replay a sequence of log entries to reconstruct worker state. pub fn collect_state(entries: &[LogEntry]) -> RestoredState { let mut state = RestoredState { + session_id: None, system_prompt: None, config: RequestConfig::default(), history: Vec::new(), @@ -238,11 +249,13 @@ pub fn collect_state(entries: &[LogEntry]) -> RestoredState { match entry { LogEntry::SegmentStart { + session_id, system_prompt, config, history, .. } => { + state.session_id = Some(*session_id); state.system_prompt = system_prompt.clone(); state.config = config.clone(); state.history = history.iter().cloned().map(Item::from).collect(); @@ -354,6 +367,7 @@ mod tests { fn replay_segment_start_sets_initial_state() { let state = collect_state(&[LogEntry::SegmentStart { ts: 1000, + session_id: uuid::Uuid::nil(), system_prompt: Some("You are helpful.".into()), config: RequestConfig::default().with_max_tokens(1024), history: vec![Item::user_message("seed").into()], @@ -371,6 +385,7 @@ mod tests { let state = collect_state(&[ LogEntry::SegmentStart { ts: 1000, + session_id: uuid::Uuid::nil(), system_prompt: None, config: RequestConfig::default(), history: vec![], @@ -405,6 +420,7 @@ mod tests { let state = collect_state(&[ LogEntry::SegmentStart { ts: 1000, + session_id: uuid::Uuid::nil(), system_prompt: None, config: RequestConfig::default(), history: vec![], @@ -442,6 +458,7 @@ mod tests { let state = collect_state(&[ LogEntry::SegmentStart { ts: 1000, + session_id: uuid::Uuid::nil(), system_prompt: None, config: RequestConfig::default(), history: vec![], @@ -461,6 +478,7 @@ mod tests { let state = collect_state(&[ LogEntry::SegmentStart { ts: 1000, + session_id: uuid::Uuid::nil(), system_prompt: None, config: RequestConfig::default(), history: vec![], @@ -507,6 +525,7 @@ mod tests { let state = collect_state(&[ LogEntry::SegmentStart { ts: 1000, + session_id: uuid::Uuid::nil(), system_prompt: None, config: RequestConfig::default(), history: vec![], @@ -578,6 +597,7 @@ mod tests { let state = collect_state(&[ LogEntry::SegmentStart { ts: 0, + session_id: uuid::Uuid::nil(), system_prompt: None, config: RequestConfig::default(), history: vec![], @@ -610,6 +630,7 @@ mod tests { let state = collect_state(&[ LogEntry::SegmentStart { ts: 1000, + session_id: uuid::Uuid::nil(), system_prompt: None, config: RequestConfig::default(), history: vec![], @@ -693,6 +714,7 @@ mod tests { let state = collect_state(&[ LogEntry::SegmentStart { ts: 1, + session_id: uuid::Uuid::nil(), system_prompt: None, config: RequestConfig::default(), history: vec![], diff --git a/crates/session-store/src/store.rs b/crates/session-store/src/store.rs index 1406571e..0896395d 100644 --- a/crates/session-store/src/store.rs +++ b/crates/session-store/src/store.rs @@ -1,7 +1,8 @@ //! Persistence backend abstraction. //! -//! [`Store`] defines the sync interface for reading and writing segment logs. -//! Implementations handle the physical storage (filesystem, database, etc.). +//! [`Store`] defines the sync interface for reading and writing segment logs +//! within a [`Session`](crate::SessionId). Implementations handle the +//! physical storage (filesystem, database, etc.). //! //! Sync (rather than async) is intentional: a segment log append is a single //! `< 1 KiB` line on local fs and completes well below a millisecond. Going @@ -10,9 +11,9 @@ //! drain task. Keeping the store sync lets the worker callback, Pod commit //! paths, and `PodInterceptor` all share one direct `append_entry` call. -use crate::SegmentId; use crate::event_trace::TraceEntry; use crate::segment_log::LogEntry; +use crate::{SegmentId, SessionId}; /// Errors from the persistence store. #[derive(Debug, thiserror::Error)] @@ -33,33 +34,67 @@ pub enum StoreError { /// Sync persistence backend for segment logs. /// /// All methods take `&self` — implementations should use interior mutability -/// (e.g., append-mode file handles) when needed. +/// (e.g., append-mode file handles) when needed. Most read/write methods +/// take `(SessionId, SegmentId)` so segments can be physically grouped +/// per Session on disk (or per session_id in a DB). pub trait Store: Send + Sync { /// Append a single log entry to the segment log. /// /// One line per call. The kernel orders concurrent `O_APPEND` writes /// for lines < `PIPE_BUF`, so user-space serialization is unnecessary. - fn append(&self, id: SegmentId, entry: &LogEntry) -> Result<(), StoreError>; + fn append( + &self, + session_id: SessionId, + segment_id: SegmentId, + entry: &LogEntry, + ) -> Result<(), StoreError>; /// Read all log entries for a segment, in order. - fn read_all(&self, id: SegmentId) -> Result, StoreError>; + fn read_all( + &self, + session_id: SessionId, + segment_id: SegmentId, + ) -> Result, StoreError>; - /// List all segment IDs, most recent first. - fn list_segments(&self) -> Result, StoreError>; + /// List all session IDs, most recent first. + fn list_sessions(&self) -> Result, StoreError>; - /// Create a new segment with initial entries. - fn create_segment(&self, id: SegmentId, entries: &[LogEntry]) -> Result<(), StoreError>; + /// List segment IDs belonging to `session_id`, most recent first. + fn list_segments(&self, session_id: SessionId) -> Result, StoreError>; + + /// Look up which session a given segment belongs to. Returns `None` + /// when the segment is not known to any session. Implementations + /// may scan storage; intended for shim entry points that receive a + /// segment ID without its session ID (e.g. legacy `--session `). + fn lookup_session_of(&self, segment_id: SegmentId) -> Result, StoreError>; + + /// Create a new segment within `session_id`, with initial entries. + fn create_segment( + &self, + session_id: SessionId, + segment_id: SegmentId, + entries: &[LogEntry], + ) -> Result<(), StoreError>; /// Check if a segment exists. - fn exists(&self, id: SegmentId) -> Result; + fn exists(&self, session_id: SessionId, segment_id: SegmentId) -> Result; /// Count entries currently stored for a segment. /// /// Used by `ensure_head_or_fork` to detect concurrent writers: /// if the on-disk count exceeds the writer's own append tally, /// another process has extended the log. - fn read_entry_count(&self, id: SegmentId) -> Result; + fn read_entry_count( + &self, + session_id: SessionId, + segment_id: SegmentId, + ) -> Result; /// Append a trace entry to the debug event trace file. - fn append_trace(&self, id: SegmentId, entry: &TraceEntry) -> Result<(), StoreError>; + fn append_trace( + &self, + session_id: SessionId, + segment_id: SegmentId, + entry: &TraceEntry, + ) -> Result<(), StoreError>; } diff --git a/crates/session-store/tests/fs_store_test.rs b/crates/session-store/tests/fs_store_test.rs index 3fca4113..6cb60963 100644 --- a/crates/session-store/tests/fs_store_test.rs +++ b/crates/session-store/tests/fs_store_test.rs @@ -1,16 +1,32 @@ use llm_worker::WorkerResult; use llm_worker::llm_client::types::{Item, RequestConfig}; -use session_store::{FsStore, LogEntry, Store, TraceEntry, collect_state, new_segment_id}; +use session_store::{ + FsStore, LogEntry, Store, TraceEntry, collect_state, new_segment_id, new_session_id, +}; + +fn nil_session_start(ts: u64, session_id: uuid::Uuid) -> LogEntry { + LogEntry::SegmentStart { + ts, + session_id, + system_prompt: None, + config: RequestConfig::default(), + history: vec![], + forked_from: None, + compacted_from: None, + } +} #[test] fn round_trip_write_and_read() { let dir = tempfile::tempdir().unwrap(); let store = FsStore::new(dir.path()).unwrap(); - let id = new_segment_id(); + let sid = new_session_id(); + let segid = new_segment_id(); let entries = vec![ LogEntry::SegmentStart { ts: 1000, + session_id: sid, system_prompt: Some("You are helpful.".into()), config: RequestConfig::default().with_max_tokens(1024), history: vec![], @@ -37,13 +53,14 @@ fn round_trip_write_and_read() { ]; for entry in &entries { - store.append(id, entry).unwrap(); + store.append(sid, segid, entry).unwrap(); } - let read_back = store.read_all(id).unwrap(); + let read_back = store.read_all(sid, segid).unwrap(); assert_eq!(read_back.len(), entries.len()); let state = collect_state(&read_back); + assert_eq!(state.session_id, Some(sid)); assert_eq!(state.system_prompt.as_deref(), Some("You are helpful.")); assert_eq!(state.config.max_tokens, Some(1024)); assert_eq!(state.history.len(), 2); @@ -53,13 +70,15 @@ fn round_trip_write_and_read() { } #[test] -fn create_session_writes_all_entries() { +fn create_segment_writes_all_entries() { let dir = tempfile::tempdir().unwrap(); let store = FsStore::new(dir.path()).unwrap(); - let id = new_segment_id(); + let sid = new_session_id(); + let segid = new_segment_id(); let entries = [LogEntry::SegmentStart { ts: 1000, + session_id: sid, system_prompt: None, config: RequestConfig::default(), history: vec![ @@ -70,74 +89,65 @@ fn create_session_writes_all_entries() { compacted_from: None, }]; - store.create_segment(id, &entries).unwrap(); - let read_back = store.read_all(id).unwrap(); + store.create_segment(sid, segid, &entries).unwrap(); + let read_back = store.read_all(sid, segid).unwrap(); assert_eq!(read_back.len(), 1); let state = collect_state(&read_back); assert_eq!(state.history.len(), 2); + assert_eq!(state.session_id, Some(sid)); } #[test] -fn list_sessions_returns_newest_first() { +fn list_sessions_and_segments() { let dir = tempfile::tempdir().unwrap(); let store = FsStore::new(dir.path()).unwrap(); - let id1 = new_segment_id(); - // Small delay to ensure different UUID v7 timestamps + let sid_a = new_session_id(); std::thread::sleep(std::time::Duration::from_millis(2)); - let id2 = new_segment_id(); + let sid_b = new_session_id(); - let entry = LogEntry::SegmentStart { - ts: 1000, - system_prompt: None, - config: RequestConfig::default(), - history: vec![], - forked_from: None, - compacted_from: None, - }; + let seg_a1 = new_segment_id(); + let seg_a2 = new_segment_id(); + let seg_b1 = new_segment_id(); - store.append(id1, &entry).unwrap(); - store.append(id2, &entry).unwrap(); + store.append(sid_a, seg_a1, &nil_session_start(1, sid_a)).unwrap(); + store.append(sid_a, seg_a2, &nil_session_start(2, sid_a)).unwrap(); + store.append(sid_b, seg_b1, &nil_session_start(3, sid_b)).unwrap(); - let sessions = store.list_segments().unwrap(); - assert_eq!(sessions.len(), 2); - assert_eq!(sessions[0], id2); // newest first - assert_eq!(sessions[1], id1); + let sessions = store.list_sessions().unwrap(); + assert_eq!(sessions, vec![sid_b, sid_a]); // newest first + + let segs_a = store.list_segments(sid_a).unwrap(); + assert!(segs_a.contains(&seg_a1) && segs_a.contains(&seg_a2)); + assert_eq!(segs_a.len(), 2); + + let segs_b = store.list_segments(sid_b).unwrap(); + assert_eq!(segs_b, vec![seg_b1]); } #[test] fn exists_returns_correct_state() { let dir = tempfile::tempdir().unwrap(); let store = FsStore::new(dir.path()).unwrap(); - let id = new_segment_id(); + let sid = new_session_id(); + let segid = new_segment_id(); - assert!(!store.exists(id).unwrap()); + assert!(!store.exists(sid, segid).unwrap()); - store - .append( - id, - &LogEntry::SegmentStart { - ts: 1000, - system_prompt: None, - config: RequestConfig::default(), - history: vec![], - forked_from: None, - compacted_from: None, - }, - ) - .unwrap(); + store.append(sid, segid, &nil_session_start(1000, sid)).unwrap(); - assert!(store.exists(id).unwrap()); + assert!(store.exists(sid, segid).unwrap()); } #[test] -fn not_found_error_for_missing_session() { +fn not_found_error_for_missing_segment() { let dir = tempfile::tempdir().unwrap(); let store = FsStore::new(dir.path()).unwrap(); - let id = new_segment_id(); + let sid = new_session_id(); + let segid = new_segment_id(); - let result = store.read_all(id); + let result = store.read_all(sid, segid); assert!(result.is_err()); } @@ -145,21 +155,10 @@ fn not_found_error_for_missing_session() { fn trace_entries_in_separate_file() { let dir = tempfile::tempdir().unwrap(); let store = FsStore::new(dir.path()).unwrap(); - let id = new_segment_id(); + let sid = new_session_id(); + let segid = new_segment_id(); - store - .append( - id, - &LogEntry::SegmentStart { - ts: 1000, - system_prompt: None, - config: RequestConfig::default(), - history: vec![], - forked_from: None, - compacted_from: None, - }, - ) - .unwrap(); + store.append(sid, segid, &nil_session_start(1000, sid)).unwrap(); let trace = TraceEntry { ts: 1500, @@ -168,14 +167,17 @@ fn trace_entries_in_separate_file() { llm_worker::llm_client::event::PingEvent { timestamp: None }, ), }; - store.append_trace(id, &trace).unwrap(); + store.append_trace(sid, segid, &trace).unwrap(); // Log should have 1 entry, unaffected by trace - let log = store.read_all(id).unwrap(); + let log = store.read_all(sid, segid).unwrap(); assert_eq!(log.len(), 1); // Trace file should exist separately - let trace_path = dir.path().join(format!("{id}.trace.jsonl")); + let trace_path = dir + .path() + .join(sid.to_string()) + .join(format!("{segid}.trace.jsonl")); assert!(trace_path.exists()); } @@ -183,11 +185,13 @@ fn trace_entries_in_separate_file() { fn read_entry_count_matches_append_tally() { let dir = tempfile::tempdir().unwrap(); let store = FsStore::new(dir.path()).unwrap(); - let id = new_segment_id(); + let sid = new_session_id(); + let segid = new_segment_id(); let entries = [ LogEntry::SegmentStart { ts: 1000, + session_id: sid, system_prompt: None, config: RequestConfig::default(), history: vec![], @@ -201,8 +205,22 @@ fn read_entry_count_matches_append_tally() { ]; for entry in &entries { - store.append(id, entry).unwrap(); + store.append(sid, segid, entry).unwrap(); } - assert_eq!(store.read_entry_count(id).unwrap(), entries.len()); + assert_eq!(store.read_entry_count(sid, segid).unwrap(), entries.len()); +} + +#[test] +fn lookup_session_of_finds_owning_session() { + let dir = tempfile::tempdir().unwrap(); + let store = FsStore::new(dir.path()).unwrap(); + let sid = new_session_id(); + let segid = new_segment_id(); + + assert_eq!(store.lookup_session_of(segid).unwrap(), None); + + store.append(sid, segid, &nil_session_start(1, sid)).unwrap(); + + assert_eq!(store.lookup_session_of(segid).unwrap(), Some(sid)); } diff --git a/crates/session-store/tests/session_test.rs b/crates/session-store/tests/session_test.rs index b931a4c8..e05ab309 100644 --- a/crates/session-store/tests/session_test.rs +++ b/crates/session-store/tests/session_test.rs @@ -95,14 +95,20 @@ fn make_store() -> (tempfile::TempDir, FsStore) { async fn run_and_persist( worker: Worker, store: &FsStore, + session_id: session_store::SessionId, segment_id: session_store::SegmentId, input: &str, ) -> (Worker, llm_worker::WorkerResult) { // Mirror Pod's run-entry contract: log the user input as segments // before the worker pushes its flattened user_message; save_delta // skips the resulting user_message item to avoid double-write. - session_store::save_user_input(store, segment_id, vec![protocol::Segment::text(input)]) - .unwrap(); + session_store::save_user_input( + store, + session_id, + segment_id, + vec![protocol::Segment::text(input)], + ) + .unwrap(); let history_before = worker.history().len(); @@ -111,13 +117,14 @@ async fn run_and_persist( let worker = locked.unlock(); let new_items = &worker.history()[history_before..]; - session_store::save_delta(store, segment_id, new_items).unwrap(); - session_store::save_turn_end(store, segment_id, worker.turn_count()).unwrap(); + session_store::save_delta(store, session_id, segment_id, new_items).unwrap(); + session_store::save_turn_end(store, session_id, segment_id, worker.turn_count()).unwrap(); match &result { Ok(r) => { session_store::save_run_completed( store, + session_id, segment_id, r.clone(), worker.last_run_interrupted(), @@ -127,6 +134,7 @@ async fn run_and_persist( Err(e) => { session_store::save_run_errored( store, + session_id, segment_id, e.to_string(), worker.last_run_interrupted(), @@ -149,7 +157,7 @@ async fn session_run_logs_entries() { let client = MockLlmClient::new(simple_text_events()); let worker = Worker::new(client); - let sid = session_store::create_segment( + let (sid, segid) = session_store::create_segment( &store, SegmentStartState { system_prompt: worker.get_system_prompt(), @@ -159,10 +167,10 @@ async fn session_run_logs_entries() { ) .unwrap(); - let (worker, _) = run_and_persist(worker, &store, sid, "Hi").await; + let (worker, _) = run_and_persist(worker, &store, sid, segid, "Hi").await; let _ = &worker; - let entries = store.read_all(sid).unwrap(); + let entries = store.read_all(sid, segid).unwrap(); // SegmentStart, UserInput, AssistantItems, TurnEnd, RunCompleted (at minimum) assert!( @@ -194,7 +202,7 @@ async fn session_restore_round_trip() { let mut worker = Worker::new(client); worker.set_system_prompt("You are helpful."); - let sid = session_store::create_segment( + let (sid, segid) = session_store::create_segment( &store, SegmentStartState { system_prompt: worker.get_system_prompt(), @@ -204,18 +212,26 @@ async fn session_restore_round_trip() { ) .unwrap(); - let (worker, _) = run_and_persist(worker, &store, sid, "Hi").await; + let (worker, _) = run_and_persist(worker, &store, sid, segid, "Hi").await; let original_history_len = worker.history().len(); let original_turn_count = worker.turn_count(); // Restore - let state = session_store::restore(&store, sid).unwrap(); + let state = session_store::restore(&store, sid, segid).unwrap(); + assert_eq!(state.session_id, Some(sid)); assert_eq!(state.history.len(), original_history_len); assert_eq!(state.turn_count, original_turn_count); assert_eq!(state.system_prompt.as_deref(), Some("You are helpful.")); - assert_eq!(state.entries_count, store.read_entry_count(sid).unwrap()); + assert_eq!( + state.entries_count, + store.read_entry_count(sid, segid).unwrap() + ); + + // Shim by segment ID alone. + let by_segment = session_store::restore_by_segment(&store, segid).unwrap(); + assert_eq!(by_segment.session_id, Some(sid)); } #[tokio::test] @@ -225,7 +241,7 @@ async fn session_run_with_tool_call() { let mut worker = Worker::new(client); worker.register_tool(weather_tool_definition()); - let sid = session_store::create_segment( + let (sid, segid) = session_store::create_segment( &store, SegmentStartState { system_prompt: worker.get_system_prompt(), @@ -235,9 +251,9 @@ async fn session_run_with_tool_call() { ) .unwrap(); - let (_worker, _) = run_and_persist(worker, &store, sid, "What's the weather?").await; + let (_worker, _) = run_and_persist(worker, &store, sid, segid, "What's the weather?").await; - let entries = store.read_all(sid).unwrap(); + let entries = store.read_all(sid, segid).unwrap(); let has_tool_results = entries .iter() @@ -260,7 +276,7 @@ async fn session_resume_after_pause() { worker.register_tool(weather_tool_definition()); worker.set_interceptor(PausePolicy); - let sid = session_store::create_segment( + let (sid, segid) = session_store::create_segment( &store, SegmentStartState { system_prompt: worker.get_system_prompt(), @@ -270,11 +286,11 @@ async fn session_resume_after_pause() { ) .unwrap(); - let (_worker, result) = run_and_persist(worker, &store, sid, "Weather?").await; + let (_worker, result) = run_and_persist(worker, &store, sid, segid, "Weather?").await; assert!(matches!(result, llm_worker::WorkerResult::Paused)); // Check RunCompleted is Paused - let entries = store.read_all(sid).unwrap(); + let entries = store.read_all(sid, segid).unwrap(); let has_paused = entries.iter().any(|e| { matches!( e, @@ -287,18 +303,18 @@ async fn session_resume_after_pause() { assert!(has_paused, "should have Paused outcome"); // Restore state and verify - let state = session_store::restore(&store, sid).unwrap(); + let state = session_store::restore(&store, sid, segid).unwrap(); assert!(state.last_run_interrupted); } #[tokio::test] -async fn session_fork_preserves_state() { +async fn session_fork_creates_new_session() { let (_dir, store) = make_store(); let client = MockLlmClient::new(simple_text_events()); let mut worker = Worker::new(client); worker.set_system_prompt("System prompt"); - let sid = session_store::create_segment( + let (sid, segid) = session_store::create_segment( &store, SegmentStartState { system_prompt: worker.get_system_prompt(), @@ -308,10 +324,10 @@ async fn session_fork_preserves_state() { ) .unwrap(); - let (worker, _) = run_and_persist(worker, &store, sid, "Hello").await; + let (worker, _) = run_and_persist(worker, &store, sid, segid, "Hello").await; let original_history_len = worker.history().len(); - let fork_id = session_store::fork( + let (fork_sid, fork_segid) = session_store::fork( &store, SegmentStartState { system_prompt: worker.get_system_prompt(), @@ -320,24 +336,26 @@ async fn session_fork_preserves_state() { }, ) .unwrap(); + assert_ne!(fork_sid, sid, "`fork` mints a fresh Session"); // Fork should have a SegmentStart with the current history - let fork_entries = store.read_all(fork_id).unwrap(); + let fork_entries = store.read_all(fork_sid, fork_segid).unwrap(); assert_eq!(fork_entries.len(), 1); assert!(matches!(&fork_entries[0], LogEntry::SegmentStart { .. })); let fork_state = collect_state(&fork_entries); + assert_eq!(fork_state.session_id, Some(fork_sid)); assert_eq!(fork_state.history.len(), original_history_len); assert_eq!(fork_state.system_prompt.as_deref(), Some("System prompt")); } #[tokio::test] -async fn session_fork_at_truncates() { +async fn session_fork_at_truncates_within_session() { let (_dir, store) = make_store(); let client = MockLlmClient::new(simple_text_events()); let worker = Worker::new(client); - let sid = session_store::create_segment( + let (sid, segid) = session_store::create_segment( &store, SegmentStartState { system_prompt: worker.get_system_prompt(), @@ -347,26 +365,33 @@ async fn session_fork_at_truncates() { ) .unwrap(); - let (worker, _) = run_and_persist(worker, &store, sid, "Hello").await; + let (worker, _) = run_and_persist(worker, &store, sid, segid, "Hello").await; - let all_entries = store.read_all(sid).unwrap(); + let all_entries = store.read_all(sid, segid).unwrap(); assert!(all_entries.len() > 2); - // Fork at turn 1 (one completed turn). - let fork_id = session_store::fork_at(&store, sid, worker.turn_count()).unwrap(); + // Fork at turn 1 (one completed turn). Stays in same Session. + let fork_segid = session_store::fork_at(&store, sid, segid, worker.turn_count()).unwrap(); - let fork_entries = store.read_all(fork_id).unwrap(); + let fork_entries = store.read_all(sid, fork_segid).unwrap(); assert_eq!(fork_entries.len(), 1); // Just the new SegmentStart let fork_state = collect_state(&fork_entries); + assert_eq!(fork_state.session_id, Some(sid), "fork_at inherits Session"); + // History at fork point should match history right after the TurnEnd in - // the source session. + // the source segment. let turn_end_pos = all_entries .iter() .position(|e| matches!(e, LogEntry::TurnEnd { turn_count, .. } if *turn_count == worker.turn_count())) - .expect("source session has the matching TurnEnd"); + .expect("source segment has the matching TurnEnd"); let source_state_at_fork = collect_state(&all_entries[..=turn_end_pos]); assert_eq!(fork_state.history.len(), source_state_at_fork.history.len()); + + // list_segments should show both source and fork in the same Session. + let segs = store.list_segments(sid).unwrap(); + assert!(segs.contains(&segid)); + assert!(segs.contains(&fork_segid)); } #[tokio::test] @@ -375,7 +400,7 @@ async fn session_config_changed_logged() { let client = MockLlmClient::new(vec![]); let mut worker = Worker::new(client); - let sid = session_store::create_segment( + let (sid, segid) = session_store::create_segment( &store, SegmentStartState { system_prompt: worker.get_system_prompt(), @@ -388,9 +413,9 @@ async fn session_config_changed_logged() { // Modify config and log it let new_config = RequestConfig::default().with_temperature(0.7); worker.set_request_config(new_config.clone()); - session_store::save_config_changed(&store, sid, &new_config).unwrap(); + session_store::save_config_changed(&store, sid, segid, &new_config).unwrap(); - let entries = store.read_all(sid).unwrap(); + let entries = store.read_all(sid, segid).unwrap(); let has_config_changed = entries.iter().any(|e| { matches!( e, @@ -404,11 +429,11 @@ async fn session_config_changed_logged() { async fn session_auto_forks_on_conflict() { let (_dir, store) = make_store(); - // Create a session + // Create a segment let client_a = MockLlmClient::new(simple_text_events()); let worker_a = Worker::new(client_a); - let original_sid = session_store::create_segment( + let (sid, original_segid) = session_store::create_segment( &store, SegmentStartState { system_prompt: worker_a.get_system_prompt(), @@ -417,20 +442,21 @@ async fn session_auto_forks_on_conflict() { }, ) .unwrap(); - let mut segment_id = original_sid; + let mut segment_id = original_segid; // Writer tracked: just the SegmentStart we wrote. let mut entries_written: usize = 1; - // Simulate another Pod writing to the same session behind our back. + // Simulate another Pod writing to the same segment behind our back. let extra_entry = LogEntry::UserInput { ts: 9999, segments: vec![protocol::Segment::text("Interloper")], }; - store.append(original_sid, &extra_entry).unwrap(); + store.append(sid, original_segid, &extra_entry).unwrap(); // Now the on-disk count exceeds our tally — ensure_head_or_fork should auto-fork. session_store::ensure_head_or_fork( &store, + sid, &mut segment_id, &mut entries_written, SegmentStartState { @@ -441,15 +467,17 @@ async fn session_auto_forks_on_conflict() { ) .unwrap(); - // segment_id should now be different - assert_ne!(segment_id, original_sid); + // segment_id should now be different but live in the same Session. + assert_ne!(segment_id, original_segid); - // The fork session should exist and have entries - let fork_entries = store.read_all(segment_id).unwrap(); + // The fork segment should exist and have entries + let fork_entries = store.read_all(sid, segment_id).unwrap(); assert!(!fork_entries.is_empty()); + let fork_state = collect_state(&fork_entries); + assert_eq!(fork_state.session_id, Some(sid), "auto-fork inherits Session"); - // Original session should still have the interloper entry - let original_entries = store.read_all(original_sid).unwrap(); + // Original segment should still have the interloper entry + let original_entries = store.read_all(sid, original_segid).unwrap(); let has_interloper = original_entries .iter() .any(|e| matches!(e, LogEntry::UserInput { .. })); diff --git a/crates/tui/src/app.rs b/crates/tui/src/app.rs index d06561db..df6de499 100644 --- a/crates/tui/src/app.rs +++ b/crates/tui/src/app.rs @@ -1447,6 +1447,7 @@ mod completion_flow_tests { let mut app = App::new("test".into()); let session_start = session_store::LogEntry::SegmentStart { ts: 1, + session_id: uuid::Uuid::nil(), system_prompt: None, config: Default::default(), history: vec![session_store::LoggedItem::from( diff --git a/crates/tui/src/main.rs b/crates/tui/src/main.rs index 920c770c..4fed5d70 100644 --- a/crates/tui/src/main.rs +++ b/crates/tui/src/main.rs @@ -209,11 +209,11 @@ async fn run_resume() -> Result<(), Box> { // Phase 1: pick a session in its own inline viewport, dropping the // viewport before the name dialog opens so each phase gets fresh // vertical room. - let id = match picker::run().await? { - PickerOutcome::Picked(id) => id, + let leaf_segment_id = match picker::run().await? { + PickerOutcome::Picked { segment_id, .. } => segment_id, PickerOutcome::Cancelled => return Ok(()), }; - run_spawn(Some(id)).await + run_spawn(Some(leaf_segment_id)).await } async fn run_spawn(resume_from: Option) -> Result<(), Box> { diff --git a/crates/tui/src/picker.rs b/crates/tui/src/picker.rs index fe46e8b9..51054192 100644 --- a/crates/tui/src/picker.rs +++ b/crates/tui/src/picker.rs @@ -20,7 +20,9 @@ use ratatui::style::{Color, Modifier, Style}; use ratatui::text::{Line, Span}; use ratatui::widgets::Paragraph; use ratatui::{Frame, TerminalOptions, Viewport}; -use session_store::{FsStore, LogEntry, LoggedContentPart, LoggedItem, SegmentId, Store}; +use session_store::{ + FsStore, LogEntry, LoggedContentPart, LoggedItem, SegmentId, SessionId, Store, +}; const MAX_ROWS: usize = 10; const VIEWPORT_LINES: u16 = MAX_ROWS as u16 + 4; @@ -60,38 +62,55 @@ impl From for PickerError { } pub enum PickerOutcome { - Picked(SegmentId), + /// User picked a session; resume at its leaf segment. + Picked { + session_id: SessionId, + segment_id: SegmentId, + }, Cancelled, } -/// One row in the picker view. Rendered from the session log so the -/// user can recognise their session at a glance without parsing UUIDs. +/// One row in the picker view. Rendered from the leaf segment of a +/// Session so the user can recognise their conversation at a glance +/// without parsing UUIDs. struct Row { - id: SegmentId, + session_id: SessionId, + leaf_segment_id: SegmentId, /// Last user / assistant snippet, or a `[corrupt]` placeholder. preview: String, /// `Some(pod_name)` when a live Pod currently holds an allocation - /// for this session in `pods.json`. Picking such a row launches - /// `pod --session ` which will fail with `SegmentConflict` — - /// the badge warns the user up-front. + /// for this session's leaf segment in `pods.json`. Picking such a + /// row launches `pod --session ` which will fail with + /// `SegmentConflict` — the badge warns the user up-front. live_pod: Option, } pub async fn run() -> Result { let store = open_default_store()?; - let ids = store.list_segments()?; - if ids.is_empty() { + let sessions = store.list_sessions()?; + if sessions.is_empty() { return Err(PickerError::NoSessions); } let mut rows: Vec = Vec::with_capacity(MAX_ROWS); - for id in ids.into_iter().take(MAX_ROWS) { - let preview = build_preview(&store, id); + for session_id in sessions.into_iter().take(MAX_ROWS) { + let Some(leaf_segment_id) = store + .list_segments(session_id)? + .into_iter() + .next() + else { + continue; + }; + let preview = build_preview(&store, session_id, leaf_segment_id); // Best-effort live check. A pods.json I/O hiccup downgrades // the row to "no badge" rather than killing the picker — the // user still gets to see the listing. - let live_pod = lookup_segment(id).ok().flatten().map(|info| info.pod_name); + let live_pod = lookup_segment(leaf_segment_id) + .ok() + .flatten() + .map(|info| info.pod_name); rows.push(Row { - id, + session_id, + leaf_segment_id, preview, live_pod, }); @@ -115,7 +134,11 @@ pub async fn run() -> Result { } Some(Action::Submit) => { close_viewport(&mut terminal)?; - return Ok(PickerOutcome::Picked(rows[selected].id)); + let row = &rows[selected]; + return Ok(PickerOutcome::Picked { + session_id: row.session_id, + segment_id: row.leaf_segment_id, + }); } Some(Action::Cancel) => { close_viewport(&mut terminal)?; @@ -158,8 +181,8 @@ fn open_default_store() -> Result { Ok(FsStore::new(&dir)?) } -fn build_preview(store: &FsStore, id: SegmentId) -> String { - match store.read_all(id) { +fn build_preview(store: &FsStore, session_id: SessionId, segment_id: SegmentId) -> String { + match store.read_all(session_id, segment_id) { Ok(entries) => last_message_preview(&entries).unwrap_or_else(|| "[empty]".to_string()), Err(_) => "[corrupt]".to_string(), } @@ -300,7 +323,7 @@ fn row_line(row: &Row, selected: bool) -> Line<'_> { }; let mut spans = vec![ Span::raw(marker), - Span::styled(short_segment(row.id), id_style), + Span::styled(short_segment(row.session_id), id_style), Span::raw(" "), ]; if let Some(ref pod_name) = row.live_pod { @@ -313,7 +336,7 @@ fn row_line(row: &Row, selected: bool) -> Line<'_> { Line::from(spans) } -fn short_segment(id: SegmentId) -> String { +fn short_segment(id: SessionId) -> String { let s = id.to_string(); s.chars().take(8).collect() } diff --git a/crates/tui/src/spawn.rs b/crates/tui/src/spawn.rs index 5cc66c7a..5a0ffdb8 100644 --- a/crates/tui/src/spawn.rs +++ b/crates/tui/src/spawn.rs @@ -329,7 +329,7 @@ async fn load_resume_scope(segment_id: SegmentId) -> Result