Merge: session-grouping-introduce
This commit is contained in:
commit
b5d5c03412
1
TODO.md
1
TODO.md
|
|
@ -8,7 +8,6 @@
|
|||
- Pod: 任意ターンからの Fork(複数ターン巻き戻しを汎用化) → [tickets/pod-session-fork.md](tickets/pod-session-fork.md)
|
||||
- Pod: Inbound PodEvent ハンドリングの重複を統合 → [tickets/pod-inbound-pod-event-dedup.md](tickets/pod-inbound-pod-event-dedup.md)
|
||||
- 永続化層整理 (Storage)
|
||||
- Session (Segment 群の grouping) 導入 → [tickets/session-grouping-introduce.md](tickets/session-grouping-introduce.md)
|
||||
- live auto-fork の marker 形式確定 → [tickets/live-fork-marker.md](tickets/live-fork-marker.md)
|
||||
- Pod 単位永続化
|
||||
- Pod state backend と FsStore 実装 → [tickets/pod-state-backend.md](tickets/pod-state-backend.md)
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@ use std::process::ExitCode;
|
|||
use clap::Parser;
|
||||
use manifest::{PodManifest, paths};
|
||||
use pod::{Pod, PodController, PodFactory, PromptLoader};
|
||||
use session_store::{FsStore, SegmentId};
|
||||
use session_store::{FsStore, SegmentId, Store};
|
||||
|
||||
const USER_MANIFEST_ENV: &str = "INSOMNIA_USER_MANIFEST";
|
||||
|
||||
|
|
@ -186,7 +186,28 @@ async fn main() -> ExitCode {
|
|||
}
|
||||
}
|
||||
} else if let Some(source_segment_id) = cli.session {
|
||||
match Pod::restore_from_manifest(source_segment_id, manifest, store, loader).await {
|
||||
let source_session_id = match store.lookup_session_of(source_segment_id) {
|
||||
Ok(Some(sid)) => sid,
|
||||
Ok(None) => {
|
||||
eprintln!(
|
||||
"error: --session {source_segment_id}: segment is not registered to any session"
|
||||
);
|
||||
return ExitCode::FAILURE;
|
||||
}
|
||||
Err(e) => {
|
||||
eprintln!("error: lookup_session_of failed: {e}");
|
||||
return ExitCode::FAILURE;
|
||||
}
|
||||
};
|
||||
match Pod::restore_from_manifest(
|
||||
source_session_id,
|
||||
source_segment_id,
|
||||
manifest,
|
||||
store,
|
||||
loader,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(p) => p,
|
||||
Err(e) => {
|
||||
eprintln!("error: failed to restore pod: {e}");
|
||||
|
|
|
|||
|
|
@ -9,7 +9,8 @@ use llm_worker::llm_client::client::LlmClient;
|
|||
use llm_worker::state::Mutable;
|
||||
use llm_worker::{ToolOutputLimits, UsageRecord, Worker, WorkerError, WorkerResult};
|
||||
use session_store::{
|
||||
LogEntry, PodScopeSnapshot, SegmentId, Store, StoreError, SystemItem, segment_log, to_logged,
|
||||
LogEntry, PodScopeSnapshot, SegmentId, SessionId, Store, StoreError, SystemItem, segment_log,
|
||||
to_logged,
|
||||
};
|
||||
use tracing::{info, warn};
|
||||
|
||||
|
|
@ -42,35 +43,54 @@ use protocol::{AlertLevel, AlertSource, Event, Segment};
|
|||
use tokio::sync::broadcast;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
/// Lock-free shared session pointer.
|
||||
/// `(SessionId, SegmentId)` pair the Pod is currently writing to.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub struct SegmentLocation {
|
||||
pub session_id: SessionId,
|
||||
pub segment_id: SegmentId,
|
||||
}
|
||||
|
||||
/// Lock-free shared session/segment pointer.
|
||||
///
|
||||
/// Holds the current `(segment_id, entries_written)` pair so that the
|
||||
/// Pod and every `LogWriterHandle` clone see a consistent view through
|
||||
/// `Arc`-shared lock-free reads. `segment_id` is wrapped in `ArcSwap`
|
||||
/// so fork (a rare, run-start-only event) can atomically swap it
|
||||
/// without taking a mutex on the append hot path. `entries_written` is
|
||||
/// an `AtomicUsize` bumped on every successful append; the writer's
|
||||
/// tally is compared against the store's on-disk count to detect
|
||||
/// concurrent writers in `ensure_segment_head`.
|
||||
/// Holds the current `(SessionId, SegmentId)` pair and the append tally
|
||||
/// so that the Pod and every `LogWriterHandle` clone see a consistent
|
||||
/// view through `Arc`-shared lock-free reads. The location is wrapped in
|
||||
/// `ArcSwap` so fork (a rare, run-start-only event) can atomically swap
|
||||
/// session_id + segment_id together without taking a mutex on the
|
||||
/// append hot path. `entries_written` is an `AtomicUsize` bumped on
|
||||
/// every successful append; the writer's tally is compared against the
|
||||
/// store's on-disk count to detect concurrent writers in
|
||||
/// `ensure_segment_head`.
|
||||
pub struct SegmentState {
|
||||
segment_id: ArcSwap<SegmentId>,
|
||||
location: ArcSwap<SegmentLocation>,
|
||||
entries_written: AtomicUsize,
|
||||
}
|
||||
|
||||
impl SegmentState {
|
||||
pub fn new(segment_id: SegmentId, entries_written: usize) -> Arc<Self> {
|
||||
pub fn new(session_id: SessionId, segment_id: SegmentId, entries_written: usize) -> Arc<Self> {
|
||||
Arc::new(Self {
|
||||
segment_id: ArcSwap::from_pointee(segment_id),
|
||||
location: ArcSwap::from_pointee(SegmentLocation {
|
||||
session_id,
|
||||
segment_id,
|
||||
}),
|
||||
entries_written: AtomicUsize::new(entries_written),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn segment_id(&self) -> SegmentId {
|
||||
**self.segment_id.load()
|
||||
pub fn location(&self) -> SegmentLocation {
|
||||
**self.location.load()
|
||||
}
|
||||
|
||||
pub fn set_segment_id(&self, id: SegmentId) {
|
||||
self.segment_id.store(Arc::new(id));
|
||||
pub fn session_id(&self) -> SessionId {
|
||||
self.location().session_id
|
||||
}
|
||||
|
||||
pub fn segment_id(&self) -> SegmentId {
|
||||
self.location().segment_id
|
||||
}
|
||||
|
||||
pub fn set_location(&self, loc: SegmentLocation) {
|
||||
self.location.store(Arc::new(loc));
|
||||
}
|
||||
|
||||
pub fn entries_written(&self) -> usize {
|
||||
|
|
@ -107,8 +127,8 @@ where
|
|||
/// writes for `< PIPE_BUF` lines, so no user-space serialization is
|
||||
/// needed across appenders.
|
||||
pub fn append_entry(&self, entry: LogEntry) -> Result<(), StoreError> {
|
||||
let segment_id = self.state.segment_id();
|
||||
self.store.append(segment_id, &entry)?;
|
||||
let loc = self.state.location();
|
||||
self.store.append(loc.session_id, loc.segment_id, &entry)?;
|
||||
self.state.increment_entries();
|
||||
self.sink.publish(entry);
|
||||
Ok(())
|
||||
|
|
@ -472,13 +492,14 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
// Segment creation is deferred to `ensure_segment_head` at first
|
||||
// run so a later-installed system-prompt template (see
|
||||
// `set_system_prompt_template`) can be captured by `SegmentStart`.
|
||||
let session_id = session_store::new_session_id();
|
||||
let segment_id = session_store::new_segment_id();
|
||||
let prompts = PromptCatalog::builtins_only()?;
|
||||
let mut pod = Self {
|
||||
manifest,
|
||||
worker: Some(worker),
|
||||
store,
|
||||
segment_state: SegmentState::new(segment_id, 0),
|
||||
segment_state: SegmentState::new(session_id, segment_id, 0),
|
||||
pwd,
|
||||
scope: SharedScope::new(scope),
|
||||
hook_builder: HookRegistryBuilder::new(),
|
||||
|
|
@ -542,12 +563,20 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
&self.prompts
|
||||
}
|
||||
|
||||
/// The session ID used for persistence. Read lock-free from the
|
||||
/// shared session pointer so fork-time swaps are observed immediately.
|
||||
/// The current segment ID. Read lock-free from the shared session
|
||||
/// pointer so fork-time swaps are observed immediately.
|
||||
pub fn segment_id(&self) -> SegmentId {
|
||||
self.segment_state.segment_id()
|
||||
}
|
||||
|
||||
/// The Session this Pod belongs to. Stable across compaction and
|
||||
/// auto-fork (both stay within the same Session); there is no
|
||||
/// Pod-level operation today that moves a running Pod to a different
|
||||
/// Session.
|
||||
pub fn session_id(&self) -> SessionId {
|
||||
self.segment_state.session_id()
|
||||
}
|
||||
|
||||
/// The Pod's manifest.
|
||||
pub fn manifest(&self) -> &PodManifest {
|
||||
&self.manifest
|
||||
|
|
@ -627,8 +656,8 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
/// concurrent appenders — the kernel orders `O_APPEND` writes for
|
||||
/// lines smaller than `PIPE_BUF`.
|
||||
pub(crate) fn commit_entry(&self, entry: LogEntry) -> Result<(), StoreError> {
|
||||
let segment_id = self.segment_state.segment_id();
|
||||
self.store.append(segment_id, &entry)?;
|
||||
let loc = self.segment_state.location();
|
||||
self.store.append(loc.session_id, loc.segment_id, &entry)?;
|
||||
self.segment_state.increment_entries();
|
||||
self.sink.publish(entry);
|
||||
Ok(())
|
||||
|
|
@ -1618,11 +1647,12 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
/// when another writer has appended behind our back.
|
||||
fn ensure_segment_head(&mut self) -> Result<(), PodError> {
|
||||
let w = self.worker.as_ref().unwrap();
|
||||
let prev_segment_id = self.segment_state.segment_id();
|
||||
let loc = self.segment_state.location();
|
||||
let entries_written = self.segment_state.entries_written();
|
||||
if entries_written == 0 {
|
||||
let initial = LogEntry::SegmentStart {
|
||||
ts: segment_log::now_millis(),
|
||||
session_id: loc.session_id,
|
||||
system_prompt: w.get_system_prompt().map(String::from),
|
||||
config: w.request_config().clone(),
|
||||
history: to_logged(w.history()),
|
||||
|
|
@ -1636,17 +1666,19 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
// Check store count + auto-fork if it drifted.
|
||||
let store_count = self
|
||||
.store
|
||||
.read_entry_count(prev_segment_id)
|
||||
.read_entry_count(loc.session_id, loc.segment_id)
|
||||
.map_err(PodError::from)?;
|
||||
if store_count == entries_written {
|
||||
return Ok(());
|
||||
}
|
||||
// Fork: mint a fresh session and switch to it. The new
|
||||
// SegmentStart entry replaces the mirror and is broadcast
|
||||
// through the sink so existing subscribers reset their view.
|
||||
let fork_id = session_store::new_segment_id();
|
||||
// Auto-fork within the same Session: mint a fresh Segment and
|
||||
// switch to it. The new SegmentStart entry replaces the mirror
|
||||
// and is broadcast through the sink so existing subscribers
|
||||
// reset their view.
|
||||
let fork_segment_id = session_store::new_segment_id();
|
||||
let entry = LogEntry::SegmentStart {
|
||||
ts: segment_log::now_millis(),
|
||||
session_id: loc.session_id,
|
||||
system_prompt: w.get_system_prompt().map(String::from),
|
||||
config: w.request_config().clone(),
|
||||
history: to_logged(w.history()),
|
||||
|
|
@ -1654,13 +1686,16 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
compacted_from: None,
|
||||
};
|
||||
self.store
|
||||
.create_segment(fork_id, &[entry.clone()])
|
||||
.create_segment(loc.session_id, fork_segment_id, &[entry.clone()])
|
||||
.map_err(PodError::from)?;
|
||||
self.segment_state.set_segment_id(fork_id);
|
||||
self.segment_state.set_location(SegmentLocation {
|
||||
session_id: loc.session_id,
|
||||
segment_id: fork_segment_id,
|
||||
});
|
||||
self.segment_state.set_entries_written(1);
|
||||
self.sink.reset_with_initial(entry);
|
||||
if self.scope_allocation.is_some() {
|
||||
pod_registry::update_segment(&self.manifest.pod.name, fork_id)?;
|
||||
pod_registry::update_segment(&self.manifest.pod.name, fork_segment_id)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
|
@ -2140,27 +2175,34 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
task_snapshot_text.clone(),
|
||||
));
|
||||
|
||||
// Build the SegmentStart entry for the new compacted session,
|
||||
// then atomically rotate to it: create on disk, swap head, reset
|
||||
// the broadcast sink so existing subscribers see the new
|
||||
// `SegmentStart { compacted_from }` and reset their view.
|
||||
// Build the SegmentStart entry for the new compacted segment.
|
||||
// Inherits the source Segment's session_id so the compacted
|
||||
// lineage stays grouped under the same Session. Atomically
|
||||
// rotate: create on disk, swap location, reset the broadcast
|
||||
// sink so existing subscribers see the new `SegmentStart
|
||||
// { compacted_from }` and reset their view.
|
||||
let new_segment_id = session_store::new_segment_id();
|
||||
let old_session_id = self.segment_state.segment_id();
|
||||
let old_loc = self.segment_state.location();
|
||||
let source_turn_count = self.worker.as_ref().unwrap().turn_count();
|
||||
let w = self.worker.as_ref().unwrap();
|
||||
let entry = LogEntry::SegmentStart {
|
||||
ts: segment_log::now_millis(),
|
||||
session_id: old_loc.session_id,
|
||||
system_prompt: w.get_system_prompt().map(String::from),
|
||||
config: w.request_config().clone(),
|
||||
history: to_logged(&new_history),
|
||||
forked_from: None,
|
||||
compacted_from: Some(session_store::SegmentOrigin {
|
||||
segment_id: old_session_id,
|
||||
segment_id: old_loc.segment_id,
|
||||
at_turn_index: source_turn_count,
|
||||
}),
|
||||
};
|
||||
self.store.create_segment(new_segment_id, &[entry.clone()])?;
|
||||
self.segment_state.set_segment_id(new_segment_id);
|
||||
self.store
|
||||
.create_segment(old_loc.session_id, new_segment_id, &[entry.clone()])?;
|
||||
self.segment_state.set_location(SegmentLocation {
|
||||
session_id: old_loc.session_id,
|
||||
segment_id: new_segment_id,
|
||||
});
|
||||
self.segment_state.set_entries_written(1);
|
||||
let session_start = entry;
|
||||
// Broadcast the SegmentStart through the sink. This atomically
|
||||
|
|
@ -2367,7 +2409,10 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
// Read the session log to get the current entry count. This is
|
||||
// the boundary for the source.range end_entry. Called once per
|
||||
// extract, on a small local file.
|
||||
let entries_now = self.store.read_all(self.segment_id())?.len();
|
||||
let entries_now = self
|
||||
.store
|
||||
.read_all(self.session_id(), self.segment_id())?
|
||||
.len();
|
||||
if entries_now == 0 {
|
||||
return Ok(ExtractDecision::Skipped);
|
||||
}
|
||||
|
|
@ -2738,8 +2783,9 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
|
|||
// Segment creation is deferred to the first run (see
|
||||
// `ensure_segment_head`) so the SegmentStart entry can capture
|
||||
// the rendered system prompt, not the raw template source. The
|
||||
// segment_id is allocated here so the pod-registry registration
|
||||
// can record it from the start.
|
||||
// session_id + segment_id are allocated here so the pod-registry
|
||||
// registration can record them from the start.
|
||||
let session_id = session_store::new_session_id();
|
||||
let segment_id = session_store::new_segment_id();
|
||||
|
||||
// Register this Pod in the machine-wide pod-registry
|
||||
|
|
@ -2765,7 +2811,7 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
|
|||
manifest,
|
||||
worker: Some(worker),
|
||||
store,
|
||||
segment_state: SegmentState::new(segment_id, 0),
|
||||
segment_state: SegmentState::new(session_id, segment_id, 0),
|
||||
pwd: common.pwd,
|
||||
scope: SharedScope::new(common.scope),
|
||||
hook_builder: HookRegistryBuilder::new(),
|
||||
|
|
@ -2820,6 +2866,9 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
|
|||
let mut common = prepare_pod_common(&manifest, &loader, /* parse_template */ true)?;
|
||||
let skill_shadows = std::mem::take(&mut common.skill_shadows);
|
||||
|
||||
// A spawned child starts its own conversation, so it mints a
|
||||
// fresh Session rather than joining the spawner's.
|
||||
let session_id = session_store::new_session_id();
|
||||
let segment_id = session_store::new_segment_id();
|
||||
let scope_allocation = pod_registry::adopt_allocation(
|
||||
manifest.pod.name.clone(),
|
||||
|
|
@ -2835,7 +2884,7 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
|
|||
manifest,
|
||||
worker: Some(worker),
|
||||
store,
|
||||
segment_state: SegmentState::new(segment_id, 0),
|
||||
segment_state: SegmentState::new(session_id, segment_id, 0),
|
||||
pwd: common.pwd,
|
||||
scope: SharedScope::new(common.scope),
|
||||
hook_builder: HookRegistryBuilder::new(),
|
||||
|
|
@ -2892,6 +2941,7 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
|
|||
/// session keeps a stable cache prefix even when the manifest's
|
||||
/// instruction template would render differently today.
|
||||
pub async fn restore_from_manifest(
|
||||
session_id: SessionId,
|
||||
segment_id: SegmentId,
|
||||
manifest: PodManifest,
|
||||
store: St,
|
||||
|
|
@ -2900,7 +2950,7 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
|
|||
// Read raw entries once so we can both reconstruct state and
|
||||
// seed the broadcast sink's mirror with the same prefix that
|
||||
// sits on disk.
|
||||
let raw_entries = store.read_all(segment_id)?;
|
||||
let raw_entries = store.read_all(session_id, segment_id)?;
|
||||
let state = session_store::collect_state(&raw_entries);
|
||||
if state.entries_count == 0 {
|
||||
return Err(PodError::SegmentEmpty { segment_id });
|
||||
|
|
@ -2974,7 +3024,7 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
|
|||
manifest,
|
||||
worker: Some(worker),
|
||||
store,
|
||||
segment_state: SegmentState::new(segment_id, state.entries_count),
|
||||
segment_state: SegmentState::new(session_id, segment_id, state.entries_count),
|
||||
pwd: common.pwd,
|
||||
scope: SharedScope::new(common.scope),
|
||||
hook_builder: HookRegistryBuilder::new(),
|
||||
|
|
|
|||
|
|
@ -203,6 +203,7 @@ mod tests {
|
|||
fn session_start() -> LogEntry {
|
||||
LogEntry::SegmentStart {
|
||||
ts: now_millis(),
|
||||
session_id: uuid::Uuid::nil(),
|
||||
system_prompt: None,
|
||||
config: RequestConfig::default(),
|
||||
history: vec![],
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@
|
|||
use std::sync::{LazyLock, Mutex};
|
||||
|
||||
use pod::{Pod, PodError};
|
||||
use session_store::{FsStore, SegmentId, StoreError};
|
||||
use session_store::{FsStore, StoreError};
|
||||
|
||||
const MINIMAL_MANIFEST_TOML: &str = r#"
|
||||
[pod]
|
||||
|
|
@ -32,7 +32,7 @@ permission = "write"
|
|||
static ENV_LOCK: LazyLock<Mutex<()>> = LazyLock::new(|| Mutex::new(()));
|
||||
|
||||
#[tokio::test]
|
||||
async fn restore_from_manifest_rejects_unknown_session() {
|
||||
async fn restore_from_manifest_rejects_unknown_segment() {
|
||||
let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
|
||||
|
||||
let store_tmp = tempfile::tempdir().unwrap();
|
||||
|
|
@ -42,66 +42,76 @@ async fn restore_from_manifest_rejects_unknown_session() {
|
|||
// A freshly-minted id with no jsonl file at all → store returns
|
||||
// NotFound, which `Pod::restore_from_manifest` surfaces verbatim
|
||||
// as `PodError::Store`.
|
||||
let unknown = session_store::new_segment_id();
|
||||
let unknown_sid = session_store::new_session_id();
|
||||
let unknown_seg = session_store::new_segment_id();
|
||||
let result = Pod::restore_from_manifest(
|
||||
unknown_sid,
|
||||
unknown_seg,
|
||||
manifest,
|
||||
store,
|
||||
pod::PromptLoader::builtins_only(),
|
||||
)
|
||||
.await;
|
||||
|
||||
match result {
|
||||
Err(PodError::Store(StoreError::NotFound(id))) => assert_eq!(id, unknown_seg),
|
||||
Err(other) => panic!("expected Store(NotFound), got {other:?}"),
|
||||
Ok(_) => panic!("expected unknown segment to fail"),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn restore_from_manifest_rejects_empty_segment_log() {
|
||||
let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
|
||||
|
||||
let store_tmp = tempfile::tempdir().unwrap();
|
||||
let store = FsStore::new(store_tmp.path()).unwrap();
|
||||
let manifest = pod::PodManifest::from_toml(MINIMAL_MANIFEST_TOML).unwrap();
|
||||
|
||||
// Pre-create an empty `<sid>/<segid>.jsonl` so `read_all` succeeds
|
||||
// with no entries. `collect_state` returns `entries_count = 0`,
|
||||
// which `restore_from_manifest` rejects with `SegmentEmpty` *before*
|
||||
// it gets as far as building the LLM client.
|
||||
let sid = session_store::new_session_id();
|
||||
let segid = session_store::new_segment_id();
|
||||
let dir = store_tmp.path().join(sid.to_string());
|
||||
std::fs::create_dir_all(&dir).unwrap();
|
||||
std::fs::write(dir.join(format!("{segid}.jsonl")), b"").unwrap();
|
||||
|
||||
let result =
|
||||
Pod::restore_from_manifest(unknown, manifest, store, pod::PromptLoader::builtins_only())
|
||||
Pod::restore_from_manifest(sid, segid, manifest, store, pod::PromptLoader::builtins_only())
|
||||
.await;
|
||||
|
||||
match result {
|
||||
Err(PodError::Store(StoreError::NotFound(id))) => assert_eq!(id, unknown),
|
||||
Err(other) => panic!("expected Store(NotFound), got {other:?}"),
|
||||
Ok(_) => panic!("expected unknown session to fail"),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn restore_from_manifest_rejects_empty_session_log() {
|
||||
let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
|
||||
|
||||
let store_tmp = tempfile::tempdir().unwrap();
|
||||
let store = FsStore::new(store_tmp.path()).unwrap();
|
||||
let manifest = pod::PodManifest::from_toml(MINIMAL_MANIFEST_TOML).unwrap();
|
||||
|
||||
// Pre-create an empty `<id>.jsonl` so `read_all` succeeds with no
|
||||
// entries. `collect_state` returns `entries_count = 0`, which
|
||||
// `restore_from_manifest` rejects with `SegmentEmpty` *before* it
|
||||
// gets as far as building the LLM client — so the test does not
|
||||
// need credentials or a runtime sandbox.
|
||||
let id: SegmentId = session_store::new_segment_id();
|
||||
let path = store_tmp.path().join(format!("{id}.jsonl"));
|
||||
std::fs::write(&path, b"").unwrap();
|
||||
|
||||
let result =
|
||||
Pod::restore_from_manifest(id, manifest, store, pod::PromptLoader::builtins_only()).await;
|
||||
|
||||
match result {
|
||||
Err(PodError::SegmentEmpty { segment_id }) => assert_eq!(segment_id, id),
|
||||
Err(PodError::SegmentEmpty { segment_id }) => assert_eq!(segment_id, segid),
|
||||
Err(other) => panic!("expected SegmentEmpty, got {other:?}"),
|
||||
Ok(_) => panic!("expected empty session log to fail"),
|
||||
Ok(_) => panic!("expected empty segment log to fail"),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn restore_from_manifest_rejects_session_without_scope_snapshot() {
|
||||
async fn restore_from_manifest_rejects_segment_without_scope_snapshot() {
|
||||
let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
|
||||
|
||||
let store_tmp = tempfile::tempdir().unwrap();
|
||||
let store = FsStore::new(store_tmp.path()).unwrap();
|
||||
let manifest = pod::PodManifest::from_toml(MINIMAL_MANIFEST_TOML).unwrap();
|
||||
|
||||
let id = session_store::new_segment_id();
|
||||
let sid = session_store::new_session_id();
|
||||
let segid = session_store::new_segment_id();
|
||||
let state = session_store::SegmentStartState {
|
||||
system_prompt: None,
|
||||
config: &Default::default(),
|
||||
history: &[],
|
||||
};
|
||||
session_store::create_segment_with_id(&store, id, state).unwrap();
|
||||
session_store::create_segment_with_ids(&store, sid, segid, state).unwrap();
|
||||
|
||||
let result =
|
||||
Pod::restore_from_manifest(id, manifest, store, pod::PromptLoader::builtins_only()).await;
|
||||
Pod::restore_from_manifest(sid, segid, manifest, store, pod::PromptLoader::builtins_only())
|
||||
.await;
|
||||
|
||||
match result {
|
||||
Err(PodError::SegmentScopeMissing { segment_id }) => assert_eq!(segment_id, id),
|
||||
Err(PodError::SegmentScopeMissing { segment_id }) => assert_eq!(segment_id, segid),
|
||||
Err(other) => panic!("expected SegmentScopeMissing, got {other:?}"),
|
||||
Ok(_) => panic!("expected missing scope snapshot to fail"),
|
||||
}
|
||||
|
|
|
|||
|
|
@ -26,7 +26,7 @@ use llm_worker::llm_client::event::{Event as LlmEvent, ResponseStatus, StatusEve
|
|||
use llm_worker::llm_client::{ClientError, LlmClient, Request};
|
||||
use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput};
|
||||
use session_metrics::{DOMAIN, Metric, metrics_from_extensions};
|
||||
use session_store::{FsStore, LogEntry, SegmentId, Store, StoreError, TraceEntry};
|
||||
use session_store::{FsStore, LogEntry, SegmentId, SessionId, Store, StoreError, TraceEntry};
|
||||
|
||||
use pod::{Pod, PodManifest};
|
||||
|
||||
|
|
@ -200,6 +200,7 @@ async fn prune_metrics_emit_skip_then_fire_with_post_request_join() {
|
|||
text_response_with_cache("done", 1234, 50),
|
||||
]);
|
||||
let (mut pod, _store_tmp, _pwd_tmp) = make_pod(manifest_toml(1, 1), client, "big_tool").await;
|
||||
let session_id = pod.session_id();
|
||||
let segment_id = pod.segment_id();
|
||||
// Cloning the store handle to read the session log back after the
|
||||
// runs complete — the Pod retains its own copy.
|
||||
|
|
@ -208,7 +209,7 @@ async fn prune_metrics_emit_skip_then_fire_with_post_request_join() {
|
|||
pod.run_text("first").await.unwrap();
|
||||
pod.run_text("second").await.unwrap();
|
||||
|
||||
let state = session_store::restore(&store, segment_id).unwrap();
|
||||
let state = session_store::restore(&store, session_id, segment_id).unwrap();
|
||||
let metrics = metrics_from_extensions(&state.extensions);
|
||||
|
||||
// Run 1 has 2 LLM iterations (tool loop), each evaluates prune with
|
||||
|
|
@ -288,13 +289,14 @@ async fn prune_metrics_record_below_min_savings_skip() {
|
|||
]);
|
||||
let (mut pod, _store_tmp, _pwd_tmp) =
|
||||
make_pod(manifest_toml(1, u64::MAX), client, "big_tool").await;
|
||||
let session_id = pod.session_id();
|
||||
let segment_id = pod.segment_id();
|
||||
let store = pod.store().clone();
|
||||
|
||||
pod.run_text("first").await.unwrap();
|
||||
pod.run_text("second").await.unwrap();
|
||||
|
||||
let state = session_store::restore(&store, segment_id).unwrap();
|
||||
let state = session_store::restore(&store, session_id, segment_id).unwrap();
|
||||
let metrics = metrics_from_extensions(&state.extensions);
|
||||
let below = metrics
|
||||
.iter()
|
||||
|
|
@ -327,31 +329,60 @@ struct MetricFailingStore {
|
|||
}
|
||||
|
||||
impl Store for MetricFailingStore {
|
||||
fn append(&self, id: SegmentId, entry: &LogEntry) -> Result<(), StoreError> {
|
||||
fn append(
|
||||
&self,
|
||||
session_id: SessionId,
|
||||
segment_id: SegmentId,
|
||||
entry: &LogEntry,
|
||||
) -> Result<(), StoreError> {
|
||||
if let LogEntry::Extension { domain, .. } = entry {
|
||||
if domain == DOMAIN {
|
||||
return Err(StoreError::Io(std::io::Error::other("synthetic failure")));
|
||||
}
|
||||
}
|
||||
self.inner.append(id, entry)
|
||||
self.inner.append(session_id, segment_id, entry)
|
||||
}
|
||||
fn read_all(&self, id: SegmentId) -> Result<Vec<LogEntry>, StoreError> {
|
||||
self.inner.read_all(id)
|
||||
fn read_all(
|
||||
&self,
|
||||
session_id: SessionId,
|
||||
segment_id: SegmentId,
|
||||
) -> Result<Vec<LogEntry>, StoreError> {
|
||||
self.inner.read_all(session_id, segment_id)
|
||||
}
|
||||
fn list_segments(&self) -> Result<Vec<SegmentId>, StoreError> {
|
||||
self.inner.list_segments()
|
||||
fn list_sessions(&self) -> Result<Vec<SessionId>, StoreError> {
|
||||
self.inner.list_sessions()
|
||||
}
|
||||
fn create_segment(&self, id: SegmentId, entries: &[LogEntry]) -> Result<(), StoreError> {
|
||||
self.inner.create_segment(id, entries)
|
||||
fn list_segments(&self, session_id: SessionId) -> Result<Vec<SegmentId>, StoreError> {
|
||||
self.inner.list_segments(session_id)
|
||||
}
|
||||
fn exists(&self, id: SegmentId) -> Result<bool, StoreError> {
|
||||
self.inner.exists(id)
|
||||
fn lookup_session_of(&self, segment_id: SegmentId) -> Result<Option<SessionId>, StoreError> {
|
||||
self.inner.lookup_session_of(segment_id)
|
||||
}
|
||||
fn read_entry_count(&self, id: SegmentId) -> Result<usize, StoreError> {
|
||||
self.inner.read_entry_count(id)
|
||||
fn create_segment(
|
||||
&self,
|
||||
session_id: SessionId,
|
||||
segment_id: SegmentId,
|
||||
entries: &[LogEntry],
|
||||
) -> Result<(), StoreError> {
|
||||
self.inner.create_segment(session_id, segment_id, entries)
|
||||
}
|
||||
fn append_trace(&self, id: SegmentId, entry: &TraceEntry) -> Result<(), StoreError> {
|
||||
self.inner.append_trace(id, entry)
|
||||
fn exists(&self, session_id: SessionId, segment_id: SegmentId) -> Result<bool, StoreError> {
|
||||
self.inner.exists(session_id, segment_id)
|
||||
}
|
||||
fn read_entry_count(
|
||||
&self,
|
||||
session_id: SessionId,
|
||||
segment_id: SegmentId,
|
||||
) -> Result<usize, StoreError> {
|
||||
self.inner.read_entry_count(session_id, segment_id)
|
||||
}
|
||||
fn append_trace(
|
||||
&self,
|
||||
session_id: SessionId,
|
||||
segment_id: SegmentId,
|
||||
entry: &TraceEntry,
|
||||
) -> Result<(), StoreError> {
|
||||
self.inner.append_trace(session_id, segment_id, entry)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -386,12 +417,13 @@ async fn metric_write_failure_emits_warn_alert_and_does_not_abort_run() {
|
|||
let alerter = pod::Alerter::new(tx);
|
||||
pod.attach_alerter(alerter);
|
||||
|
||||
let session_id = pod.session_id();
|
||||
let segment_id = pod.segment_id();
|
||||
// Run completes successfully despite metric failure.
|
||||
pod.run_text("hello").await.unwrap();
|
||||
|
||||
// No metrics ended up in the log (writes were rejected).
|
||||
let state = session_store::restore(&store, segment_id).unwrap();
|
||||
let state = session_store::restore(&store, session_id, segment_id).unwrap();
|
||||
let metrics = metrics_from_extensions(&state.extensions);
|
||||
assert!(metrics.is_empty(), "metrics must drop on write failure");
|
||||
|
||||
|
|
@ -446,10 +478,11 @@ permission = "write"
|
|||
let mut pod = Pod::new(manifest, worker, store.clone(), pwd, scope)
|
||||
.await
|
||||
.unwrap();
|
||||
let session_id = pod.session_id();
|
||||
let segment_id = pod.segment_id();
|
||||
pod.run_text("hello").await.unwrap();
|
||||
|
||||
let state = session_store::restore(&store, segment_id).unwrap();
|
||||
let state = session_store::restore(&store, session_id, segment_id).unwrap();
|
||||
let metrics = metrics_from_extensions(&state.extensions);
|
||||
assert!(
|
||||
metrics.is_empty(),
|
||||
|
|
|
|||
|
|
@ -182,7 +182,10 @@ async fn session_start_state_captures_rendered_prompt() {
|
|||
.unwrap();
|
||||
pod.run_text("hi").await.unwrap();
|
||||
|
||||
let entries = pod.store().read_all(pod.segment_id()).unwrap();
|
||||
let entries = pod
|
||||
.store()
|
||||
.read_all(pod.session_id(), pod.segment_id())
|
||||
.unwrap();
|
||||
let first = entries.first().expect("at least one entry");
|
||||
match first {
|
||||
LogEntry::SegmentStart { system_prompt, .. } => {
|
||||
|
|
|
|||
|
|
@ -18,7 +18,7 @@
|
|||
use std::collections::BTreeMap;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use session_store::{SegmentId, Store, StoreError, save_extension, segment_log};
|
||||
use session_store::{SegmentId, SessionId, Store, StoreError, save_extension, segment_log};
|
||||
|
||||
/// Domain tag used in `LogEntry::Extension` for all metrics records.
|
||||
pub const DOMAIN: &str = "metrics";
|
||||
|
|
@ -77,11 +77,12 @@ impl Metric {
|
|||
/// (メトリクスのために本体処理を止めるかは呼び出し側の判断)。
|
||||
pub fn record_metric(
|
||||
store: &impl Store,
|
||||
session_id: SessionId,
|
||||
segment_id: SegmentId,
|
||||
metric: &Metric,
|
||||
) -> Result<(), StoreError> {
|
||||
let payload = serde_json::to_value(metric).expect("Metric serialization cannot fail");
|
||||
save_extension(store, segment_id, DOMAIN, payload)
|
||||
save_extension(store, session_id, segment_id, DOMAIN, payload)
|
||||
}
|
||||
|
||||
/// `RestoredState.extensions` から metrics domain の payload を順に取り出し、
|
||||
|
|
|
|||
|
|
@ -1,13 +1,24 @@
|
|||
//! Filesystem-backed JSONL store.
|
||||
//!
|
||||
//! Layout:
|
||||
//! - Segment log: `{root}/{segment_id}.jsonl`
|
||||
//! - Event trace: `{root}/{segment_id}.trace.jsonl`
|
||||
//! - Segment log: `{root}/{session_id}/{segment_id}.jsonl`
|
||||
//! - Event trace: `{root}/{session_id}/{segment_id}.trace.jsonl`
|
||||
//!
|
||||
//! The per-Session directory makes `list_segments(session_id)` an O(dir)
|
||||
//! scan and gives the fork tree a visible grouping in the filesystem.
|
||||
//!
|
||||
//! Migration: this layout is incompatible with the pre-`session-grouping`
|
||||
//! flat `{root}/{segment_id}.jsonl` form. Project policy is no
|
||||
//! backward compatibility — discard `~/.insomnia/sessions/` (or whatever
|
||||
//! `root` resolved to) before running the new code. `list_sessions`
|
||||
//! ignores top-level files outside session directories, so leftover
|
||||
//! flat files do not corrupt new sessions, but they are no longer
|
||||
//! enumerable by the picker.
|
||||
|
||||
use crate::SegmentId;
|
||||
use crate::event_trace::TraceEntry;
|
||||
use crate::segment_log::LogEntry;
|
||||
use crate::store::{Store, StoreError};
|
||||
use crate::{SegmentId, SessionId};
|
||||
use std::fs;
|
||||
use std::io::Write;
|
||||
use std::path::{Path, PathBuf};
|
||||
|
|
@ -30,15 +41,24 @@ impl FsStore {
|
|||
Ok(Self { root })
|
||||
}
|
||||
|
||||
fn log_path(&self, id: SegmentId) -> PathBuf {
|
||||
self.root.join(format!("{id}.jsonl"))
|
||||
fn session_dir(&self, session_id: SessionId) -> PathBuf {
|
||||
self.root.join(session_id.to_string())
|
||||
}
|
||||
|
||||
fn trace_path(&self, id: SegmentId) -> PathBuf {
|
||||
self.root.join(format!("{id}.trace.jsonl"))
|
||||
fn log_path(&self, session_id: SessionId, segment_id: SegmentId) -> PathBuf {
|
||||
self.session_dir(session_id)
|
||||
.join(format!("{segment_id}.jsonl"))
|
||||
}
|
||||
|
||||
fn trace_path(&self, session_id: SessionId, segment_id: SegmentId) -> PathBuf {
|
||||
self.session_dir(session_id)
|
||||
.join(format!("{segment_id}.trace.jsonl"))
|
||||
}
|
||||
|
||||
fn append_line(&self, path: &Path, line: &str) -> Result<(), StoreError> {
|
||||
if let Some(parent) = path.parent() {
|
||||
fs::create_dir_all(parent)?;
|
||||
}
|
||||
let mut file = fs::OpenOptions::new().create(true).append(true).open(path)?;
|
||||
file.write_all(line.as_bytes())?;
|
||||
file.write_all(b"\n")?;
|
||||
|
|
@ -65,23 +85,56 @@ impl FsStore {
|
|||
}
|
||||
|
||||
impl Store for FsStore {
|
||||
fn append(&self, id: SegmentId, entry: &LogEntry) -> Result<(), StoreError> {
|
||||
fn append(
|
||||
&self,
|
||||
session_id: SessionId,
|
||||
segment_id: SegmentId,
|
||||
entry: &LogEntry,
|
||||
) -> Result<(), StoreError> {
|
||||
let line = serde_json::to_string(entry)?;
|
||||
self.append_line(&self.log_path(id), &line)
|
||||
self.append_line(&self.log_path(session_id, segment_id), &line)
|
||||
}
|
||||
|
||||
fn read_all(&self, id: SegmentId) -> Result<Vec<LogEntry>, StoreError> {
|
||||
let path = self.log_path(id);
|
||||
fn read_all(
|
||||
&self,
|
||||
session_id: SessionId,
|
||||
segment_id: SegmentId,
|
||||
) -> Result<Vec<LogEntry>, StoreError> {
|
||||
let path = self.log_path(session_id, segment_id);
|
||||
if !path.exists() {
|
||||
return Err(StoreError::NotFound(id));
|
||||
return Err(StoreError::NotFound(segment_id));
|
||||
}
|
||||
let content = fs::read_to_string(&path)?;
|
||||
Self::parse_jsonl(&content)
|
||||
}
|
||||
|
||||
fn list_segments(&self) -> Result<Vec<SegmentId>, StoreError> {
|
||||
let mut segments = Vec::new();
|
||||
fn list_sessions(&self) -> Result<Vec<SessionId>, StoreError> {
|
||||
let mut sessions = Vec::new();
|
||||
if !self.root.exists() {
|
||||
return Ok(sessions);
|
||||
}
|
||||
for entry in fs::read_dir(&self.root)? {
|
||||
let entry = entry?;
|
||||
if !entry.file_type()?.is_dir() {
|
||||
continue;
|
||||
}
|
||||
if let Some(name) = entry.file_name().to_str() {
|
||||
if let Ok(id) = name.parse::<SessionId>() {
|
||||
sessions.push(id);
|
||||
}
|
||||
}
|
||||
}
|
||||
sessions.sort_by(|a, b| b.cmp(a));
|
||||
Ok(sessions)
|
||||
}
|
||||
|
||||
fn list_segments(&self, session_id: SessionId) -> Result<Vec<SegmentId>, StoreError> {
|
||||
let dir = self.session_dir(session_id);
|
||||
let mut segments = Vec::new();
|
||||
if !dir.exists() {
|
||||
return Ok(segments);
|
||||
}
|
||||
for entry in fs::read_dir(&dir)? {
|
||||
let entry = entry?;
|
||||
let path = entry.path();
|
||||
// Only match .jsonl files, not .trace.jsonl
|
||||
|
|
@ -98,8 +151,36 @@ impl Store for FsStore {
|
|||
Ok(segments)
|
||||
}
|
||||
|
||||
fn create_segment(&self, id: SegmentId, entries: &[LogEntry]) -> Result<(), StoreError> {
|
||||
let path = self.log_path(id);
|
||||
fn lookup_session_of(&self, segment_id: SegmentId) -> Result<Option<SessionId>, StoreError> {
|
||||
if !self.root.exists() {
|
||||
return Ok(None);
|
||||
}
|
||||
let needle = format!("{segment_id}.jsonl");
|
||||
for entry in fs::read_dir(&self.root)? {
|
||||
let entry = entry?;
|
||||
if !entry.file_type()?.is_dir() {
|
||||
continue;
|
||||
}
|
||||
if entry.path().join(&needle).exists()
|
||||
&& let Some(name) = entry.file_name().to_str()
|
||||
&& let Ok(id) = name.parse::<SessionId>()
|
||||
{
|
||||
return Ok(Some(id));
|
||||
}
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
fn create_segment(
|
||||
&self,
|
||||
session_id: SessionId,
|
||||
segment_id: SegmentId,
|
||||
entries: &[LogEntry],
|
||||
) -> Result<(), StoreError> {
|
||||
let path = self.log_path(session_id, segment_id);
|
||||
if let Some(parent) = path.parent() {
|
||||
fs::create_dir_all(parent)?;
|
||||
}
|
||||
let mut content = String::new();
|
||||
for entry in entries {
|
||||
content.push_str(&serde_json::to_string(entry)?);
|
||||
|
|
@ -109,21 +190,30 @@ impl Store for FsStore {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
fn exists(&self, id: SegmentId) -> Result<bool, StoreError> {
|
||||
Ok(self.log_path(id).exists())
|
||||
fn exists(&self, session_id: SessionId, segment_id: SegmentId) -> Result<bool, StoreError> {
|
||||
Ok(self.log_path(session_id, segment_id).exists())
|
||||
}
|
||||
|
||||
fn read_entry_count(&self, id: SegmentId) -> Result<usize, StoreError> {
|
||||
let path = self.log_path(id);
|
||||
fn read_entry_count(
|
||||
&self,
|
||||
session_id: SessionId,
|
||||
segment_id: SegmentId,
|
||||
) -> Result<usize, StoreError> {
|
||||
let path = self.log_path(session_id, segment_id);
|
||||
if !path.exists() {
|
||||
return Err(StoreError::NotFound(id));
|
||||
return Err(StoreError::NotFound(segment_id));
|
||||
}
|
||||
let content = fs::read_to_string(&path)?;
|
||||
Ok(content.lines().filter(|l| !l.trim().is_empty()).count())
|
||||
}
|
||||
|
||||
fn append_trace(&self, id: SegmentId, entry: &TraceEntry) -> Result<(), StoreError> {
|
||||
fn append_trace(
|
||||
&self,
|
||||
session_id: SessionId,
|
||||
segment_id: SegmentId,
|
||||
entry: &TraceEntry,
|
||||
) -> Result<(), StoreError> {
|
||||
let line = serde_json::to_string(entry)?;
|
||||
self.append_line(&self.trace_path(id), &line)
|
||||
self.append_line(&self.trace_path(session_id, segment_id), &line)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,10 +1,14 @@
|
|||
//! Segment persistence via append-only JSONL logs.
|
||||
//! Session persistence via append-only JSONL logs.
|
||||
//!
|
||||
//! # Architecture
|
||||
//!
|
||||
//! Sessions are recorded as a sequence of [`LogEntry`] values, one per line
|
||||
//! in a `.jsonl` file. Reading the log and collecting entries reconstructs
|
||||
//! the full Worker state — no separate snapshots or checkpoints needed.
|
||||
//! A [`Session`](SessionId) is a fork-tree of [`Segment`](SegmentId)s
|
||||
//! belonging to the same logical conversation. Each Segment is recorded
|
||||
//! as a sequence of [`LogEntry`] values, one per line in a `.jsonl`
|
||||
//! file. Reading a segment log and collecting entries reconstructs the
|
||||
//! Worker state at that segment — no separate snapshots or checkpoints
|
||||
//! needed. Compaction and fork operations mint a fresh Segment within
|
||||
//! the same Session.
|
||||
//!
|
||||
//! This crate provides free functions for persistence operations.
|
||||
//! The caller (typically Pod) holds the Worker directly and calls these
|
||||
|
|
@ -19,7 +23,7 @@
|
|||
//! use session_store::{create_segment, restore, save_delta, FsStore, SegmentStartState};
|
||||
//!
|
||||
//! let store = FsStore::new("./sessions")?;
|
||||
//! let segment_id = create_segment(&store, SegmentStartState {
|
||||
//! let (session_id, segment_id) = create_segment(&store, SegmentStartState {
|
||||
//! system_prompt: None,
|
||||
//! config: &config,
|
||||
//! history: &[],
|
||||
|
|
@ -41,9 +45,10 @@ pub use llm_worker::llm_client::types::{ContentPart, Item, Role};
|
|||
pub use logged_item::{LoggedContentPart, LoggedItem, LoggedRole, from_logged, to_logged};
|
||||
pub use segment::{
|
||||
SegmentStartState, append_entry, append_system_item, classify_history_item,
|
||||
create_compacted_segment, create_segment, create_segment_with_id, ensure_head_or_fork, fork,
|
||||
fork_at, restore, save_config_changed, save_delta, save_extension, save_pod_scope,
|
||||
save_run_completed, save_run_errored, save_turn_end, save_usage, save_user_input,
|
||||
create_compacted_segment, create_segment, create_segment_with_ids, ensure_head_or_fork, fork,
|
||||
fork_at, restore, restore_by_segment, save_config_changed, save_delta, save_extension,
|
||||
save_pod_scope, save_run_completed, save_run_errored, save_turn_end, save_usage,
|
||||
save_user_input,
|
||||
};
|
||||
pub use segment_log::{
|
||||
LogEntry, POD_SCOPE_EXTENSION_DOMAIN, PodScopeSnapshot, RestoredState, SegmentOrigin,
|
||||
|
|
@ -52,9 +57,21 @@ pub use segment_log::{
|
|||
pub use system_item::{SystemItem, render_pod_event};
|
||||
pub use store::{Store, StoreError};
|
||||
|
||||
/// Session identifier — the fork-tree root. UUID v7 (time-ordered).
|
||||
///
|
||||
/// All Segments belonging to the same Session share this ID. Compaction
|
||||
/// and fork operations create a new Segment within the same Session, so
|
||||
/// `WHERE session_id = ?` retrieves the full lineage.
|
||||
pub type SessionId = uuid::Uuid;
|
||||
|
||||
/// Segment identifier. UUID v7 (time-ordered, lexicographically sortable).
|
||||
pub type SegmentId = uuid::Uuid;
|
||||
|
||||
/// Generate a new session ID.
|
||||
pub fn new_session_id() -> SessionId {
|
||||
uuid::Uuid::now_v7()
|
||||
}
|
||||
|
||||
/// Generate a new segment ID.
|
||||
pub fn new_segment_id() -> SegmentId {
|
||||
uuid::Uuid::now_v7()
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@
|
|||
//! The caller (typically Pod) holds the Worker directly and calls these
|
||||
//! functions after state-mutating operations.
|
||||
|
||||
use crate::SegmentId;
|
||||
use crate::{SegmentId, SessionId};
|
||||
use crate::logged_item::{LoggedItem, to_logged};
|
||||
use crate::segment_log::{self, LogEntry, PodScopeSnapshot, SegmentOrigin};
|
||||
use crate::store::{Store, StoreError};
|
||||
|
|
@ -21,38 +21,43 @@ pub struct SegmentStartState<'a> {
|
|||
pub history: &'a [Item],
|
||||
}
|
||||
|
||||
/// Create a new segment, writing the initial `SegmentStart` entry.
|
||||
/// Create a new session + initial segment, writing the initial
|
||||
/// `SegmentStart` entry. Returns the freshly minted `(session_id, segment_id)`.
|
||||
pub fn create_segment(
|
||||
store: &impl Store,
|
||||
state: SegmentStartState<'_>,
|
||||
) -> Result<SegmentId, StoreError> {
|
||||
) -> Result<(SessionId, SegmentId), StoreError> {
|
||||
let session_id = crate::new_session_id();
|
||||
let segment_id = crate::new_segment_id();
|
||||
create_segment_with_id(store, segment_id, state)?;
|
||||
Ok(segment_id)
|
||||
create_segment_with_ids(store, session_id, segment_id, state)?;
|
||||
Ok((session_id, segment_id))
|
||||
}
|
||||
|
||||
/// Write a fresh `SegmentStart` entry using a pre-generated segment ID.
|
||||
/// Write a fresh `SegmentStart` entry using pre-generated IDs.
|
||||
///
|
||||
/// Used by callers that need to reserve a segment ID synchronously but
|
||||
/// defer the initial log append (e.g. Pod, which resolves a templated
|
||||
/// system prompt only at first turn).
|
||||
pub fn create_segment_with_id(
|
||||
/// Used by callers that need to reserve `(session_id, segment_id)`
|
||||
/// synchronously but defer the initial log append (e.g. Pod, which
|
||||
/// resolves a templated system prompt only at first turn).
|
||||
pub fn create_segment_with_ids(
|
||||
store: &impl Store,
|
||||
session_id: SessionId,
|
||||
segment_id: SegmentId,
|
||||
state: SegmentStartState<'_>,
|
||||
) -> Result<(), StoreError> {
|
||||
let entry = LogEntry::SegmentStart {
|
||||
ts: segment_log::now_millis(),
|
||||
session_id,
|
||||
system_prompt: state.system_prompt.map(String::from),
|
||||
config: state.config.clone(),
|
||||
history: to_logged(state.history),
|
||||
forked_from: None,
|
||||
compacted_from: None,
|
||||
};
|
||||
store.append(segment_id, &entry)
|
||||
store.append(session_id, segment_id, &entry)
|
||||
}
|
||||
|
||||
/// Create a compacted segment from an existing one.
|
||||
/// Create a compacted segment from an existing one. Inherits the source's
|
||||
/// `session_id` so the compacted lineage stays within the same Session.
|
||||
///
|
||||
/// Records `compacted_from` provenance linking back to the source segment
|
||||
/// at the turn boundary captured by `source_turn_count` (the most recent
|
||||
|
|
@ -60,12 +65,14 @@ pub fn create_segment_with_id(
|
|||
pub fn create_compacted_segment(
|
||||
store: &impl Store,
|
||||
state: SegmentStartState<'_>,
|
||||
source_session_id: SessionId,
|
||||
source_segment_id: SegmentId,
|
||||
source_turn_count: usize,
|
||||
) -> Result<SegmentId, StoreError> {
|
||||
let segment_id = crate::new_segment_id();
|
||||
let entry = LogEntry::SegmentStart {
|
||||
ts: segment_log::now_millis(),
|
||||
session_id: source_session_id,
|
||||
system_prompt: state.system_prompt.map(String::from),
|
||||
config: state.config.clone(),
|
||||
history: to_logged(state.history),
|
||||
|
|
@ -75,7 +82,7 @@ pub fn create_compacted_segment(
|
|||
at_turn_index: source_turn_count,
|
||||
}),
|
||||
};
|
||||
store.append(segment_id, &entry)?;
|
||||
store.append(source_session_id, segment_id, &entry)?;
|
||||
Ok(segment_id)
|
||||
}
|
||||
|
||||
|
|
@ -85,36 +92,54 @@ pub fn create_compacted_segment(
|
|||
/// applying it to a Worker.
|
||||
pub fn restore(
|
||||
store: &impl Store,
|
||||
session_id: SessionId,
|
||||
segment_id: SegmentId,
|
||||
) -> Result<crate::segment_log::RestoredState, StoreError> {
|
||||
let entries = store.read_all(segment_id)?;
|
||||
let entries = store.read_all(session_id, segment_id)?;
|
||||
Ok(segment_log::collect_state(&entries))
|
||||
}
|
||||
|
||||
/// Restore segment state when only the segment ID is known. Uses
|
||||
/// [`Store::lookup_session_of`] to resolve the parent Session.
|
||||
///
|
||||
/// Shim for legacy entry points (`pod-cli --session <UUID>` etc.) that
|
||||
/// receive a Segment ID without a Session ID.
|
||||
pub fn restore_by_segment(
|
||||
store: &impl Store,
|
||||
segment_id: SegmentId,
|
||||
) -> Result<crate::segment_log::RestoredState, StoreError> {
|
||||
let session_id = store
|
||||
.lookup_session_of(segment_id)?
|
||||
.ok_or(StoreError::NotFound(segment_id))?;
|
||||
restore(store, session_id, segment_id)
|
||||
}
|
||||
|
||||
/// Check if the store's entry count still matches the writer's tally.
|
||||
/// If not, auto-fork into a new segment.
|
||||
/// If not, auto-fork into a new segment within the same Session.
|
||||
///
|
||||
/// Updates `segment_id` and `entries_written` in place when a fork occurs.
|
||||
pub fn ensure_head_or_fork(
|
||||
store: &impl Store,
|
||||
session_id: SessionId,
|
||||
segment_id: &mut SegmentId,
|
||||
entries_written: &mut usize,
|
||||
state: SegmentStartState<'_>,
|
||||
) -> Result<(), StoreError> {
|
||||
let store_count = store.read_entry_count(*segment_id)?;
|
||||
let store_count = store.read_entry_count(session_id, *segment_id)?;
|
||||
if store_count == *entries_written {
|
||||
return Ok(());
|
||||
}
|
||||
let fork_id = crate::new_segment_id();
|
||||
let entry = LogEntry::SegmentStart {
|
||||
ts: segment_log::now_millis(),
|
||||
session_id,
|
||||
system_prompt: state.system_prompt.map(String::from),
|
||||
config: state.config.clone(),
|
||||
history: to_logged(state.history),
|
||||
forked_from: None,
|
||||
compacted_from: None,
|
||||
};
|
||||
store.create_segment(fork_id, &[entry])?;
|
||||
store.create_segment(session_id, fork_id, &[entry])?;
|
||||
*segment_id = fork_id;
|
||||
*entries_written = 1;
|
||||
Ok(())
|
||||
|
|
@ -128,11 +153,13 @@ pub fn ensure_head_or_fork(
|
|||
/// [`Segment::flatten_to_text`].
|
||||
pub fn save_user_input(
|
||||
store: &impl Store,
|
||||
session_id: SessionId,
|
||||
segment_id: SegmentId,
|
||||
segments: Vec<Segment>,
|
||||
) -> Result<(), StoreError> {
|
||||
append_entry(
|
||||
store,
|
||||
session_id,
|
||||
segment_id,
|
||||
LogEntry::UserInput {
|
||||
ts: segment_log::now_millis(),
|
||||
|
|
@ -151,6 +178,7 @@ pub fn save_user_input(
|
|||
/// `UserInput` entry.
|
||||
pub fn save_delta(
|
||||
store: &impl Store,
|
||||
session_id: SessionId,
|
||||
segment_id: SegmentId,
|
||||
new_items: &[Item],
|
||||
) -> Result<(), StoreError> {
|
||||
|
|
@ -165,7 +193,7 @@ pub fn save_delta(
|
|||
continue;
|
||||
}
|
||||
let entry = classify_history_item(item, ts);
|
||||
append_entry(store, segment_id, entry)?;
|
||||
append_entry(store, session_id, segment_id, entry)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
|
@ -199,11 +227,13 @@ pub fn classify_history_item(item: &Item, ts: u64) -> LogEntry {
|
|||
/// commit shape used for assistant / tool result entries.
|
||||
pub fn append_system_item(
|
||||
store: &impl Store,
|
||||
session_id: SessionId,
|
||||
segment_id: SegmentId,
|
||||
item: SystemItem,
|
||||
) -> Result<(), StoreError> {
|
||||
append_entry(
|
||||
store,
|
||||
session_id,
|
||||
segment_id,
|
||||
LogEntry::SystemItem {
|
||||
ts: segment_log::now_millis(),
|
||||
|
|
@ -215,11 +245,13 @@ pub fn append_system_item(
|
|||
/// Log a TurnEnd entry.
|
||||
pub fn save_turn_end(
|
||||
store: &impl Store,
|
||||
session_id: SessionId,
|
||||
segment_id: SegmentId,
|
||||
turn_count: usize,
|
||||
) -> Result<(), StoreError> {
|
||||
append_entry(
|
||||
store,
|
||||
session_id,
|
||||
segment_id,
|
||||
LogEntry::TurnEnd {
|
||||
ts: segment_log::now_millis(),
|
||||
|
|
@ -231,12 +263,14 @@ pub fn save_turn_end(
|
|||
/// Log a `RunCompleted` entry — `run()` / `resume()` returned `Ok(WorkerResult)`.
|
||||
pub fn save_run_completed(
|
||||
store: &impl Store,
|
||||
session_id: SessionId,
|
||||
segment_id: SegmentId,
|
||||
result: WorkerResult,
|
||||
interrupted: bool,
|
||||
) -> Result<(), StoreError> {
|
||||
append_entry(
|
||||
store,
|
||||
session_id,
|
||||
segment_id,
|
||||
LogEntry::RunCompleted {
|
||||
ts: segment_log::now_millis(),
|
||||
|
|
@ -252,12 +286,14 @@ pub fn save_run_completed(
|
|||
/// `to_string()` rendering as `message`.
|
||||
pub fn save_run_errored(
|
||||
store: &impl Store,
|
||||
session_id: SessionId,
|
||||
segment_id: SegmentId,
|
||||
message: String,
|
||||
interrupted: bool,
|
||||
) -> Result<(), StoreError> {
|
||||
append_entry(
|
||||
store,
|
||||
session_id,
|
||||
segment_id,
|
||||
LogEntry::RunErrored {
|
||||
ts: segment_log::now_millis(),
|
||||
|
|
@ -275,6 +311,7 @@ pub fn save_run_errored(
|
|||
/// 済ませた値を渡す。
|
||||
pub fn save_usage(
|
||||
store: &impl Store,
|
||||
session_id: SessionId,
|
||||
segment_id: SegmentId,
|
||||
history_len: usize,
|
||||
input_total_tokens: u64,
|
||||
|
|
@ -284,6 +321,7 @@ pub fn save_usage(
|
|||
) -> Result<(), StoreError> {
|
||||
append_entry(
|
||||
store,
|
||||
session_id,
|
||||
segment_id,
|
||||
LogEntry::LlmUsage {
|
||||
ts: segment_log::now_millis(),
|
||||
|
|
@ -303,12 +341,14 @@ pub fn save_usage(
|
|||
/// Use `RestoredState.extensions` to read entries back at restore time.
|
||||
pub fn save_extension(
|
||||
store: &impl Store,
|
||||
session_id: SessionId,
|
||||
segment_id: SegmentId,
|
||||
domain: impl Into<String>,
|
||||
payload: serde_json::Value,
|
||||
) -> Result<(), StoreError> {
|
||||
append_entry(
|
||||
store,
|
||||
session_id,
|
||||
segment_id,
|
||||
LogEntry::Extension {
|
||||
ts: segment_log::now_millis(),
|
||||
|
|
@ -321,12 +361,14 @@ pub fn save_extension(
|
|||
/// Log the Pod's latest runtime scope snapshot.
|
||||
pub fn save_pod_scope(
|
||||
store: &impl Store,
|
||||
session_id: SessionId,
|
||||
segment_id: SegmentId,
|
||||
snapshot: &PodScopeSnapshot,
|
||||
) -> Result<(), StoreError> {
|
||||
let payload = serde_json::to_value(snapshot)?;
|
||||
save_extension(
|
||||
store,
|
||||
session_id,
|
||||
segment_id,
|
||||
segment_log::POD_SCOPE_EXTENSION_DOMAIN,
|
||||
payload,
|
||||
|
|
@ -336,11 +378,13 @@ pub fn save_pod_scope(
|
|||
/// Log a `ConfigChanged` entry.
|
||||
pub fn save_config_changed(
|
||||
store: &impl Store,
|
||||
session_id: SessionId,
|
||||
segment_id: SegmentId,
|
||||
config: &RequestConfig,
|
||||
) -> Result<(), StoreError> {
|
||||
append_entry(
|
||||
store,
|
||||
session_id,
|
||||
segment_id,
|
||||
LogEntry::ConfigChanged {
|
||||
ts: segment_log::now_millis(),
|
||||
|
|
@ -349,22 +393,33 @@ pub fn save_config_changed(
|
|||
)
|
||||
}
|
||||
|
||||
/// Fork the current state into a new segment.
|
||||
pub fn fork(store: &impl Store, state: SegmentStartState<'_>) -> Result<SegmentId, StoreError> {
|
||||
/// Fork the current state into a brand-new Session (no parent lineage).
|
||||
///
|
||||
/// Use this for "start a fresh conversation from this state" — the
|
||||
/// returned segment does not share `session_id` with any prior segment.
|
||||
/// In-Session forks (live auto-fork / past-turn fork) go through
|
||||
/// [`fork_at`] or [`ensure_head_or_fork`] instead.
|
||||
pub fn fork(
|
||||
store: &impl Store,
|
||||
state: SegmentStartState<'_>,
|
||||
) -> Result<(SessionId, SegmentId), StoreError> {
|
||||
let session_id = crate::new_session_id();
|
||||
let fork_id = crate::new_segment_id();
|
||||
let entry = LogEntry::SegmentStart {
|
||||
ts: segment_log::now_millis(),
|
||||
session_id,
|
||||
system_prompt: state.system_prompt.map(String::from),
|
||||
config: state.config.clone(),
|
||||
history: to_logged(state.history),
|
||||
forked_from: None,
|
||||
compacted_from: None,
|
||||
};
|
||||
store.create_segment(fork_id, &[entry])?;
|
||||
Ok(fork_id)
|
||||
store.create_segment(session_id, fork_id, &[entry])?;
|
||||
Ok((session_id, fork_id))
|
||||
}
|
||||
|
||||
/// Fork from a turn boundary in a stored segment log.
|
||||
/// Fork from a turn boundary in a stored segment log, keeping the new
|
||||
/// segment in the same Session as `source_id`.
|
||||
///
|
||||
/// `at_turn_index` is the `turn_count` of the most recent completed
|
||||
/// `TurnEnd` in the source segment that the fork should branch from.
|
||||
|
|
@ -372,10 +427,11 @@ pub fn fork(store: &impl Store, state: SegmentStartState<'_>) -> Result<SegmentI
|
|||
/// after it are not carried into the new segment.
|
||||
pub fn fork_at(
|
||||
store: &impl Store,
|
||||
source_session_id: SessionId,
|
||||
source_id: SegmentId,
|
||||
at_turn_index: usize,
|
||||
) -> Result<SegmentId, StoreError> {
|
||||
let entries = store.read_all(source_id)?;
|
||||
let entries = store.read_all(source_session_id, source_id)?;
|
||||
let cut = if at_turn_index == 0 {
|
||||
// Branch directly after the SegmentStart (or whatever opens the
|
||||
// segment), before any turn completes.
|
||||
|
|
@ -395,6 +451,7 @@ pub fn fork_at(
|
|||
let fork_id = crate::new_segment_id();
|
||||
let entry = LogEntry::SegmentStart {
|
||||
ts: segment_log::now_millis(),
|
||||
session_id: source_session_id,
|
||||
system_prompt: state.system_prompt,
|
||||
config: state.config,
|
||||
history: to_logged(&state.history),
|
||||
|
|
@ -404,7 +461,7 @@ pub fn fork_at(
|
|||
}),
|
||||
compacted_from: None,
|
||||
};
|
||||
store.create_segment(fork_id, &[entry])?;
|
||||
store.create_segment(source_session_id, fork_id, &[entry])?;
|
||||
Ok(fork_id)
|
||||
}
|
||||
|
||||
|
|
@ -415,8 +472,9 @@ pub fn fork_at(
|
|||
/// it needs the same value for an in-memory mirror + broadcast).
|
||||
pub fn append_entry(
|
||||
store: &impl Store,
|
||||
session_id: SessionId,
|
||||
segment_id: SegmentId,
|
||||
entry: LogEntry,
|
||||
) -> Result<(), StoreError> {
|
||||
store.append(segment_id, &entry)
|
||||
store.append(session_id, segment_id, &entry)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -37,13 +37,19 @@ pub enum LogEntry {
|
|||
/// For forked segments, `history` contains the seed state from the parent.
|
||||
SegmentStart {
|
||||
ts: u64,
|
||||
/// Session this segment belongs to. Compaction / fork inherits
|
||||
/// the source segment's session_id; only fresh "new conversation"
|
||||
/// segments mint a new session_id.
|
||||
session_id: crate::SessionId,
|
||||
system_prompt: Option<String>,
|
||||
config: RequestConfig,
|
||||
history: Vec<LoggedItem>,
|
||||
/// Origin: forked from another segment at a specific turn boundary.
|
||||
/// Origin: forked from a sibling segment at a specific turn boundary.
|
||||
/// The referenced segment is guaranteed to share `session_id`.
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
forked_from: Option<SegmentOrigin>,
|
||||
/// Origin: compacted from another segment at a specific turn boundary.
|
||||
/// Origin: compacted from a sibling segment at a specific turn boundary.
|
||||
/// The referenced segment is guaranteed to share `session_id`.
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
compacted_from: Option<SegmentOrigin>,
|
||||
},
|
||||
|
|
@ -190,6 +196,10 @@ pub struct PodScopeSnapshot {
|
|||
/// State collected from log entries.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct RestoredState {
|
||||
/// Session the replayed segment belongs to. Sourced from the
|
||||
/// `SegmentStart` entry; `None` only if the log was empty (in which
|
||||
/// case `entries_count == 0`).
|
||||
pub session_id: Option<crate::SessionId>,
|
||||
pub system_prompt: Option<String>,
|
||||
pub config: RequestConfig,
|
||||
pub history: Vec<Item>,
|
||||
|
|
@ -221,6 +231,7 @@ pub struct RestoredState {
|
|||
/// Replay a sequence of log entries to reconstruct worker state.
|
||||
pub fn collect_state(entries: &[LogEntry]) -> RestoredState {
|
||||
let mut state = RestoredState {
|
||||
session_id: None,
|
||||
system_prompt: None,
|
||||
config: RequestConfig::default(),
|
||||
history: Vec::new(),
|
||||
|
|
@ -238,11 +249,13 @@ pub fn collect_state(entries: &[LogEntry]) -> RestoredState {
|
|||
|
||||
match entry {
|
||||
LogEntry::SegmentStart {
|
||||
session_id,
|
||||
system_prompt,
|
||||
config,
|
||||
history,
|
||||
..
|
||||
} => {
|
||||
state.session_id = Some(*session_id);
|
||||
state.system_prompt = system_prompt.clone();
|
||||
state.config = config.clone();
|
||||
state.history = history.iter().cloned().map(Item::from).collect();
|
||||
|
|
@ -354,6 +367,7 @@ mod tests {
|
|||
fn replay_segment_start_sets_initial_state() {
|
||||
let state = collect_state(&[LogEntry::SegmentStart {
|
||||
ts: 1000,
|
||||
session_id: uuid::Uuid::nil(),
|
||||
system_prompt: Some("You are helpful.".into()),
|
||||
config: RequestConfig::default().with_max_tokens(1024),
|
||||
history: vec![Item::user_message("seed").into()],
|
||||
|
|
@ -371,6 +385,7 @@ mod tests {
|
|||
let state = collect_state(&[
|
||||
LogEntry::SegmentStart {
|
||||
ts: 1000,
|
||||
session_id: uuid::Uuid::nil(),
|
||||
system_prompt: None,
|
||||
config: RequestConfig::default(),
|
||||
history: vec![],
|
||||
|
|
@ -405,6 +420,7 @@ mod tests {
|
|||
let state = collect_state(&[
|
||||
LogEntry::SegmentStart {
|
||||
ts: 1000,
|
||||
session_id: uuid::Uuid::nil(),
|
||||
system_prompt: None,
|
||||
config: RequestConfig::default(),
|
||||
history: vec![],
|
||||
|
|
@ -442,6 +458,7 @@ mod tests {
|
|||
let state = collect_state(&[
|
||||
LogEntry::SegmentStart {
|
||||
ts: 1000,
|
||||
session_id: uuid::Uuid::nil(),
|
||||
system_prompt: None,
|
||||
config: RequestConfig::default(),
|
||||
history: vec![],
|
||||
|
|
@ -461,6 +478,7 @@ mod tests {
|
|||
let state = collect_state(&[
|
||||
LogEntry::SegmentStart {
|
||||
ts: 1000,
|
||||
session_id: uuid::Uuid::nil(),
|
||||
system_prompt: None,
|
||||
config: RequestConfig::default(),
|
||||
history: vec![],
|
||||
|
|
@ -507,6 +525,7 @@ mod tests {
|
|||
let state = collect_state(&[
|
||||
LogEntry::SegmentStart {
|
||||
ts: 1000,
|
||||
session_id: uuid::Uuid::nil(),
|
||||
system_prompt: None,
|
||||
config: RequestConfig::default(),
|
||||
history: vec![],
|
||||
|
|
@ -578,6 +597,7 @@ mod tests {
|
|||
let state = collect_state(&[
|
||||
LogEntry::SegmentStart {
|
||||
ts: 0,
|
||||
session_id: uuid::Uuid::nil(),
|
||||
system_prompt: None,
|
||||
config: RequestConfig::default(),
|
||||
history: vec![],
|
||||
|
|
@ -610,6 +630,7 @@ mod tests {
|
|||
let state = collect_state(&[
|
||||
LogEntry::SegmentStart {
|
||||
ts: 1000,
|
||||
session_id: uuid::Uuid::nil(),
|
||||
system_prompt: None,
|
||||
config: RequestConfig::default(),
|
||||
history: vec![],
|
||||
|
|
@ -693,6 +714,7 @@ mod tests {
|
|||
let state = collect_state(&[
|
||||
LogEntry::SegmentStart {
|
||||
ts: 1,
|
||||
session_id: uuid::Uuid::nil(),
|
||||
system_prompt: None,
|
||||
config: RequestConfig::default(),
|
||||
history: vec![],
|
||||
|
|
|
|||
|
|
@ -1,7 +1,8 @@
|
|||
//! Persistence backend abstraction.
|
||||
//!
|
||||
//! [`Store`] defines the sync interface for reading and writing segment logs.
|
||||
//! Implementations handle the physical storage (filesystem, database, etc.).
|
||||
//! [`Store`] defines the sync interface for reading and writing segment logs
|
||||
//! within a [`Session`](crate::SessionId). Implementations handle the
|
||||
//! physical storage (filesystem, database, etc.).
|
||||
//!
|
||||
//! Sync (rather than async) is intentional: a segment log append is a single
|
||||
//! `< 1 KiB` line on local fs and completes well below a millisecond. Going
|
||||
|
|
@ -10,9 +11,9 @@
|
|||
//! drain task. Keeping the store sync lets the worker callback, Pod commit
|
||||
//! paths, and `PodInterceptor` all share one direct `append_entry` call.
|
||||
|
||||
use crate::SegmentId;
|
||||
use crate::event_trace::TraceEntry;
|
||||
use crate::segment_log::LogEntry;
|
||||
use crate::{SegmentId, SessionId};
|
||||
|
||||
/// Errors from the persistence store.
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
|
|
@ -33,33 +34,67 @@ pub enum StoreError {
|
|||
/// Sync persistence backend for segment logs.
|
||||
///
|
||||
/// All methods take `&self` — implementations should use interior mutability
|
||||
/// (e.g., append-mode file handles) when needed.
|
||||
/// (e.g., append-mode file handles) when needed. Most read/write methods
|
||||
/// take `(SessionId, SegmentId)` so segments can be physically grouped
|
||||
/// per Session on disk (or per session_id in a DB).
|
||||
pub trait Store: Send + Sync {
|
||||
/// Append a single log entry to the segment log.
|
||||
///
|
||||
/// One line per call. The kernel orders concurrent `O_APPEND` writes
|
||||
/// for lines < `PIPE_BUF`, so user-space serialization is unnecessary.
|
||||
fn append(&self, id: SegmentId, entry: &LogEntry) -> Result<(), StoreError>;
|
||||
fn append(
|
||||
&self,
|
||||
session_id: SessionId,
|
||||
segment_id: SegmentId,
|
||||
entry: &LogEntry,
|
||||
) -> Result<(), StoreError>;
|
||||
|
||||
/// Read all log entries for a segment, in order.
|
||||
fn read_all(&self, id: SegmentId) -> Result<Vec<LogEntry>, StoreError>;
|
||||
fn read_all(
|
||||
&self,
|
||||
session_id: SessionId,
|
||||
segment_id: SegmentId,
|
||||
) -> Result<Vec<LogEntry>, StoreError>;
|
||||
|
||||
/// List all segment IDs, most recent first.
|
||||
fn list_segments(&self) -> Result<Vec<SegmentId>, StoreError>;
|
||||
/// List all session IDs, most recent first.
|
||||
fn list_sessions(&self) -> Result<Vec<SessionId>, StoreError>;
|
||||
|
||||
/// Create a new segment with initial entries.
|
||||
fn create_segment(&self, id: SegmentId, entries: &[LogEntry]) -> Result<(), StoreError>;
|
||||
/// List segment IDs belonging to `session_id`, most recent first.
|
||||
fn list_segments(&self, session_id: SessionId) -> Result<Vec<SegmentId>, StoreError>;
|
||||
|
||||
/// Look up which session a given segment belongs to. Returns `None`
|
||||
/// when the segment is not known to any session. Implementations
|
||||
/// may scan storage; intended for shim entry points that receive a
|
||||
/// segment ID without its session ID (e.g. legacy `--session <UUID>`).
|
||||
fn lookup_session_of(&self, segment_id: SegmentId) -> Result<Option<SessionId>, StoreError>;
|
||||
|
||||
/// Create a new segment within `session_id`, with initial entries.
|
||||
fn create_segment(
|
||||
&self,
|
||||
session_id: SessionId,
|
||||
segment_id: SegmentId,
|
||||
entries: &[LogEntry],
|
||||
) -> Result<(), StoreError>;
|
||||
|
||||
/// Check if a segment exists.
|
||||
fn exists(&self, id: SegmentId) -> Result<bool, StoreError>;
|
||||
fn exists(&self, session_id: SessionId, segment_id: SegmentId) -> Result<bool, StoreError>;
|
||||
|
||||
/// Count entries currently stored for a segment.
|
||||
///
|
||||
/// Used by `ensure_head_or_fork` to detect concurrent writers:
|
||||
/// if the on-disk count exceeds the writer's own append tally,
|
||||
/// another process has extended the log.
|
||||
fn read_entry_count(&self, id: SegmentId) -> Result<usize, StoreError>;
|
||||
fn read_entry_count(
|
||||
&self,
|
||||
session_id: SessionId,
|
||||
segment_id: SegmentId,
|
||||
) -> Result<usize, StoreError>;
|
||||
|
||||
/// Append a trace entry to the debug event trace file.
|
||||
fn append_trace(&self, id: SegmentId, entry: &TraceEntry) -> Result<(), StoreError>;
|
||||
fn append_trace(
|
||||
&self,
|
||||
session_id: SessionId,
|
||||
segment_id: SegmentId,
|
||||
entry: &TraceEntry,
|
||||
) -> Result<(), StoreError>;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,16 +1,32 @@
|
|||
use llm_worker::WorkerResult;
|
||||
use llm_worker::llm_client::types::{Item, RequestConfig};
|
||||
use session_store::{FsStore, LogEntry, Store, TraceEntry, collect_state, new_segment_id};
|
||||
use session_store::{
|
||||
FsStore, LogEntry, Store, TraceEntry, collect_state, new_segment_id, new_session_id,
|
||||
};
|
||||
|
||||
fn nil_session_start(ts: u64, session_id: uuid::Uuid) -> LogEntry {
|
||||
LogEntry::SegmentStart {
|
||||
ts,
|
||||
session_id,
|
||||
system_prompt: None,
|
||||
config: RequestConfig::default(),
|
||||
history: vec![],
|
||||
forked_from: None,
|
||||
compacted_from: None,
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn round_trip_write_and_read() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let store = FsStore::new(dir.path()).unwrap();
|
||||
let id = new_segment_id();
|
||||
let sid = new_session_id();
|
||||
let segid = new_segment_id();
|
||||
|
||||
let entries = vec![
|
||||
LogEntry::SegmentStart {
|
||||
ts: 1000,
|
||||
session_id: sid,
|
||||
system_prompt: Some("You are helpful.".into()),
|
||||
config: RequestConfig::default().with_max_tokens(1024),
|
||||
history: vec![],
|
||||
|
|
@ -37,13 +53,14 @@ fn round_trip_write_and_read() {
|
|||
];
|
||||
|
||||
for entry in &entries {
|
||||
store.append(id, entry).unwrap();
|
||||
store.append(sid, segid, entry).unwrap();
|
||||
}
|
||||
|
||||
let read_back = store.read_all(id).unwrap();
|
||||
let read_back = store.read_all(sid, segid).unwrap();
|
||||
assert_eq!(read_back.len(), entries.len());
|
||||
|
||||
let state = collect_state(&read_back);
|
||||
assert_eq!(state.session_id, Some(sid));
|
||||
assert_eq!(state.system_prompt.as_deref(), Some("You are helpful."));
|
||||
assert_eq!(state.config.max_tokens, Some(1024));
|
||||
assert_eq!(state.history.len(), 2);
|
||||
|
|
@ -53,13 +70,15 @@ fn round_trip_write_and_read() {
|
|||
}
|
||||
|
||||
#[test]
|
||||
fn create_session_writes_all_entries() {
|
||||
fn create_segment_writes_all_entries() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let store = FsStore::new(dir.path()).unwrap();
|
||||
let id = new_segment_id();
|
||||
let sid = new_session_id();
|
||||
let segid = new_segment_id();
|
||||
|
||||
let entries = [LogEntry::SegmentStart {
|
||||
ts: 1000,
|
||||
session_id: sid,
|
||||
system_prompt: None,
|
||||
config: RequestConfig::default(),
|
||||
history: vec![
|
||||
|
|
@ -70,74 +89,65 @@ fn create_session_writes_all_entries() {
|
|||
compacted_from: None,
|
||||
}];
|
||||
|
||||
store.create_segment(id, &entries).unwrap();
|
||||
let read_back = store.read_all(id).unwrap();
|
||||
store.create_segment(sid, segid, &entries).unwrap();
|
||||
let read_back = store.read_all(sid, segid).unwrap();
|
||||
assert_eq!(read_back.len(), 1);
|
||||
|
||||
let state = collect_state(&read_back);
|
||||
assert_eq!(state.history.len(), 2);
|
||||
assert_eq!(state.session_id, Some(sid));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn list_sessions_returns_newest_first() {
|
||||
fn list_sessions_and_segments() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let store = FsStore::new(dir.path()).unwrap();
|
||||
|
||||
let id1 = new_segment_id();
|
||||
// Small delay to ensure different UUID v7 timestamps
|
||||
let sid_a = new_session_id();
|
||||
std::thread::sleep(std::time::Duration::from_millis(2));
|
||||
let id2 = new_segment_id();
|
||||
let sid_b = new_session_id();
|
||||
|
||||
let entry = LogEntry::SegmentStart {
|
||||
ts: 1000,
|
||||
system_prompt: None,
|
||||
config: RequestConfig::default(),
|
||||
history: vec![],
|
||||
forked_from: None,
|
||||
compacted_from: None,
|
||||
};
|
||||
let seg_a1 = new_segment_id();
|
||||
let seg_a2 = new_segment_id();
|
||||
let seg_b1 = new_segment_id();
|
||||
|
||||
store.append(id1, &entry).unwrap();
|
||||
store.append(id2, &entry).unwrap();
|
||||
store.append(sid_a, seg_a1, &nil_session_start(1, sid_a)).unwrap();
|
||||
store.append(sid_a, seg_a2, &nil_session_start(2, sid_a)).unwrap();
|
||||
store.append(sid_b, seg_b1, &nil_session_start(3, sid_b)).unwrap();
|
||||
|
||||
let sessions = store.list_segments().unwrap();
|
||||
assert_eq!(sessions.len(), 2);
|
||||
assert_eq!(sessions[0], id2); // newest first
|
||||
assert_eq!(sessions[1], id1);
|
||||
let sessions = store.list_sessions().unwrap();
|
||||
assert_eq!(sessions, vec![sid_b, sid_a]); // newest first
|
||||
|
||||
let segs_a = store.list_segments(sid_a).unwrap();
|
||||
assert!(segs_a.contains(&seg_a1) && segs_a.contains(&seg_a2));
|
||||
assert_eq!(segs_a.len(), 2);
|
||||
|
||||
let segs_b = store.list_segments(sid_b).unwrap();
|
||||
assert_eq!(segs_b, vec![seg_b1]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn exists_returns_correct_state() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let store = FsStore::new(dir.path()).unwrap();
|
||||
let id = new_segment_id();
|
||||
let sid = new_session_id();
|
||||
let segid = new_segment_id();
|
||||
|
||||
assert!(!store.exists(id).unwrap());
|
||||
assert!(!store.exists(sid, segid).unwrap());
|
||||
|
||||
store
|
||||
.append(
|
||||
id,
|
||||
&LogEntry::SegmentStart {
|
||||
ts: 1000,
|
||||
system_prompt: None,
|
||||
config: RequestConfig::default(),
|
||||
history: vec![],
|
||||
forked_from: None,
|
||||
compacted_from: None,
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
store.append(sid, segid, &nil_session_start(1000, sid)).unwrap();
|
||||
|
||||
assert!(store.exists(id).unwrap());
|
||||
assert!(store.exists(sid, segid).unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn not_found_error_for_missing_session() {
|
||||
fn not_found_error_for_missing_segment() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let store = FsStore::new(dir.path()).unwrap();
|
||||
let id = new_segment_id();
|
||||
let sid = new_session_id();
|
||||
let segid = new_segment_id();
|
||||
|
||||
let result = store.read_all(id);
|
||||
let result = store.read_all(sid, segid);
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
|
|
@ -145,21 +155,10 @@ fn not_found_error_for_missing_session() {
|
|||
fn trace_entries_in_separate_file() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let store = FsStore::new(dir.path()).unwrap();
|
||||
let id = new_segment_id();
|
||||
let sid = new_session_id();
|
||||
let segid = new_segment_id();
|
||||
|
||||
store
|
||||
.append(
|
||||
id,
|
||||
&LogEntry::SegmentStart {
|
||||
ts: 1000,
|
||||
system_prompt: None,
|
||||
config: RequestConfig::default(),
|
||||
history: vec![],
|
||||
forked_from: None,
|
||||
compacted_from: None,
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
store.append(sid, segid, &nil_session_start(1000, sid)).unwrap();
|
||||
|
||||
let trace = TraceEntry {
|
||||
ts: 1500,
|
||||
|
|
@ -168,14 +167,17 @@ fn trace_entries_in_separate_file() {
|
|||
llm_worker::llm_client::event::PingEvent { timestamp: None },
|
||||
),
|
||||
};
|
||||
store.append_trace(id, &trace).unwrap();
|
||||
store.append_trace(sid, segid, &trace).unwrap();
|
||||
|
||||
// Log should have 1 entry, unaffected by trace
|
||||
let log = store.read_all(id).unwrap();
|
||||
let log = store.read_all(sid, segid).unwrap();
|
||||
assert_eq!(log.len(), 1);
|
||||
|
||||
// Trace file should exist separately
|
||||
let trace_path = dir.path().join(format!("{id}.trace.jsonl"));
|
||||
let trace_path = dir
|
||||
.path()
|
||||
.join(sid.to_string())
|
||||
.join(format!("{segid}.trace.jsonl"));
|
||||
assert!(trace_path.exists());
|
||||
}
|
||||
|
||||
|
|
@ -183,11 +185,13 @@ fn trace_entries_in_separate_file() {
|
|||
fn read_entry_count_matches_append_tally() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let store = FsStore::new(dir.path()).unwrap();
|
||||
let id = new_segment_id();
|
||||
let sid = new_session_id();
|
||||
let segid = new_segment_id();
|
||||
|
||||
let entries = [
|
||||
LogEntry::SegmentStart {
|
||||
ts: 1000,
|
||||
session_id: sid,
|
||||
system_prompt: None,
|
||||
config: RequestConfig::default(),
|
||||
history: vec![],
|
||||
|
|
@ -201,8 +205,22 @@ fn read_entry_count_matches_append_tally() {
|
|||
];
|
||||
|
||||
for entry in &entries {
|
||||
store.append(id, entry).unwrap();
|
||||
store.append(sid, segid, entry).unwrap();
|
||||
}
|
||||
|
||||
assert_eq!(store.read_entry_count(id).unwrap(), entries.len());
|
||||
assert_eq!(store.read_entry_count(sid, segid).unwrap(), entries.len());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn lookup_session_of_finds_owning_session() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let store = FsStore::new(dir.path()).unwrap();
|
||||
let sid = new_session_id();
|
||||
let segid = new_segment_id();
|
||||
|
||||
assert_eq!(store.lookup_session_of(segid).unwrap(), None);
|
||||
|
||||
store.append(sid, segid, &nil_session_start(1, sid)).unwrap();
|
||||
|
||||
assert_eq!(store.lookup_session_of(segid).unwrap(), Some(sid));
|
||||
}
|
||||
|
|
|
|||
|
|
@ -95,14 +95,20 @@ fn make_store() -> (tempfile::TempDir, FsStore) {
|
|||
async fn run_and_persist(
|
||||
worker: Worker<MockLlmClient>,
|
||||
store: &FsStore,
|
||||
session_id: session_store::SessionId,
|
||||
segment_id: session_store::SegmentId,
|
||||
input: &str,
|
||||
) -> (Worker<MockLlmClient>, llm_worker::WorkerResult) {
|
||||
// Mirror Pod's run-entry contract: log the user input as segments
|
||||
// before the worker pushes its flattened user_message; save_delta
|
||||
// skips the resulting user_message item to avoid double-write.
|
||||
session_store::save_user_input(store, segment_id, vec![protocol::Segment::text(input)])
|
||||
.unwrap();
|
||||
session_store::save_user_input(
|
||||
store,
|
||||
session_id,
|
||||
segment_id,
|
||||
vec![protocol::Segment::text(input)],
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let history_before = worker.history().len();
|
||||
|
||||
|
|
@ -111,13 +117,14 @@ async fn run_and_persist(
|
|||
let worker = locked.unlock();
|
||||
|
||||
let new_items = &worker.history()[history_before..];
|
||||
session_store::save_delta(store, segment_id, new_items).unwrap();
|
||||
session_store::save_turn_end(store, segment_id, worker.turn_count()).unwrap();
|
||||
session_store::save_delta(store, session_id, segment_id, new_items).unwrap();
|
||||
session_store::save_turn_end(store, session_id, segment_id, worker.turn_count()).unwrap();
|
||||
|
||||
match &result {
|
||||
Ok(r) => {
|
||||
session_store::save_run_completed(
|
||||
store,
|
||||
session_id,
|
||||
segment_id,
|
||||
r.clone(),
|
||||
worker.last_run_interrupted(),
|
||||
|
|
@ -127,6 +134,7 @@ async fn run_and_persist(
|
|||
Err(e) => {
|
||||
session_store::save_run_errored(
|
||||
store,
|
||||
session_id,
|
||||
segment_id,
|
||||
e.to_string(),
|
||||
worker.last_run_interrupted(),
|
||||
|
|
@ -149,7 +157,7 @@ async fn session_run_logs_entries() {
|
|||
let client = MockLlmClient::new(simple_text_events());
|
||||
let worker = Worker::new(client);
|
||||
|
||||
let sid = session_store::create_segment(
|
||||
let (sid, segid) = session_store::create_segment(
|
||||
&store,
|
||||
SegmentStartState {
|
||||
system_prompt: worker.get_system_prompt(),
|
||||
|
|
@ -159,10 +167,10 @@ async fn session_run_logs_entries() {
|
|||
)
|
||||
.unwrap();
|
||||
|
||||
let (worker, _) = run_and_persist(worker, &store, sid, "Hi").await;
|
||||
let (worker, _) = run_and_persist(worker, &store, sid, segid, "Hi").await;
|
||||
let _ = &worker;
|
||||
|
||||
let entries = store.read_all(sid).unwrap();
|
||||
let entries = store.read_all(sid, segid).unwrap();
|
||||
|
||||
// SegmentStart, UserInput, AssistantItems, TurnEnd, RunCompleted (at minimum)
|
||||
assert!(
|
||||
|
|
@ -194,7 +202,7 @@ async fn session_restore_round_trip() {
|
|||
let mut worker = Worker::new(client);
|
||||
worker.set_system_prompt("You are helpful.");
|
||||
|
||||
let sid = session_store::create_segment(
|
||||
let (sid, segid) = session_store::create_segment(
|
||||
&store,
|
||||
SegmentStartState {
|
||||
system_prompt: worker.get_system_prompt(),
|
||||
|
|
@ -204,18 +212,26 @@ async fn session_restore_round_trip() {
|
|||
)
|
||||
.unwrap();
|
||||
|
||||
let (worker, _) = run_and_persist(worker, &store, sid, "Hi").await;
|
||||
let (worker, _) = run_and_persist(worker, &store, sid, segid, "Hi").await;
|
||||
|
||||
let original_history_len = worker.history().len();
|
||||
let original_turn_count = worker.turn_count();
|
||||
|
||||
// Restore
|
||||
let state = session_store::restore(&store, sid).unwrap();
|
||||
let state = session_store::restore(&store, sid, segid).unwrap();
|
||||
|
||||
assert_eq!(state.session_id, Some(sid));
|
||||
assert_eq!(state.history.len(), original_history_len);
|
||||
assert_eq!(state.turn_count, original_turn_count);
|
||||
assert_eq!(state.system_prompt.as_deref(), Some("You are helpful."));
|
||||
assert_eq!(state.entries_count, store.read_entry_count(sid).unwrap());
|
||||
assert_eq!(
|
||||
state.entries_count,
|
||||
store.read_entry_count(sid, segid).unwrap()
|
||||
);
|
||||
|
||||
// Shim by segment ID alone.
|
||||
let by_segment = session_store::restore_by_segment(&store, segid).unwrap();
|
||||
assert_eq!(by_segment.session_id, Some(sid));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
|
@ -225,7 +241,7 @@ async fn session_run_with_tool_call() {
|
|||
let mut worker = Worker::new(client);
|
||||
worker.register_tool(weather_tool_definition());
|
||||
|
||||
let sid = session_store::create_segment(
|
||||
let (sid, segid) = session_store::create_segment(
|
||||
&store,
|
||||
SegmentStartState {
|
||||
system_prompt: worker.get_system_prompt(),
|
||||
|
|
@ -235,9 +251,9 @@ async fn session_run_with_tool_call() {
|
|||
)
|
||||
.unwrap();
|
||||
|
||||
let (_worker, _) = run_and_persist(worker, &store, sid, "What's the weather?").await;
|
||||
let (_worker, _) = run_and_persist(worker, &store, sid, segid, "What's the weather?").await;
|
||||
|
||||
let entries = store.read_all(sid).unwrap();
|
||||
let entries = store.read_all(sid, segid).unwrap();
|
||||
|
||||
let has_tool_results = entries
|
||||
.iter()
|
||||
|
|
@ -260,7 +276,7 @@ async fn session_resume_after_pause() {
|
|||
worker.register_tool(weather_tool_definition());
|
||||
worker.set_interceptor(PausePolicy);
|
||||
|
||||
let sid = session_store::create_segment(
|
||||
let (sid, segid) = session_store::create_segment(
|
||||
&store,
|
||||
SegmentStartState {
|
||||
system_prompt: worker.get_system_prompt(),
|
||||
|
|
@ -270,11 +286,11 @@ async fn session_resume_after_pause() {
|
|||
)
|
||||
.unwrap();
|
||||
|
||||
let (_worker, result) = run_and_persist(worker, &store, sid, "Weather?").await;
|
||||
let (_worker, result) = run_and_persist(worker, &store, sid, segid, "Weather?").await;
|
||||
assert!(matches!(result, llm_worker::WorkerResult::Paused));
|
||||
|
||||
// Check RunCompleted is Paused
|
||||
let entries = store.read_all(sid).unwrap();
|
||||
let entries = store.read_all(sid, segid).unwrap();
|
||||
let has_paused = entries.iter().any(|e| {
|
||||
matches!(
|
||||
e,
|
||||
|
|
@ -287,18 +303,18 @@ async fn session_resume_after_pause() {
|
|||
assert!(has_paused, "should have Paused outcome");
|
||||
|
||||
// Restore state and verify
|
||||
let state = session_store::restore(&store, sid).unwrap();
|
||||
let state = session_store::restore(&store, sid, segid).unwrap();
|
||||
assert!(state.last_run_interrupted);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn session_fork_preserves_state() {
|
||||
async fn session_fork_creates_new_session() {
|
||||
let (_dir, store) = make_store();
|
||||
let client = MockLlmClient::new(simple_text_events());
|
||||
let mut worker = Worker::new(client);
|
||||
worker.set_system_prompt("System prompt");
|
||||
|
||||
let sid = session_store::create_segment(
|
||||
let (sid, segid) = session_store::create_segment(
|
||||
&store,
|
||||
SegmentStartState {
|
||||
system_prompt: worker.get_system_prompt(),
|
||||
|
|
@ -308,10 +324,10 @@ async fn session_fork_preserves_state() {
|
|||
)
|
||||
.unwrap();
|
||||
|
||||
let (worker, _) = run_and_persist(worker, &store, sid, "Hello").await;
|
||||
let (worker, _) = run_and_persist(worker, &store, sid, segid, "Hello").await;
|
||||
|
||||
let original_history_len = worker.history().len();
|
||||
let fork_id = session_store::fork(
|
||||
let (fork_sid, fork_segid) = session_store::fork(
|
||||
&store,
|
||||
SegmentStartState {
|
||||
system_prompt: worker.get_system_prompt(),
|
||||
|
|
@ -320,24 +336,26 @@ async fn session_fork_preserves_state() {
|
|||
},
|
||||
)
|
||||
.unwrap();
|
||||
assert_ne!(fork_sid, sid, "`fork` mints a fresh Session");
|
||||
|
||||
// Fork should have a SegmentStart with the current history
|
||||
let fork_entries = store.read_all(fork_id).unwrap();
|
||||
let fork_entries = store.read_all(fork_sid, fork_segid).unwrap();
|
||||
assert_eq!(fork_entries.len(), 1);
|
||||
assert!(matches!(&fork_entries[0], LogEntry::SegmentStart { .. }));
|
||||
|
||||
let fork_state = collect_state(&fork_entries);
|
||||
assert_eq!(fork_state.session_id, Some(fork_sid));
|
||||
assert_eq!(fork_state.history.len(), original_history_len);
|
||||
assert_eq!(fork_state.system_prompt.as_deref(), Some("System prompt"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn session_fork_at_truncates() {
|
||||
async fn session_fork_at_truncates_within_session() {
|
||||
let (_dir, store) = make_store();
|
||||
let client = MockLlmClient::new(simple_text_events());
|
||||
let worker = Worker::new(client);
|
||||
|
||||
let sid = session_store::create_segment(
|
||||
let (sid, segid) = session_store::create_segment(
|
||||
&store,
|
||||
SegmentStartState {
|
||||
system_prompt: worker.get_system_prompt(),
|
||||
|
|
@ -347,26 +365,33 @@ async fn session_fork_at_truncates() {
|
|||
)
|
||||
.unwrap();
|
||||
|
||||
let (worker, _) = run_and_persist(worker, &store, sid, "Hello").await;
|
||||
let (worker, _) = run_and_persist(worker, &store, sid, segid, "Hello").await;
|
||||
|
||||
let all_entries = store.read_all(sid).unwrap();
|
||||
let all_entries = store.read_all(sid, segid).unwrap();
|
||||
assert!(all_entries.len() > 2);
|
||||
|
||||
// Fork at turn 1 (one completed turn).
|
||||
let fork_id = session_store::fork_at(&store, sid, worker.turn_count()).unwrap();
|
||||
// Fork at turn 1 (one completed turn). Stays in same Session.
|
||||
let fork_segid = session_store::fork_at(&store, sid, segid, worker.turn_count()).unwrap();
|
||||
|
||||
let fork_entries = store.read_all(fork_id).unwrap();
|
||||
let fork_entries = store.read_all(sid, fork_segid).unwrap();
|
||||
assert_eq!(fork_entries.len(), 1); // Just the new SegmentStart
|
||||
|
||||
let fork_state = collect_state(&fork_entries);
|
||||
assert_eq!(fork_state.session_id, Some(sid), "fork_at inherits Session");
|
||||
|
||||
// History at fork point should match history right after the TurnEnd in
|
||||
// the source session.
|
||||
// the source segment.
|
||||
let turn_end_pos = all_entries
|
||||
.iter()
|
||||
.position(|e| matches!(e, LogEntry::TurnEnd { turn_count, .. } if *turn_count == worker.turn_count()))
|
||||
.expect("source session has the matching TurnEnd");
|
||||
.expect("source segment has the matching TurnEnd");
|
||||
let source_state_at_fork = collect_state(&all_entries[..=turn_end_pos]);
|
||||
assert_eq!(fork_state.history.len(), source_state_at_fork.history.len());
|
||||
|
||||
// list_segments should show both source and fork in the same Session.
|
||||
let segs = store.list_segments(sid).unwrap();
|
||||
assert!(segs.contains(&segid));
|
||||
assert!(segs.contains(&fork_segid));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
|
@ -375,7 +400,7 @@ async fn session_config_changed_logged() {
|
|||
let client = MockLlmClient::new(vec![]);
|
||||
let mut worker = Worker::new(client);
|
||||
|
||||
let sid = session_store::create_segment(
|
||||
let (sid, segid) = session_store::create_segment(
|
||||
&store,
|
||||
SegmentStartState {
|
||||
system_prompt: worker.get_system_prompt(),
|
||||
|
|
@ -388,9 +413,9 @@ async fn session_config_changed_logged() {
|
|||
// Modify config and log it
|
||||
let new_config = RequestConfig::default().with_temperature(0.7);
|
||||
worker.set_request_config(new_config.clone());
|
||||
session_store::save_config_changed(&store, sid, &new_config).unwrap();
|
||||
session_store::save_config_changed(&store, sid, segid, &new_config).unwrap();
|
||||
|
||||
let entries = store.read_all(sid).unwrap();
|
||||
let entries = store.read_all(sid, segid).unwrap();
|
||||
let has_config_changed = entries.iter().any(|e| {
|
||||
matches!(
|
||||
e,
|
||||
|
|
@ -404,11 +429,11 @@ async fn session_config_changed_logged() {
|
|||
async fn session_auto_forks_on_conflict() {
|
||||
let (_dir, store) = make_store();
|
||||
|
||||
// Create a session
|
||||
// Create a segment
|
||||
let client_a = MockLlmClient::new(simple_text_events());
|
||||
let worker_a = Worker::new(client_a);
|
||||
|
||||
let original_sid = session_store::create_segment(
|
||||
let (sid, original_segid) = session_store::create_segment(
|
||||
&store,
|
||||
SegmentStartState {
|
||||
system_prompt: worker_a.get_system_prompt(),
|
||||
|
|
@ -417,20 +442,21 @@ async fn session_auto_forks_on_conflict() {
|
|||
},
|
||||
)
|
||||
.unwrap();
|
||||
let mut segment_id = original_sid;
|
||||
let mut segment_id = original_segid;
|
||||
// Writer tracked: just the SegmentStart we wrote.
|
||||
let mut entries_written: usize = 1;
|
||||
|
||||
// Simulate another Pod writing to the same session behind our back.
|
||||
// Simulate another Pod writing to the same segment behind our back.
|
||||
let extra_entry = LogEntry::UserInput {
|
||||
ts: 9999,
|
||||
segments: vec![protocol::Segment::text("Interloper")],
|
||||
};
|
||||
store.append(original_sid, &extra_entry).unwrap();
|
||||
store.append(sid, original_segid, &extra_entry).unwrap();
|
||||
|
||||
// Now the on-disk count exceeds our tally — ensure_head_or_fork should auto-fork.
|
||||
session_store::ensure_head_or_fork(
|
||||
&store,
|
||||
sid,
|
||||
&mut segment_id,
|
||||
&mut entries_written,
|
||||
SegmentStartState {
|
||||
|
|
@ -441,15 +467,17 @@ async fn session_auto_forks_on_conflict() {
|
|||
)
|
||||
.unwrap();
|
||||
|
||||
// segment_id should now be different
|
||||
assert_ne!(segment_id, original_sid);
|
||||
// segment_id should now be different but live in the same Session.
|
||||
assert_ne!(segment_id, original_segid);
|
||||
|
||||
// The fork session should exist and have entries
|
||||
let fork_entries = store.read_all(segment_id).unwrap();
|
||||
// The fork segment should exist and have entries
|
||||
let fork_entries = store.read_all(sid, segment_id).unwrap();
|
||||
assert!(!fork_entries.is_empty());
|
||||
let fork_state = collect_state(&fork_entries);
|
||||
assert_eq!(fork_state.session_id, Some(sid), "auto-fork inherits Session");
|
||||
|
||||
// Original session should still have the interloper entry
|
||||
let original_entries = store.read_all(original_sid).unwrap();
|
||||
// Original segment should still have the interloper entry
|
||||
let original_entries = store.read_all(sid, original_segid).unwrap();
|
||||
let has_interloper = original_entries
|
||||
.iter()
|
||||
.any(|e| matches!(e, LogEntry::UserInput { .. }));
|
||||
|
|
|
|||
|
|
@ -4,12 +4,12 @@
|
|||
//! `llm-worker` `Tool` infrastructure. Filesystem access is mediated by
|
||||
//! two orthogonal concerns:
|
||||
//!
|
||||
//! - [`ScopedFs`] — pod-lifetime, expresses the write-block boundary for
|
||||
//! the current scope. Derived from the manifest and shareable across
|
||||
//! sessions.
|
||||
//! - [`Tracker`] — session-lifetime, enforces the "read before edit"
|
||||
//! - [`ScopedFs`] — Pod-process lifetime, expresses the write-block
|
||||
//! boundary for the current scope. Derived from the manifest; not
|
||||
//! persisted across Pod restart.
|
||||
//! - [`Tracker`] — Pod-process lifetime, enforces the "read before edit"
|
||||
//! policy via content hashes and tracks the recency of touched files.
|
||||
//! Recreated fresh per session.
|
||||
//! Recreated fresh on each Pod start (including resume).
|
||||
//!
|
||||
//! The Pod layer owns both instances and passes them to
|
||||
//! [`builtin_tools`] when registering tools on a `Worker`.
|
||||
|
|
@ -42,11 +42,11 @@ pub use tracker::Tracker;
|
|||
pub use write::write_tool;
|
||||
|
||||
/// Register all builtin tools, wiring them to a shared `ScopedFs`
|
||||
/// (pod-lifetime) and `Tracker` (session-lifetime).
|
||||
/// (Pod-process lifetime) and `Tracker` (Pod-process lifetime).
|
||||
///
|
||||
/// All returned factories share the same tracker instance so that
|
||||
/// `Read` / `Write` / `Edit` see a consistent history across tool
|
||||
/// invocations within a single session.
|
||||
/// invocations within a single Pod run.
|
||||
///
|
||||
/// `bash_output_dir` is where the Bash tool spills long outputs. The
|
||||
/// caller is responsible for adding that path to the readable scope
|
||||
|
|
|
|||
|
|
@ -1,8 +1,9 @@
|
|||
//! Pod-lifetime TaskStore and builtin task tools.
|
||||
//! Session-lifetime TaskStore and builtin task tools.
|
||||
//!
|
||||
//! The store is Pod/session-lifetime state shared by the four Task* tools. It
|
||||
//! is reconstructed on resume by replaying TaskCreate / TaskUpdate tool-call
|
||||
//! arguments from persisted history.
|
||||
//! The store survives compaction and Pod restart — it is reconstructed
|
||||
//! on resume by replaying TaskCreate / TaskUpdate tool-call arguments
|
||||
//! from persisted history, so its effective lifetime is the
|
||||
//! [`session_store::SessionId`] (the conversation), not the Pod process.
|
||||
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
|
|
|
|||
|
|
@ -18,11 +18,13 @@
|
|||
//!
|
||||
//! # Lifetime
|
||||
//!
|
||||
//! A `Tracker` is **session-scoped**: the Pod layer creates a fresh
|
||||
//! instance at the start of each agent session and discards it when the
|
||||
//! session ends. The `ScopedFs` write boundary, by contrast, is
|
||||
//! pod-lifetime (derived from the manifest). The two are orthogonal and
|
||||
//! the Pod wires them together when registering builtin tools.
|
||||
//! A `Tracker` is **Pod-process scoped**: the Pod layer creates a fresh
|
||||
//! instance at the start of each Pod run (including resume) and discards
|
||||
//! it when the process exits — it is not persisted, so a resumed
|
||||
//! conversation starts with an empty read/edit history. The `ScopedFs`
|
||||
//! write boundary is likewise Pod-process scoped (derived from the
|
||||
//! manifest). The two are orthogonal and the Pod wires them together
|
||||
//! when registering builtin tools.
|
||||
//!
|
||||
//! ```no_run
|
||||
//! # use std::path::PathBuf;
|
||||
|
|
|
|||
|
|
@ -1447,6 +1447,7 @@ mod completion_flow_tests {
|
|||
let mut app = App::new("test".into());
|
||||
let session_start = session_store::LogEntry::SegmentStart {
|
||||
ts: 1,
|
||||
session_id: uuid::Uuid::nil(),
|
||||
system_prompt: None,
|
||||
config: Default::default(),
|
||||
history: vec![session_store::LoggedItem::from(
|
||||
|
|
|
|||
|
|
@ -209,11 +209,11 @@ async fn run_resume() -> Result<(), Box<dyn std::error::Error>> {
|
|||
// Phase 1: pick a session in its own inline viewport, dropping the
|
||||
// viewport before the name dialog opens so each phase gets fresh
|
||||
// vertical room.
|
||||
let id = match picker::run().await? {
|
||||
PickerOutcome::Picked(id) => id,
|
||||
let leaf_segment_id = match picker::run().await? {
|
||||
PickerOutcome::Picked { segment_id } => segment_id,
|
||||
PickerOutcome::Cancelled => return Ok(()),
|
||||
};
|
||||
run_spawn(Some(id)).await
|
||||
run_spawn(Some(leaf_segment_id)).await
|
||||
}
|
||||
|
||||
async fn run_spawn(resume_from: Option<SegmentId>) -> Result<(), Box<dyn std::error::Error>> {
|
||||
|
|
|
|||
|
|
@ -20,7 +20,9 @@ use ratatui::style::{Color, Modifier, Style};
|
|||
use ratatui::text::{Line, Span};
|
||||
use ratatui::widgets::Paragraph;
|
||||
use ratatui::{Frame, TerminalOptions, Viewport};
|
||||
use session_store::{FsStore, LogEntry, LoggedContentPart, LoggedItem, SegmentId, Store};
|
||||
use session_store::{
|
||||
FsStore, LogEntry, LoggedContentPart, LoggedItem, SegmentId, SessionId, Store,
|
||||
};
|
||||
|
||||
const MAX_ROWS: usize = 10;
|
||||
const VIEWPORT_LINES: u16 = MAX_ROWS as u16 + 4;
|
||||
|
|
@ -60,38 +62,54 @@ impl From<session_store::StoreError> for PickerError {
|
|||
}
|
||||
|
||||
pub enum PickerOutcome {
|
||||
Picked(SegmentId),
|
||||
/// User picked a session; resume at its leaf segment. The pod-cli
|
||||
/// rehydrates `session_id` via `Store::lookup_session_of` so we only
|
||||
/// need to surface the segment here.
|
||||
Picked { segment_id: SegmentId },
|
||||
Cancelled,
|
||||
}
|
||||
|
||||
/// One row in the picker view. Rendered from the session log so the
|
||||
/// user can recognise their session at a glance without parsing UUIDs.
|
||||
/// One row in the picker view. Rendered from the leaf segment of a
|
||||
/// Session so the user can recognise their conversation at a glance
|
||||
/// without parsing UUIDs.
|
||||
struct Row {
|
||||
id: SegmentId,
|
||||
session_id: SessionId,
|
||||
leaf_segment_id: SegmentId,
|
||||
/// Last user / assistant snippet, or a `[corrupt]` placeholder.
|
||||
preview: String,
|
||||
/// `Some(pod_name)` when a live Pod currently holds an allocation
|
||||
/// for this session in `pods.json`. Picking such a row launches
|
||||
/// `pod --session <UUID>` which will fail with `SegmentConflict` —
|
||||
/// the badge warns the user up-front.
|
||||
/// for this session's leaf segment in `pods.json`. Picking such a
|
||||
/// row launches `pod --session <UUID>` which will fail with
|
||||
/// `SegmentConflict` — the badge warns the user up-front.
|
||||
live_pod: Option<String>,
|
||||
}
|
||||
|
||||
pub async fn run() -> Result<PickerOutcome, PickerError> {
|
||||
let store = open_default_store()?;
|
||||
let ids = store.list_segments()?;
|
||||
if ids.is_empty() {
|
||||
let sessions = store.list_sessions()?;
|
||||
if sessions.is_empty() {
|
||||
return Err(PickerError::NoSessions);
|
||||
}
|
||||
let mut rows: Vec<Row> = Vec::with_capacity(MAX_ROWS);
|
||||
for id in ids.into_iter().take(MAX_ROWS) {
|
||||
let preview = build_preview(&store, id);
|
||||
for session_id in sessions.into_iter().take(MAX_ROWS) {
|
||||
let Some(leaf_segment_id) = store
|
||||
.list_segments(session_id)?
|
||||
.into_iter()
|
||||
.next()
|
||||
else {
|
||||
continue;
|
||||
};
|
||||
let preview = build_preview(&store, session_id, leaf_segment_id);
|
||||
// Best-effort live check. A pods.json I/O hiccup downgrades
|
||||
// the row to "no badge" rather than killing the picker — the
|
||||
// user still gets to see the listing.
|
||||
let live_pod = lookup_segment(id).ok().flatten().map(|info| info.pod_name);
|
||||
let live_pod = lookup_segment(leaf_segment_id)
|
||||
.ok()
|
||||
.flatten()
|
||||
.map(|info| info.pod_name);
|
||||
rows.push(Row {
|
||||
id,
|
||||
session_id,
|
||||
leaf_segment_id,
|
||||
preview,
|
||||
live_pod,
|
||||
});
|
||||
|
|
@ -115,7 +133,9 @@ pub async fn run() -> Result<PickerOutcome, PickerError> {
|
|||
}
|
||||
Some(Action::Submit) => {
|
||||
close_viewport(&mut terminal)?;
|
||||
return Ok(PickerOutcome::Picked(rows[selected].id));
|
||||
return Ok(PickerOutcome::Picked {
|
||||
segment_id: rows[selected].leaf_segment_id,
|
||||
});
|
||||
}
|
||||
Some(Action::Cancel) => {
|
||||
close_viewport(&mut terminal)?;
|
||||
|
|
@ -158,8 +178,8 @@ fn open_default_store() -> Result<FsStore, PickerError> {
|
|||
Ok(FsStore::new(&dir)?)
|
||||
}
|
||||
|
||||
fn build_preview(store: &FsStore, id: SegmentId) -> String {
|
||||
match store.read_all(id) {
|
||||
fn build_preview(store: &FsStore, session_id: SessionId, segment_id: SegmentId) -> String {
|
||||
match store.read_all(session_id, segment_id) {
|
||||
Ok(entries) => last_message_preview(&entries).unwrap_or_else(|| "[empty]".to_string()),
|
||||
Err(_) => "[corrupt]".to_string(),
|
||||
}
|
||||
|
|
@ -177,6 +197,11 @@ fn last_message_preview(entries: &[LogEntry]) -> Option<String> {
|
|||
return Some(format!("user: {}", trim_one_line(&text, 60)));
|
||||
}
|
||||
}
|
||||
LogEntry::AssistantItem { item, .. } => {
|
||||
if let Some(text) = first_text_logged(item) {
|
||||
return Some(format!("assistant: {}", trim_one_line(&text, 60)));
|
||||
}
|
||||
}
|
||||
LogEntry::AssistantItems { items, .. } => {
|
||||
if let Some(text) = items.iter().find_map(first_text_logged) {
|
||||
return Some(format!("assistant: {}", trim_one_line(&text, 60)));
|
||||
|
|
@ -300,7 +325,7 @@ fn row_line(row: &Row, selected: bool) -> Line<'_> {
|
|||
};
|
||||
let mut spans = vec![
|
||||
Span::raw(marker),
|
||||
Span::styled(short_segment(row.id), id_style),
|
||||
Span::styled(short_segment(row.session_id), id_style),
|
||||
Span::raw(" "),
|
||||
];
|
||||
if let Some(ref pod_name) = row.live_pod {
|
||||
|
|
@ -313,7 +338,7 @@ fn row_line(row: &Row, selected: bool) -> Line<'_> {
|
|||
Line::from(spans)
|
||||
}
|
||||
|
||||
fn short_segment(id: SegmentId) -> String {
|
||||
fn short_segment(id: SessionId) -> String {
|
||||
let s = id.to_string();
|
||||
s.chars().take(8).collect()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -329,7 +329,7 @@ async fn load_resume_scope(segment_id: SegmentId) -> Result<ScopeConfig, SpawnEr
|
|||
)
|
||||
})?;
|
||||
let store = session_store::FsStore::new(&store_dir)?;
|
||||
let state = session_store::restore(&store, segment_id)?;
|
||||
let state = session_store::restore_by_segment(&store, segment_id)?;
|
||||
let snapshot = state
|
||||
.pod_scope
|
||||
.ok_or(SpawnError::MissingResumeScope { segment_id })?;
|
||||
|
|
|
|||
|
|
@ -1,48 +0,0 @@
|
|||
# session-store: Session(Segment 群の grouping)導入
|
||||
|
||||
## 背景
|
||||
|
||||
`segment-rename` で物理 append-only 単位を `Segment` に揃えた。続けて、ユーザー視点の「同じ会話の家系」を表す **`Session`** を導入する。
|
||||
|
||||
`Session` は fork tree 全体を 1 つにまとめる grouping で、compaction / fork は同 Session 内に新 Segment を生やす操作になる。これにより:
|
||||
|
||||
- Session 数が fork で爆発せず、`WHERE session_id = ?` だけで fork tree 全体が取れる。
|
||||
- resume の指定は `(SessionId, SegmentId)` の組で行える。
|
||||
- pod-persistent-state 側で Pod ↔ Session の対応関係を扱いやすくなる。
|
||||
|
||||
## 要件
|
||||
|
||||
- 新 `SessionId` 型を `session-store` に追加。Segment は `parent_session_id` を持つ。
|
||||
- Segment 生成パスでの session_id 決定:
|
||||
- new session: 新 `SessionId` を発行
|
||||
- compaction: 元 Segment と同 `SessionId` を継承
|
||||
- fork (live auto / 過去 fork いずれも): 元 Segment と同 `SessionId` を継承
|
||||
- `SessionOrigin` を以下のいずれかに整理:
|
||||
- `compacted_from { segment_id, at_turn_index }`
|
||||
- `forked_from { segment_id, at_turn_index }`
|
||||
- どちらも同 Session 内 segment への参照であることを型レベルで保証。
|
||||
- restore API を `(SessionId, SegmentId)` を取る形に。`SegmentId` のみを取る既存経路は内部で `SessionId` を解決する shim を一段噛ませる。
|
||||
- FsStore layout に Session 単位の index を追加(Session → Segment 列挙が `WHERE session_id = ?` 相当で引けること)。形式は `<data_dir>/sessions/<session_id>/<segment_id>.jsonl` または別ファイル index、実装時に決定。
|
||||
- Session 内の leaf Segment 列挙 API を提供(pod-name resume 等から使う)。
|
||||
|
||||
## 完了条件
|
||||
|
||||
- `SessionId` 型と Session 単位 metadata の永続表現が決まり、FsStore で読み書きできる。
|
||||
- compaction / fork が同 Session 内 Segment 増分として記録される。
|
||||
- `(SessionId, SegmentId)` での restore が動作し、leaf 以外を指定した read-only restore も実装上は可能。
|
||||
- 既存 session を Session 単位に grouping する migration 戦略が決まっている(プロジェクト方針として後方互換は作らないため、既存 sessions ディレクトリは破棄して良いかどうかをここで明示)。
|
||||
- `cargo check --workspace` および全テストが通る。
|
||||
|
||||
## 範囲外
|
||||
|
||||
- live auto-fork の marker 形式(別チケット `live-fork-marker`)。
|
||||
- Pod 単位 metadata(Phase 2 一連のチケット)。
|
||||
- TUI からの Session/Segment 選択 UI。
|
||||
- DB backend 実装。
|
||||
|
||||
## 関連
|
||||
|
||||
- `tickets/segment-rename.md` (前提)
|
||||
- `tickets/live-fork-marker.md`
|
||||
- `tickets/pod-state-backend.md`
|
||||
- `crates/session-store/`
|
||||
Loading…
Reference in New Issue
Block a user