From d5fcbc21251126f11a079fb7336b932375323bd6 Mon Sep 17 00:00:00 2001 From: Hare Date: Wed, 20 May 2026 05:06:04 +0900 Subject: [PATCH] =?UTF-8?q?update:=20SessionId=20/=20SessionStart=20/=20Se?= =?UTF-8?q?ssionOrigin=20=E7=AD=89=E3=82=92=20Segment=20=E7=B3=BB=E5=90=8D?= =?UTF-8?q?=E7=A7=B0=E3=81=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Type/Function/Variantを Segment* 系へ統一 - SessionId/SessionStart/SessionOrigin/SessionStartState/SessionState/SessionLogSink/SessionLockInfo - new_session_id / session_id / create_session* / list_sessions / lookup_session / update_session / find_by_session - protocol Event::SessionRotated → SegmentRotated、CompactDone.new_session_id → new_segment_id - Module: session_log → segment_log / session → segment (file mv 含む) pod 側の session_log_sink → segment_log_sink も同様 - crate 名 (session-store)、CLI flag (--session)、ResumeWithSession (CLI tied) は据え置き - session-tests/session_metrics_test 等の Store impl も追従 --- .../scheme/openai_responses/request.rs | 2 +- crates/llm-worker/src/llm_client/types.rs | 2 +- crates/llm-worker/src/worker.rs | 2 +- crates/memory/src/consolidate/input.rs | 2 +- crates/memory/src/consolidate/lock.rs | 4 +- crates/memory/src/consolidate/staging.rs | 4 +- crates/memory/src/consolidate/tidy.rs | 2 +- crates/memory/src/extract/mod.rs | 2 +- crates/memory/src/extract/payload.rs | 2 +- crates/memory/src/extract/staging.rs | 6 +- crates/memory/src/schema/common.rs | 2 +- crates/memory/src/tool/read.rs | 8 +- crates/memory/src/usage.rs | 14 +- crates/pod-registry/src/error.rs | 6 +- crates/pod-registry/src/lib.rs | 4 +- crates/pod-registry/src/lifecycle.rs | 64 ++--- crates/pod-registry/src/mutate.rs | 26 +- crates/pod-registry/src/table.rs | 26 +- crates/pod-registry/src/test_util.rs | 6 +- crates/pod/examples/pod_cli.rs | 4 +- crates/pod/src/controller.rs | 12 +- crates/pod/src/ipc/server.rs | 6 +- crates/pod/src/lib.rs | 4 +- crates/pod/src/main.rs | 4 +- crates/pod/src/pod.rs | 248 +++++++++--------- crates/pod/src/runtime/dir.rs | 2 +- ...ession_log_sink.rs => segment_log_sink.rs} | 48 ++-- crates/pod/src/shared_state.rs | 14 +- crates/pod/src/spawn/comm_tools.rs | 4 +- crates/pod/tests/compact_events_test.rs | 24 +- crates/pod/tests/consolidation_test.rs | 2 +- crates/pod/tests/controller_test.rs | 2 +- crates/pod/tests/pod_comm_tools_test.rs | 2 +- crates/pod/tests/pod_events_test.rs | 2 +- crates/pod/tests/restore_test.rs | 16 +- crates/pod/tests/session_metrics_test.rs | 36 +-- crates/pod/tests/spawn_pod_test.rs | 2 +- .../pod/tests/system_prompt_template_test.rs | 6 +- crates/protocol/src/lib.rs | 34 +-- crates/session-metrics/src/lib.rs | 8 +- crates/session-store/src/event_trace.rs | 2 +- crates/session-store/src/fs_store.rs | 30 +-- crates/session-store/src/lib.rs | 30 +-- .../src/{session.rs => segment.rs} | 198 +++++++------- .../src/{session_log.rs => segment_log.rs} | 71 ++--- crates/session-store/src/store.rs | 40 +-- crates/session-store/src/system_item.rs | 2 +- crates/session-store/tests/fs_store_test.rs | 34 +-- crates/session-store/tests/session_test.rs | 74 +++--- crates/tui/src/app.rs | 20 +- crates/tui/src/block.rs | 2 +- crates/tui/src/main.rs | 10 +- crates/tui/src/picker.rs | 18 +- crates/tui/src/spawn.rs | 22 +- crates/tui/src/ui.rs | 4 +- 55 files changed, 611 insertions(+), 610 deletions(-) rename crates/pod/src/{session_log_sink.rs => segment_log_sink.rs} (89%) rename crates/session-store/src/{session.rs => segment.rs} (70%) rename crates/session-store/src/{session_log.rs => segment_log.rs} (93%) diff --git a/crates/llm-worker/src/llm_client/scheme/openai_responses/request.rs b/crates/llm-worker/src/llm_client/scheme/openai_responses/request.rs index 51b38ae1..c74a3d07 100644 --- a/crates/llm-worker/src/llm_client/scheme/openai_responses/request.rs +++ b/crates/llm-worker/src/llm_client/scheme/openai_responses/request.rs @@ -51,7 +51,7 @@ pub(crate) struct ResponsesRequest { #[serde(skip_serializing_if = "Option::is_none")] pub top_p: Option, /// 会話単位の安定キー。ChatGPT backend (codex-oauth) は明示キーが - /// 無いとプロンプトキャッシュがほぼ効かない。pod 側は `SessionId` + /// 無いとプロンプトキャッシュがほぼ効かない。pod 側は `SegmentId` /// を渡す。`Request::cache_key` が `None` のときはキー自体を送らない。 #[serde(skip_serializing_if = "Option::is_none")] pub prompt_cache_key: Option, diff --git a/crates/llm-worker/src/llm_client/types.rs b/crates/llm-worker/src/llm_client/types.rs index 26528078..f1350534 100644 --- a/crates/llm-worker/src/llm_client/types.rs +++ b/crates/llm-worker/src/llm_client/types.rs @@ -492,7 +492,7 @@ pub struct Request { /// 会話単位の安定キー。`prompt_cache_key` として送られる /// (OpenAI Responses)。ChatGPT backend (codex-oauth) は明示キーが /// 無いと org/project ハッシュ衝突でプロンプトキャッシュが - /// ほぼヒットしないため、pod 側で `SessionId` を渡す運用を想定。 + /// ほぼヒットしないため、pod 側で `SegmentId` を渡す運用を想定。 /// `cache_anchor` と違い名前空間キーであり、`prefix anchor` とは /// 別の概念。`cache_anchor` を読まない provider と同じく、 /// `prompt_cache_key` を持たない provider は無視する。 diff --git a/crates/llm-worker/src/worker.rs b/crates/llm-worker/src/worker.rs index 88bcd813..529f2108 100644 --- a/crates/llm-worker/src/worker.rs +++ b/crates/llm-worker/src/worker.rs @@ -213,7 +213,7 @@ pub struct Worker { cache_anchor: Option, /// Conversation-scoped cache key, set by higher layers. Plumbed into /// [`Request::cache_key`] at request build time. Pod 側では - /// `SessionId` を渡す。 + /// `SegmentId` を渡す。 cache_key: Option, /// State marker _state: PhantomData, diff --git a/crates/memory/src/consolidate/input.rs b/crates/memory/src/consolidate/input.rs index 5152e10c..3b3911a7 100644 --- a/crates/memory/src/consolidate/input.rs +++ b/crates/memory/src/consolidate/input.rs @@ -243,7 +243,7 @@ mod tests { let (_id, _) = write_staging( &layout, SourceRef { - session_id: "s".into(), + segment_id: "s".into(), range: [0, 1], }, ExtractedPayload::default(), diff --git a/crates/memory/src/consolidate/lock.rs b/crates/memory/src/consolidate/lock.rs index bf429bc1..7b726018 100644 --- a/crates/memory/src/consolidate/lock.rs +++ b/crates/memory/src/consolidate/lock.rs @@ -260,7 +260,7 @@ mod tests { let (id_a, _) = write_staging( &layout, SourceRef { - session_id: "s".into(), + segment_id: "s".into(), range: [0, 0], }, ExtractedPayload::default(), @@ -269,7 +269,7 @@ mod tests { let (id_b, _) = write_staging( &layout, SourceRef { - session_id: "s".into(), + segment_id: "s".into(), range: [1, 1], }, ExtractedPayload::default(), diff --git a/crates/memory/src/consolidate/staging.rs b/crates/memory/src/consolidate/staging.rs index bd8b8f6e..08d55403 100644 --- a/crates/memory/src/consolidate/staging.rs +++ b/crates/memory/src/consolidate/staging.rs @@ -94,9 +94,9 @@ mod tests { ExtractedPayload::default() } - fn source(session_id: &str, range: [u64; 2]) -> SourceRef { + fn source(segment_id: &str, range: [u64; 2]) -> SourceRef { SourceRef { - session_id: session_id.into(), + segment_id: segment_id.into(), range, } } diff --git a/crates/memory/src/consolidate/tidy.rs b/crates/memory/src/consolidate/tidy.rs index 2c613f78..09cb2bb5 100644 --- a/crates/memory/src/consolidate/tidy.rs +++ b/crates/memory/src/consolidate/tidy.rs @@ -305,7 +305,7 @@ mod tests { fn flags_sources_overflow() { let (dir, layout) = workspace(); let many_sources: String = (0..15) - .map(|i| format!(" - session_id: s{i}\n range: [{i}, {i}]\n")) + .map(|i| format!(" - segment_id: s{i}\n range: [{i}, {i}]\n")) .collect(); write( &dir.path().join(".insomnia/memory/decisions/big.md"), diff --git a/crates/memory/src/extract/mod.rs b/crates/memory/src/extract/mod.rs index ea7ebe91..574523f9 100644 --- a/crates/memory/src/extract/mod.rs +++ b/crates/memory/src/extract/mod.rs @@ -13,7 +13,7 @@ //! (session-store の `LogEntry::Extension`、domain `"memory.extract"`)は //! Pod 側が責務を持つ。 //! -//! 出力 JSON の wrap は [`write_staging`] が `source: { session_id, range }` +//! 出力 JSON の wrap は [`write_staging`] が `source: { segment_id, range }` //! を機械付与する形で担当し、LLM には source を推論させない。 mod input; diff --git a/crates/memory/src/extract/payload.rs b/crates/memory/src/extract/payload.rs index cd2db29b..14863dc1 100644 --- a/crates/memory/src/extract/payload.rs +++ b/crates/memory/src/extract/payload.rs @@ -78,7 +78,7 @@ pub struct RequestEntry { /// staging に書き出される 1 ファイル分のレコード。 /// -/// `source` は Pod 側ラッパーが session_id と log entry range を +/// `source` は Pod 側ラッパーが segment_id と log entry range を /// 機械付与する。LLM はこのフィールドを見ない / 推論しない。 #[derive(Debug, Clone, Serialize, Deserialize)] pub struct StagingRecord { diff --git a/crates/memory/src/extract/staging.rs b/crates/memory/src/extract/staging.rs index b14d9912..0af78a0f 100644 --- a/crates/memory/src/extract/staging.rs +++ b/crates/memory/src/extract/staging.rs @@ -71,7 +71,7 @@ mod tests { let layout = WorkspaceLayout::new(tmp.path().to_path_buf()); let source = SourceRef { - session_id: "sess-1".into(), + segment_id: "sess-1".into(), range: [3, 7], }; let payload = ExtractedPayload { @@ -93,7 +93,7 @@ mod tests { let written: StagingRecord = serde_json::from_str(&fs::read_to_string(&path).unwrap()).unwrap(); - assert_eq!(written.source.session_id, "sess-1"); + assert_eq!(written.source.segment_id, "sess-1"); assert_eq!(written.source.range, [3, 7]); assert_eq!(written.payload.decisions.len(), 1); } @@ -103,7 +103,7 @@ mod tests { let tmp = tempfile::TempDir::new().unwrap(); let layout = WorkspaceLayout::new(tmp.path().to_path_buf()); let source = SourceRef { - session_id: "sess".into(), + segment_id: "sess".into(), range: [0, 0], }; let (_, path) = write_staging(&layout, source, ExtractedPayload::default()).unwrap(); diff --git a/crates/memory/src/schema/common.rs b/crates/memory/src/schema/common.rs index 91099999..a0752827 100644 --- a/crates/memory/src/schema/common.rs +++ b/crates/memory/src/schema/common.rs @@ -10,7 +10,7 @@ pub use lint_common::Frontmatter; /// `last_sources` arrays for traceability back to raw session logs. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct SourceRef { - pub session_id: String, + pub segment_id: String, /// `[start_entry, end_entry]` inclusive range of session-store entry indices. pub range: [u64; 2], } diff --git a/crates/memory/src/tool/read.rs b/crates/memory/src/tool/read.rs index 865ed5ff..0ad8ba28 100644 --- a/crates/memory/src/tool/read.rs +++ b/crates/memory/src/tool/read.rs @@ -60,7 +60,7 @@ impl Tool for ReadTool { })?; let text = String::from_utf8_lossy(&bytes).into_owned(); - if let Some(session_id) = self.usage_session_id.as_deref() { + if let Some(segment_id) = self.usage_session_id.as_deref() { let usage_slug = params.slug.as_deref().unwrap_or("summary"); let snapshot = usage::snapshot_record_from_bytes( params.kind.record_kind(), @@ -69,7 +69,7 @@ impl Tool for ReadTool { ); if let Err(err) = usage::append_use_event( &self.layout, - session_id.to_string(), + segment_id.to_string(), UsageSource::MemoryRead, vec![snapshot], ) { @@ -140,9 +140,9 @@ pub fn read_tool(layout: WorkspaceLayout) -> ToolDefinition { pub fn read_tool_with_usage( layout: WorkspaceLayout, - session_id: impl Into, + segment_id: impl Into, ) -> ToolDefinition { - read_tool_inner(layout, Some(session_id.into())) + read_tool_inner(layout, Some(segment_id.into())) } fn read_tool_inner(layout: WorkspaceLayout, usage_session_id: Option) -> ToolDefinition { diff --git a/crates/memory/src/usage.rs b/crates/memory/src/usage.rs index f2d3e8ab..57bd7235 100644 --- a/crates/memory/src/usage.rs +++ b/crates/memory/src/usage.rs @@ -64,7 +64,7 @@ impl UsageRecordSnapshot { pub struct UsageEvent { pub id: Uuid, pub occurred_at: DateTime, - pub session_id: String, + pub segment_id: String, pub event: UsageEventKind, pub source: UsageSource, pub records: Vec, @@ -72,7 +72,7 @@ pub struct UsageEvent { impl UsageEvent { pub fn new( - session_id: impl Into, + segment_id: impl Into, event: UsageEventKind, source: UsageSource, records: Vec, @@ -80,7 +80,7 @@ impl UsageEvent { Self { id: Uuid::now_v7(), occurred_at: Utc::now(), - session_id: session_id.into(), + segment_id: segment_id.into(), event, source, records, @@ -144,7 +144,7 @@ pub fn append_usage_event(layout: &WorkspaceLayout, event: &UsageEvent) -> io::R /// Convenience for a successful explicit record read. pub fn append_use_event( layout: &WorkspaceLayout, - session_id: impl Into, + segment_id: impl Into, source: UsageSource, records: Vec, ) -> io::Result<()> { @@ -153,14 +153,14 @@ pub fn append_use_event( } append_usage_event( layout, - &UsageEvent::new(session_id, UsageEventKind::Use, source, records), + &UsageEvent::new(segment_id, UsageEventKind::Use, source, records), ) } /// Convenience for resident model-invocation exposure cost telemetry. pub fn append_resident_exposure_event( layout: &WorkspaceLayout, - session_id: impl Into, + segment_id: impl Into, records: Vec, ) -> io::Result<()> { if records.is_empty() { @@ -169,7 +169,7 @@ pub fn append_resident_exposure_event( append_usage_event( layout, &UsageEvent::new( - session_id, + segment_id, UsageEventKind::ResidentExposure, UsageSource::ResidentInjection, records, diff --git a/crates/pod-registry/src/error.rs b/crates/pod-registry/src/error.rs index b685a82e..c27bb087 100644 --- a/crates/pod-registry/src/error.rs +++ b/crates/pod-registry/src/error.rs @@ -4,7 +4,7 @@ use std::io; use std::path::PathBuf; use manifest::ScopeRule; -use session_store::SessionId; +use session_store::SegmentId; /// Errors raised by the mutating pod-registry operations. #[derive(Debug, thiserror::Error)] @@ -27,11 +27,11 @@ pub enum ScopeLockError { #[error("pod `{0}` is not registered")] UnknownPod(String), #[error( - "session {session_id} is already held by pod `{pod_name}` at {}", + "session {segment_id} is already held by pod `{pod_name}` at {}", .socket.display() )] SessionConflict { - session_id: SessionId, + segment_id: SegmentId, pod_name: String, socket: PathBuf, }, diff --git a/crates/pod-registry/src/lib.rs b/crates/pod-registry/src/lib.rs index 6c92d099..45d67e57 100644 --- a/crates/pod-registry/src/lib.rs +++ b/crates/pod-registry/src/lib.rs @@ -27,8 +27,8 @@ pub use conflict::{ }; pub use error::ScopeLockError; pub use lifecycle::{ - ScopeAllocationGuard, SessionLockInfo, adopt_allocation, install_top_level, - install_top_level_with_deny, lookup_session, update_session, + ScopeAllocationGuard, SegmentLockInfo, adopt_allocation, install_top_level, + install_top_level_with_deny, lookup_segment, update_segment, }; pub use mutate::{ delegate_scope, reclaim_stale, reclaim_stale_with, register_pod, register_pod_with_deny, diff --git a/crates/pod-registry/src/lifecycle.rs b/crates/pod-registry/src/lifecycle.rs index 05572179..6e4bd584 100644 --- a/crates/pod-registry/src/lifecycle.rs +++ b/crates/pod-registry/src/lifecycle.rs @@ -5,7 +5,7 @@ use std::path::{Path, PathBuf}; use manifest::ScopeRule; -use session_store::SessionId; +use session_store::SegmentId; use crate::error::ScopeLockError; use crate::mutate::release_pod; @@ -45,9 +45,9 @@ pub fn install_top_level( pid: u32, socket: PathBuf, scope_allow: Vec, - session_id: SessionId, + segment_id: SegmentId, ) -> Result { - install_top_level_with_deny(pod_name, pid, socket, scope_allow, Vec::new(), session_id) + install_top_level_with_deny(pod_name, pid, socket, scope_allow, Vec::new(), segment_id) } /// Open the default lock file, register a top-level Pod with explicit @@ -59,7 +59,7 @@ pub fn install_top_level_with_deny( socket: PathBuf, scope_allow: Vec, scope_deny: Vec, - session_id: SessionId, + segment_id: SegmentId, ) -> Result { let lock_path = default_registry_path()?; let mut guard = LockFileGuard::open(&lock_path)?; @@ -70,7 +70,7 @@ pub fn install_top_level_with_deny( socket, scope_allow, scope_deny, - session_id, + segment_id, )?; Ok(ScopeAllocationGuard { pod_name, @@ -83,14 +83,14 @@ pub fn install_top_level_with_deny( /// /// The spawning flow is two-stage: the spawner calls /// [`crate::delegate_scope`] (with its own pid as a live placeholder, -/// `session_id = None`), then exec's the child; the child, once +/// `segment_id = None`), then exec's the child; the child, once /// running, calls this function to rewrite the allocation's pid + -/// session_id to its own and claim the [`ScopeAllocationGuard`] so +/// segment_id to its own and claim the [`ScopeAllocationGuard`] so /// the entry is released when the child exits. pub fn adopt_allocation( pod_name: String, new_pid: u32, - session_id: SessionId, + segment_id: SegmentId, ) -> Result { let lock_path = default_registry_path()?; let mut guard = LockFileGuard::open(&lock_path)?; @@ -99,7 +99,7 @@ pub fn adopt_allocation( .find_mut(&pod_name) .ok_or_else(|| ScopeLockError::UnknownPod(pod_name.clone()))?; alloc.pid = new_pid; - alloc.session_id = Some(session_id); + alloc.segment_id = Some(segment_id); guard.save()?; Ok(ScopeAllocationGuard { pod_name, @@ -107,32 +107,32 @@ pub fn adopt_allocation( }) } -/// Rewrite the `session_id` recorded for `pod_name` to -/// `new_session_id`. +/// Rewrite the `segment_id` recorded for `pod_name` to +/// `new_segment_id`. /// -/// The Pod's in-memory `session_id` can change underneath the +/// The Pod's in-memory `segment_id` can change underneath the /// allocation in two normal places: /// /// - `Pod::compact` mints a fresh session and swaps it in. /// - `session_store::ensure_head_or_fork` auto-forks when another /// writer has advanced the store head behind our back. /// -/// Both paths must call this so subsequent [`lookup_session`] queries +/// Both paths must call this so subsequent [`lookup_segment`] queries /// find the live session id, not the old one. Without this update a /// concurrent `restore_from_manifest(new_id)` would see "no live /// writer" and proceed to register a competing allocation on the /// session this Pod just moved into. /// /// The lock is opened once and the allocation is rewritten inside the -/// guard, so the session_id collision check is atomic with the +/// guard, so the segment_id collision check is atomic with the /// rewrite. -pub fn update_session(pod_name: &str, new_session_id: SessionId) -> Result<(), ScopeLockError> { +pub fn update_segment(pod_name: &str, new_segment_id: SegmentId) -> Result<(), ScopeLockError> { let lock_path = default_registry_path()?; let mut guard = LockFileGuard::open(&lock_path)?; - if let Some(other) = guard.data().find_by_session(new_session_id) { + if let Some(other) = guard.data().find_by_segment(new_segment_id) { if other.pod_name != pod_name { return Err(ScopeLockError::SessionConflict { - session_id: new_session_id, + segment_id: new_segment_id, pod_name: other.pod_name.clone(), socket: other.socket.clone(), }); @@ -142,7 +142,7 @@ pub fn update_session(pod_name: &str, new_session_id: SessionId) -> Result<(), S .data_mut() .find_mut(pod_name) .ok_or_else(|| ScopeLockError::UnknownPod(pod_name.into()))?; - alloc.session_id = Some(new_session_id); + alloc.segment_id = Some(new_segment_id); guard.save()?; Ok(()) } @@ -150,25 +150,25 @@ pub fn update_session(pod_name: &str, new_session_id: SessionId) -> Result<(), S /// Information about a Pod that currently holds an allocation for a /// given session. #[derive(Debug, Clone)] -pub struct SessionLockInfo { +pub struct SegmentLockInfo { pub pod_name: String, pub socket: PathBuf, pub pid: u32, } /// Open the default lock file, reclaim stale entries, and return the -/// allocation currently writing to `session_id`, if any. +/// allocation currently writing to `segment_id`, if any. /// /// Used by `Pod::restore_from_manifest` to refuse a resume that would /// race a live writer on the same source session. -pub fn lookup_session(session_id: SessionId) -> Result, ScopeLockError> { +pub fn lookup_segment(segment_id: SegmentId) -> Result, ScopeLockError> { let lock_path = default_registry_path()?; let mut guard = LockFileGuard::open(&lock_path)?; crate::mutate::reclaim_stale(&mut guard); Ok(guard .data() - .find_by_session(session_id) - .map(|a| SessionLockInfo { + .find_by_segment(segment_id) + .map(|a| SegmentLockInfo { pod_name: a.pod_name.clone(), socket: a.socket.clone(), pid: a.pid, @@ -193,7 +193,7 @@ mod tests { scope_allow: vec![write_rule("/tmp/child", true)], scope_deny: Vec::new(), delegated_from: None, - session_id: None, + segment_id: None, }); g.save().unwrap(); } @@ -267,12 +267,12 @@ mod tests { s, ) .unwrap(); - let info = lookup_session(s).unwrap().expect("expected live writer"); + let info = lookup_segment(s).unwrap().expect("expected live writer"); assert_eq!(info.pod_name, "live"); assert_eq!(info.socket, sock("live")); drop(guard); // After the guard's release, the lookup goes back to None. - assert!(lookup_session(s).unwrap().is_none()); + assert!(lookup_segment(s).unwrap().is_none()); } #[test] @@ -289,10 +289,10 @@ mod tests { original, ) .unwrap(); - update_session("p", updated).unwrap(); + update_segment("p", updated).unwrap(); // lookup against the original is now empty, the updated id wins. - assert!(lookup_session(original).unwrap().is_none()); - assert_eq!(lookup_session(updated).unwrap().unwrap().pod_name, "p"); + assert!(lookup_segment(original).unwrap().is_none()); + assert_eq!(lookup_segment(updated).unwrap().unwrap().pod_name, "p"); } #[test] @@ -318,15 +318,15 @@ mod tests { ) .unwrap(); // `a` cannot adopt b's live session id. - let err = update_session("a", s_b).unwrap_err(); + let err = update_segment("a", s_b).unwrap_err(); match err { ScopeLockError::SessionConflict { pod_name, - session_id, + segment_id, .. } => { assert_eq!(pod_name, "b"); - assert_eq!(session_id, s_b); + assert_eq!(segment_id, s_b); } other => panic!("expected SessionConflict, got {other:?}"), } diff --git a/crates/pod-registry/src/mutate.rs b/crates/pod-registry/src/mutate.rs index 1879a16a..cde9034b 100644 --- a/crates/pod-registry/src/mutate.rs +++ b/crates/pod-registry/src/mutate.rs @@ -5,7 +5,7 @@ use std::io; use std::path::PathBuf; use manifest::{Permission, ScopeRule}; -use session_store::SessionId; +use session_store::SegmentId; use crate::conflict::{find_conflict_owner, find_conflict_owners, is_within_effective_write}; use crate::error::ScopeLockError; @@ -16,7 +16,7 @@ use crate::table::{Allocation, LockFileGuard}; /// conflicts so a crashed Pod's allocation doesn't block the new one. /// /// Rejects when another live allocation is already writing to -/// `session_id`, so two `restore_from_manifest` calls under different +/// `segment_id`, so two `restore_from_manifest` calls under different /// `pod_name`s cannot both grab the same session log. pub fn register_pod( guard: &mut LockFileGuard, @@ -24,7 +24,7 @@ pub fn register_pod( pid: u32, socket: PathBuf, scope_allow: Vec, - session_id: SessionId, + segment_id: SegmentId, ) -> Result<(), ScopeLockError> { register_pod_with_deny( guard, @@ -33,7 +33,7 @@ pub fn register_pod( socket, scope_allow, Vec::new(), - session_id, + segment_id, ) } @@ -56,15 +56,15 @@ pub fn register_pod_with_deny( socket: PathBuf, scope_allow: Vec, scope_deny: Vec, - session_id: SessionId, + segment_id: SegmentId, ) -> Result<(), ScopeLockError> { reclaim_stale(guard); if guard.data().find(&pod_name).is_some() { return Err(ScopeLockError::DuplicatePodName(pod_name)); } - if let Some(existing) = guard.data().find_by_session(session_id) { + if let Some(existing) = guard.data().find_by_segment(segment_id) { return Err(ScopeLockError::SessionConflict { - session_id, + segment_id, pod_name: existing.pod_name.clone(), socket: existing.socket.clone(), }); @@ -99,7 +99,7 @@ pub fn register_pod_with_deny( scope_allow, scope_deny, delegated_from: None, - session_id: Some(session_id), + segment_id: Some(segment_id), }); guard.save()?; Ok(()) @@ -147,9 +147,9 @@ pub fn delegate_scope( scope_allow, scope_deny: Vec::new(), delegated_from: Some(spawner.into()), - // Pre-reservation. The child fills in its own session_id when + // Pre-reservation. The child fills in its own segment_id when // it calls `adopt_allocation` after the worker is built. - session_id: None, + segment_id: None, }); guard.save()?; Ok(()) @@ -587,7 +587,7 @@ mod tests { shared_session, ) .unwrap(); - // Second registration tries to grab the same session_id under + // Second registration tries to grab the same segment_id under // a different pod_name. Without the SessionConflict check both // would succeed and race on the same jsonl. let err = register_pod( @@ -601,11 +601,11 @@ mod tests { .unwrap_err(); match err { ScopeLockError::SessionConflict { - session_id, + segment_id, pod_name, .. } => { - assert_eq!(session_id, shared_session); + assert_eq!(segment_id, shared_session); assert_eq!(pod_name, "first"); } other => panic!("expected SessionConflict, got {other:?}"), diff --git a/crates/pod-registry/src/table.rs b/crates/pod-registry/src/table.rs index 4c9090b4..480448a5 100644 --- a/crates/pod-registry/src/table.rs +++ b/crates/pod-registry/src/table.rs @@ -8,7 +8,7 @@ use std::path::{Path, PathBuf}; use fs4::fs_std::FileExt; use manifest::{ScopeRule, paths}; use serde::{Deserialize, Serialize}; -use session_store::SessionId; +use session_store::SegmentId; /// On-disk representation of the allocation table. #[derive(Debug, Clone, Default, Serialize, Deserialize)] @@ -43,12 +43,12 @@ pub struct Allocation { /// Name of the Pod that delegated scope to this one, or `None` for /// a top-level Pod started directly by a human. pub delegated_from: Option, - /// Session ID this Pod is currently writing to. `None` means this + /// Segment ID this Pod is currently writing to. `None` means this /// is a pre-reservation made by a spawner via [`crate::delegate_scope`] /// before the child has come up; the child fills it in at /// [`crate::adopt_allocation`] time. #[serde(default)] - pub session_id: Option, + pub segment_id: Option, } impl LockFile { @@ -60,12 +60,12 @@ impl LockFile { self.allocations.iter_mut().find(|a| a.pod_name == pod_name) } - /// Find the allocation currently writing to `session_id`. Skips - /// pre-reservations whose `session_id` is still `None`. - pub fn find_by_session(&self, session_id: SessionId) -> Option<&Allocation> { + /// Find the allocation currently writing to `segment_id`. Skips + /// pre-reservations whose `segment_id` is still `None`. + pub fn find_by_segment(&self, segment_id: SegmentId) -> Option<&Allocation> { self.allocations .iter() - .find(|a| a.session_id == Some(session_id)) + .find(|a| a.segment_id == Some(segment_id)) } } @@ -225,8 +225,8 @@ mod tests { let dir = TempDir::new().unwrap(); let path = dir.path().join("pods.json"); let mut g = open_empty(&path); - // Pre-reservation: delegate_scope leaves session_id = None - // until adopt_allocation rewrites it. find_by_session must not + // Pre-reservation: delegate_scope leaves segment_id = None + // until adopt_allocation rewrites it. find_by_segment must not // match those placeholders, otherwise a freshly-spawning child // would shadow itself before it has even chosen a session. register_pod( @@ -249,13 +249,13 @@ mod tests { .unwrap(); let target_session = sid(); - // The placeholder allocation has session_id = None and must + // The placeholder allocation has segment_id = None and must // not be returned for any lookup. - assert!(g.data().find_by_session(target_session).is_none()); + assert!(g.data().find_by_segment(target_session).is_none()); // After adopt-style rewrite, the same allocation is now found. - g.data_mut().find_mut("child").unwrap().session_id = Some(target_session); - let found = g.data().find_by_session(target_session).unwrap(); + g.data_mut().find_mut("child").unwrap().segment_id = Some(target_session); + let found = g.data().find_by_segment(target_session).unwrap(); assert_eq!(found.pod_name, "child"); } } diff --git a/crates/pod-registry/src/test_util.rs b/crates/pod-registry/src/test_util.rs index 5d1609fd..894a5c54 100644 --- a/crates/pod-registry/src/test_util.rs +++ b/crates/pod-registry/src/test_util.rs @@ -6,12 +6,12 @@ use std::path::{Path, PathBuf}; use std::sync::{LazyLock, Mutex, MutexGuard}; use manifest::{Permission, ScopeRule}; -use session_store::SessionId; +use session_store::SegmentId; use crate::table::LockFileGuard; -pub(crate) fn sid() -> SessionId { - session_store::new_session_id() +pub(crate) fn sid() -> SegmentId { + session_store::new_segment_id() } /// Serialises tests that mutate runtime-dir env vars. The test diff --git a/crates/pod/examples/pod_cli.rs b/crates/pod/examples/pod_cli.rs index 414ed613..cc94e66f 100644 --- a/crates/pod/examples/pod_cli.rs +++ b/crates/pod/examples/pod_cli.rs @@ -54,7 +54,7 @@ async fn main() -> Result<(), Box> { let mut pod = Pod::from_manifest_toml(&toml, store).await?; let manifest: &PodManifest = pod.manifest(); println!("Pod: {}", manifest.pod.name); - println!("Session: {}", pod.session_id()); + println!("Session: {}", pod.segment_id()); // 4. Run a prompt let result = pod.run_text("What is the capital of France?").await?; @@ -76,7 +76,7 @@ async fn main() -> Result<(), Box> { } // 6. Session ID for potential restore - println!("\nSession ID: {}", pod.session_id()); + println!("\nSession ID: {}", pod.segment_id()); Ok(()) } diff --git a/crates/pod/src/controller.rs b/crates/pod/src/controller.rs index f0ef4061..6841f133 100644 --- a/crates/pod/src/controller.rs +++ b/crates/pod/src/controller.rs @@ -11,7 +11,7 @@ use crate::ipc::notify_buffer::NotifyBuffer; use crate::ipc::server::SocketServer; use crate::pod::{Pod, PodError, PodRunResult, SystemItemCommitter}; use crate::runtime::dir::RuntimeDir; -use crate::session_log_sink::SessionLogSink; +use crate::segment_log_sink::SegmentLogSink; use crate::shared_state::PodSharedState; use crate::spawn::comm_tools::{ list_pods_tool, read_pod_output_tool, send_to_pod_tool, stop_pod_tool, @@ -33,10 +33,10 @@ pub struct PodHandle { pub shared_state: Arc, pub runtime_dir: Arc, pub alerter: Alerter, - /// Session-log mirror + broadcast handle. The IPC server snapshots + /// Segment-log mirror + broadcast handle. The IPC server snapshots /// it on every new connection (Event::Snapshot) and forwards /// subsequent commits (Event::Entry) on the receiver. - pub sink: SessionLogSink, + pub sink: SegmentLogSink, } impl PodHandle { @@ -214,7 +214,7 @@ impl PodController { let greeting = build_greeting(&pod); let shared_state = Arc::new(PodSharedState::new( pod.manifest().pod.name.clone(), - pod.session_id(), + pod.segment_id(), manifest_toml.clone(), greeting, )); @@ -430,7 +430,7 @@ where let scope_handle = pod.scope().clone(); let pwd = pod.pwd().to_path_buf(); let task_store = pod.task_store(); - let session_id_for_usage = pod.session_id().to_string(); + let session_id_for_usage = pod.segment_id().to_string(); let scope_change_sink = pod.scope_change_sink(); let memory_config = pod.manifest().memory.clone(); let spawner_name = pod.manifest().pod.name.clone(); @@ -992,7 +992,7 @@ mod tests { let (cancel_tx, cancel_rx) = mpsc::channel::<()>(1); let shared_state = Arc::new(PodSharedState::new( "child-pod".to_string(), - session_store::new_session_id(), + session_store::new_segment_id(), String::new(), protocol::Greeting { pod_name: "child-pod".to_string(), diff --git a/crates/pod/src/ipc/server.rs b/crates/pod/src/ipc/server.rs index 77c9e566..930b6270 100644 --- a/crates/pod/src/ipc/server.rs +++ b/crates/pod/src/ipc/server.rs @@ -105,10 +105,10 @@ async fn handle_connection(stream: tokio::net::UnixStream, handle: PodHandle) { match entry { Ok(entry) => { let outbound = match entry { - session_store::LogEntry::SessionStart { .. } => { + session_store::LogEntry::SegmentStart { .. } => { let value = serde_json::to_value(&entry) .expect("LogEntry is Serialize"); - Some(Event::SessionRotated { entry: value }) + Some(Event::SegmentRotated { entry: value }) } session_store::LogEntry::SystemItem { item, .. } => { let value = serde_json::to_value(&item) @@ -119,7 +119,7 @@ async fn handle_connection(stream: tokio::net::UnixStream, handle: PodHandle) { Some(Event::InvokeStart { kind: trigger }) } other => { - // `SessionLogSink::is_live_relevant` keeps + // `SegmentLogSink::is_live_relevant` keeps // non-live-relevant variants off the // broadcast lane; reaching here means the // two are out of sync and we silently diff --git a/crates/pod/src/lib.rs b/crates/pod/src/lib.rs index b929796c..5f6f3df6 100644 --- a/crates/pod/src/lib.rs +++ b/crates/pod/src/lib.rs @@ -5,7 +5,7 @@ pub mod hook; pub mod ipc; pub mod prompt; pub mod runtime; -pub mod session_log_sink; +pub mod segment_log_sink; pub mod shared_state; pub mod spawn; pub mod workflow; @@ -31,5 +31,5 @@ pub use prompt::system::{SystemPromptContext, SystemPromptError, SystemPromptTem pub use protocol::{ErrorCode, Event, Method, PodStatus, TurnResult}; pub use provider::{ProviderError, build_client}; pub use runtime::dir::RuntimeDir; -pub use session_log_sink::SessionLogSink; +pub use segment_log_sink::SegmentLogSink; pub use shared_state::PodSharedState; diff --git a/crates/pod/src/main.rs b/crates/pod/src/main.rs index bd98894a..7c65337f 100644 --- a/crates/pod/src/main.rs +++ b/crates/pod/src/main.rs @@ -5,7 +5,7 @@ use std::process::ExitCode; use clap::Parser; use manifest::{PodManifest, paths}; use pod::{Pod, PodController, PodFactory, PromptLoader}; -use session_store::{FsStore, SessionId}; +use session_store::{FsStore, SegmentId}; const USER_MANIFEST_ENV: &str = "INSOMNIA_USER_MANIFEST"; @@ -53,7 +53,7 @@ struct Cli { /// Mutually exclusive with `--adopt` (spawned children always start /// fresh). #[arg(long, value_name = "UUID", conflicts_with = "adopt")] - session: Option, + session: Option, } fn resolve_manifest(cli: &Cli) -> Result<(PodManifest, PromptLoader), String> { diff --git a/crates/pod/src/pod.rs b/crates/pod/src/pod.rs index de9322fa..aa63f3f2 100644 --- a/crates/pod/src/pod.rs +++ b/crates/pod/src/pod.rs @@ -9,11 +9,11 @@ use llm_worker::llm_client::client::LlmClient; use llm_worker::state::Mutable; use llm_worker::{ToolOutputLimits, UsageRecord, Worker, WorkerError, WorkerResult}; use session_store::{ - LogEntry, PodScopeSnapshot, SessionId, Store, StoreError, SystemItem, session_log, to_logged, + LogEntry, PodScopeSnapshot, SegmentId, Store, StoreError, SystemItem, segment_log, to_logged, }; use tracing::{info, warn}; -use crate::session_log_sink::SessionLogSink; +use crate::segment_log_sink::SegmentLogSink; use manifest::{ Permission, PodManifest, PodManifestConfig, ResolveError, Scope, ScopeConfig, ScopeError, @@ -44,33 +44,33 @@ use tokio::task::JoinHandle; /// Lock-free shared session pointer. /// -/// Holds the current `(session_id, entries_written)` pair so that the +/// Holds the current `(segment_id, entries_written)` pair so that the /// Pod and every `LogWriterHandle` clone see a consistent view through -/// `Arc`-shared lock-free reads. `session_id` is wrapped in `ArcSwap` +/// `Arc`-shared lock-free reads. `segment_id` is wrapped in `ArcSwap` /// so fork (a rare, run-start-only event) can atomically swap it /// without taking a mutex on the append hot path. `entries_written` is /// an `AtomicUsize` bumped on every successful append; the writer's /// tally is compared against the store's on-disk count to detect /// concurrent writers in `ensure_session_head`. -pub struct SessionState { - session_id: ArcSwap, +pub struct SegmentState { + segment_id: ArcSwap, entries_written: AtomicUsize, } -impl SessionState { - pub fn new(session_id: SessionId, entries_written: usize) -> Arc { +impl SegmentState { + pub fn new(segment_id: SegmentId, entries_written: usize) -> Arc { Arc::new(Self { - session_id: ArcSwap::from_pointee(session_id), + segment_id: ArcSwap::from_pointee(segment_id), entries_written: AtomicUsize::new(entries_written), }) } - pub fn session_id(&self) -> SessionId { - **self.session_id.load() + pub fn segment_id(&self) -> SegmentId { + **self.segment_id.load() } - pub fn set_session_id(&self, id: SessionId) { - self.session_id.store(Arc::new(id)); + pub fn set_session_id(&self, id: SegmentId) { + self.segment_id.store(Arc::new(id)); } pub fn entries_written(&self) -> usize { @@ -94,8 +94,8 @@ impl SessionState { #[derive(Clone)] pub struct LogWriterHandle { pub store: St, - pub state: Arc, - pub sink: SessionLogSink, + pub state: Arc, + pub sink: SegmentLogSink, } impl LogWriterHandle @@ -107,8 +107,8 @@ where /// writes for `< PIPE_BUF` lines, so no user-space serialization is /// needed across appenders. pub fn append_entry(&self, entry: LogEntry) -> Result<(), StoreError> { - let session_id = self.state.session_id(); - self.store.append(session_id, &entry)?; + let segment_id = self.state.segment_id(); + self.store.append(segment_id, &entry)?; self.state.increment_entries(); self.sink.publish(entry); Ok(()) @@ -128,7 +128,7 @@ where { fn commit_system_item(&self, item: SystemItem) { let entry = LogEntry::SystemItem { - ts: session_log::now_millis(), + ts: segment_log::now_millis(), item, }; if let Err(err) = self.append_entry(entry) { @@ -162,9 +162,9 @@ pub struct Pod { worker: Option>, store: St, /// Shared session pointer. Source of truth for the Pod's current - /// `session_id` and append tally. `self.session_id()` is a thin - /// wrapper over `session_state.session_id()`. - session_state: Arc, + /// `segment_id` and append tally. `self.segment_id()` is a thin + /// wrapper over `session_state.segment_id()`. + session_state: Arc, /// Absolute working directory of the Pod. pwd: PathBuf, /// Shared, atomically-swappable view of the Pod's resolved scope. @@ -284,7 +284,7 @@ pub struct Pod { memory_task: Option>, /// Typed user submissions in submit order. K-th entry corresponds to /// the K-th `Item::user_message` in `worker.history()` (modulo seed - /// history loaded via `SessionStart.history`, whose original segments + /// history loaded via `SegmentStart.history`, whose original segments /// are not preserved). Populated from log on `restore_from_manifest`, /// appended after `save_user_input` on each `run`. Pre-`Event::Snapshot` /// this fed `PodSharedState.user_segments`; the new wire format @@ -295,7 +295,7 @@ pub struct Pod { /// every successful `session_store::append_entry` write so connected /// clients see a `(snapshot, live)` stream consistent with what's /// on disk. - sink: SessionLogSink, + sink: SegmentLogSink, /// `true` once `wire_history_persistence` has installed the /// `Worker::on_history_append` callback that commits each appended /// item as a singular `LogEntry::AssistantItem` / `ToolResult` @@ -369,7 +369,7 @@ impl Pod { // The memory-task clone never appends to the session log // (it only reads `worker.history()`), so a fresh sink is // fine — nothing observes its broadcast. - sink: SessionLogSink::new(), + sink: SegmentLogSink::new(), history_persistence_wired: false, log_writer: None, } @@ -422,7 +422,7 @@ impl Pod { ) { return; } - let entry = session_store::classify_history_item(item, session_log::now_millis()); + let entry = session_store::classify_history_item(item, segment_log::now_millis()); if let Err(err) = writer.append_entry(entry) { warn!(error = %err, "history append commit failed; dropping"); } @@ -469,16 +469,16 @@ impl Pod { pwd: PathBuf, scope: Scope, ) -> Result { - // Session creation is deferred to `ensure_session_head` at first + // Segment creation is deferred to `ensure_session_head` at first // run so a later-installed system-prompt template (see - // `set_system_prompt_template`) can be captured by `SessionStart`. - let session_id = session_store::new_session_id(); + // `set_system_prompt_template`) can be captured by `SegmentStart`. + let segment_id = session_store::new_segment_id(); let prompts = PromptCatalog::builtins_only()?; let mut pod = Self { manifest, worker: Some(worker), store, - session_state: SessionState::new(session_id, 0), + session_state: SegmentState::new(segment_id, 0), pwd, scope: SharedScope::new(scope), hook_builder: HookRegistryBuilder::new(), @@ -506,7 +506,7 @@ impl Pod { extract_pointer: Arc::new(Mutex::new(None)), memory_task: None, user_segments: Vec::new(), - sink: SessionLogSink::new(), + sink: SegmentLogSink::new(), history_persistence_wired: false, log_writer: None, }; @@ -544,8 +544,8 @@ impl Pod { /// The session ID used for persistence. Read lock-free from the /// shared session pointer so fork-time swaps are observed immediately. - pub fn session_id(&self) -> SessionId { - self.session_state.session_id() + pub fn segment_id(&self) -> SegmentId { + self.session_state.segment_id() } /// The Pod's manifest. @@ -616,7 +616,7 @@ impl Pod { }; let payload = serde_json::to_value(&snapshot).expect("PodScopeSnapshot is Serialize"); self.commit_entry(LogEntry::Extension { - ts: session_log::now_millis(), + ts: segment_log::now_millis(), domain: session_store::POD_SCOPE_EXTENSION_DOMAIN.into(), payload, }) @@ -627,8 +627,8 @@ impl Pod { /// concurrent appenders — the kernel orders `O_APPEND` writes for /// lines smaller than `PIPE_BUF`. pub(crate) fn commit_entry(&self, entry: LogEntry) -> Result<(), StoreError> { - let session_id = self.session_state.session_id(); - self.store.append(session_id, &entry)?; + let segment_id = self.session_state.segment_id(); + self.store.append(segment_id, &entry)?; self.session_state.increment_entries(); self.sink.publish(entry); Ok(()) @@ -637,7 +637,7 @@ impl Pod { /// Cloneable sink handle. Exposed to the controller so the IPC /// layer can `subscribe_with_snapshot` and stream entries to /// clients without consulting any other state. - pub fn sink(&self) -> SessionLogSink { + pub fn sink(&self) -> SegmentLogSink { self.sink.clone() } @@ -661,7 +661,7 @@ impl Pod { if let Some(snapshot) = snapshot { let payload = serde_json::to_value(&snapshot).expect("PodScopeSnapshot is Serialize"); self.commit_entry(LogEntry::Extension { - ts: session_log::now_millis(), + ts: segment_log::now_millis(), domain: session_store::POD_SCOPE_EXTENSION_DOMAIN.into(), payload, })?; @@ -716,7 +716,7 @@ impl Pod { /// Snapshot of the typed user segments tracked alongside worker /// history. The K-th entry corresponds to the K-th `Item::user_message` /// derived from `LogEntry::UserInput` entries (post-compaction); seed - /// history loaded via `SessionStart.history` does not contribute, + /// history loaded via `SegmentStart.history` does not contribute, /// which is acceptable because the original segments are unrecoverable. pub fn user_segments(&self) -> &[Vec] { &self.user_segments @@ -829,7 +829,7 @@ impl Pod { fn try_record_metric(&mut self, metric: &session_metrics::Metric) { let payload = serde_json::to_value(metric).expect("Metric is Serialize"); let entry = LogEntry::Extension { - ts: session_log::now_millis(), + ts: segment_log::now_millis(), domain: session_metrics::DOMAIN.into(), payload, }; @@ -1188,7 +1188,7 @@ impl Pod { // IDLE → active marker. Commits first so the next UserInput entry // is contained inside this Invoke range. See `tickets/invoke-turn-llmcall-semantics.md`. self.commit_entry(LogEntry::Invoke { - ts: session_log::now_millis(), + ts: segment_log::now_millis(), trigger: protocol::InvokeKind::UserSend, })?; @@ -1196,7 +1196,7 @@ impl Pod { // pushes its flattened copy into history. save_delta deliberately // skips the resulting `is_user_message()` item to avoid double-write. self.commit_entry(LogEntry::UserInput { - ts: session_log::now_millis(), + ts: segment_log::now_millis(), segments: input.clone(), }) ?; @@ -1376,7 +1376,7 @@ impl Pod { return; }; if let Err(err) = - memory::append_use_event(layout, self.session_id().to_string(), source, records) + memory::append_use_event(layout, self.segment_id().to_string(), source, records) { warn!(error = %err, "failed to append memory usage event"); } @@ -1387,7 +1387,7 @@ impl Pod { return; }; if let Err(err) = - memory::append_resident_exposure_event(layout, self.session_id().to_string(), records) + memory::append_resident_exposure_event(layout, self.segment_id().to_string(), records) { warn!(error = %err, "failed to append resident exposure event"); } @@ -1578,7 +1578,7 @@ impl Pod { // drain. The trailing SystemItem entries (drained by the // PodInterceptor) carry the actual payload. self.commit_entry(LogEntry::Invoke { - ts: session_log::now_millis(), + ts: segment_log::now_millis(), trigger: kind, })?; @@ -1612,17 +1612,17 @@ impl Pod { /// /// On the first call for a Pod built via `from_manifest`, the session /// has not been written to the store yet — this is when we append the - /// initial `SessionStart` entry, carrying the system prompt that + /// initial `SegmentStart` entry, carrying the system prompt that /// `ensure_system_prompt_materialized` has just rendered. Subsequent /// calls fall through to entry-count comparison, which auto-forks /// when another writer has appended behind our back. fn ensure_session_head(&mut self) -> Result<(), PodError> { let w = self.worker.as_ref().unwrap(); - let prev_session_id = self.session_state.session_id(); + let prev_session_id = self.session_state.segment_id(); let entries_written = self.session_state.entries_written(); if entries_written == 0 { - let initial = LogEntry::SessionStart { - ts: session_log::now_millis(), + let initial = LogEntry::SegmentStart { + ts: segment_log::now_millis(), system_prompt: w.get_system_prompt().map(String::from), config: w.request_config().clone(), history: to_logged(w.history()), @@ -1642,11 +1642,11 @@ impl Pod { return Ok(()); } // Fork: mint a fresh session and switch to it. The new - // SessionStart entry replaces the mirror and is broadcast + // SegmentStart entry replaces the mirror and is broadcast // through the sink so existing subscribers reset their view. - let fork_id = session_store::new_session_id(); - let entry = LogEntry::SessionStart { - ts: session_log::now_millis(), + let fork_id = session_store::new_segment_id(); + let entry = LogEntry::SegmentStart { + ts: segment_log::now_millis(), system_prompt: w.get_system_prompt().map(String::from), config: w.request_config().clone(), history: to_logged(w.history()), @@ -1654,13 +1654,13 @@ impl Pod { compacted_from: None, }; self.store - .create_session(fork_id, &[entry.clone()]) + .create_segment(fork_id, &[entry.clone()]) .map_err(PodError::from)?; self.session_state.set_session_id(fork_id); self.session_state.set_entries_written(1); self.sink.reset_with_initial(entry); if self.scope_allocation.is_some() { - pod_registry::update_session(&self.manifest.pod.name, fork_id)?; + pod_registry::update_segment(&self.manifest.pod.name, fork_id)?; } Ok(()) } @@ -1717,12 +1717,12 @@ impl Pod { self.send_event(Event::CompactStart); match self.compact(retained).await { - Ok(new_session_id) => { + Ok(new_segment_id) => { info!( - new_session_id = %new_session_id, + new_segment_id = %new_segment_id, "Compaction succeeded, resuming execution" ); - self.send_event(Event::CompactDone { new_session_id }); + self.send_event(Event::CompactDone { new_segment_id }); if let Some(ref state) = self.compact_state { state.record_compact_success(); } @@ -1767,12 +1767,12 @@ impl Pod { let retained = state.retained_tokens(); self.send_event(Event::CompactStart); match self.compact(retained).await { - Ok(new_session_id) => { + Ok(new_segment_id) => { info!( - new_session_id = %new_session_id, + new_segment_id = %new_segment_id, "Proactive pre-run compaction succeeded" ); - self.send_event(Event::CompactDone { new_session_id }); + self.send_event(Event::CompactDone { new_segment_id }); state.record_compact_success(); } Err(e) => { @@ -1814,7 +1814,7 @@ impl Pod { .iter() .cloned() .collect(); - let ts = session_log::now_millis(); + let ts = segment_log::now_millis(); for item in &new_items { if item.is_user_message() { continue; @@ -1837,7 +1837,7 @@ impl Pod { let turn_count = self.worker.as_ref().unwrap().turn_count(); self.commit_entry(LogEntry::TurnEnd { - ts: session_log::now_millis(), + ts: segment_log::now_millis(), turn_count, }) ?; @@ -1874,7 +1874,7 @@ impl Pod { correlation_id, } = recorded; self.commit_entry(LogEntry::LlmUsage { - ts: session_log::now_millis(), + ts: segment_log::now_millis(), history_len: record.history_len, input_total_tokens: record.input_total_tokens, cache_read_tokens: record.cache_read_tokens, @@ -1900,7 +1900,7 @@ impl Pod { match result { Ok(r) => { self.commit_entry(LogEntry::RunCompleted { - ts: session_log::now_millis(), + ts: segment_log::now_millis(), interrupted, result: r.clone(), }) @@ -1908,7 +1908,7 @@ impl Pod { } Err(e) => { self.commit_entry(LogEntry::RunErrored { - ts: session_log::now_millis(), + ts: segment_log::now_millis(), interrupted, message: e.to_string(), }) @@ -1928,7 +1928,7 @@ impl Pod { /// - a clone of the main LlmClient via `clone_boxed()`. /// /// Returns the new session ID. - pub async fn compact(&mut self, retained_tokens: u64) -> Result { + pub async fn compact(&mut self, retained_tokens: u64) -> Result { use crate::compact::worker::{ CompactWorkerContext, CompactWorkerInterceptor, add_reference_tool, mark_read_required_tool, write_summary_tool, @@ -2001,7 +2001,7 @@ impl Pod { .compact_system() .map_err(PodError::PromptCatalog)?; let mut summary_worker = Worker::new(summary_client).system_prompt(summary_system_prompt); - summary_worker.set_cache_key(Some(self.session_id().to_string())); + summary_worker.set_cache_key(Some(self.segment_id().to_string())); // Occupancy-based input-token meter + interceptor. The tracker pairs // each pre-request history length with the following UsageEvent, then @@ -2140,41 +2140,41 @@ impl Pod { task_snapshot_text.clone(), )); - // Build the SessionStart entry for the new compacted session, + // Build the SegmentStart entry for the new compacted session, // then atomically rotate to it: create on disk, swap head, reset // the broadcast sink so existing subscribers see the new - // `SessionStart { compacted_from }` and reset their view. - let new_session_id = session_store::new_session_id(); - let old_session_id = self.session_state.session_id(); + // `SegmentStart { compacted_from }` and reset their view. + let new_segment_id = session_store::new_segment_id(); + let old_session_id = self.session_state.segment_id(); let source_turn_count = self.worker.as_ref().unwrap().turn_count(); let w = self.worker.as_ref().unwrap(); - let entry = LogEntry::SessionStart { - ts: session_log::now_millis(), + let entry = LogEntry::SegmentStart { + ts: segment_log::now_millis(), system_prompt: w.get_system_prompt().map(String::from), config: w.request_config().clone(), history: to_logged(&new_history), forked_from: None, - compacted_from: Some(session_store::SessionOrigin { - session_id: old_session_id, + compacted_from: Some(session_store::SegmentOrigin { + segment_id: old_session_id, at_turn_index: source_turn_count, }), }; - self.store.create_session(new_session_id, &[entry.clone()])?; - self.session_state.set_session_id(new_session_id); + self.store.create_segment(new_segment_id, &[entry.clone()])?; + self.session_state.set_session_id(new_segment_id); self.session_state.set_entries_written(1); let session_start = entry; - // Broadcast the SessionStart through the sink. This atomically - // resets the mirror to `[SessionStart]` so any subscriber + // Broadcast the SegmentStart through the sink. This atomically + // resets the mirror to `[SegmentStart]` so any subscriber // querying after this point sees the post-compaction prefix. self.sink.reset_with_initial(session_start); - // Keep pods.json pointing at the live session_id. Without this - // a concurrent `restore_from_manifest(new_session_id)` would + // Keep pods.json pointing at the live segment_id. Without this + // a concurrent `restore_from_manifest(new_segment_id)` would // see no live writer and grab the session this Pod just moved // into, causing two writers to race on the same jsonl. Skipped // when no allocation is installed (e.g. compact under // `Pod::new` in tests). if self.scope_allocation.is_some() { - pod_registry::update_session(&self.manifest.pod.name, new_session_id)?; + pod_registry::update_segment(&self.manifest.pod.name, new_segment_id)?; } // Align user_segments with the post-compaction history. Items // before `retain_from` (now folded into the summary) lose their @@ -2188,8 +2188,8 @@ impl Pod { self.worker.as_mut().unwrap().set_history(new_history); // Compaction-introduced system messages are part of the new - // SessionStart's history (broadcast above) — clients derive - // their blocks from `SessionStart.history`. No per-item + // SegmentStart's history (broadcast above) — clients derive + // their blocks from `SegmentStart.history`. No per-item // broadcast is required. let _ = &compact_introduced_system_messages; let worker = self.worker.as_mut().unwrap(); @@ -2198,9 +2198,9 @@ impl Pod { // compact layout guarantees history[0] is the summary. worker.set_cache_anchor(Some(0)); // Re-key the OpenAI Responses prompt cache namespace to the new - // session_id so post-compact turns share a key with extract / + // segment_id so post-compact turns share a key with extract / // consolidate workers running in the same session. - worker.set_cache_key(Some(new_session_id.to_string())); + worker.set_cache_key(Some(new_segment_id.to_string())); self.usage_history .lock() .expect("usage_history poisoned") @@ -2219,7 +2219,7 @@ impl Pod { .lock() .expect("extract_pointer poisoned") = None; - Ok(new_session_id) + Ok(new_segment_id) } /// Build the LlmClient for the compactor Worker. @@ -2367,7 +2367,7 @@ impl Pod { // Read the session log to get the current entry count. This is // the boundary for the source.range end_entry. Called once per // extract, on a small local file. - let entries_now = self.store.read_all(self.session_id())?.len(); + let entries_now = self.store.read_all(self.segment_id())?.len(); if entries_now == 0 { return Ok(ExtractDecision::Skipped); } @@ -2399,7 +2399,7 @@ impl Pod { .memory_extract_system(memory_language) .map_err(PodError::PromptCatalog)?; let mut extract_worker = Worker::new(client).system_prompt(extract_system_prompt); - extract_worker.set_cache_key(Some(self.session_id().to_string())); + extract_worker.set_cache_key(Some(self.segment_id().to_string())); // Occupancy-based input-token meter + interceptor. The tracker pairs // each pre-request history length with the following UsageEvent, then @@ -2435,12 +2435,12 @@ impl Pod { extract::ExtractedPayload::default() }); - let source_session_id = self.session_state.session_id(); + let source_session_id = self.session_state.segment_id(); let staging_id = if payload.is_empty() { String::new() } else { let source = memory::schema::SourceRef { - session_id: source_session_id.to_string(), + segment_id: source_session_id.to_string(), range: [start_entry as u64, end_entry as u64], }; let (id, _) = extract::write_staging(&layout, source, payload) @@ -2456,7 +2456,7 @@ impl Pod { let payload_value = serde_json::to_value(&pointer_payload) .expect("ExtractPointerPayload is always JSON-serializable"); self.commit_entry(LogEntry::Extension { - ts: session_log::now_millis(), + ts: segment_log::now_millis(), domain: extract::EXTRACT_DOMAIN.into(), payload: payload_value, })?; @@ -2598,7 +2598,7 @@ impl Pod { } }; let mut worker = Worker::new(client).system_prompt(consolidation_system_prompt); - worker.set_cache_key(Some(self.session_id().to_string())); + worker.set_cache_key(Some(self.segment_id().to_string())); // Memory tools are self-contained — they bypass ScopedFs and write // directly under the workspace via WorkspaceLayout. Resident @@ -2610,7 +2610,7 @@ impl Pod { let query_cfg = memory::tool::QueryConfig::from(memory_cfg); worker.register_tool(memory::tool::read_tool_with_usage( layout.clone(), - self.session_id().to_string(), + self.segment_id().to_string(), )); worker.register_tool(memory::tool::write_tool(layout.clone())); worker.register_tool(memory::tool::edit_tool(layout.clone())); @@ -2735,12 +2735,12 @@ impl Pod, St> { let mut common = prepare_pod_common(&manifest, &loader, /* parse_template */ true)?; let skill_shadows = std::mem::take(&mut common.skill_shadows); - // Session creation is deferred to the first run (see - // `ensure_session_head`) so the SessionStart entry can capture + // Segment creation is deferred to the first run (see + // `ensure_session_head`) so the SegmentStart entry can capture // the rendered system prompt, not the raw template source. The - // session_id is allocated here so the pod-registry registration + // segment_id is allocated here so the pod-registry registration // can record it from the start. - let session_id = session_store::new_session_id(); + let segment_id = session_store::new_segment_id(); // Register this Pod in the machine-wide pod-registry // before building anything else, so a spawn that conflicts on @@ -2754,18 +2754,18 @@ impl Pod, St> { std::process::id(), socket_path, common.scope.allow_rules(), - session_id, + segment_id, )?; let mut worker = Worker::new(common.client); apply_worker_manifest(&mut worker, &manifest.worker); - worker.set_cache_key(Some(session_id.to_string())); + worker.set_cache_key(Some(segment_id.to_string())); let mut pod = Self { manifest, worker: Some(worker), store, - session_state: SessionState::new(session_id, 0), + session_state: SegmentState::new(segment_id, 0), pwd: common.pwd, scope: SharedScope::new(common.scope), hook_builder: HookRegistryBuilder::new(), @@ -2793,7 +2793,7 @@ impl Pod, St> { extract_pointer: Arc::new(Mutex::new(None)), memory_task: None, user_segments: Vec::new(), - sink: SessionLogSink::new(), + sink: SegmentLogSink::new(), history_persistence_wired: false, log_writer: None, }; @@ -2820,22 +2820,22 @@ impl Pod, St> { let mut common = prepare_pod_common(&manifest, &loader, /* parse_template */ true)?; let skill_shadows = std::mem::take(&mut common.skill_shadows); - let session_id = session_store::new_session_id(); + let segment_id = session_store::new_segment_id(); let scope_allocation = pod_registry::adopt_allocation( manifest.pod.name.clone(), std::process::id(), - session_id, + segment_id, )?; let mut worker = Worker::new(common.client); apply_worker_manifest(&mut worker, &manifest.worker); - worker.set_cache_key(Some(session_id.to_string())); + worker.set_cache_key(Some(segment_id.to_string())); let mut pod = Self { manifest, worker: Some(worker), store, - session_state: SessionState::new(session_id, 0), + session_state: SegmentState::new(segment_id, 0), pwd: common.pwd, scope: SharedScope::new(common.scope), hook_builder: HookRegistryBuilder::new(), @@ -2863,7 +2863,7 @@ impl Pod, St> { extract_pointer: Arc::new(Mutex::new(None)), memory_task: None, user_segments: Vec::new(), - sink: SessionLogSink::new(), + sink: SegmentLogSink::new(), history_persistence_wired: false, log_writer: None, }; @@ -2878,13 +2878,13 @@ impl Pod, St> { /// Resolves the manifest cascade exactly like [`Self::from_manifest`] /// (pwd / scope / pod-registry / client / prompt catalog), seeds a /// fresh Worker from the source session's `RestoredState`, and - /// reuses the same `session_id` so subsequent turns append to the + /// reuses the same `segment_id` so subsequent turns append to the /// source jsonl as a continuation of the same conversation. /// /// Concurrent writers are prevented by the pod-registry: - /// the registration carries `session_id`, and this constructor - /// refuses to start when `pod_registry::lookup_session` already finds - /// a live Pod writing to `session_id`. So there is no need to fork — + /// the registration carries `segment_id`, and this constructor + /// refuses to start when `pod_registry::lookup_segment` already finds + /// a live Pod writing to `segment_id`. So there is no need to fork — /// resume is "the same session, a different process owning it". /// /// `system_prompt` is replayed verbatim from the session log — @@ -2892,7 +2892,7 @@ impl Pod, St> { /// session keeps a stable cache prefix even when the manifest's /// instruction template would render differently today. pub async fn restore_from_manifest( - session_id: SessionId, + segment_id: SegmentId, manifest: PodManifest, store: St, loader: PromptLoader, @@ -2900,16 +2900,16 @@ impl Pod, St> { // Read raw entries once so we can both reconstruct state and // seed the broadcast sink's mirror with the same prefix that // sits on disk. - let raw_entries = store.read_all(session_id)?; + let raw_entries = store.read_all(segment_id)?; let state = session_store::collect_state(&raw_entries); if state.entries_count == 0 { - return Err(PodError::SessionEmpty { session_id }); + return Err(PodError::SessionEmpty { segment_id }); } let mirror_entries: Vec = raw_entries.clone(); let scope_snapshot = state .pod_scope .clone() - .ok_or(PodError::SessionScopeMissing { session_id })?; + .ok_or(PodError::SessionScopeMissing { segment_id })?; let mut common = prepare_pod_common_with_scope( &manifest, @@ -2923,7 +2923,7 @@ impl Pod, St> { let skill_shadows = std::mem::take(&mut common.skill_shadows); // Atomic: register_pod inside install_top_level rejects when - // another live allocation already holds `session_id`. Wrapping + // another live allocation already holds `segment_id`. Wrapping // the lookup + install inside a single `LockFileGuard` is what // makes "no two live Pods write to the same session log" // actually structural rather than a hopeful pre-check. @@ -2937,14 +2937,14 @@ impl Pod, St> { socket_path, common.scope.allow_rules(), common.scope.deny_rules(), - session_id, + segment_id, )?; // Build the worker and apply the manifest defaults first, then // overwrite the pieces the session log is authoritative for. let mut worker = Worker::new(common.client); apply_worker_manifest(&mut worker, &manifest.worker); - worker.set_cache_key(Some(session_id.to_string())); + worker.set_cache_key(Some(segment_id.to_string())); if let Some(ref prompt) = state.system_prompt { worker.set_system_prompt(prompt); } @@ -2974,7 +2974,7 @@ impl Pod, St> { manifest, worker: Some(worker), store, - session_state: SessionState::new(session_id, state.entries_count), + session_state: SegmentState::new(segment_id, state.entries_count), pwd: common.pwd, scope: SharedScope::new(common.scope), hook_builder: HookRegistryBuilder::new(), @@ -3007,7 +3007,7 @@ impl Pod, St> { // Seed the mirror with the entries we just replayed so a // late-attaching client sees the full prefix without an // extra round trip. - sink: SessionLogSink::with_initial(mirror_entries), + sink: SegmentLogSink::with_initial(mirror_entries), history_persistence_wired: false, log_writer: None, }; @@ -3234,13 +3234,13 @@ pub enum PodError { #[error("workflow invocation failed: {0}")] WorkflowResolve(#[from] WorkflowResolveError), - #[error("session {session_id} has no entries to restore")] - SessionEmpty { session_id: SessionId }, + #[error("session {segment_id} has no entries to restore")] + SessionEmpty { segment_id: SegmentId }, #[error( - "session {session_id} has no persisted scope snapshot; refusing resume without explicit scope" + "session {segment_id} has no persisted scope snapshot; refusing resume without explicit scope" )] - SessionScopeMissing { session_id: SessionId }, + SessionScopeMissing { segment_id: SegmentId }, } /// Bundle of resources that every high-level Pod constructor needs: diff --git a/crates/pod/src/runtime/dir.rs b/crates/pod/src/runtime/dir.rs index 07e59f7f..26e9ec51 100644 --- a/crates/pod/src/runtime/dir.rs +++ b/crates/pod/src/runtime/dir.rs @@ -131,7 +131,7 @@ mod tests { fn test_state() -> PodSharedState { PodSharedState::new( "test-pod".into(), - session_store::new_session_id(), + session_store::new_segment_id(), "[pod]\nname = \"test-pod\"".into(), protocol::Greeting { pod_name: "test-pod".into(), diff --git a/crates/pod/src/session_log_sink.rs b/crates/pod/src/segment_log_sink.rs similarity index 89% rename from crates/pod/src/session_log_sink.rs rename to crates/pod/src/segment_log_sink.rs index 48e583a3..d206a630 100644 --- a/crates/pod/src/session_log_sink.rs +++ b/crates/pod/src/segment_log_sink.rs @@ -10,11 +10,11 @@ //! Atomicity contract: //! //! 1. Pod writes the entry to disk via the `Store`. -//! 2. Pod calls [`SessionLogSink::publish`] which acquires the mirror +//! 2. Pod calls [`SegmentLogSink::publish`] which acquires the mirror //! mutex, pushes the entry, and fires `broadcast::send` — all under //! the same critical section. //! -//! [`SessionLogSink::subscribe_with_snapshot`] takes the same mutex, +//! [`SegmentLogSink::subscribe_with_snapshot`] takes the same mutex, //! so the `(snapshot, receiver)` pair returned to a connecting client //! splits the entry sequence cleanly: every entry shows up in exactly //! one of `snapshot` or on `receiver`. @@ -39,24 +39,24 @@ const BROADCAST_CAPACITY: usize = 256; /// for read-only `subscribe_with_snapshot` access and keeps one for /// its own write path. #[derive(Clone)] -pub struct SessionLogSink { +pub struct SegmentLogSink { inner: Arc, } struct SinkInner { /// Full session log mirror in commit order. Reset on session swap - /// (compaction / fork) via [`SessionLogSink::reset_with_initial`]. + /// (compaction / fork) via [`SegmentLogSink::reset_with_initial`]. mirror: StdMutex>, /// Broadcast channel for live entry updates. The same `Sender` /// survives session swaps so existing subscribers keep their /// receiver — they observe the swap as a freshly broadcast - /// `LogEntry::SessionStart` and reset their view accordingly. + /// `LogEntry::SegmentStart` and reset their view accordingly. broadcast_tx: broadcast::Sender, } -impl SessionLogSink { +impl SegmentLogSink { /// Create a fresh sink with an empty mirror. Used before any entry - /// has been written (deferred SessionStart) or as a placeholder in + /// has been written (deferred SegmentStart) or as a placeholder in /// tests. pub fn new() -> Self { let (broadcast_tx, _) = broadcast::channel(BROADCAST_CAPACITY); @@ -89,7 +89,7 @@ impl SessionLogSink { /// /// Live broadcast fires only for entries that the streaming-event /// lane does not cover: - /// - `LogEntry::SessionStart` → `Event::SessionRotated` on the wire. + /// - `LogEntry::SegmentStart` → `Event::SegmentRotated` on the wire. /// - `LogEntry::SystemItem` → `Event::SystemItem`. /// - `LogEntry::Invoke` → `Event::InvokeStart`. /// Everything else (AssistantItem, ToolResult, UserInput, TurnEnd, @@ -119,7 +119,7 @@ impl SessionLogSink { fn is_live_relevant(entry: &LogEntry) -> bool { matches!( entry, - LogEntry::SessionStart { .. } + LogEntry::SegmentStart { .. } | LogEntry::SystemItem { .. } | LogEntry::Invoke { .. } ) @@ -127,12 +127,12 @@ impl SessionLogSink { /// Atomically swap the mirror to `[initial]` and broadcast the new /// session-start entry. Used during compaction / fork: the new - /// `LogEntry::SessionStart` is the first entry of the replacement + /// `LogEntry::SegmentStart` is the first entry of the replacement /// session, and existing subscribers transition by replaying it /// like any other live entry. /// /// Existing snapshot prefixes seen by old subscribers stay valid - /// for the prior session; the new `SessionStart` on the broadcast + /// for the prior session; the new `SegmentStart` on the broadcast /// is the signal to reset their derived view. pub fn reset_with_initial(&self, initial: LogEntry) { let mut mirror = self @@ -188,7 +188,7 @@ impl SessionLogSink { } } -impl Default for SessionLogSink { +impl Default for SegmentLogSink { fn default() -> Self { Self::new() } @@ -198,10 +198,10 @@ impl Default for SessionLogSink { mod tests { use super::*; use llm_worker::llm_client::RequestConfig; - use session_store::session_log::now_millis; + use session_store::segment_log::now_millis; fn session_start() -> LogEntry { - LogEntry::SessionStart { + LogEntry::SegmentStart { ts: now_millis(), system_prompt: None, config: RequestConfig::default(), @@ -220,13 +220,13 @@ mod tests { #[test] fn publish_then_subscribe_returns_history_in_snapshot() { - let sink = SessionLogSink::new(); + let sink = SegmentLogSink::new(); sink.publish(session_start()); sink.publish(turn_end(1)); let (snapshot, mut rx) = sink.subscribe_with_snapshot(); assert_eq!(snapshot.len(), 2); - assert!(matches!(snapshot[0], LogEntry::SessionStart { .. })); + assert!(matches!(snapshot[0], LogEntry::SegmentStart { .. })); assert!(matches!( snapshot[1], LogEntry::TurnEnd { turn_count: 1, .. } @@ -246,7 +246,7 @@ mod tests { #[test] fn subscribe_then_publish_delivers_only_live_relevant_entries() { - let sink = SessionLogSink::new(); + let sink = SegmentLogSink::new(); sink.publish(session_start()); let (snapshot, mut rx) = sink.subscribe_with_snapshot(); @@ -270,7 +270,7 @@ mod tests { #[test] fn snapshot_and_live_never_overlap() { - let sink = SessionLogSink::new(); + let sink = SegmentLogSink::new(); sink.publish(session_start()); let (snapshot, mut rx) = sink.subscribe_with_snapshot(); sink.publish(notification_entry("post-snapshot")); @@ -285,7 +285,7 @@ mod tests { #[test] fn reset_with_initial_clears_and_broadcasts() { - let sink = SessionLogSink::new(); + let sink = SegmentLogSink::new(); sink.publish(session_start()); sink.publish(turn_end(1)); @@ -293,18 +293,18 @@ mod tests { sink.reset_with_initial(session_start()); match rx.try_recv() { - Ok(LogEntry::SessionStart { .. }) => {} - other => panic!("expected SessionStart broadcast, got {other:?}"), + Ok(LogEntry::SegmentStart { .. }) => {} + other => panic!("expected SegmentStart broadcast, got {other:?}"), } let (post_snapshot, _) = sink.subscribe_with_snapshot(); assert_eq!(post_snapshot.len(), 1); - assert!(matches!(post_snapshot[0], LogEntry::SessionStart { .. })); + assert!(matches!(post_snapshot[0], LogEntry::SegmentStart { .. })); } #[test] fn replace_silent_does_not_broadcast() { - let sink = SessionLogSink::new(); + let sink = SegmentLogSink::new(); sink.publish(session_start()); let (_pre_snapshot, mut rx) = sink.subscribe_with_snapshot(); @@ -318,7 +318,7 @@ mod tests { #[test] fn with_initial_seeds_the_mirror() { - let sink = SessionLogSink::with_initial(vec![session_start(), turn_end(1)]); + let sink = SegmentLogSink::with_initial(vec![session_start(), turn_end(1)]); let (snapshot, _) = sink.subscribe_with_snapshot(); assert_eq!(snapshot.len(), 2); } diff --git a/crates/pod/src/shared_state.rs b/crates/pod/src/shared_state.rs index 5ecccbfe..b363d911 100644 --- a/crates/pod/src/shared_state.rs +++ b/crates/pod/src/shared_state.rs @@ -2,7 +2,7 @@ use std::sync::{OnceLock, RwLock}; use protocol::PodStatus; use serde_json::json; -use session_store::SessionId; +use session_store::SegmentId; use crate::fs_view::PodFsView; @@ -28,7 +28,7 @@ pub struct KnowledgeCandidate { /// greeting, and completion lookup hubs. pub struct PodSharedState { pub pod_name: String, - pub session_id: SessionId, + pub segment_id: SegmentId, pub manifest_toml: String, pub greeting: protocol::Greeting, pub status: RwLock, @@ -46,13 +46,13 @@ pub struct PodSharedState { impl PodSharedState { pub fn new( pod_name: String, - session_id: SessionId, + segment_id: SegmentId, manifest_toml: String, greeting: protocol::Greeting, ) -> Self { Self { pod_name, - session_id, + segment_id, manifest_toml, greeting, status: RwLock::new(PodStatus::Idle), @@ -123,7 +123,7 @@ impl PodSharedState { let status = self.get_status(); json!({ "state": status, - "session_id": self.session_id.to_string(), + "segment_id": self.segment_id.to_string(), "pod_name": self.pod_name, }) .to_string() @@ -137,7 +137,7 @@ mod tests { fn test_state() -> PodSharedState { PodSharedState::new( "test-pod".into(), - session_store::new_session_id(), + session_store::new_segment_id(), "[pod]\nname = \"test-pod\"".into(), test_greeting(), ) @@ -176,7 +176,7 @@ mod tests { let parsed: serde_json::Value = serde_json::from_str(&json).unwrap(); assert_eq!(parsed["state"], "idle"); assert_eq!(parsed["pod_name"], "test-pod"); - assert!(parsed["session_id"].is_string()); + assert!(parsed["segment_id"].is_string()); } #[test] diff --git a/crates/pod/src/spawn/comm_tools.rs b/crates/pod/src/spawn/comm_tools.rs index c6b88b54..3ef09a1b 100644 --- a/crates/pod/src/spawn/comm_tools.rs +++ b/crates/pod/src/spawn/comm_tools.rs @@ -430,13 +430,13 @@ fn extract_assistant_text(entries: &[serde_json::Value]) -> String { for value in entries { // The wire payload is the JSON form of `session_store::LogEntry`. // Walk Assistant items inside each entry that can carry them: - // post-compaction `SessionStart.history` (seed) and per-LLM-call + // post-compaction `SegmentStart.history` (seed) and per-LLM-call // `AssistantItems` deltas. let Ok(entry) = serde_json::from_value::(value.clone()) else { continue; }; let logged_items = match entry { - LogEntry::SessionStart { history, .. } => history, + LogEntry::SegmentStart { history, .. } => history, LogEntry::AssistantItems { items, .. } => items, _ => continue, }; diff --git a/crates/pod/tests/compact_events_test.rs b/crates/pod/tests/compact_events_test.rs index eeea8873..3c6d13de 100644 --- a/crates/pod/tests/compact_events_test.rs +++ b/crates/pod/tests/compact_events_test.rs @@ -178,7 +178,7 @@ fn drain(rx: &mut broadcast::Receiver) -> Vec { } /// Collect every system-message text that the post-compaction -/// `SessionStart.history` carries, by reading the sink mirror directly. +/// `SegmentStart.history` carries, by reading the sink mirror directly. fn system_texts_in_sink_session_start( pod: &pod::Pod< impl llm_worker::llm_client::client::LlmClient + Clone + 'static, @@ -187,7 +187,7 @@ fn system_texts_in_sink_session_start( ) -> Vec { let (entries, _rx) = pod.sink().subscribe_with_snapshot(); for entry in entries.into_iter().rev() { - if let session_store::LogEntry::SessionStart { history, .. } = entry { + if let session_store::LogEntry::SegmentStart { history, .. } = entry { return history .into_iter() .filter_map(|logged| { @@ -229,7 +229,7 @@ async fn compact_emits_session_start_carrying_summary_and_task_snapshot() { pod.compact(10_000).await.unwrap(); let system_texts = system_texts_in_sink_session_start(&pod); - // The post-compaction `SessionStart.history` carries the new system + // The post-compaction `SegmentStart.history` carries the new system // messages introduced by the compactor. Clients re-seed their view // from this entry alone, so it is the load-bearing payload. assert!( @@ -289,11 +289,11 @@ async fn pre_run_compact_success_broadcasts_start_and_done() { // CompactDone carries the new session id. let new_id_in_event = events.iter().find_map(|e| match e { - Event::CompactDone { new_session_id } => Some(*new_session_id), + Event::CompactDone { new_segment_id } => Some(*new_segment_id), _ => None, }); assert!(new_id_in_event.is_some(), "CompactDone missing"); - assert_eq!(new_id_in_event.unwrap(), pod.session_id()); + assert_eq!(new_id_in_event.unwrap(), pod.segment_id()); } #[tokio::test] @@ -345,10 +345,10 @@ async fn mid_turn_compact_success_broadcasts_start_and_done() { ); let new_id_in_event = events.iter().find_map(|e| match e { - Event::CompactDone { new_session_id } => Some(*new_session_id), + Event::CompactDone { new_segment_id } => Some(*new_segment_id), _ => None, }); - assert_eq!(new_id_in_event, Some(pod.session_id())); + assert_eq!(new_id_in_event, Some(pod.segment_id())); } /// Regression: `Pod::compact()` must reset the in-memory @@ -520,7 +520,7 @@ async fn pre_run_compact_failure_broadcasts_start_and_failed() { // --------------------------------------------------------------------------- // Detached post-run memory jobs (`spawn_post_run_memory_jobs` / // `wait_for_memory_jobs`). Covers the detach round-trip and the structural -// invariant that the cloned memory-task Pod shares `SessionState` with the +// invariant that the cloned memory-task Pod shares `SegmentState` with the // source Pod, so that `save_extension` from the background extract does not // leave the next turn's `save_user_input` looking at a stale session pointer. @@ -570,7 +570,7 @@ async fn spawn_and_wait_drives_extract_to_completion() { #[tokio::test] async fn detached_extract_does_not_fork_session_log() { - // Source pod and the cloned memory-task pod share `SessionState` via + // Source pod and the cloned memory-task pod share `SegmentState` via // `Arc<_>`. The detached extract advances the entry tally through // `save_extension`; the next `run` must see that same tally so // `ensure_head_or_fork` does not spawn a new session. @@ -583,18 +583,18 @@ async fn detached_extract_does_not_fork_session_log() { let mut pod = make_pod_with_manifest(EXTRACT_NO_COMPACT_MANIFEST, client).await; pod.run_text("first").await.unwrap(); - let session_before = pod.session_id(); + let session_before = pod.segment_id(); pod.spawn_post_run_memory_jobs(); pod.wait_for_memory_jobs().await; pod.run_text("second").await.unwrap(); - let session_after = pod.session_id(); + let session_after = pod.segment_id(); assert_eq!( session_before, session_after, "detached extract's save_extension and the next turn's save_user_input \ - must share the entry tally through SessionState — a fork here means the \ + must share the entry tally through SegmentState — a fork here means the \ clone carried its own counter" ); } diff --git a/crates/pod/tests/consolidation_test.rs b/crates/pod/tests/consolidation_test.rs index 7c11c335..2c45a8e5 100644 --- a/crates/pod/tests/consolidation_test.rs +++ b/crates/pod/tests/consolidation_test.rs @@ -172,7 +172,7 @@ fn write_n_staging(layout: &WorkspaceLayout, n: usize) -> Vec { let (id, _) = write_staging( layout, SourceRef { - session_id: format!("s-{i}"), + segment_id: format!("s-{i}"), range: [i as u64, i as u64], }, ExtractedPayload::default(), diff --git a/crates/pod/tests/controller_test.rs b/crates/pod/tests/controller_test.rs index 2d97c5c6..ba79a007 100644 --- a/crates/pod/tests/controller_test.rs +++ b/crates/pod/tests/controller_test.rs @@ -22,7 +22,7 @@ fn history_from_sink(handle: &PodHandle) -> Vec { let mut items = Vec::new(); for entry in entries { match entry { - LogEntry::SessionStart { history, .. } => { + LogEntry::SegmentStart { history, .. } => { items.extend(history.into_iter().map(Item::from)); } LogEntry::UserInput { segments, .. } => { diff --git a/crates/pod/tests/pod_comm_tools_test.rs b/crates/pod/tests/pod_comm_tools_test.rs index 83483706..94f00891 100644 --- a/crates/pod/tests/pod_comm_tools_test.rs +++ b/crates/pod/tests/pod_comm_tools_test.rs @@ -349,7 +349,7 @@ async fn stop_pod_sends_shutdown_and_releases_scope() { permission: Permission::Write, recursive: true, }], - session_store::new_session_id(), + session_store::new_segment_id(), ) .unwrap(); pod_registry::delegate_scope( diff --git a/crates/pod/tests/pod_events_test.rs b/crates/pod/tests/pod_events_test.rs index 52b54d50..00c2526c 100644 --- a/crates/pod/tests/pod_events_test.rs +++ b/crates/pod/tests/pod_events_test.rs @@ -358,7 +358,7 @@ async fn shutdown_releases_scope_allocation_when_present() { std::process::id(), "/tmp/kid.sock".into(), vec![], - session_store::new_session_id(), + session_store::new_segment_id(), ) .unwrap(); std::mem::forget(guard); diff --git a/crates/pod/tests/restore_test.rs b/crates/pod/tests/restore_test.rs index ff49e7ba..c4d92217 100644 --- a/crates/pod/tests/restore_test.rs +++ b/crates/pod/tests/restore_test.rs @@ -8,7 +8,7 @@ use std::sync::{LazyLock, Mutex}; use pod::{Pod, PodError}; -use session_store::{FsStore, SessionId, StoreError}; +use session_store::{FsStore, SegmentId, StoreError}; const MINIMAL_MANIFEST_TOML: &str = r#" [pod] @@ -42,7 +42,7 @@ async fn restore_from_manifest_rejects_unknown_session() { // A freshly-minted id with no jsonl file at all → store returns // NotFound, which `Pod::restore_from_manifest` surfaces verbatim // as `PodError::Store`. - let unknown = session_store::new_session_id(); + let unknown = session_store::new_segment_id(); let result = Pod::restore_from_manifest(unknown, manifest, store, pod::PromptLoader::builtins_only()) .await; @@ -67,7 +67,7 @@ async fn restore_from_manifest_rejects_empty_session_log() { // `restore_from_manifest` rejects with `SessionEmpty` *before* it // gets as far as building the LLM client — so the test does not // need credentials or a runtime sandbox. - let id: SessionId = session_store::new_session_id(); + let id: SegmentId = session_store::new_segment_id(); let path = store_tmp.path().join(format!("{id}.jsonl")); std::fs::write(&path, b"").unwrap(); @@ -75,7 +75,7 @@ async fn restore_from_manifest_rejects_empty_session_log() { Pod::restore_from_manifest(id, manifest, store, pod::PromptLoader::builtins_only()).await; match result { - Err(PodError::SessionEmpty { session_id }) => assert_eq!(session_id, id), + Err(PodError::SessionEmpty { segment_id }) => assert_eq!(segment_id, id), Err(other) => panic!("expected SessionEmpty, got {other:?}"), Ok(_) => panic!("expected empty session log to fail"), } @@ -89,19 +89,19 @@ async fn restore_from_manifest_rejects_session_without_scope_snapshot() { let store = FsStore::new(store_tmp.path()).unwrap(); let manifest = pod::PodManifest::from_toml(MINIMAL_MANIFEST_TOML).unwrap(); - let id = session_store::new_session_id(); - let state = session_store::SessionStartState { + let id = session_store::new_segment_id(); + let state = session_store::SegmentStartState { system_prompt: None, config: &Default::default(), history: &[], }; - session_store::create_session_with_id(&store, id, state).unwrap(); + session_store::create_segment_with_id(&store, id, state).unwrap(); let result = Pod::restore_from_manifest(id, manifest, store, pod::PromptLoader::builtins_only()).await; match result { - Err(PodError::SessionScopeMissing { session_id }) => assert_eq!(session_id, id), + Err(PodError::SessionScopeMissing { segment_id }) => assert_eq!(segment_id, id), Err(other) => panic!("expected SessionScopeMissing, got {other:?}"), Ok(_) => panic!("expected missing scope snapshot to fail"), } diff --git a/crates/pod/tests/session_metrics_test.rs b/crates/pod/tests/session_metrics_test.rs index e72ea9f0..659f47d5 100644 --- a/crates/pod/tests/session_metrics_test.rs +++ b/crates/pod/tests/session_metrics_test.rs @@ -26,7 +26,7 @@ use llm_worker::llm_client::event::{Event as LlmEvent, ResponseStatus, StatusEve use llm_worker::llm_client::{ClientError, LlmClient, Request}; use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput}; use session_metrics::{DOMAIN, Metric, metrics_from_extensions}; -use session_store::{FsStore, LogEntry, SessionId, Store, StoreError, TraceEntry}; +use session_store::{FsStore, LogEntry, SegmentId, Store, StoreError, TraceEntry}; use pod::{Pod, PodManifest}; @@ -200,7 +200,7 @@ async fn prune_metrics_emit_skip_then_fire_with_post_request_join() { text_response_with_cache("done", 1234, 50), ]); let (mut pod, _store_tmp, _pwd_tmp) = make_pod(manifest_toml(1, 1), client, "big_tool").await; - let session_id = pod.session_id(); + let segment_id = pod.segment_id(); // Cloning the store handle to read the session log back after the // runs complete — the Pod retains its own copy. let store = pod.store().clone(); @@ -208,7 +208,7 @@ async fn prune_metrics_emit_skip_then_fire_with_post_request_join() { pod.run_text("first").await.unwrap(); pod.run_text("second").await.unwrap(); - let state = session_store::restore(&store, session_id).unwrap(); + let state = session_store::restore(&store, segment_id).unwrap(); let metrics = metrics_from_extensions(&state.extensions); // Run 1 has 2 LLM iterations (tool loop), each evaluates prune with @@ -288,13 +288,13 @@ async fn prune_metrics_record_below_min_savings_skip() { ]); let (mut pod, _store_tmp, _pwd_tmp) = make_pod(manifest_toml(1, u64::MAX), client, "big_tool").await; - let session_id = pod.session_id(); + let segment_id = pod.segment_id(); let store = pod.store().clone(); pod.run_text("first").await.unwrap(); pod.run_text("second").await.unwrap(); - let state = session_store::restore(&store, session_id).unwrap(); + let state = session_store::restore(&store, segment_id).unwrap(); let metrics = metrics_from_extensions(&state.extensions); let below = metrics .iter() @@ -327,7 +327,7 @@ struct MetricFailingStore { } impl Store for MetricFailingStore { - fn append(&self, id: SessionId, entry: &LogEntry) -> Result<(), StoreError> { + fn append(&self, id: SegmentId, entry: &LogEntry) -> Result<(), StoreError> { if let LogEntry::Extension { domain, .. } = entry { if domain == DOMAIN { return Err(StoreError::Io(std::io::Error::other("synthetic failure"))); @@ -335,22 +335,22 @@ impl Store for MetricFailingStore { } self.inner.append(id, entry) } - fn read_all(&self, id: SessionId) -> Result, StoreError> { + fn read_all(&self, id: SegmentId) -> Result, StoreError> { self.inner.read_all(id) } - fn list_sessions(&self) -> Result, StoreError> { - self.inner.list_sessions() + fn list_segments(&self) -> Result, StoreError> { + self.inner.list_segments() } - fn create_session(&self, id: SessionId, entries: &[LogEntry]) -> Result<(), StoreError> { - self.inner.create_session(id, entries) + fn create_segment(&self, id: SegmentId, entries: &[LogEntry]) -> Result<(), StoreError> { + self.inner.create_segment(id, entries) } - fn exists(&self, id: SessionId) -> Result { + fn exists(&self, id: SegmentId) -> Result { self.inner.exists(id) } - fn read_entry_count(&self, id: SessionId) -> Result { + fn read_entry_count(&self, id: SegmentId) -> Result { self.inner.read_entry_count(id) } - fn append_trace(&self, id: SessionId, entry: &TraceEntry) -> Result<(), StoreError> { + fn append_trace(&self, id: SegmentId, entry: &TraceEntry) -> Result<(), StoreError> { self.inner.append_trace(id, entry) } } @@ -386,12 +386,12 @@ async fn metric_write_failure_emits_warn_alert_and_does_not_abort_run() { let alerter = pod::Alerter::new(tx); pod.attach_alerter(alerter); - let session_id = pod.session_id(); + let segment_id = pod.segment_id(); // Run completes successfully despite metric failure. pod.run_text("hello").await.unwrap(); // No metrics ended up in the log (writes were rejected). - let state = session_store::restore(&store, session_id).unwrap(); + let state = session_store::restore(&store, segment_id).unwrap(); let metrics = metrics_from_extensions(&state.extensions); assert!(metrics.is_empty(), "metrics must drop on write failure"); @@ -446,10 +446,10 @@ permission = "write" let mut pod = Pod::new(manifest, worker, store.clone(), pwd, scope) .await .unwrap(); - let session_id = pod.session_id(); + let segment_id = pod.segment_id(); pod.run_text("hello").await.unwrap(); - let state = session_store::restore(&store, session_id).unwrap(); + let state = session_store::restore(&store, segment_id).unwrap(); let metrics = metrics_from_extensions(&state.extensions); assert!( metrics.is_empty(), diff --git a/crates/pod/tests/spawn_pod_test.rs b/crates/pod/tests/spawn_pod_test.rs index 3d37ebaa..91f51578 100644 --- a/crates/pod/tests/spawn_pod_test.rs +++ b/crates/pod/tests/spawn_pod_test.rs @@ -73,7 +73,7 @@ async fn setup_spawner( permission: Permission::Write, recursive: true, }], - session_store::new_session_id(), + session_store::new_segment_id(), ) .unwrap(); // Leak the guard — the spawner allocation needs to outlive the diff --git a/crates/pod/tests/system_prompt_template_test.rs b/crates/pod/tests/system_prompt_template_test.rs index cb6bca4f..98dd7328 100644 --- a/crates/pod/tests/system_prompt_template_test.rs +++ b/crates/pod/tests/system_prompt_template_test.rs @@ -182,16 +182,16 @@ async fn session_start_state_captures_rendered_prompt() { .unwrap(); pod.run_text("hi").await.unwrap(); - let entries = pod.store().read_all(pod.session_id()).unwrap(); + let entries = pod.store().read_all(pod.segment_id()).unwrap(); let first = entries.first().expect("at least one entry"); match first { - LogEntry::SessionStart { system_prompt, .. } => { + LogEntry::SegmentStart { system_prompt, .. } => { let sp = system_prompt.as_deref().expect("system prompt set"); assert!(sp.starts_with("hello cwd=")); assert!(sp.contains(&pwd.display().to_string())); assert!(sp.contains("## Working boundaries")); } - other => panic!("expected SessionStart as first entry, got {other:?}"), + other => panic!("expected SegmentStart as first entry, got {other:?}"), } } diff --git a/crates/protocol/src/lib.rs b/crates/protocol/src/lib.rs index 8e7c9770..1442a47b 100644 --- a/crates/protocol/src/lib.rs +++ b/crates/protocol/src/lib.rs @@ -364,7 +364,7 @@ pub enum Event { /// Live updates after the snapshot arrive through the streaming /// events (`TextDelta` / `ToolCall*` / `ToolResult` / etc.) plus /// the two role-specific entry events - /// (`SessionRotated` / `HookInjectedItems`) — there is no generic + /// (`SegmentRotated` / `HookInjectedItems`) — there is no generic /// "every committed entry" broadcast. Snapshot { entries: Vec, @@ -372,15 +372,15 @@ pub enum Event { #[serde(default)] status: PodStatus, }, - /// Server-side session log rotated to a fresh `SessionStart`. + /// Server-side segment log rotated to a fresh `SegmentStart`. /// /// Fires on compaction and on auto-fork when the store head drifts /// from the live writer's cached head. Clients drop their derived /// view and reseed from `entry.history` exactly the way they would /// from a connect-time `Snapshot`. /// - /// Payload is the JSON form of `session_store::LogEntry::SessionStart`. - SessionRotated { + /// Payload is the JSON form of `session_store::LogEntry::SegmentStart`. + SegmentRotated { entry: serde_json::Value, }, /// Current Pod controller status. Broadcast on every controller-level @@ -400,15 +400,15 @@ pub enum Event { /// Pod has started compacting the current session. /// /// Fired immediately before a compaction run. Success is signalled by - /// `CompactDone` (with the new `SessionId`); failure by `CompactFailed`. + /// `CompactDone` (with the new `SegmentId`); failure by `CompactFailed`. /// Broadcast to all clients; not replayed to late subscribers. CompactStart, /// Compaction completed and the session was rotated. /// - /// `new_session_id` is the UUID of the freshly created session that + /// `new_segment_id` is the UUID of the freshly created session that /// replaced the old history. CompactDone { - new_session_id: uuid::Uuid, + new_segment_id: uuid::Uuid, }, /// Compaction failed. The session is unchanged. CompactFailed { @@ -890,18 +890,18 @@ mod tests { } #[test] - fn event_session_rotated_roundtrip() { - let event = Event::SessionRotated { - entry: serde_json::json!({"kind": "session_start", "ts": 1, "history": []}), + fn event_segment_rotated_roundtrip() { + let event = Event::SegmentRotated { + entry: serde_json::json!({"kind": "segment_start", "ts": 1, "history": []}), }; let json = serde_json::to_string(&event).unwrap(); let parsed: serde_json::Value = serde_json::from_str(&json).unwrap(); - assert_eq!(parsed["event"], "session_rotated"); - assert_eq!(parsed["data"]["entry"]["kind"], "session_start"); + assert_eq!(parsed["event"], "segment_rotated"); + assert_eq!(parsed["data"]["entry"]["kind"], "segment_start"); let decoded: Event = serde_json::from_str(&json).unwrap(); match decoded { - Event::SessionRotated { entry } => assert_eq!(entry["kind"], "session_start"), - other => panic!("expected SessionRotated, got {other:?}"), + Event::SegmentRotated { entry } => assert_eq!(entry["kind"], "segment_start"), + other => panic!("expected SegmentRotated, got {other:?}"), } } @@ -1060,17 +1060,17 @@ mod tests { #[test] fn event_compact_done_roundtrip() { let id = uuid::Uuid::parse_str("0192f0e8-4d84-7d6e-a000-000000000001").unwrap(); - let event = Event::CompactDone { new_session_id: id }; + let event = Event::CompactDone { new_segment_id: id }; let json = serde_json::to_string(&event).unwrap(); let parsed: serde_json::Value = serde_json::from_str(&json).unwrap(); assert_eq!(parsed["event"], "compact_done"); assert_eq!( - parsed["data"]["new_session_id"], + parsed["data"]["new_segment_id"], "0192f0e8-4d84-7d6e-a000-000000000001" ); let decoded: Event = serde_json::from_str(&json).unwrap(); match decoded { - Event::CompactDone { new_session_id } => assert_eq!(new_session_id, id), + Event::CompactDone { new_segment_id } => assert_eq!(new_segment_id, id), other => panic!("expected CompactDone, got {other:?}"), } } diff --git a/crates/session-metrics/src/lib.rs b/crates/session-metrics/src/lib.rs index 0600c597..eb0d9c7a 100644 --- a/crates/session-metrics/src/lib.rs +++ b/crates/session-metrics/src/lib.rs @@ -18,7 +18,7 @@ use std::collections::BTreeMap; use serde::{Deserialize, Serialize}; -use session_store::{SessionId, Store, StoreError, save_extension, session_log}; +use session_store::{SegmentId, Store, StoreError, save_extension, segment_log}; /// Domain tag used in `LogEntry::Extension` for all metrics records. pub const DOMAIN: &str = "metrics"; @@ -48,7 +48,7 @@ impl Metric { pub fn now(name: impl Into) -> Self { Self { name: name.into(), - ts: session_log::now_millis(), + ts: segment_log::now_millis(), dimensions: BTreeMap::new(), value: None, correlation_id: None, @@ -77,11 +77,11 @@ impl Metric { /// (メトリクスのために本体処理を止めるかは呼び出し側の判断)。 pub fn record_metric( store: &impl Store, - session_id: SessionId, + segment_id: SegmentId, metric: &Metric, ) -> Result<(), StoreError> { let payload = serde_json::to_value(metric).expect("Metric serialization cannot fail"); - save_extension(store, session_id, DOMAIN, payload) + save_extension(store, segment_id, DOMAIN, payload) } /// `RestoredState.extensions` から metrics domain の payload を順に取り出し、 diff --git a/crates/session-store/src/event_trace.rs b/crates/session-store/src/event_trace.rs index 8fbfc0ec..7f152520 100644 --- a/crates/session-store/src/event_trace.rs +++ b/crates/session-store/src/event_trace.rs @@ -2,7 +2,7 @@ //! //! [`TraceEntry`] captures every LLM stream event verbatim for debugging //! and post-hoc analysis. Written to a separate `.trace.jsonl` file, -//! completely independent of the session log used for state restoration. +//! completely independent of the segment log used for state restoration. //! //! Disabled by default. Enable via `SessionConfig::record_event_trace`. diff --git a/crates/session-store/src/fs_store.rs b/crates/session-store/src/fs_store.rs index 0d04db74..8ebc4eca 100644 --- a/crates/session-store/src/fs_store.rs +++ b/crates/session-store/src/fs_store.rs @@ -1,12 +1,12 @@ //! Filesystem-backed JSONL store. //! //! Layout: -//! - Session log: `{root}/{session_id}.jsonl` -//! - Event trace: `{root}/{session_id}.trace.jsonl` +//! - Segment log: `{root}/{segment_id}.jsonl` +//! - Event trace: `{root}/{segment_id}.trace.jsonl` -use crate::SessionId; +use crate::SegmentId; use crate::event_trace::TraceEntry; -use crate::session_log::LogEntry; +use crate::segment_log::LogEntry; use crate::store::{Store, StoreError}; use std::fs; use std::io::Write; @@ -14,7 +14,7 @@ use std::path::{Path, PathBuf}; /// Filesystem-backed JSONL store. /// -/// Each session is stored as a single `.jsonl` file with one [`LogEntry`] +/// Each segment is stored as a single `.jsonl` file with one [`LogEntry`] /// per line. Writes use append mode for crash safety. #[derive(Clone)] pub struct FsStore { @@ -30,11 +30,11 @@ impl FsStore { Ok(Self { root }) } - fn log_path(&self, id: SessionId) -> PathBuf { + fn log_path(&self, id: SegmentId) -> PathBuf { self.root.join(format!("{id}.jsonl")) } - fn trace_path(&self, id: SessionId) -> PathBuf { + fn trace_path(&self, id: SegmentId) -> PathBuf { self.root.join(format!("{id}.trace.jsonl")) } @@ -65,12 +65,12 @@ impl FsStore { } impl Store for FsStore { - fn append(&self, id: SessionId, entry: &LogEntry) -> Result<(), StoreError> { + fn append(&self, id: SegmentId, entry: &LogEntry) -> Result<(), StoreError> { let line = serde_json::to_string(entry)?; self.append_line(&self.log_path(id), &line) } - fn read_all(&self, id: SessionId) -> Result, StoreError> { + fn read_all(&self, id: SegmentId) -> Result, StoreError> { let path = self.log_path(id); if !path.exists() { return Err(StoreError::NotFound(id)); @@ -79,7 +79,7 @@ impl Store for FsStore { Self::parse_jsonl(&content) } - fn list_sessions(&self) -> Result, StoreError> { + fn list_segments(&self) -> Result, StoreError> { let mut sessions = Vec::new(); for entry in fs::read_dir(&self.root)? { let entry = entry?; @@ -88,7 +88,7 @@ impl Store for FsStore { let name = path.file_name().and_then(|n| n.to_str()).unwrap_or(""); if name.ends_with(".jsonl") && !name.ends_with(".trace.jsonl") { let stem = name.trim_end_matches(".jsonl"); - if let Ok(id) = stem.parse::() { + if let Ok(id) = stem.parse::() { sessions.push(id); } } @@ -98,7 +98,7 @@ impl Store for FsStore { Ok(sessions) } - fn create_session(&self, id: SessionId, entries: &[LogEntry]) -> Result<(), StoreError> { + fn create_segment(&self, id: SegmentId, entries: &[LogEntry]) -> Result<(), StoreError> { let path = self.log_path(id); let mut content = String::new(); for entry in entries { @@ -109,11 +109,11 @@ impl Store for FsStore { Ok(()) } - fn exists(&self, id: SessionId) -> Result { + fn exists(&self, id: SegmentId) -> Result { Ok(self.log_path(id).exists()) } - fn read_entry_count(&self, id: SessionId) -> Result { + fn read_entry_count(&self, id: SegmentId) -> Result { let path = self.log_path(id); if !path.exists() { return Err(StoreError::NotFound(id)); @@ -122,7 +122,7 @@ impl Store for FsStore { Ok(content.lines().filter(|l| !l.trim().is_empty()).count()) } - fn append_trace(&self, id: SessionId, entry: &TraceEntry) -> Result<(), StoreError> { + fn append_trace(&self, id: SegmentId, entry: &TraceEntry) -> Result<(), StoreError> { let line = serde_json::to_string(entry)?; self.append_line(&self.trace_path(id), &line) } diff --git a/crates/session-store/src/lib.rs b/crates/session-store/src/lib.rs index d00f376c..3e5df709 100644 --- a/crates/session-store/src/lib.rs +++ b/crates/session-store/src/lib.rs @@ -1,4 +1,4 @@ -//! Session persistence via append-only JSONL logs. +//! Segment persistence via append-only JSONL logs. //! //! # Architecture //! @@ -11,15 +11,15 @@ //! functions after state-mutating operations. //! //! Debug-mode [`TraceEntry`] records capture raw stream events in a separate -//! `.trace.jsonl` file, independent of the session log. +//! `.trace.jsonl` file, independent of the segment log. //! //! # Quick start //! //! ```ignore -//! use session_store::{create_session, restore, save_delta, FsStore, SessionStartState}; +//! use session_store::{create_segment, restore, save_delta, FsStore, SegmentStartState}; //! //! let store = FsStore::new("./sessions")?; -//! let session_id = create_session(&store, SessionStartState { +//! let segment_id = create_segment(&store, SegmentStartState { //! system_prompt: None, //! config: &config, //! history: &[], @@ -29,8 +29,8 @@ pub mod event_trace; pub mod fs_store; pub mod logged_item; -pub mod session; -pub mod session_log; +pub mod segment; +pub mod segment_log; pub mod store; pub mod system_item; @@ -39,23 +39,23 @@ pub use fs_store::FsStore; pub use llm_worker::UsageRecord; pub use llm_worker::llm_client::types::{ContentPart, Item, Role}; pub use logged_item::{LoggedContentPart, LoggedItem, LoggedRole, from_logged, to_logged}; -pub use session::{ - SessionStartState, append_entry, append_system_item, classify_history_item, - create_compacted_session, create_session, create_session_with_id, ensure_head_or_fork, fork, +pub use segment::{ + SegmentStartState, append_entry, append_system_item, classify_history_item, + create_compacted_segment, create_segment, create_segment_with_id, ensure_head_or_fork, fork, fork_at, restore, save_config_changed, save_delta, save_extension, save_pod_scope, save_run_completed, save_run_errored, save_turn_end, save_usage, save_user_input, }; -pub use session_log::{ - LogEntry, POD_SCOPE_EXTENSION_DOMAIN, PodScopeSnapshot, RestoredState, SessionOrigin, +pub use segment_log::{ + LogEntry, POD_SCOPE_EXTENSION_DOMAIN, PodScopeSnapshot, RestoredState, SegmentOrigin, collect_state, }; pub use system_item::{SystemItem, render_pod_event}; pub use store::{Store, StoreError}; -/// Session identifier. UUID v7 (time-ordered, lexicographically sortable). -pub type SessionId = uuid::Uuid; +/// Segment identifier. UUID v7 (time-ordered, lexicographically sortable). +pub type SegmentId = uuid::Uuid; -/// Generate a new session ID. -pub fn new_session_id() -> SessionId { +/// Generate a new segment ID. +pub fn new_segment_id() -> SegmentId { uuid::Uuid::now_v7() } diff --git a/crates/session-store/src/session.rs b/crates/session-store/src/segment.rs similarity index 70% rename from crates/session-store/src/session.rs rename to crates/session-store/src/segment.rs index 2bad2a08..5a3d7115 100644 --- a/crates/session-store/src/session.rs +++ b/crates/session-store/src/segment.rs @@ -1,12 +1,12 @@ -//! Free functions for session persistence operations. +//! Free functions for segment persistence operations. //! -//! These functions record and restore session state without owning a Worker. +//! These functions record and restore segment state without owning a Worker. //! The caller (typically Pod) holds the Worker directly and calls these //! functions after state-mutating operations. -use crate::SessionId; +use crate::SegmentId; use crate::logged_item::{LoggedItem, to_logged}; -use crate::session_log::{self, LogEntry, PodScopeSnapshot, SessionOrigin}; +use crate::segment_log::{self, LogEntry, PodScopeSnapshot, SegmentOrigin}; use crate::store::{Store, StoreError}; use crate::system_item::SystemItem; use llm_worker::WorkerResult; @@ -14,108 +14,108 @@ use llm_worker::llm_client::RequestConfig; use llm_worker::llm_client::types::Item; use protocol::Segment; -/// State snapshot for creating a SessionStart entry. -pub struct SessionStartState<'a> { +/// State snapshot for creating a SegmentStart entry. +pub struct SegmentStartState<'a> { pub system_prompt: Option<&'a str>, pub config: &'a RequestConfig, pub history: &'a [Item], } -/// Create a new session, writing the initial `SessionStart` entry. -pub fn create_session( +/// Create a new segment, writing the initial `SegmentStart` entry. +pub fn create_segment( store: &impl Store, - state: SessionStartState<'_>, -) -> Result { - let session_id = crate::new_session_id(); - create_session_with_id(store, session_id, state)?; - Ok(session_id) + state: SegmentStartState<'_>, +) -> Result { + let segment_id = crate::new_segment_id(); + create_segment_with_id(store, segment_id, state)?; + Ok(segment_id) } -/// Write a fresh `SessionStart` entry using a pre-generated session ID. +/// Write a fresh `SegmentStart` entry using a pre-generated segment ID. /// -/// Used by callers that need to reserve a session ID synchronously but +/// Used by callers that need to reserve a segment ID synchronously but /// defer the initial log append (e.g. Pod, which resolves a templated /// system prompt only at first turn). -pub fn create_session_with_id( +pub fn create_segment_with_id( store: &impl Store, - session_id: SessionId, - state: SessionStartState<'_>, + segment_id: SegmentId, + state: SegmentStartState<'_>, ) -> Result<(), StoreError> { - let entry = LogEntry::SessionStart { - ts: session_log::now_millis(), + let entry = LogEntry::SegmentStart { + ts: segment_log::now_millis(), system_prompt: state.system_prompt.map(String::from), config: state.config.clone(), history: to_logged(state.history), forked_from: None, compacted_from: None, }; - store.append(session_id, &entry) + store.append(segment_id, &entry) } -/// Create a compacted session from an existing one. +/// Create a compacted segment from an existing one. /// -/// Records `compacted_from` provenance linking back to the source session +/// Records `compacted_from` provenance linking back to the source segment /// at the turn boundary captured by `source_turn_count` (the most recent /// completed turn in the source). -pub fn create_compacted_session( +pub fn create_compacted_segment( store: &impl Store, - state: SessionStartState<'_>, - source_session_id: SessionId, + state: SegmentStartState<'_>, + source_session_id: SegmentId, source_turn_count: usize, -) -> Result { - let session_id = crate::new_session_id(); - let entry = LogEntry::SessionStart { - ts: session_log::now_millis(), +) -> Result { + let segment_id = crate::new_segment_id(); + let entry = LogEntry::SegmentStart { + ts: segment_log::now_millis(), system_prompt: state.system_prompt.map(String::from), config: state.config.clone(), history: to_logged(state.history), forked_from: None, - compacted_from: Some(SessionOrigin { - session_id: source_session_id, + compacted_from: Some(SegmentOrigin { + segment_id: source_session_id, at_turn_index: source_turn_count, }), }; - store.append(session_id, &entry)?; - Ok(session_id) + store.append(segment_id, &entry)?; + Ok(segment_id) } -/// Restore session state from a stored log. +/// Restore segment state from a stored log. /// /// Returns the reconstructed state. The caller is responsible for /// applying it to a Worker. pub fn restore( store: &impl Store, - session_id: SessionId, -) -> Result { - let entries = store.read_all(session_id)?; - Ok(session_log::collect_state(&entries)) + segment_id: SegmentId, +) -> Result { + let entries = store.read_all(segment_id)?; + Ok(segment_log::collect_state(&entries)) } /// Check if the store's entry count still matches the writer's tally. -/// If not, auto-fork into a new session. +/// If not, auto-fork into a new segment. /// -/// Updates `session_id` and `entries_written` in place when a fork occurs. +/// Updates `segment_id` and `entries_written` in place when a fork occurs. pub fn ensure_head_or_fork( store: &impl Store, - session_id: &mut SessionId, + segment_id: &mut SegmentId, entries_written: &mut usize, - state: SessionStartState<'_>, + state: SegmentStartState<'_>, ) -> Result<(), StoreError> { - let store_count = store.read_entry_count(*session_id)?; + let store_count = store.read_entry_count(*segment_id)?; if store_count == *entries_written { return Ok(()); } - let fork_id = crate::new_session_id(); - let entry = LogEntry::SessionStart { - ts: session_log::now_millis(), + let fork_id = crate::new_segment_id(); + let entry = LogEntry::SegmentStart { + ts: segment_log::now_millis(), system_prompt: state.system_prompt.map(String::from), config: state.config.clone(), history: to_logged(state.history), forked_from: None, compacted_from: None, }; - store.create_session(fork_id, &[entry])?; - *session_id = fork_id; + store.create_segment(fork_id, &[entry])?; + *segment_id = fork_id; *entries_written = 1; Ok(()) } @@ -128,14 +128,14 @@ pub fn ensure_head_or_fork( /// [`Segment::flatten_to_text`]. pub fn save_user_input( store: &impl Store, - session_id: SessionId, + segment_id: SegmentId, segments: Vec, ) -> Result<(), StoreError> { append_entry( store, - session_id, + segment_id, LogEntry::UserInput { - ts: session_log::now_millis(), + ts: segment_log::now_millis(), segments, }, ) @@ -151,21 +151,21 @@ pub fn save_user_input( /// `UserInput` entry. pub fn save_delta( store: &impl Store, - session_id: SessionId, + segment_id: SegmentId, new_items: &[Item], ) -> Result<(), StoreError> { if new_items.is_empty() { return Ok(()); } - let ts = session_log::now_millis(); + let ts = segment_log::now_millis(); for item in new_items { if item.is_user_message() { // Already persisted by save_user_input at submit time. continue; } let entry = classify_history_item(item, ts); - append_entry(store, session_id, entry)?; + append_entry(store, segment_id, entry)?; } Ok(()) } @@ -199,14 +199,14 @@ pub fn classify_history_item(item: &Item, ts: u64) -> LogEntry { /// commit shape used for assistant / tool result entries. pub fn append_system_item( store: &impl Store, - session_id: SessionId, + segment_id: SegmentId, item: SystemItem, ) -> Result<(), StoreError> { append_entry( store, - session_id, + segment_id, LogEntry::SystemItem { - ts: session_log::now_millis(), + ts: segment_log::now_millis(), item, }, ) @@ -215,14 +215,14 @@ pub fn append_system_item( /// Log a TurnEnd entry. pub fn save_turn_end( store: &impl Store, - session_id: SessionId, + segment_id: SegmentId, turn_count: usize, ) -> Result<(), StoreError> { append_entry( store, - session_id, + segment_id, LogEntry::TurnEnd { - ts: session_log::now_millis(), + ts: segment_log::now_millis(), turn_count, }, ) @@ -231,15 +231,15 @@ pub fn save_turn_end( /// Log a `RunCompleted` entry — `run()` / `resume()` returned `Ok(WorkerResult)`. pub fn save_run_completed( store: &impl Store, - session_id: SessionId, + segment_id: SegmentId, result: WorkerResult, interrupted: bool, ) -> Result<(), StoreError> { append_entry( store, - session_id, + segment_id, LogEntry::RunCompleted { - ts: session_log::now_millis(), + ts: segment_log::now_millis(), interrupted, result, }, @@ -252,15 +252,15 @@ pub fn save_run_completed( /// `to_string()` rendering as `message`. pub fn save_run_errored( store: &impl Store, - session_id: SessionId, + segment_id: SegmentId, message: String, interrupted: bool, ) -> Result<(), StoreError> { append_entry( store, - session_id, + segment_id, LogEntry::RunErrored { - ts: session_log::now_millis(), + ts: segment_log::now_millis(), interrupted, message, }, @@ -275,7 +275,7 @@ pub fn save_run_errored( /// 済ませた値を渡す。 pub fn save_usage( store: &impl Store, - session_id: SessionId, + segment_id: SegmentId, history_len: usize, input_total_tokens: u64, cache_read_tokens: u64, @@ -284,9 +284,9 @@ pub fn save_usage( ) -> Result<(), StoreError> { append_entry( store, - session_id, + segment_id, LogEntry::LlmUsage { - ts: session_log::now_millis(), + ts: segment_log::now_millis(), history_len, input_total_tokens, cache_read_tokens, @@ -303,15 +303,15 @@ pub fn save_usage( /// Use `RestoredState.extensions` to read entries back at restore time. pub fn save_extension( store: &impl Store, - session_id: SessionId, + segment_id: SegmentId, domain: impl Into, payload: serde_json::Value, ) -> Result<(), StoreError> { append_entry( store, - session_id, + segment_id, LogEntry::Extension { - ts: session_log::now_millis(), + ts: segment_log::now_millis(), domain: domain.into(), payload, }, @@ -321,14 +321,14 @@ pub fn save_extension( /// Log the Pod's latest runtime scope snapshot. pub fn save_pod_scope( store: &impl Store, - session_id: SessionId, + segment_id: SegmentId, snapshot: &PodScopeSnapshot, ) -> Result<(), StoreError> { let payload = serde_json::to_value(snapshot)?; save_extension( store, - session_id, - session_log::POD_SCOPE_EXTENSION_DOMAIN, + segment_id, + segment_log::POD_SCOPE_EXTENSION_DOMAIN, payload, ) } @@ -336,35 +336,35 @@ pub fn save_pod_scope( /// Log a `ConfigChanged` entry. pub fn save_config_changed( store: &impl Store, - session_id: SessionId, + segment_id: SegmentId, config: &RequestConfig, ) -> Result<(), StoreError> { append_entry( store, - session_id, + segment_id, LogEntry::ConfigChanged { - ts: session_log::now_millis(), + ts: segment_log::now_millis(), config: config.clone(), }, ) } -/// Fork the current state into a new session. -pub fn fork(store: &impl Store, state: SessionStartState<'_>) -> Result { - let fork_id = crate::new_session_id(); - let entry = LogEntry::SessionStart { - ts: session_log::now_millis(), +/// Fork the current state into a new segment. +pub fn fork(store: &impl Store, state: SegmentStartState<'_>) -> Result { + let fork_id = crate::new_segment_id(); + let entry = LogEntry::SegmentStart { + ts: segment_log::now_millis(), system_prompt: state.system_prompt.map(String::from), config: state.config.clone(), history: to_logged(state.history), forked_from: None, compacted_from: None, }; - store.create_session(fork_id, &[entry])?; + store.create_segment(fork_id, &[entry])?; Ok(fork_id) } -/// Fork from a turn boundary in a stored session's log. +/// Fork from a turn boundary in a stored segment log. /// /// `at_turn_index` is the `turn_count` of the most recent completed /// `TurnEnd` in the source segment that the fork should branch from. @@ -372,16 +372,16 @@ pub fn fork(store: &impl Store, state: SessionStartState<'_>) -> Result Result { +) -> Result { let entries = store.read_all(source_id)?; let cut = if at_turn_index == 0 { - // Branch directly after the SessionStart (or whatever opens the + // Branch directly after the SegmentStart (or whatever opens the // segment), before any turn completes. entries .iter() - .position(|e| !matches!(e, LogEntry::SessionStart { .. })) + .position(|e| !matches!(e, LogEntry::SegmentStart { .. })) .unwrap_or(entries.len()) } else { entries @@ -390,21 +390,21 @@ pub fn fork_at( .map(|i| i + 1) .unwrap_or(entries.len()) }; - let state = session_log::collect_state(&entries[..cut]); + let state = segment_log::collect_state(&entries[..cut]); - let fork_id = crate::new_session_id(); - let entry = LogEntry::SessionStart { - ts: session_log::now_millis(), + let fork_id = crate::new_segment_id(); + let entry = LogEntry::SegmentStart { + ts: segment_log::now_millis(), system_prompt: state.system_prompt, config: state.config, history: to_logged(&state.history), - forked_from: Some(SessionOrigin { - session_id: source_id, + forked_from: Some(SegmentOrigin { + segment_id: source_id, at_turn_index, }), compacted_from: None, }; - store.create_session(fork_id, &[entry])?; + store.create_segment(fork_id, &[entry])?; Ok(fork_id) } @@ -415,8 +415,8 @@ pub fn fork_at( /// it needs the same value for an in-memory mirror + broadcast). pub fn append_entry( store: &impl Store, - session_id: SessionId, + segment_id: SegmentId, entry: LogEntry, ) -> Result<(), StoreError> { - store.append(session_id, &entry) + store.append(segment_id, &entry) } diff --git a/crates/session-store/src/session_log.rs b/crates/session-store/src/segment_log.rs similarity index 93% rename from crates/session-store/src/session_log.rs rename to crates/session-store/src/segment_log.rs index da7e4495..15835a63 100644 --- a/crates/session-store/src/session_log.rs +++ b/crates/session-store/src/segment_log.rs @@ -1,12 +1,13 @@ -//! Session log types for append-only JSONL persistence. +//! Segment log types for append-only JSONL persistence. //! -//! Each [`LogEntry`] represents a single state transition in a session, -//! serialized as one line in a `.jsonl` file. Reading all entries and -//! collecting them via [`collect_state`] reconstructs the full [`Worker`] state. +//! Each [`LogEntry`] represents a single state transition within one +//! segment, serialized as one line in a `.jsonl` file. Reading all +//! entries and collecting them via [`collect_state`] reconstructs the +//! full [`Worker`] state at that segment. //! //! The on-disk format is one `LogEntry` per line — entries are positionally //! ordered. Fork lineage references between segments use turn-number indices -//! (`SessionOrigin.at_turn_index`) rather than per-entry hashes. +//! (`SegmentOrigin.at_turn_index`) rather than per-entry hashes. use llm_worker::llm_client::types::{Item, RequestConfig}; use llm_worker::{UsageRecord, WorkerResult}; @@ -16,10 +17,10 @@ use serde::{Deserialize, Serialize}; use crate::logged_item::LoggedItem; use crate::system_item::SystemItem; -/// A single session log entry, serialized as one JSONL line. +/// A single segment log entry, serialized as one JSONL line. /// /// Variants correspond to specific mutation points in `Worker`: -/// - `SessionStart` — always the first entry; captures initial state +/// - `SegmentStart` — always the first entry; captures initial state /// - `Invoke` — IDLE → active marker (start of a new self-driving cycle) /// - `UserInput` / `AssistantItems` / `ToolResults` / `HookInjectedItems` — history appends /// - `TurnEnd` — AgentTurn boundary marker; carries the post-increment @@ -32,19 +33,19 @@ use crate::system_item::SystemItem; #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "kind", rename_all = "snake_case")] pub enum LogEntry { - /// Session start. Always the first entry in a log. - /// For forked sessions, `history` contains the seed state from the parent. - SessionStart { + /// Segment start. Always the first entry in a segment log. + /// For forked segments, `history` contains the seed state from the parent. + SegmentStart { ts: u64, system_prompt: Option, config: RequestConfig, history: Vec, - /// Origin: forked from another session at a specific turn boundary. + /// Origin: forked from another segment at a specific turn boundary. #[serde(default, skip_serializing_if = "Option::is_none")] - forked_from: Option, - /// Origin: compacted from another session at a specific turn boundary. + forked_from: Option, + /// Origin: compacted from another segment at a specific turn boundary. #[serde(default, skip_serializing_if = "Option::is_none")] - compacted_from: Option, + compacted_from: Option, }, /// IDLE → active marker. Records the start of a new self-driving @@ -66,7 +67,7 @@ pub enum LogEntry { /// User input accepted at submit time. Carries the original typed /// `Vec` so clients can re-render typed atoms (paste chips, - /// file/knowledge refs, workflow invocations) on session restore. + /// file/knowledge refs, workflow invocations) on segment restore. /// Replay flattens these into a `Item::user_message` for the worker /// history; the worker layer never sees segments directly. UserInput { ts: u64, segments: Vec }, @@ -87,7 +88,7 @@ pub enum LogEntry { /// dispatch on `kind` for typed rendering. SystemItem { ts: u64, item: SystemItem }, - /// Legacy plural form: kept **read-only** so old session logs still + /// Legacy plural form: kept **read-only** so old segment logs still /// open. New writes always use the singular `AssistantItem`. Items /// are flattened on replay. AssistantItems { ts: u64, items: Vec }, @@ -169,10 +170,10 @@ pub enum LogEntry { /// `at_turn_index` is the `turn_count` value of the most recent /// `TurnEnd` entry preceding the split point in the source segment. /// A value of `0` means the split happened before any turn completed -/// (e.g. immediately after `SessionStart`). +/// (e.g. immediately after `SegmentStart`). #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub struct SessionOrigin { - pub session_id: crate::SessionId, +pub struct SegmentOrigin { + pub segment_id: crate::SegmentId, pub at_turn_index: usize, } @@ -194,7 +195,7 @@ pub struct RestoredState { pub history: Vec, pub turn_count: usize, pub last_run_interrupted: bool, - /// Number of entries replayed. `0` means the session log was empty. + /// Number of entries replayed. `0` means the segment log was empty. /// Writers track their own append count via the same counter so /// `ensure_head_or_fork` can compare it with the on-disk count. pub entries_count: usize, @@ -206,14 +207,14 @@ pub struct RestoredState { /// session-store は domain を不透明扱いし、各ドメインが自前で fold する。 pub extensions: Vec<(String, serde_json::Value)>, /// Latest runtime scope snapshot persisted by the Pod. `None` means - /// the session predates scope persistence or the payload was corrupt. + /// the segment predates scope persistence or the payload was corrupt. pub pod_scope: Option, /// User submissions in original typed form, in submit order. /// One entry per `LogEntry::UserInput`; the K-th entry corresponds to /// the K-th `Item::user_message` derived during replay (modulo - /// pre-compaction history seeded via `SessionStart.history`, whose + /// pre-compaction history seeded via `SegmentStart.history`, whose /// original segments are not preserved). Used by clients to re-render - /// typed atoms (paste chips, refs) on session restore. + /// typed atoms (paste chips, refs) on segment restore. pub user_segments: Vec>, } @@ -236,7 +237,7 @@ pub fn collect_state(entries: &[LogEntry]) -> RestoredState { state.entries_count += 1; match entry { - LogEntry::SessionStart { + LogEntry::SegmentStart { system_prompt, config, history, @@ -316,7 +317,7 @@ pub fn collect_state(entries: &[LogEntry]) -> RestoredState { Err(err) => { tracing::warn!( error = %err, - "discarding malformed pod.scope snapshot from session log" + "discarding malformed pod.scope snapshot from segment log" ); } } @@ -350,8 +351,8 @@ mod tests { } #[test] - fn replay_session_start_sets_initial_state() { - let state = collect_state(&[LogEntry::SessionStart { + fn replay_segment_start_sets_initial_state() { + let state = collect_state(&[LogEntry::SegmentStart { ts: 1000, system_prompt: Some("You are helpful.".into()), config: RequestConfig::default().with_max_tokens(1024), @@ -368,7 +369,7 @@ mod tests { #[test] fn replay_full_turn() { let state = collect_state(&[ - LogEntry::SessionStart { + LogEntry::SegmentStart { ts: 1000, system_prompt: None, config: RequestConfig::default(), @@ -402,7 +403,7 @@ mod tests { #[test] fn replay_with_tool_calls() { let state = collect_state(&[ - LogEntry::SessionStart { + LogEntry::SegmentStart { ts: 1000, system_prompt: None, config: RequestConfig::default(), @@ -439,7 +440,7 @@ mod tests { #[test] fn replay_config_changed() { let state = collect_state(&[ - LogEntry::SessionStart { + LogEntry::SegmentStart { ts: 1000, system_prompt: None, config: RequestConfig::default(), @@ -458,7 +459,7 @@ mod tests { #[test] fn replay_llm_usage_appends_to_usage_history() { let state = collect_state(&[ - LogEntry::SessionStart { + LogEntry::SegmentStart { ts: 1000, system_prompt: None, config: RequestConfig::default(), @@ -504,7 +505,7 @@ mod tests { #[test] fn replay_without_llm_usage_keeps_usage_history_empty() { let state = collect_state(&[ - LogEntry::SessionStart { + LogEntry::SegmentStart { ts: 1000, system_prompt: None, config: RequestConfig::default(), @@ -575,7 +576,7 @@ mod tests { #[test] fn replay_invoke_marker_does_not_mutate_state() { let state = collect_state(&[ - LogEntry::SessionStart { + LogEntry::SegmentStart { ts: 0, system_prompt: None, config: RequestConfig::default(), @@ -607,7 +608,7 @@ mod tests { #[test] fn replay_extension_collects_domain_payload_pairs() { let state = collect_state(&[ - LogEntry::SessionStart { + LogEntry::SegmentStart { ts: 1000, system_prompt: None, config: RequestConfig::default(), @@ -690,7 +691,7 @@ mod tests { let json = serde_json::to_string(&entry).unwrap(); let parsed: LogEntry = serde_json::from_str(&json).unwrap(); let state = collect_state(&[ - LogEntry::SessionStart { + LogEntry::SegmentStart { ts: 1, system_prompt: None, config: RequestConfig::default(), diff --git a/crates/session-store/src/store.rs b/crates/session-store/src/store.rs index 43981133..1406571e 100644 --- a/crates/session-store/src/store.rs +++ b/crates/session-store/src/store.rs @@ -1,18 +1,18 @@ //! Persistence backend abstraction. //! -//! [`Store`] defines the sync interface for reading and writing session logs. +//! [`Store`] defines the sync interface for reading and writing segment logs. //! Implementations handle the physical storage (filesystem, database, etc.). //! -//! Sync (rather than async) is intentional: a session log append is a single +//! Sync (rather than async) is intentional: a segment log append is a single //! `< 1 KiB` line on local fs and completes well below a millisecond. Going //! through `tokio::fs` would force every caller — including `Worker`'s sync //! `on_history_append` callback — to bridge sync → async via a channel + //! drain task. Keeping the store sync lets the worker callback, Pod commit //! paths, and `PodInterceptor` all share one direct `append_entry` call. -use crate::SessionId; +use crate::SegmentId; use crate::event_trace::TraceEntry; -use crate::session_log::LogEntry; +use crate::segment_log::LogEntry; /// Errors from the persistence store. #[derive(Debug, thiserror::Error)] @@ -23,43 +23,43 @@ pub enum StoreError { #[error("serialization error: {0}")] Serde(#[from] serde_json::Error), - #[error("session not found: {0}")] - NotFound(SessionId), + #[error("segment not found: {0}")] + NotFound(SegmentId), #[error("log corrupted at line {line}: {message}")] Corrupt { line: usize, message: String }, } -/// Sync persistence backend for session logs. +/// Sync persistence backend for segment logs. /// /// All methods take `&self` — implementations should use interior mutability /// (e.g., append-mode file handles) when needed. pub trait Store: Send + Sync { - /// Append a single log entry to the session log. + /// Append a single log entry to the segment log. /// /// One line per call. The kernel orders concurrent `O_APPEND` writes /// for lines < `PIPE_BUF`, so user-space serialization is unnecessary. - fn append(&self, id: SessionId, entry: &LogEntry) -> Result<(), StoreError>; + fn append(&self, id: SegmentId, entry: &LogEntry) -> Result<(), StoreError>; - /// Read all log entries for a session, in order. - fn read_all(&self, id: SessionId) -> Result, StoreError>; + /// Read all log entries for a segment, in order. + fn read_all(&self, id: SegmentId) -> Result, StoreError>; - /// List all session IDs, most recent first. - fn list_sessions(&self) -> Result, StoreError>; + /// List all segment IDs, most recent first. + fn list_segments(&self) -> Result, StoreError>; - /// Create a new session with initial entries. - fn create_session(&self, id: SessionId, entries: &[LogEntry]) -> Result<(), StoreError>; + /// Create a new segment with initial entries. + fn create_segment(&self, id: SegmentId, entries: &[LogEntry]) -> Result<(), StoreError>; - /// Check if a session exists. - fn exists(&self, id: SessionId) -> Result; + /// Check if a segment exists. + fn exists(&self, id: SegmentId) -> Result; - /// Count entries currently stored for a session. + /// Count entries currently stored for a segment. /// /// Used by `ensure_head_or_fork` to detect concurrent writers: /// if the on-disk count exceeds the writer's own append tally, /// another process has extended the log. - fn read_entry_count(&self, id: SessionId) -> Result; + fn read_entry_count(&self, id: SegmentId) -> Result; /// Append a trace entry to the debug event trace file. - fn append_trace(&self, id: SessionId, entry: &TraceEntry) -> Result<(), StoreError>; + fn append_trace(&self, id: SegmentId, entry: &TraceEntry) -> Result<(), StoreError>; } diff --git a/crates/session-store/src/system_item.rs b/crates/session-store/src/system_item.rs index 749ca0ae..c85e29a0 100644 --- a/crates/session-store/src/system_item.rs +++ b/crates/session-store/src/system_item.rs @@ -29,7 +29,7 @@ use serde::{Deserialize, Serialize}; /// path / knowledge slug / workflow slug / etc.), plus a pre-rendered /// `body` (where applicable) that is the exact `role:system` text the /// LLM actually saw at commit time. `body` is denormalised so that -/// session log replay reconstructs worker history byte-identical to +/// segment log replay reconstructs worker history byte-identical to /// what was on the wire — even when prompt overrides (e.g. custom /// `notify_wrapper` template) re-shape the live rendering on a later /// resume. diff --git a/crates/session-store/tests/fs_store_test.rs b/crates/session-store/tests/fs_store_test.rs index d7c27024..3fca4113 100644 --- a/crates/session-store/tests/fs_store_test.rs +++ b/crates/session-store/tests/fs_store_test.rs @@ -1,15 +1,15 @@ use llm_worker::WorkerResult; use llm_worker::llm_client::types::{Item, RequestConfig}; -use session_store::{FsStore, LogEntry, Store, TraceEntry, collect_state, new_session_id}; +use session_store::{FsStore, LogEntry, Store, TraceEntry, collect_state, new_segment_id}; #[test] fn round_trip_write_and_read() { let dir = tempfile::tempdir().unwrap(); let store = FsStore::new(dir.path()).unwrap(); - let id = new_session_id(); + let id = new_segment_id(); let entries = vec![ - LogEntry::SessionStart { + LogEntry::SegmentStart { ts: 1000, system_prompt: Some("You are helpful.".into()), config: RequestConfig::default().with_max_tokens(1024), @@ -56,9 +56,9 @@ fn round_trip_write_and_read() { fn create_session_writes_all_entries() { let dir = tempfile::tempdir().unwrap(); let store = FsStore::new(dir.path()).unwrap(); - let id = new_session_id(); + let id = new_segment_id(); - let entries = [LogEntry::SessionStart { + let entries = [LogEntry::SegmentStart { ts: 1000, system_prompt: None, config: RequestConfig::default(), @@ -70,7 +70,7 @@ fn create_session_writes_all_entries() { compacted_from: None, }]; - store.create_session(id, &entries).unwrap(); + store.create_segment(id, &entries).unwrap(); let read_back = store.read_all(id).unwrap(); assert_eq!(read_back.len(), 1); @@ -83,12 +83,12 @@ fn list_sessions_returns_newest_first() { let dir = tempfile::tempdir().unwrap(); let store = FsStore::new(dir.path()).unwrap(); - let id1 = new_session_id(); + let id1 = new_segment_id(); // Small delay to ensure different UUID v7 timestamps std::thread::sleep(std::time::Duration::from_millis(2)); - let id2 = new_session_id(); + let id2 = new_segment_id(); - let entry = LogEntry::SessionStart { + let entry = LogEntry::SegmentStart { ts: 1000, system_prompt: None, config: RequestConfig::default(), @@ -100,7 +100,7 @@ fn list_sessions_returns_newest_first() { store.append(id1, &entry).unwrap(); store.append(id2, &entry).unwrap(); - let sessions = store.list_sessions().unwrap(); + let sessions = store.list_segments().unwrap(); assert_eq!(sessions.len(), 2); assert_eq!(sessions[0], id2); // newest first assert_eq!(sessions[1], id1); @@ -110,14 +110,14 @@ fn list_sessions_returns_newest_first() { fn exists_returns_correct_state() { let dir = tempfile::tempdir().unwrap(); let store = FsStore::new(dir.path()).unwrap(); - let id = new_session_id(); + let id = new_segment_id(); assert!(!store.exists(id).unwrap()); store .append( id, - &LogEntry::SessionStart { + &LogEntry::SegmentStart { ts: 1000, system_prompt: None, config: RequestConfig::default(), @@ -135,7 +135,7 @@ fn exists_returns_correct_state() { fn not_found_error_for_missing_session() { let dir = tempfile::tempdir().unwrap(); let store = FsStore::new(dir.path()).unwrap(); - let id = new_session_id(); + let id = new_segment_id(); let result = store.read_all(id); assert!(result.is_err()); @@ -145,12 +145,12 @@ fn not_found_error_for_missing_session() { fn trace_entries_in_separate_file() { let dir = tempfile::tempdir().unwrap(); let store = FsStore::new(dir.path()).unwrap(); - let id = new_session_id(); + let id = new_segment_id(); store .append( id, - &LogEntry::SessionStart { + &LogEntry::SegmentStart { ts: 1000, system_prompt: None, config: RequestConfig::default(), @@ -183,10 +183,10 @@ fn trace_entries_in_separate_file() { fn read_entry_count_matches_append_tally() { let dir = tempfile::tempdir().unwrap(); let store = FsStore::new(dir.path()).unwrap(); - let id = new_session_id(); + let id = new_segment_id(); let entries = [ - LogEntry::SessionStart { + LogEntry::SegmentStart { ts: 1000, system_prompt: None, config: RequestConfig::default(), diff --git a/crates/session-store/tests/session_test.rs b/crates/session-store/tests/session_test.rs index e393175c..b931a4c8 100644 --- a/crates/session-store/tests/session_test.rs +++ b/crates/session-store/tests/session_test.rs @@ -9,7 +9,7 @@ use llm_worker::interceptor::{Interceptor, TurnEndAction}; use llm_worker::llm_client::event::{Event, ResponseStatus, StatusEvent}; use llm_worker::llm_client::types::{Item, RequestConfig}; use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput}; -use session_store::{FsStore, LogEntry, SessionStartState, Store, collect_state}; +use session_store::{FsStore, LogEntry, SegmentStartState, Store, collect_state}; // ============================================================================= // Helpers @@ -95,13 +95,13 @@ fn make_store() -> (tempfile::TempDir, FsStore) { async fn run_and_persist( worker: Worker, store: &FsStore, - session_id: session_store::SessionId, + segment_id: session_store::SegmentId, input: &str, ) -> (Worker, llm_worker::WorkerResult) { // Mirror Pod's run-entry contract: log the user input as segments // before the worker pushes its flattened user_message; save_delta // skips the resulting user_message item to avoid double-write. - session_store::save_user_input(store, session_id, vec![protocol::Segment::text(input)]) + session_store::save_user_input(store, segment_id, vec![protocol::Segment::text(input)]) .unwrap(); let history_before = worker.history().len(); @@ -111,14 +111,14 @@ async fn run_and_persist( let worker = locked.unlock(); let new_items = &worker.history()[history_before..]; - session_store::save_delta(store, session_id, new_items).unwrap(); - session_store::save_turn_end(store, session_id, worker.turn_count()).unwrap(); + session_store::save_delta(store, segment_id, new_items).unwrap(); + session_store::save_turn_end(store, segment_id, worker.turn_count()).unwrap(); match &result { Ok(r) => { session_store::save_run_completed( store, - session_id, + segment_id, r.clone(), worker.last_run_interrupted(), ) @@ -127,7 +127,7 @@ async fn run_and_persist( Err(e) => { session_store::save_run_errored( store, - session_id, + segment_id, e.to_string(), worker.last_run_interrupted(), ) @@ -149,9 +149,9 @@ async fn session_run_logs_entries() { let client = MockLlmClient::new(simple_text_events()); let worker = Worker::new(client); - let sid = session_store::create_session( + let sid = session_store::create_segment( &store, - SessionStartState { + SegmentStartState { system_prompt: worker.get_system_prompt(), config: worker.request_config(), history: worker.history(), @@ -164,15 +164,15 @@ async fn session_run_logs_entries() { let entries = store.read_all(sid).unwrap(); - // SessionStart, UserInput, AssistantItems, TurnEnd, RunCompleted (at minimum) + // SegmentStart, UserInput, AssistantItems, TurnEnd, RunCompleted (at minimum) assert!( entries.len() >= 4, "expected at least 4 entries, got {}", entries.len() ); - // First entry is SessionStart - assert!(matches!(&entries[0], LogEntry::SessionStart { .. })); + // First entry is SegmentStart + assert!(matches!(&entries[0], LogEntry::SegmentStart { .. })); // Has a RunCompleted with Finished let has_finished = entries.iter().any(|e| { @@ -194,9 +194,9 @@ async fn session_restore_round_trip() { let mut worker = Worker::new(client); worker.set_system_prompt("You are helpful."); - let sid = session_store::create_session( + let sid = session_store::create_segment( &store, - SessionStartState { + SegmentStartState { system_prompt: worker.get_system_prompt(), config: worker.request_config(), history: worker.history(), @@ -225,9 +225,9 @@ async fn session_run_with_tool_call() { let mut worker = Worker::new(client); worker.register_tool(weather_tool_definition()); - let sid = session_store::create_session( + let sid = session_store::create_segment( &store, - SessionStartState { + SegmentStartState { system_prompt: worker.get_system_prompt(), config: worker.request_config(), history: worker.history(), @@ -260,9 +260,9 @@ async fn session_resume_after_pause() { worker.register_tool(weather_tool_definition()); worker.set_interceptor(PausePolicy); - let sid = session_store::create_session( + let sid = session_store::create_segment( &store, - SessionStartState { + SegmentStartState { system_prompt: worker.get_system_prompt(), config: worker.request_config(), history: worker.history(), @@ -298,9 +298,9 @@ async fn session_fork_preserves_state() { let mut worker = Worker::new(client); worker.set_system_prompt("System prompt"); - let sid = session_store::create_session( + let sid = session_store::create_segment( &store, - SessionStartState { + SegmentStartState { system_prompt: worker.get_system_prompt(), config: worker.request_config(), history: worker.history(), @@ -313,7 +313,7 @@ async fn session_fork_preserves_state() { let original_history_len = worker.history().len(); let fork_id = session_store::fork( &store, - SessionStartState { + SegmentStartState { system_prompt: worker.get_system_prompt(), config: worker.request_config(), history: worker.history(), @@ -321,10 +321,10 @@ async fn session_fork_preserves_state() { ) .unwrap(); - // Fork should have a SessionStart with the current history + // Fork should have a SegmentStart with the current history let fork_entries = store.read_all(fork_id).unwrap(); assert_eq!(fork_entries.len(), 1); - assert!(matches!(&fork_entries[0], LogEntry::SessionStart { .. })); + assert!(matches!(&fork_entries[0], LogEntry::SegmentStart { .. })); let fork_state = collect_state(&fork_entries); assert_eq!(fork_state.history.len(), original_history_len); @@ -337,9 +337,9 @@ async fn session_fork_at_truncates() { let client = MockLlmClient::new(simple_text_events()); let worker = Worker::new(client); - let sid = session_store::create_session( + let sid = session_store::create_segment( &store, - SessionStartState { + SegmentStartState { system_prompt: worker.get_system_prompt(), config: worker.request_config(), history: worker.history(), @@ -356,7 +356,7 @@ async fn session_fork_at_truncates() { let fork_id = session_store::fork_at(&store, sid, worker.turn_count()).unwrap(); let fork_entries = store.read_all(fork_id).unwrap(); - assert_eq!(fork_entries.len(), 1); // Just the new SessionStart + assert_eq!(fork_entries.len(), 1); // Just the new SegmentStart let fork_state = collect_state(&fork_entries); // History at fork point should match history right after the TurnEnd in @@ -375,9 +375,9 @@ async fn session_config_changed_logged() { let client = MockLlmClient::new(vec![]); let mut worker = Worker::new(client); - let sid = session_store::create_session( + let sid = session_store::create_segment( &store, - SessionStartState { + SegmentStartState { system_prompt: worker.get_system_prompt(), config: worker.request_config(), history: worker.history(), @@ -408,17 +408,17 @@ async fn session_auto_forks_on_conflict() { let client_a = MockLlmClient::new(simple_text_events()); let worker_a = Worker::new(client_a); - let original_sid = session_store::create_session( + let original_sid = session_store::create_segment( &store, - SessionStartState { + SegmentStartState { system_prompt: worker_a.get_system_prompt(), config: worker_a.request_config(), history: worker_a.history(), }, ) .unwrap(); - let mut session_id = original_sid; - // Writer tracked: just the SessionStart we wrote. + let mut segment_id = original_sid; + // Writer tracked: just the SegmentStart we wrote. let mut entries_written: usize = 1; // Simulate another Pod writing to the same session behind our back. @@ -431,9 +431,9 @@ async fn session_auto_forks_on_conflict() { // Now the on-disk count exceeds our tally — ensure_head_or_fork should auto-fork. session_store::ensure_head_or_fork( &store, - &mut session_id, + &mut segment_id, &mut entries_written, - SessionStartState { + SegmentStartState { system_prompt: worker_a.get_system_prompt(), config: worker_a.request_config(), history: worker_a.history(), @@ -441,11 +441,11 @@ async fn session_auto_forks_on_conflict() { ) .unwrap(); - // session_id should now be different - assert_ne!(session_id, original_sid); + // segment_id should now be different + assert_ne!(segment_id, original_sid); // The fork session should exist and have entries - let fork_entries = store.read_all(session_id).unwrap(); + let fork_entries = store.read_all(segment_id).unwrap(); assert!(!fork_entries.is_empty()); // Original session should still have the interloper entry diff --git a/crates/tui/src/app.rs b/crates/tui/src/app.rs index a17ffa8c..d06561db 100644 --- a/crates/tui/src/app.rs +++ b/crates/tui/src/app.rs @@ -483,7 +483,7 @@ impl App { self.blocks.push(Block::UserMessage { segments }); self.assistant_streaming = false; } - Event::SessionRotated { entry } => { + Event::SegmentRotated { entry } => { self.reset_for_rotation(); self.apply_log_entry_raw(&entry); self.assistant_streaming = false; @@ -685,7 +685,7 @@ impl App { started_at: Instant::now(), })); } - Event::CompactDone { new_session_id } => { + Event::CompactDone { new_segment_id } => { if let Some(evt) = self.last_streaming_compact_mut() { let elapsed_secs = match evt { CompactEvent::Streaming { started_at } => { @@ -694,12 +694,12 @@ impl App { _ => None, }; *evt = CompactEvent::Done { - new_session_id, + new_segment_id, elapsed_secs, }; } else { self.blocks.push(Block::Compact(CompactEvent::Done { - new_session_id, + new_segment_id, elapsed_secs: None, })); } @@ -932,7 +932,7 @@ impl App { } /// Drop the derived view in preparation for replaying a new - /// `SessionStart` (compaction / fork). Greeting is preserved + /// `SegmentStart` (compaction / fork). Greeting is preserved /// because the Pod identity hasn't changed. fn reset_for_rotation(&mut self) { let greeting = self.blocks.iter().find_map(|b| match b { @@ -958,7 +958,7 @@ impl App { return; }; match entry { - session_store::LogEntry::SessionStart { history, .. } => { + session_store::LogEntry::SegmentStart { history, .. } => { for logged in history { let item: llm_worker::Item = logged.into(); let item_value = serde_json::to_value(&item).expect("Item is Serialize"); @@ -1445,7 +1445,7 @@ mod completion_flow_tests { #[test] fn snapshot_renders_system_message_block_from_session_start() { let mut app = App::new("test".into()); - let session_start = session_store::LogEntry::SessionStart { + let session_start = session_store::LogEntry::SegmentStart { ts: 1, system_prompt: None, config: Default::default(), @@ -1525,15 +1525,15 @@ mod completion_flow_tests { let id = uuid::Uuid::parse_str("12345678-1234-5678-1234-567812345678").unwrap(); app.handle_pod_event(Event::CompactStart); - app.handle_pod_event(Event::CompactDone { new_session_id: id }); + app.handle_pod_event(Event::CompactDone { new_segment_id: id }); assert_eq!(compact_block_count(&app), 1); assert!(matches!( app.blocks.as_slice(), [Block::Compact(CompactEvent::Done { - new_session_id, + new_segment_id, elapsed_secs: Some(_), - })] if *new_session_id == id + })] if *new_segment_id == id )); } diff --git a/crates/tui/src/block.rs b/crates/tui/src/block.rs index f3a5faa2..f884a060 100644 --- a/crates/tui/src/block.rs +++ b/crates/tui/src/block.rs @@ -83,7 +83,7 @@ pub enum CompactEvent { Streaming { started_at: Instant }, /// Compaction ended cleanly with `CompactDone`. Done { - new_session_id: uuid::Uuid, + new_segment_id: uuid::Uuid, elapsed_secs: Option, }, /// Compaction ended with `CompactFailed`. diff --git a/crates/tui/src/main.rs b/crates/tui/src/main.rs index 95e96f2a..920c770c 100644 --- a/crates/tui/src/main.rs +++ b/crates/tui/src/main.rs @@ -25,7 +25,7 @@ use crossterm::terminal::{ use protocol::{Method, PodStatus}; use ratatui::Terminal; use ratatui::backend::CrosstermBackend; -use session_store::SessionId; +use session_store::SegmentId; use client::PodClient; @@ -56,7 +56,7 @@ enum Mode { Resume, /// `tui --session `: skip the picker, go straight to the /// resume name dialog with `id` baked in. - ResumeWithSession(SessionId), + ResumeWithSession(SegmentId), } enum ParseError { @@ -78,7 +78,7 @@ impl std::fmt::Display for ParseError { fn parse_args() -> Result { let args: Vec = std::env::args().skip(1).collect(); let mut resume = false; - let mut session: Option = None; + let mut session: Option = None; let mut socket_override: Option = None; let mut positional: Option = None; @@ -94,7 +94,7 @@ fn parse_args() -> Result { .get(i + 1) .ok_or(ParseError::MissingValue("--session"))?; session = Some( - raw.parse::() + raw.parse::() .map_err(|_| ParseError::InvalidSession(raw.clone()))?, ); i += 2; @@ -216,7 +216,7 @@ async fn run_resume() -> Result<(), Box> { run_spawn(Some(id)).await } -async fn run_spawn(resume_from: Option) -> Result<(), Box> { +async fn run_spawn(resume_from: Option) -> Result<(), Box> { let ready = match spawn::run(resume_from).await? { SpawnOutcome::Ready(r) => r, SpawnOutcome::Cancelled => return Ok(()), diff --git a/crates/tui/src/picker.rs b/crates/tui/src/picker.rs index e81bf3df..921b6f19 100644 --- a/crates/tui/src/picker.rs +++ b/crates/tui/src/picker.rs @@ -2,7 +2,7 @@ //! //! Reads the most recent sessions from the configured store, lets the //! user pick one with the arrow keys, and returns the chosen -//! `SessionId`. Closes its inline viewport before returning so the +//! `SegmentId`. Closes its inline viewport before returning so the //! caller can open a fresh viewport for the name dialog. //! //! The picker only handles selection. Forking, pod-registry checks, and @@ -12,7 +12,7 @@ use std::io; use std::time::Duration; use crossterm::event::{self, Event as TermEvent, KeyCode, KeyEventKind, KeyModifiers}; -use pod_registry::lookup_session; +use pod_registry::lookup_segment; use ratatui::Terminal; use ratatui::backend::CrosstermBackend; use ratatui::layout::{Constraint, Layout}; @@ -20,7 +20,7 @@ use ratatui::style::{Color, Modifier, Style}; use ratatui::text::{Line, Span}; use ratatui::widgets::Paragraph; use ratatui::{Frame, TerminalOptions, Viewport}; -use session_store::{FsStore, LogEntry, LoggedContentPart, LoggedItem, SessionId, Store}; +use session_store::{FsStore, LogEntry, LoggedContentPart, LoggedItem, SegmentId, Store}; const MAX_ROWS: usize = 10; const VIEWPORT_LINES: u16 = MAX_ROWS as u16 + 4; @@ -60,14 +60,14 @@ impl From for PickerError { } pub enum PickerOutcome { - Picked(SessionId), + Picked(SegmentId), Cancelled, } /// One row in the picker view. Rendered from the session log so the /// user can recognise their session at a glance without parsing UUIDs. struct Row { - id: SessionId, + id: SegmentId, /// Last user / assistant snippet, or a `[corrupt]` placeholder. preview: String, /// `Some(pod_name)` when a live Pod currently holds an allocation @@ -79,7 +79,7 @@ struct Row { pub async fn run() -> Result { let store = open_default_store()?; - let ids = store.list_sessions()?; + let ids = store.list_segments()?; if ids.is_empty() { return Err(PickerError::NoSessions); } @@ -89,7 +89,7 @@ pub async fn run() -> Result { // Best-effort live check. A pods.json I/O hiccup downgrades // the row to "no badge" rather than killing the picker — the // user still gets to see the listing. - let live_pod = lookup_session(id).ok().flatten().map(|info| info.pod_name); + let live_pod = lookup_segment(id).ok().flatten().map(|info| info.pod_name); rows.push(Row { id, preview, @@ -158,7 +158,7 @@ fn open_default_store() -> Result { Ok(FsStore::new(&dir)?) } -fn build_preview(store: &FsStore, id: SessionId) -> String { +fn build_preview(store: &FsStore, id: SegmentId) -> String { match store.read_all(id) { Ok(entries) => last_message_preview(&entries).unwrap_or_else(|| "[empty]".to_string()), Err(_) => "[corrupt]".to_string(), @@ -313,7 +313,7 @@ fn row_line(row: &Row, selected: bool) -> Line<'_> { Line::from(spans) } -fn short_session(id: SessionId) -> String { +fn short_session(id: SegmentId) -> String { let s = id.to_string(); s.chars().take(8).collect() } diff --git a/crates/tui/src/spawn.rs b/crates/tui/src/spawn.rs index 0654c7f8..c59df878 100644 --- a/crates/tui/src/spawn.rs +++ b/crates/tui/src/spawn.rs @@ -28,7 +28,7 @@ use ratatui::style::{Color, Modifier, Style}; use ratatui::text::{Line, Span}; use ratatui::widgets::Paragraph; use ratatui::{Frame, TerminalOptions, Viewport}; -use session_store::SessionId; +use session_store::SegmentId; const VIEWPORT_LINES: u16 = 6; @@ -46,7 +46,7 @@ pub enum SpawnOutcome { pub enum SpawnError { Io(io::Error), Store(session_store::StoreError), - MissingResumeScope { session_id: SessionId }, + MissingResumeScope { segment_id: SegmentId }, Spawn(client::SpawnError), } @@ -55,9 +55,9 @@ impl std::fmt::Display for SpawnError { match self { Self::Io(e) => write!(f, "io error: {e}"), Self::Store(e) => write!(f, "failed to read session log: {e}"), - Self::MissingResumeScope { session_id } => write!( + Self::MissingResumeScope { segment_id } => write!( f, - "session {session_id} has no persisted scope snapshot; refusing resume without explicit scope" + "session {segment_id} has no persisted scope snapshot; refusing resume without explicit scope" ), Self::Spawn(e) => write!(f, "{e}"), } @@ -89,7 +89,7 @@ type InlineTerminal = Terminal>; /// Source session for a resume run. `None` = fresh spawn (current /// behaviour); `Some(id)` swaps the dialog into "Resume Pod" mode and /// passes `--session ` to the spawned `pod` child. -pub async fn run(resume_from: Option) -> Result { +pub async fn run(resume_from: Option) -> Result { let cwd = std::env::current_dir().map_err(SpawnError::Io)?; // Run the same merge pod itself uses, then read what's missing @@ -321,7 +321,7 @@ fn build_overlay_toml(form: &Form) -> String { toml::to_string(&toml::Value::Table(root)).expect("overlay serialisation cannot fail") } -async fn load_resume_scope(session_id: SessionId) -> Result { +async fn load_resume_scope(segment_id: SegmentId) -> Result { let store_dir = manifest::paths::sessions_dir().ok_or_else(|| { io::Error::new( io::ErrorKind::NotFound, @@ -329,10 +329,10 @@ async fn load_resume_scope(session_id: SessionId) -> Result` so it restores /// from `id` and appends to the same session log. - resume_from: Option, + resume_from: Option, /// Scope snapshot recovered from the source session log. Set only for /// resume runs, and serialized into the overlay instead of cwd-default /// scope so resume does not silently broaden access. @@ -473,7 +473,7 @@ fn draw_form(f: &mut Frame<'_>, form: &Form) { /// First 8 hex digits of a UUID — short enough to skim, long enough /// to disambiguate inside a 10-row picker. -pub(crate) fn short_session(id: SessionId) -> String { +pub(crate) fn short_session(id: SegmentId) -> String { let s = id.to_string(); s.chars().take(8).collect() } @@ -584,7 +584,7 @@ mod tests { #[test] fn overlay_uses_resume_scope_snapshot() { let mut f = form("agent-r", false); - f.resume_from = Some(session_store::new_session_id()); + f.resume_from = Some(session_store::new_segment_id()); f.resume_scope = Some(ScopeConfig { allow: vec![manifest::ScopeRule { target: PathBuf::from("/work/example"), diff --git a/crates/tui/src/ui.rs b/crates/tui/src/ui.rs index 884031a5..f6f923ad 100644 --- a/crates/tui/src/ui.rs +++ b/crates/tui/src/ui.rs @@ -1019,10 +1019,10 @@ fn render_compact(lines: &mut Vec>, evt: &CompactEvent, width: u16 ) } CompactEvent::Done { - new_session_id, + new_segment_id, elapsed_secs, } => { - let short = new_session_id + let short = new_segment_id .to_string() .chars() .take(8)