update: entry hash chain と session_head mutex を撤廃
- HashedEntry / EntryHash / compute_hash / build_chain 撤去、JSONL は 1 行 1 LogEntry - SessionOrigin.at_hash → at_turn_index (TurnEnd 由来) に置換 - Pod 側 SessionHead mutex を ArcSwap<SessionId> + AtomicUsize の SessionState に置換 - ensure_head_or_fork は store の entry count と writer の append tally で判定 - session-store から sha2 / hex 依存、pod から parking_lot 依存を削除
This commit is contained in:
parent
3d091acacd
commit
90e83bf2ae
4
Cargo.lock
generated
4
Cargo.lock
generated
|
|
@ -2148,6 +2148,7 @@ checksum = "b4596b6d070b27117e987119b4dac604f3c58cfb0b191112e24771b2faeac1a6"
|
|||
name = "pod"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"arc-swap",
|
||||
"async-trait",
|
||||
"chrono",
|
||||
"clap",
|
||||
|
|
@ -2160,7 +2161,6 @@ dependencies = [
|
|||
"manifest",
|
||||
"memory",
|
||||
"minijinja",
|
||||
"parking_lot",
|
||||
"pod-registry",
|
||||
"protocol",
|
||||
"provider",
|
||||
|
|
@ -2977,12 +2977,10 @@ version = "0.1.0"
|
|||
dependencies = [
|
||||
"async-trait",
|
||||
"futures",
|
||||
"hex",
|
||||
"llm-worker",
|
||||
"protocol",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"sha2 0.11.0",
|
||||
"tempfile",
|
||||
"thiserror 2.0.18",
|
||||
"tokio",
|
||||
|
|
|
|||
|
|
@ -31,7 +31,6 @@ llm-worker-macros = { path = "crates/llm-worker-macros", version = "0.2" }
|
|||
manifest = { path = "crates/manifest" }
|
||||
lint-common = { path = "crates/lint-common" }
|
||||
memory = { path = "crates/memory" }
|
||||
workflow = { path = "crates/workflow" }
|
||||
pod-registry = { path = "crates/pod-registry" }
|
||||
protocol = { path = "crates/protocol" }
|
||||
provider = { path = "crates/provider" }
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ use super::EXTRACT_DOMAIN;
|
|||
/// として 1 回ずつ書かれ、最新の 1 件が現行 pointer として有効になる。
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub struct ExtractPointerPayload {
|
||||
/// 直近 extract が処理した最後の session-store HashedEntry の index。
|
||||
/// 直近 extract が処理した最後の session-store LogEntry の index。
|
||||
/// 次回の `source.range.start` はこの値 + 1。
|
||||
pub processed_through_entry: usize,
|
||||
/// 直近 extract 時点の `history.len()`。次回入力は
|
||||
|
|
|
|||
|
|
@ -30,7 +30,7 @@ memory = { workspace = true }
|
|||
workflow-crate = { package = "workflow", path = "../workflow" }
|
||||
uuid = { workspace = true, features = ["v7"] }
|
||||
session-metrics = { workspace = true }
|
||||
parking_lot = "0.12.5"
|
||||
arc-swap = "1.9.1"
|
||||
|
||||
[dev-dependencies]
|
||||
dotenv = "0.15.0"
|
||||
|
|
|
|||
|
|
@ -1,16 +1,15 @@
|
|||
use std::path::{Path, PathBuf};
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use arc_swap::ArcSwap;
|
||||
use llm_worker::Item;
|
||||
use llm_worker::llm_client::RequestConfig;
|
||||
use llm_worker::llm_client::client::LlmClient;
|
||||
use llm_worker::state::Mutable;
|
||||
use llm_worker::{ToolOutputLimits, UsageRecord, Worker, WorkerError, WorkerResult};
|
||||
use parking_lot::Mutex as SyncMutex;
|
||||
use session_store::{
|
||||
EntryHash, HashedEntry, LogEntry, PodScopeSnapshot, SessionId, Store, StoreError, SystemItem,
|
||||
session_log, to_logged,
|
||||
LogEntry, PodScopeSnapshot, SessionId, Store, StoreError, SystemItem, session_log, to_logged,
|
||||
};
|
||||
use tracing::{info, warn};
|
||||
|
||||
|
|
@ -43,22 +42,59 @@ use protocol::{AlertLevel, AlertSource, Event, Segment};
|
|||
use tokio::sync::broadcast;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
pub struct SessionHead {
|
||||
pub session_id: SessionId,
|
||||
pub head_hash: Option<EntryHash>,
|
||||
/// Lock-free shared session pointer.
|
||||
///
|
||||
/// Holds the current `(session_id, entries_written)` pair so that the
|
||||
/// Pod and every `LogWriterHandle` clone see a consistent view through
|
||||
/// `Arc`-shared lock-free reads. `session_id` is wrapped in `ArcSwap`
|
||||
/// so fork (a rare, run-start-only event) can atomically swap it
|
||||
/// without taking a mutex on the append hot path. `entries_written` is
|
||||
/// an `AtomicUsize` bumped on every successful append; the writer's
|
||||
/// tally is compared against the store's on-disk count to detect
|
||||
/// concurrent writers in `ensure_session_head`.
|
||||
pub struct SessionState {
|
||||
session_id: ArcSwap<SessionId>,
|
||||
entries_written: AtomicUsize,
|
||||
}
|
||||
|
||||
/// Cheap-cloneable bundle of (store + session-head lock + sink) handed
|
||||
/// to the worker callback and the interceptor so they can commit
|
||||
/// `LogEntry` values directly without going through an mpsc ferry.
|
||||
///
|
||||
/// All three fields are `Clone` (the latter two as `Arc` clones, the
|
||||
/// store per its `Clone` impl) so the handle itself is a flat triple of
|
||||
/// cheap copies.
|
||||
impl SessionState {
|
||||
pub fn new(session_id: SessionId, entries_written: usize) -> Arc<Self> {
|
||||
Arc::new(Self {
|
||||
session_id: ArcSwap::from_pointee(session_id),
|
||||
entries_written: AtomicUsize::new(entries_written),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn session_id(&self) -> SessionId {
|
||||
**self.session_id.load()
|
||||
}
|
||||
|
||||
pub fn set_session_id(&self, id: SessionId) {
|
||||
self.session_id.store(Arc::new(id));
|
||||
}
|
||||
|
||||
pub fn entries_written(&self) -> usize {
|
||||
self.entries_written.load(Ordering::Acquire)
|
||||
}
|
||||
|
||||
pub fn set_entries_written(&self, n: usize) {
|
||||
self.entries_written.store(n, Ordering::Release);
|
||||
}
|
||||
|
||||
fn increment_entries(&self) {
|
||||
self.entries_written.fetch_add(1, Ordering::Release);
|
||||
}
|
||||
}
|
||||
|
||||
/// Cheap-cloneable bundle of (store + shared session pointer + sink)
|
||||
/// handed to the worker callback and the interceptor so they can
|
||||
/// commit `LogEntry` values directly without going through an mpsc
|
||||
/// ferry. All fields are `Clone` (`store` per its `Clone` impl,
|
||||
/// `state` and `sink` as `Arc` clones).
|
||||
#[derive(Clone)]
|
||||
pub struct LogWriterHandle<St: Clone> {
|
||||
pub store: St,
|
||||
pub session_head: Arc<SyncMutex<SessionHead>>,
|
||||
pub state: Arc<SessionState>,
|
||||
pub sink: SessionLogSink,
|
||||
}
|
||||
|
||||
|
|
@ -66,18 +102,16 @@ impl<St> LogWriterHandle<St>
|
|||
where
|
||||
St: Store + Clone,
|
||||
{
|
||||
/// Append `entry` to the log: disk write → in-memory mirror push →
|
||||
/// broadcast — atomic w.r.t. `subscribe_with_snapshot` callers.
|
||||
pub fn append_entry(&self, entry: LogEntry) -> Result<EntryHash, StoreError> {
|
||||
let mut head = self.session_head.lock();
|
||||
let hash = session_store::append_entry_with_hash(
|
||||
&self.store,
|
||||
head.session_id,
|
||||
&mut head.head_hash,
|
||||
entry.clone(),
|
||||
)?;
|
||||
/// Append `entry` to the log: disk write → counter bump → in-memory
|
||||
/// mirror push → broadcast. The kernel orders concurrent `O_APPEND`
|
||||
/// writes for `< PIPE_BUF` lines, so no user-space serialization is
|
||||
/// needed across appenders.
|
||||
pub fn append_entry(&self, entry: LogEntry) -> Result<(), StoreError> {
|
||||
let session_id = self.state.session_id();
|
||||
self.store.append(session_id, &entry)?;
|
||||
self.state.increment_entries();
|
||||
self.sink.publish(entry);
|
||||
Ok(hash)
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -127,8 +161,10 @@ pub struct Pod<C: LlmClient, St: Store> {
|
|||
/// Always `Some` outside of `run()`/`resume()`.
|
||||
worker: Option<Worker<C, Mutable>>,
|
||||
store: St,
|
||||
session_id: SessionId,
|
||||
session_head: Arc<SyncMutex<SessionHead>>,
|
||||
/// Shared session pointer. Source of truth for the Pod's current
|
||||
/// `session_id` and append tally. `self.session_id()` is a thin
|
||||
/// wrapper over `session_state.session_id()`.
|
||||
session_state: Arc<SessionState>,
|
||||
/// Absolute working directory of the Pod.
|
||||
pwd: PathBuf,
|
||||
/// Shared, atomically-swappable view of the Pod's resolved scope.
|
||||
|
|
@ -302,8 +338,7 @@ impl<C: LlmClient + Clone + 'static, St: Store + Clone + 'static> Pod<C, St> {
|
|||
manifest: self.manifest.clone(),
|
||||
worker: Some(worker),
|
||||
store: self.store.clone(),
|
||||
session_id: self.session_id,
|
||||
session_head: self.session_head.clone(),
|
||||
session_state: self.session_state.clone(),
|
||||
pwd: self.pwd.clone(),
|
||||
scope: self.scope.clone(),
|
||||
hook_builder: HookRegistryBuilder::new(),
|
||||
|
|
@ -342,12 +377,12 @@ impl<C: LlmClient + Clone + 'static, St: Store + Clone + 'static> Pod<C, St> {
|
|||
|
||||
/// Build a `LogWriterHandle` carrying everything the worker
|
||||
/// callback / interceptor needs to commit `LogEntry` values
|
||||
/// directly: store handle, the shared session-head lock, and the
|
||||
/// directly: store handle, the shared session pointer, and the
|
||||
/// broadcast sink. All three are cheap clones.
|
||||
pub fn log_writer_handle(&self) -> LogWriterHandle<St> {
|
||||
LogWriterHandle {
|
||||
store: self.store.clone(),
|
||||
session_head: self.session_head.clone(),
|
||||
state: self.session_state.clone(),
|
||||
sink: self.sink.clone(),
|
||||
}
|
||||
}
|
||||
|
|
@ -443,11 +478,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
manifest,
|
||||
worker: Some(worker),
|
||||
store,
|
||||
session_id,
|
||||
session_head: Arc::new(SyncMutex::new(SessionHead {
|
||||
session_id,
|
||||
head_hash: None,
|
||||
})),
|
||||
session_state: SessionState::new(session_id, 0),
|
||||
pwd,
|
||||
scope: SharedScope::new(scope),
|
||||
hook_builder: HookRegistryBuilder::new(),
|
||||
|
|
@ -511,9 +542,10 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
&self.prompts
|
||||
}
|
||||
|
||||
/// The session ID used for persistence.
|
||||
/// The session ID used for persistence. Read lock-free from the
|
||||
/// shared session pointer so fork-time swaps are observed immediately.
|
||||
pub fn session_id(&self) -> SessionId {
|
||||
self.session_id
|
||||
self.session_state.session_id()
|
||||
}
|
||||
|
||||
/// The Pod's manifest.
|
||||
|
|
@ -567,12 +599,12 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
}
|
||||
|
||||
/// Snapshot the current runtime scope in the session log. The entry
|
||||
/// is intentionally appended as soon as a session head exists: if the
|
||||
/// is intentionally appended as soon as a session log exists: if the
|
||||
/// process later exits while children keep their allocations, resume
|
||||
/// can restore the narrowed scope instead of reclaiming delegated
|
||||
/// writes.
|
||||
pub fn persist_scope_snapshot(&mut self) -> Result<(), StoreError> {
|
||||
if self.session_head.lock().head_hash.is_none() {
|
||||
if self.session_state.entries_written() == 0 {
|
||||
return Ok(());
|
||||
}
|
||||
let snapshot = {
|
||||
|
|
@ -588,23 +620,18 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
domain: session_store::POD_SCOPE_EXTENSION_DOMAIN.into(),
|
||||
payload,
|
||||
})
|
||||
.map(|_| ())
|
||||
}
|
||||
|
||||
/// Append `entry` to the session log AND publish it through the
|
||||
/// broadcast sink. Holds the session-head sync lock across the
|
||||
/// disk write and the sink publish so subscribers see a gap-free
|
||||
/// `(snapshot, live)` stream consistent with what's on disk.
|
||||
pub(crate) fn commit_entry(&self, entry: LogEntry) -> Result<EntryHash, StoreError> {
|
||||
let mut head = self.session_head.lock();
|
||||
let hash = session_store::append_entry_with_hash(
|
||||
&self.store,
|
||||
head.session_id,
|
||||
&mut head.head_hash,
|
||||
entry.clone(),
|
||||
)?;
|
||||
/// broadcast sink. No user-space serialization is needed across
|
||||
/// concurrent appenders — the kernel orders `O_APPEND` writes for
|
||||
/// lines smaller than `PIPE_BUF`.
|
||||
pub(crate) fn commit_entry(&self, entry: LogEntry) -> Result<(), StoreError> {
|
||||
let session_id = self.session_state.session_id();
|
||||
self.store.append(session_id, &entry)?;
|
||||
self.session_state.increment_entries();
|
||||
self.sink.publish(entry);
|
||||
Ok(hash)
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Cloneable sink handle. Exposed to the controller so the IPC
|
||||
|
|
@ -1160,7 +1187,6 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
|
||||
// IDLE → active marker. Commits first so the next UserInput entry
|
||||
// is contained inside this Invoke range. See `tickets/invoke-turn-llmcall-semantics.md`.
|
||||
self.session_id = self.session_head.lock().session_id;
|
||||
self.commit_entry(LogEntry::Invoke {
|
||||
ts: session_log::now_millis(),
|
||||
trigger: protocol::InvokeKind::UserSend,
|
||||
|
|
@ -1350,7 +1376,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
return;
|
||||
};
|
||||
if let Err(err) =
|
||||
memory::append_use_event(layout, self.session_id.to_string(), source, records)
|
||||
memory::append_use_event(layout, self.session_id().to_string(), source, records)
|
||||
{
|
||||
warn!(error = %err, "failed to append memory usage event");
|
||||
}
|
||||
|
|
@ -1361,7 +1387,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
return;
|
||||
};
|
||||
if let Err(err) =
|
||||
memory::append_resident_exposure_event(layout, self.session_id.to_string(), records)
|
||||
memory::append_resident_exposure_event(layout, self.session_id().to_string(), records)
|
||||
{
|
||||
warn!(error = %err, "failed to append resident exposure event");
|
||||
}
|
||||
|
|
@ -1551,7 +1577,6 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
// IDLE → active marker for the buffered notification / pod-event
|
||||
// drain. The trailing SystemItem entries (drained by the
|
||||
// PodInterceptor) carry the actual payload.
|
||||
self.session_id = self.session_head.lock().session_id;
|
||||
self.commit_entry(LogEntry::Invoke {
|
||||
ts: session_log::now_millis(),
|
||||
trigger: kind,
|
||||
|
|
@ -1582,23 +1607,20 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
self.handle_worker_result(result, history_before).await
|
||||
}
|
||||
|
||||
/// Ensure the session exists and its head still matches ours.
|
||||
/// Ensure the session exists and the writer's tally still matches
|
||||
/// the on-disk entry count.
|
||||
///
|
||||
/// On the first call for a Pod built via `from_manifest`, the session
|
||||
/// has not been written to the store yet — this is when we append the
|
||||
/// initial `SessionStart` entry, carrying the system prompt that
|
||||
/// `ensure_system_prompt_materialized` has just rendered. Subsequent
|
||||
/// calls fall through to `ensure_head_or_fork`, which auto-forks when
|
||||
/// another writer has advanced the store head behind our back.
|
||||
/// calls fall through to entry-count comparison, which auto-forks
|
||||
/// when another writer has appended behind our back.
|
||||
fn ensure_session_head(&mut self) -> Result<(), PodError> {
|
||||
let w = self.worker.as_ref().unwrap();
|
||||
let prev_session_id;
|
||||
let initial_state = {
|
||||
let head = self.session_head.lock();
|
||||
prev_session_id = head.session_id;
|
||||
head.head_hash.is_none()
|
||||
};
|
||||
if initial_state {
|
||||
let prev_session_id = self.session_state.session_id();
|
||||
let entries_written = self.session_state.entries_written();
|
||||
if entries_written == 0 {
|
||||
let initial = LogEntry::SessionStart {
|
||||
ts: session_log::now_millis(),
|
||||
system_prompt: w.get_system_prompt().map(String::from),
|
||||
|
|
@ -1611,13 +1633,12 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
self.persist_scope_snapshot()?;
|
||||
return Ok(());
|
||||
}
|
||||
// Check store head + auto-fork if it drifted.
|
||||
let store_head = self
|
||||
// Check store count + auto-fork if it drifted.
|
||||
let store_count = self
|
||||
.store
|
||||
.read_head_hash(prev_session_id)
|
||||
.read_entry_count(prev_session_id)
|
||||
.map_err(PodError::from)?;
|
||||
let mut head = self.session_head.lock();
|
||||
if store_head == head.head_hash {
|
||||
if store_count == entries_written {
|
||||
return Ok(());
|
||||
}
|
||||
// Fork: mint a fresh session and switch to it. The new
|
||||
|
|
@ -1632,20 +1653,12 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
forked_from: None,
|
||||
compacted_from: None,
|
||||
};
|
||||
let hash = session_log::compute_hash(None, &entry);
|
||||
let hashed = HashedEntry {
|
||||
hash: hash.clone(),
|
||||
prev_hash: None,
|
||||
entry: entry.clone(),
|
||||
};
|
||||
self.store
|
||||
.create_session(fork_id, &[hashed])
|
||||
.create_session(fork_id, &[entry.clone()])
|
||||
.map_err(PodError::from)?;
|
||||
head.session_id = fork_id;
|
||||
head.head_hash = Some(hash);
|
||||
self.session_id = fork_id;
|
||||
self.session_state.set_session_id(fork_id);
|
||||
self.session_state.set_entries_written(1);
|
||||
self.sink.reset_with_initial(entry);
|
||||
drop(head);
|
||||
if self.scope_allocation.is_some() {
|
||||
pod_registry::update_session(&self.manifest.pod.name, fork_id)?;
|
||||
}
|
||||
|
|
@ -1796,7 +1809,6 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
// the callback fall through this branch: they classify the
|
||||
// slice from `history_before` inline so the test's
|
||||
// `restore`-style assertions still see entries on disk.
|
||||
self.session_id = self.session_head.lock().session_id;
|
||||
if !self.history_persistence_wired {
|
||||
let new_items: Vec<Item> = self.worker.as_ref().unwrap().history()[history_before..]
|
||||
.iter()
|
||||
|
|
@ -1989,7 +2001,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
.compact_system()
|
||||
.map_err(PodError::PromptCatalog)?;
|
||||
let mut summary_worker = Worker::new(summary_client).system_prompt(summary_system_prompt);
|
||||
summary_worker.set_cache_key(Some(self.session_id.to_string()));
|
||||
summary_worker.set_cache_key(Some(self.session_id().to_string()));
|
||||
|
||||
// Occupancy-based input-token meter + interceptor. The tracker pairs
|
||||
// each pre-request history length with the following UsageEvent, then
|
||||
|
|
@ -2133,37 +2145,24 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
// the broadcast sink so existing subscribers see the new
|
||||
// `SessionStart { compacted_from }` and reset their view.
|
||||
let new_session_id = session_store::new_session_id();
|
||||
let session_start = {
|
||||
let mut head = self.session_head.lock();
|
||||
let old_session_id = head.session_id;
|
||||
let old_head_hash = head
|
||||
.head_hash
|
||||
.clone()
|
||||
.expect("head_hash should be set after at least one entry");
|
||||
let w = self.worker.as_ref().unwrap();
|
||||
let entry = LogEntry::SessionStart {
|
||||
ts: session_log::now_millis(),
|
||||
system_prompt: w.get_system_prompt().map(String::from),
|
||||
config: w.request_config().clone(),
|
||||
history: to_logged(&new_history),
|
||||
forked_from: None,
|
||||
compacted_from: Some(session_store::SessionOrigin {
|
||||
session_id: old_session_id,
|
||||
at_hash: old_head_hash,
|
||||
}),
|
||||
};
|
||||
let hash = session_log::compute_hash(None, &entry);
|
||||
let hashed = HashedEntry {
|
||||
hash: hash.clone(),
|
||||
prev_hash: None,
|
||||
entry: entry.clone(),
|
||||
};
|
||||
self.store.create_session(new_session_id, &[hashed])?;
|
||||
head.session_id = new_session_id;
|
||||
head.head_hash = Some(hash);
|
||||
self.session_id = new_session_id;
|
||||
entry
|
||||
let old_session_id = self.session_state.session_id();
|
||||
let source_turn_count = self.worker.as_ref().unwrap().turn_count();
|
||||
let w = self.worker.as_ref().unwrap();
|
||||
let entry = LogEntry::SessionStart {
|
||||
ts: session_log::now_millis(),
|
||||
system_prompt: w.get_system_prompt().map(String::from),
|
||||
config: w.request_config().clone(),
|
||||
history: to_logged(&new_history),
|
||||
forked_from: None,
|
||||
compacted_from: Some(session_store::SessionOrigin {
|
||||
session_id: old_session_id,
|
||||
at_turn_index: source_turn_count,
|
||||
}),
|
||||
};
|
||||
self.store.create_session(new_session_id, &[entry.clone()])?;
|
||||
self.session_state.set_session_id(new_session_id);
|
||||
self.session_state.set_entries_written(1);
|
||||
let session_start = entry;
|
||||
// Broadcast the SessionStart through the sink. This atomically
|
||||
// resets the mirror to `[SessionStart]` so any subscriber
|
||||
// querying after this point sees the post-compaction prefix.
|
||||
|
|
@ -2368,7 +2367,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
// Read the session log to get the current entry count. This is
|
||||
// the boundary for the source.range end_entry. Called once per
|
||||
// extract, on a small local file.
|
||||
let entries_now = self.store.read_all(self.session_id)?.len();
|
||||
let entries_now = self.store.read_all(self.session_id())?.len();
|
||||
if entries_now == 0 {
|
||||
return Ok(ExtractDecision::Skipped);
|
||||
}
|
||||
|
|
@ -2400,7 +2399,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
.memory_extract_system(memory_language)
|
||||
.map_err(PodError::PromptCatalog)?;
|
||||
let mut extract_worker = Worker::new(client).system_prompt(extract_system_prompt);
|
||||
extract_worker.set_cache_key(Some(self.session_id.to_string()));
|
||||
extract_worker.set_cache_key(Some(self.session_id().to_string()));
|
||||
|
||||
// Occupancy-based input-token meter + interceptor. The tracker pairs
|
||||
// each pre-request history length with the following UsageEvent, then
|
||||
|
|
@ -2436,7 +2435,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
extract::ExtractedPayload::default()
|
||||
});
|
||||
|
||||
let source_session_id = self.session_head.lock().session_id;
|
||||
let source_session_id = self.session_state.session_id();
|
||||
let staging_id = if payload.is_empty() {
|
||||
String::new()
|
||||
} else {
|
||||
|
|
@ -2460,9 +2459,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
ts: session_log::now_millis(),
|
||||
domain: extract::EXTRACT_DOMAIN.into(),
|
||||
payload: payload_value,
|
||||
})
|
||||
?;
|
||||
self.session_id = self.session_head.lock().session_id;
|
||||
})?;
|
||||
|
||||
*self
|
||||
.extract_pointer
|
||||
|
|
@ -2601,7 +2598,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
}
|
||||
};
|
||||
let mut worker = Worker::new(client).system_prompt(consolidation_system_prompt);
|
||||
worker.set_cache_key(Some(self.session_id.to_string()));
|
||||
worker.set_cache_key(Some(self.session_id().to_string()));
|
||||
|
||||
// Memory tools are self-contained — they bypass ScopedFs and write
|
||||
// directly under the workspace via WorkspaceLayout. Resident
|
||||
|
|
@ -2613,7 +2610,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
let query_cfg = memory::tool::QueryConfig::from(memory_cfg);
|
||||
worker.register_tool(memory::tool::read_tool_with_usage(
|
||||
layout.clone(),
|
||||
self.session_id.to_string(),
|
||||
self.session_id().to_string(),
|
||||
));
|
||||
worker.register_tool(memory::tool::write_tool(layout.clone()));
|
||||
worker.register_tool(memory::tool::edit_tool(layout.clone()));
|
||||
|
|
@ -2768,11 +2765,7 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
|
|||
manifest,
|
||||
worker: Some(worker),
|
||||
store,
|
||||
session_id,
|
||||
session_head: Arc::new(SyncMutex::new(SessionHead {
|
||||
session_id,
|
||||
head_hash: None,
|
||||
})),
|
||||
session_state: SessionState::new(session_id, 0),
|
||||
pwd: common.pwd,
|
||||
scope: SharedScope::new(common.scope),
|
||||
hook_builder: HookRegistryBuilder::new(),
|
||||
|
|
@ -2842,11 +2835,7 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
|
|||
manifest,
|
||||
worker: Some(worker),
|
||||
store,
|
||||
session_id,
|
||||
session_head: Arc::new(SyncMutex::new(SessionHead {
|
||||
session_id,
|
||||
head_hash: None,
|
||||
})),
|
||||
session_state: SessionState::new(session_id, 0),
|
||||
pwd: common.pwd,
|
||||
scope: SharedScope::new(common.scope),
|
||||
hook_builder: HookRegistryBuilder::new(),
|
||||
|
|
@ -2913,10 +2902,10 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
|
|||
// sits on disk.
|
||||
let raw_entries = store.read_all(session_id)?;
|
||||
let state = session_store::collect_state(&raw_entries);
|
||||
if state.head_hash.is_none() {
|
||||
if state.entries_count == 0 {
|
||||
return Err(PodError::SessionEmpty { session_id });
|
||||
}
|
||||
let mirror_entries: Vec<LogEntry> = raw_entries.iter().map(|e| e.entry.clone()).collect();
|
||||
let mirror_entries: Vec<LogEntry> = raw_entries.clone();
|
||||
let scope_snapshot = state
|
||||
.pod_scope
|
||||
.clone()
|
||||
|
|
@ -2985,11 +2974,7 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
|
|||
manifest,
|
||||
worker: Some(worker),
|
||||
store,
|
||||
session_id,
|
||||
session_head: Arc::new(SyncMutex::new(SessionHead {
|
||||
session_id,
|
||||
head_hash: state.head_hash,
|
||||
})),
|
||||
session_state: SessionState::new(session_id, state.entries_count),
|
||||
pwd: common.pwd,
|
||||
scope: SharedScope::new(common.scope),
|
||||
hook_builder: HookRegistryBuilder::new(),
|
||||
|
|
|
|||
|
|
@ -26,9 +26,7 @@ use llm_worker::llm_client::event::{Event as LlmEvent, ResponseStatus, StatusEve
|
|||
use llm_worker::llm_client::{ClientError, LlmClient, Request};
|
||||
use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput};
|
||||
use session_metrics::{DOMAIN, Metric, metrics_from_extensions};
|
||||
use session_store::{
|
||||
EntryHash, FsStore, HashedEntry, LogEntry, SessionId, Store, StoreError, TraceEntry,
|
||||
};
|
||||
use session_store::{FsStore, LogEntry, SessionId, Store, StoreError, TraceEntry};
|
||||
|
||||
use pod::{Pod, PodManifest};
|
||||
|
||||
|
|
@ -329,32 +327,28 @@ struct MetricFailingStore {
|
|||
}
|
||||
|
||||
impl Store for MetricFailingStore {
|
||||
fn append(&self, id: SessionId, entry: &HashedEntry) -> Result<(), StoreError> {
|
||||
if let LogEntry::Extension { domain, .. } = &entry.entry {
|
||||
fn append(&self, id: SessionId, entry: &LogEntry) -> Result<(), StoreError> {
|
||||
if let LogEntry::Extension { domain, .. } = entry {
|
||||
if domain == DOMAIN {
|
||||
return Err(StoreError::Io(std::io::Error::other("synthetic failure")));
|
||||
}
|
||||
}
|
||||
self.inner.append(id, entry)
|
||||
}
|
||||
fn read_all(&self, id: SessionId) -> Result<Vec<HashedEntry>, StoreError> {
|
||||
fn read_all(&self, id: SessionId) -> Result<Vec<LogEntry>, StoreError> {
|
||||
self.inner.read_all(id)
|
||||
}
|
||||
fn list_sessions(&self) -> Result<Vec<SessionId>, StoreError> {
|
||||
self.inner.list_sessions()
|
||||
}
|
||||
fn create_session(
|
||||
&self,
|
||||
id: SessionId,
|
||||
entries: &[HashedEntry],
|
||||
) -> Result<(), StoreError> {
|
||||
fn create_session(&self, id: SessionId, entries: &[LogEntry]) -> Result<(), StoreError> {
|
||||
self.inner.create_session(id, entries)
|
||||
}
|
||||
fn exists(&self, id: SessionId) -> Result<bool, StoreError> {
|
||||
self.inner.exists(id)
|
||||
}
|
||||
fn read_head_hash(&self, id: SessionId) -> Result<Option<EntryHash>, StoreError> {
|
||||
self.inner.read_head_hash(id)
|
||||
fn read_entry_count(&self, id: SessionId) -> Result<usize, StoreError> {
|
||||
self.inner.read_entry_count(id)
|
||||
}
|
||||
fn append_trace(&self, id: SessionId, entry: &TraceEntry) -> Result<(), StoreError> {
|
||||
self.inner.append_trace(id, entry)
|
||||
|
|
|
|||
|
|
@ -184,7 +184,7 @@ async fn session_start_state_captures_rendered_prompt() {
|
|||
|
||||
let entries = pod.store().read_all(pod.session_id()).unwrap();
|
||||
let first = entries.first().expect("at least one entry");
|
||||
match &first.entry {
|
||||
match first {
|
||||
LogEntry::SessionStart { system_prompt, .. } => {
|
||||
let sp = system_prompt.as_deref().expect("system prompt set");
|
||||
assert!(sp.starts_with("hello cwd="));
|
||||
|
|
|
|||
|
|
@ -18,7 +18,7 @@
|
|||
use std::collections::BTreeMap;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use session_store::{EntryHash, SessionId, Store, StoreError, save_extension, session_log};
|
||||
use session_store::{SessionId, Store, StoreError, save_extension, session_log};
|
||||
|
||||
/// Domain tag used in `LogEntry::Extension` for all metrics records.
|
||||
pub const DOMAIN: &str = "metrics";
|
||||
|
|
@ -78,11 +78,10 @@ impl Metric {
|
|||
pub fn record_metric(
|
||||
store: &impl Store,
|
||||
session_id: SessionId,
|
||||
head_hash: &mut Option<EntryHash>,
|
||||
metric: &Metric,
|
||||
) -> Result<(), StoreError> {
|
||||
let payload = serde_json::to_value(metric).expect("Metric serialization cannot fail");
|
||||
save_extension(store, session_id, head_hash, DOMAIN, payload)
|
||||
save_extension(store, session_id, DOMAIN, payload)
|
||||
}
|
||||
|
||||
/// `RestoredState.extensions` から metrics domain の payload を順に取り出し、
|
||||
|
|
|
|||
|
|
@ -11,8 +11,6 @@ serde = { workspace = true, features = ["derive"] }
|
|||
serde_json = { workspace = true }
|
||||
uuid = { workspace = true, features = ["v7", "serde"] }
|
||||
thiserror = { workspace = true }
|
||||
sha2 = { workspace = true }
|
||||
hex = "0.4.3"
|
||||
protocol = { workspace = true }
|
||||
tracing.workspace = true
|
||||
|
||||
|
|
|
|||
|
|
@ -6,7 +6,7 @@
|
|||
|
||||
use crate::SessionId;
|
||||
use crate::event_trace::TraceEntry;
|
||||
use crate::session_log::{EntryHash, HashedEntry};
|
||||
use crate::session_log::LogEntry;
|
||||
use crate::store::{Store, StoreError};
|
||||
use std::fs;
|
||||
use std::io::Write;
|
||||
|
|
@ -65,12 +65,12 @@ impl FsStore {
|
|||
}
|
||||
|
||||
impl Store for FsStore {
|
||||
fn append(&self, id: SessionId, entry: &HashedEntry) -> Result<(), StoreError> {
|
||||
fn append(&self, id: SessionId, entry: &LogEntry) -> Result<(), StoreError> {
|
||||
let line = serde_json::to_string(entry)?;
|
||||
self.append_line(&self.log_path(id), &line)
|
||||
}
|
||||
|
||||
fn read_all(&self, id: SessionId) -> Result<Vec<HashedEntry>, StoreError> {
|
||||
fn read_all(&self, id: SessionId) -> Result<Vec<LogEntry>, StoreError> {
|
||||
let path = self.log_path(id);
|
||||
if !path.exists() {
|
||||
return Err(StoreError::NotFound(id));
|
||||
|
|
@ -98,7 +98,7 @@ impl Store for FsStore {
|
|||
Ok(sessions)
|
||||
}
|
||||
|
||||
fn create_session(&self, id: SessionId, entries: &[HashedEntry]) -> Result<(), StoreError> {
|
||||
fn create_session(&self, id: SessionId, entries: &[LogEntry]) -> Result<(), StoreError> {
|
||||
let path = self.log_path(id);
|
||||
let mut content = String::new();
|
||||
for entry in entries {
|
||||
|
|
@ -113,24 +113,13 @@ impl Store for FsStore {
|
|||
Ok(self.log_path(id).exists())
|
||||
}
|
||||
|
||||
fn read_head_hash(&self, id: SessionId) -> Result<Option<EntryHash>, StoreError> {
|
||||
fn read_entry_count(&self, id: SessionId) -> Result<usize, StoreError> {
|
||||
let path = self.log_path(id);
|
||||
if !path.exists() {
|
||||
return Err(StoreError::NotFound(id));
|
||||
}
|
||||
let content = fs::read_to_string(&path)?;
|
||||
let last_line = content.lines().rev().find(|l| !l.trim().is_empty());
|
||||
match last_line {
|
||||
Some(line) => {
|
||||
let entry: HashedEntry =
|
||||
serde_json::from_str(line).map_err(|e| StoreError::Corrupt {
|
||||
line: content.lines().count(),
|
||||
message: e.to_string(),
|
||||
})?;
|
||||
Ok(Some(entry.hash))
|
||||
}
|
||||
None => Ok(None),
|
||||
}
|
||||
Ok(content.lines().filter(|l| !l.trim().is_empty()).count())
|
||||
}
|
||||
|
||||
fn append_trace(&self, id: SessionId, entry: &TraceEntry) -> Result<(), StoreError> {
|
||||
|
|
|
|||
|
|
@ -40,15 +40,14 @@ pub use llm_worker::UsageRecord;
|
|||
pub use llm_worker::llm_client::types::{ContentPart, Item, Role};
|
||||
pub use logged_item::{LoggedContentPart, LoggedItem, LoggedRole, from_logged, to_logged};
|
||||
pub use session::{
|
||||
SessionStartState, append_entry, append_entry_with_hash, append_system_item,
|
||||
classify_history_item, create_compacted_session, create_session, create_session_with_id,
|
||||
ensure_head_or_fork, fork, fork_at, restore, save_config_changed, save_delta, save_extension,
|
||||
save_pod_scope, save_run_completed, save_run_errored, save_turn_end, save_usage,
|
||||
save_user_input,
|
||||
SessionStartState, append_entry, append_system_item, classify_history_item,
|
||||
create_compacted_session, create_session, create_session_with_id, ensure_head_or_fork, fork,
|
||||
fork_at, restore, save_config_changed, save_delta, save_extension, save_pod_scope,
|
||||
save_run_completed, save_run_errored, save_turn_end, save_usage, save_user_input,
|
||||
};
|
||||
pub use session_log::{
|
||||
EntryHash, HashedEntry, LogEntry, POD_SCOPE_EXTENSION_DOMAIN, PodScopeSnapshot, RestoredState,
|
||||
SessionOrigin, build_chain, collect_state, compute_hash,
|
||||
LogEntry, POD_SCOPE_EXTENSION_DOMAIN, PodScopeSnapshot, RestoredState, SessionOrigin,
|
||||
collect_state,
|
||||
};
|
||||
pub use system_item::{SystemItem, render_pod_event};
|
||||
pub use store::{Store, StoreError};
|
||||
|
|
|
|||
|
|
@ -6,7 +6,7 @@
|
|||
|
||||
use crate::SessionId;
|
||||
use crate::logged_item::{LoggedItem, to_logged};
|
||||
use crate::session_log::{self, EntryHash, HashedEntry, LogEntry, PodScopeSnapshot, SessionOrigin};
|
||||
use crate::session_log::{self, LogEntry, PodScopeSnapshot, SessionOrigin};
|
||||
use crate::store::{Store, StoreError};
|
||||
use crate::system_item::SystemItem;
|
||||
use llm_worker::WorkerResult;
|
||||
|
|
@ -22,27 +22,25 @@ pub struct SessionStartState<'a> {
|
|||
}
|
||||
|
||||
/// Create a new session, writing the initial `SessionStart` entry.
|
||||
///
|
||||
/// Returns the new session ID and head hash.
|
||||
pub fn create_session(
|
||||
store: &impl Store,
|
||||
state: SessionStartState<'_>,
|
||||
) -> Result<(SessionId, EntryHash), StoreError> {
|
||||
) -> Result<SessionId, StoreError> {
|
||||
let session_id = crate::new_session_id();
|
||||
let hash = create_session_with_id(store, session_id, state)?;
|
||||
Ok((session_id, hash))
|
||||
create_session_with_id(store, session_id, state)?;
|
||||
Ok(session_id)
|
||||
}
|
||||
|
||||
/// Write a fresh `SessionStart` entry using a pre-generated session ID.
|
||||
///
|
||||
/// Used by callers that need to reserve a session ID synchronously but
|
||||
/// defer the initial log append (e.g. Pod, which resolves a templated
|
||||
/// system prompt only at first turn). Returns the resulting head hash.
|
||||
/// system prompt only at first turn).
|
||||
pub fn create_session_with_id(
|
||||
store: &impl Store,
|
||||
session_id: SessionId,
|
||||
state: SessionStartState<'_>,
|
||||
) -> Result<EntryHash, StoreError> {
|
||||
) -> Result<(), StoreError> {
|
||||
let entry = LogEntry::SessionStart {
|
||||
ts: session_log::now_millis(),
|
||||
system_prompt: state.system_prompt.map(String::from),
|
||||
|
|
@ -51,26 +49,20 @@ pub fn create_session_with_id(
|
|||
forked_from: None,
|
||||
compacted_from: None,
|
||||
};
|
||||
let hash = session_log::compute_hash(None, &entry);
|
||||
let hashed_entry = HashedEntry {
|
||||
hash: hash.clone(),
|
||||
prev_hash: None,
|
||||
entry,
|
||||
};
|
||||
store.append(session_id, &hashed_entry)?;
|
||||
Ok(hash)
|
||||
store.append(session_id, &entry)
|
||||
}
|
||||
|
||||
/// Create a compacted session from an existing one.
|
||||
///
|
||||
/// Records `compacted_from` provenance linking back to the source session.
|
||||
/// Returns the new session ID and head hash.
|
||||
/// Records `compacted_from` provenance linking back to the source session
|
||||
/// at the turn boundary captured by `source_turn_count` (the most recent
|
||||
/// completed turn in the source).
|
||||
pub fn create_compacted_session(
|
||||
store: &impl Store,
|
||||
state: SessionStartState<'_>,
|
||||
source_session_id: SessionId,
|
||||
source_head_hash: EntryHash,
|
||||
) -> Result<(SessionId, EntryHash), StoreError> {
|
||||
source_turn_count: usize,
|
||||
) -> Result<SessionId, StoreError> {
|
||||
let session_id = crate::new_session_id();
|
||||
let entry = LogEntry::SessionStart {
|
||||
ts: session_log::now_millis(),
|
||||
|
|
@ -80,17 +72,11 @@ pub fn create_compacted_session(
|
|||
forked_from: None,
|
||||
compacted_from: Some(SessionOrigin {
|
||||
session_id: source_session_id,
|
||||
at_hash: source_head_hash,
|
||||
at_turn_index: source_turn_count,
|
||||
}),
|
||||
};
|
||||
let hash = session_log::compute_hash(None, &entry);
|
||||
let hashed_entry = HashedEntry {
|
||||
hash: hash.clone(),
|
||||
prev_hash: None,
|
||||
entry,
|
||||
};
|
||||
store.append(session_id, &hashed_entry)?;
|
||||
Ok((session_id, hash))
|
||||
store.append(session_id, &entry)?;
|
||||
Ok(session_id)
|
||||
}
|
||||
|
||||
/// Restore session state from a stored log.
|
||||
|
|
@ -105,18 +91,18 @@ pub fn restore(
|
|||
Ok(session_log::collect_state(&entries))
|
||||
}
|
||||
|
||||
/// Check if the store's head still matches the expected head hash.
|
||||
/// Check if the store's entry count still matches the writer's tally.
|
||||
/// If not, auto-fork into a new session.
|
||||
///
|
||||
/// Updates `session_id` and `head_hash` in place when a fork occurs.
|
||||
/// Updates `session_id` and `entries_written` in place when a fork occurs.
|
||||
pub fn ensure_head_or_fork(
|
||||
store: &impl Store,
|
||||
session_id: &mut SessionId,
|
||||
head_hash: &mut Option<EntryHash>,
|
||||
entries_written: &mut usize,
|
||||
state: SessionStartState<'_>,
|
||||
) -> Result<(), StoreError> {
|
||||
let store_head = store.read_head_hash(*session_id)?;
|
||||
if store_head == *head_hash {
|
||||
let store_count = store.read_entry_count(*session_id)?;
|
||||
if store_count == *entries_written {
|
||||
return Ok(());
|
||||
}
|
||||
let fork_id = crate::new_session_id();
|
||||
|
|
@ -128,15 +114,9 @@ pub fn ensure_head_or_fork(
|
|||
forked_from: None,
|
||||
compacted_from: None,
|
||||
};
|
||||
let hash = session_log::compute_hash(None, &entry);
|
||||
let hashed_entry = HashedEntry {
|
||||
hash: hash.clone(),
|
||||
prev_hash: None,
|
||||
entry,
|
||||
};
|
||||
store.create_session(fork_id, &[hashed_entry])?;
|
||||
store.create_session(fork_id, &[entry])?;
|
||||
*session_id = fork_id;
|
||||
*head_hash = Some(hash);
|
||||
*entries_written = 1;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
@ -149,13 +129,11 @@ pub fn ensure_head_or_fork(
|
|||
pub fn save_user_input(
|
||||
store: &impl Store,
|
||||
session_id: SessionId,
|
||||
head_hash: &mut Option<EntryHash>,
|
||||
segments: Vec<Segment>,
|
||||
) -> Result<(), StoreError> {
|
||||
append_entry(
|
||||
store,
|
||||
session_id,
|
||||
head_hash,
|
||||
LogEntry::UserInput {
|
||||
ts: session_log::now_millis(),
|
||||
segments,
|
||||
|
|
@ -174,7 +152,6 @@ pub fn save_user_input(
|
|||
pub fn save_delta(
|
||||
store: &impl Store,
|
||||
session_id: SessionId,
|
||||
head_hash: &mut Option<EntryHash>,
|
||||
new_items: &[Item],
|
||||
) -> Result<(), StoreError> {
|
||||
if new_items.is_empty() {
|
||||
|
|
@ -188,7 +165,7 @@ pub fn save_delta(
|
|||
continue;
|
||||
}
|
||||
let entry = classify_history_item(item, ts);
|
||||
append_entry(store, session_id, head_hash, entry)?;
|
||||
append_entry(store, session_id, entry)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
|
@ -223,13 +200,11 @@ pub fn classify_history_item(item: &Item, ts: u64) -> LogEntry {
|
|||
pub fn append_system_item(
|
||||
store: &impl Store,
|
||||
session_id: SessionId,
|
||||
head_hash: &mut Option<EntryHash>,
|
||||
item: SystemItem,
|
||||
) -> Result<EntryHash, StoreError> {
|
||||
append_entry_with_hash(
|
||||
) -> Result<(), StoreError> {
|
||||
append_entry(
|
||||
store,
|
||||
session_id,
|
||||
head_hash,
|
||||
LogEntry::SystemItem {
|
||||
ts: session_log::now_millis(),
|
||||
item,
|
||||
|
|
@ -241,13 +216,11 @@ pub fn append_system_item(
|
|||
pub fn save_turn_end(
|
||||
store: &impl Store,
|
||||
session_id: SessionId,
|
||||
head_hash: &mut Option<EntryHash>,
|
||||
turn_count: usize,
|
||||
) -> Result<(), StoreError> {
|
||||
append_entry(
|
||||
store,
|
||||
session_id,
|
||||
head_hash,
|
||||
LogEntry::TurnEnd {
|
||||
ts: session_log::now_millis(),
|
||||
turn_count,
|
||||
|
|
@ -259,14 +232,12 @@ pub fn save_turn_end(
|
|||
pub fn save_run_completed(
|
||||
store: &impl Store,
|
||||
session_id: SessionId,
|
||||
head_hash: &mut Option<EntryHash>,
|
||||
result: WorkerResult,
|
||||
interrupted: bool,
|
||||
) -> Result<(), StoreError> {
|
||||
append_entry(
|
||||
store,
|
||||
session_id,
|
||||
head_hash,
|
||||
LogEntry::RunCompleted {
|
||||
ts: session_log::now_millis(),
|
||||
interrupted,
|
||||
|
|
@ -282,14 +253,12 @@ pub fn save_run_completed(
|
|||
pub fn save_run_errored(
|
||||
store: &impl Store,
|
||||
session_id: SessionId,
|
||||
head_hash: &mut Option<EntryHash>,
|
||||
message: String,
|
||||
interrupted: bool,
|
||||
) -> Result<(), StoreError> {
|
||||
append_entry(
|
||||
store,
|
||||
session_id,
|
||||
head_hash,
|
||||
LogEntry::RunErrored {
|
||||
ts: session_log::now_millis(),
|
||||
interrupted,
|
||||
|
|
@ -307,7 +276,6 @@ pub fn save_run_errored(
|
|||
pub fn save_usage(
|
||||
store: &impl Store,
|
||||
session_id: SessionId,
|
||||
head_hash: &mut Option<EntryHash>,
|
||||
history_len: usize,
|
||||
input_total_tokens: u64,
|
||||
cache_read_tokens: u64,
|
||||
|
|
@ -317,7 +285,6 @@ pub fn save_usage(
|
|||
append_entry(
|
||||
store,
|
||||
session_id,
|
||||
head_hash,
|
||||
LogEntry::LlmUsage {
|
||||
ts: session_log::now_millis(),
|
||||
history_len,
|
||||
|
|
@ -337,14 +304,12 @@ pub fn save_usage(
|
|||
pub fn save_extension(
|
||||
store: &impl Store,
|
||||
session_id: SessionId,
|
||||
head_hash: &mut Option<EntryHash>,
|
||||
domain: impl Into<String>,
|
||||
payload: serde_json::Value,
|
||||
) -> Result<(), StoreError> {
|
||||
append_entry(
|
||||
store,
|
||||
session_id,
|
||||
head_hash,
|
||||
LogEntry::Extension {
|
||||
ts: session_log::now_millis(),
|
||||
domain: domain.into(),
|
||||
|
|
@ -357,14 +322,12 @@ pub fn save_extension(
|
|||
pub fn save_pod_scope(
|
||||
store: &impl Store,
|
||||
session_id: SessionId,
|
||||
head_hash: &mut Option<EntryHash>,
|
||||
snapshot: &PodScopeSnapshot,
|
||||
) -> Result<(), StoreError> {
|
||||
let payload = serde_json::to_value(snapshot)?;
|
||||
save_extension(
|
||||
store,
|
||||
session_id,
|
||||
head_hash,
|
||||
session_log::POD_SCOPE_EXTENSION_DOMAIN,
|
||||
payload,
|
||||
)
|
||||
|
|
@ -374,13 +337,11 @@ pub fn save_pod_scope(
|
|||
pub fn save_config_changed(
|
||||
store: &impl Store,
|
||||
session_id: SessionId,
|
||||
head_hash: &mut Option<EntryHash>,
|
||||
config: &RequestConfig,
|
||||
) -> Result<(), StoreError> {
|
||||
append_entry(
|
||||
store,
|
||||
session_id,
|
||||
head_hash,
|
||||
LogEntry::ConfigChanged {
|
||||
ts: session_log::now_millis(),
|
||||
config: config.clone(),
|
||||
|
|
@ -399,28 +360,36 @@ pub fn fork(store: &impl Store, state: SessionStartState<'_>) -> Result<SessionI
|
|||
forked_from: None,
|
||||
compacted_from: None,
|
||||
};
|
||||
let hash = session_log::compute_hash(None, &entry);
|
||||
let hashed_entry = HashedEntry {
|
||||
hash,
|
||||
prev_hash: None,
|
||||
entry,
|
||||
};
|
||||
store.create_session(fork_id, &[hashed_entry])?;
|
||||
store.create_session(fork_id, &[entry])?;
|
||||
Ok(fork_id)
|
||||
}
|
||||
|
||||
/// Fork from an arbitrary point in a stored session's log.
|
||||
/// Fork from a turn boundary in a stored session's log.
|
||||
///
|
||||
/// `at_turn_index` is the `turn_count` of the most recent completed
|
||||
/// `TurnEnd` in the source segment that the fork should branch from.
|
||||
/// Replay collects state up to and including that `TurnEnd`; entries
|
||||
/// after it are not carried into the new segment.
|
||||
pub fn fork_at(
|
||||
store: &impl Store,
|
||||
source_id: SessionId,
|
||||
at_hash: &EntryHash,
|
||||
at_turn_index: usize,
|
||||
) -> Result<SessionId, StoreError> {
|
||||
let entries = store.read_all(source_id)?;
|
||||
let cut = entries
|
||||
.iter()
|
||||
.position(|e| &e.hash == at_hash)
|
||||
.map(|i| i + 1)
|
||||
.unwrap_or(entries.len());
|
||||
let cut = if at_turn_index == 0 {
|
||||
// Branch directly after the SessionStart (or whatever opens the
|
||||
// segment), before any turn completes.
|
||||
entries
|
||||
.iter()
|
||||
.position(|e| !matches!(e, LogEntry::SessionStart { .. }))
|
||||
.unwrap_or(entries.len())
|
||||
} else {
|
||||
entries
|
||||
.iter()
|
||||
.position(|e| matches!(e, LogEntry::TurnEnd { turn_count, .. } if *turn_count == at_turn_index))
|
||||
.map(|i| i + 1)
|
||||
.unwrap_or(entries.len())
|
||||
};
|
||||
let state = session_log::collect_state(&entries[..cut]);
|
||||
|
||||
let fork_id = crate::new_session_id();
|
||||
|
|
@ -429,23 +398,17 @@ pub fn fork_at(
|
|||
system_prompt: state.system_prompt,
|
||||
config: state.config,
|
||||
history: to_logged(&state.history),
|
||||
forked_from: Some(session_log::SessionOrigin {
|
||||
forked_from: Some(SessionOrigin {
|
||||
session_id: source_id,
|
||||
at_hash: at_hash.clone(),
|
||||
at_turn_index,
|
||||
}),
|
||||
compacted_from: None,
|
||||
};
|
||||
let hash = session_log::compute_hash(None, &entry);
|
||||
let hashed_entry = HashedEntry {
|
||||
hash,
|
||||
prev_hash: None,
|
||||
entry,
|
||||
};
|
||||
store.create_session(fork_id, &[hashed_entry])?;
|
||||
store.create_session(fork_id, &[entry])?;
|
||||
Ok(fork_id)
|
||||
}
|
||||
|
||||
/// Append a single `LogEntry`, chaining the hash and updating `head_hash`.
|
||||
/// Append a single `LogEntry`.
|
||||
///
|
||||
/// Lower-level dual of the `save_*` convenience wrappers in this module.
|
||||
/// Use when the caller already builds the typed entry itself (e.g. when
|
||||
|
|
@ -453,30 +416,7 @@ pub fn fork_at(
|
|||
pub fn append_entry(
|
||||
store: &impl Store,
|
||||
session_id: SessionId,
|
||||
head_hash: &mut Option<EntryHash>,
|
||||
entry: LogEntry,
|
||||
) -> Result<(), StoreError> {
|
||||
append_entry_with_hash(store, session_id, head_hash, entry)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Same as [`append_entry`] but returns the freshly computed entry hash.
|
||||
///
|
||||
/// Used by paths that need the hash for downstream broadcast or mirror
|
||||
/// updates (e.g. the Pod's `SessionLogSink`).
|
||||
pub fn append_entry_with_hash(
|
||||
store: &impl Store,
|
||||
session_id: SessionId,
|
||||
head_hash: &mut Option<EntryHash>,
|
||||
entry: LogEntry,
|
||||
) -> Result<EntryHash, StoreError> {
|
||||
let hash = session_log::compute_hash(head_hash.as_ref(), &entry);
|
||||
let hashed_entry = HashedEntry {
|
||||
hash: hash.clone(),
|
||||
prev_hash: head_hash.clone(),
|
||||
entry,
|
||||
};
|
||||
store.append(session_id, &hashed_entry)?;
|
||||
*head_hash = Some(hash.clone());
|
||||
Ok(hash)
|
||||
store.append(session_id, &entry)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,89 +4,18 @@
|
|||
//! serialized as one line in a `.jsonl` file. Reading all entries and
|
||||
//! collecting them via [`collect_state`] reconstructs the full [`Worker`] state.
|
||||
//!
|
||||
//! Entries are chained via [`EntryHash`]: each [`HashedEntry`] records the hash
|
||||
//! of the previous entry, forming a tamper-evident append-only chain. This
|
||||
//! enables safe fork detection when multiple writers share a session.
|
||||
//! The on-disk format is one `LogEntry` per line — entries are positionally
|
||||
//! ordered. Fork lineage references between segments use turn-number indices
|
||||
//! (`SessionOrigin.at_turn_index`) rather than per-entry hashes.
|
||||
|
||||
use llm_worker::llm_client::types::{Item, RequestConfig};
|
||||
use llm_worker::{UsageRecord, WorkerResult};
|
||||
use protocol::{InvokeKind, ScopeRule, Segment};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use sha2::{Digest, Sha256};
|
||||
|
||||
use crate::logged_item::LoggedItem;
|
||||
use crate::system_item::SystemItem;
|
||||
|
||||
/// SHA-256 hash identifying a specific log entry in the chain.
|
||||
///
|
||||
/// Computed as `sha256(prev_hash_bytes || canonical_json(entry))`.
|
||||
/// Displayed and serialized as a lowercase hex string.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||
pub struct EntryHash([u8; 32]);
|
||||
|
||||
impl EntryHash {
|
||||
pub fn as_bytes(&self) -> &[u8; 32] {
|
||||
&self.0
|
||||
}
|
||||
|
||||
pub fn to_hex(&self) -> String {
|
||||
hex::encode(self.0)
|
||||
}
|
||||
|
||||
pub fn from_hex(s: &str) -> Result<Self, hex::FromHexError> {
|
||||
let mut buf = [0u8; 32];
|
||||
hex::decode_to_slice(s, &mut buf)?;
|
||||
Ok(Self(buf))
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for EntryHash {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.write_str(&self.to_hex())
|
||||
}
|
||||
}
|
||||
|
||||
impl Serialize for EntryHash {
|
||||
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
|
||||
serializer.serialize_str(&self.to_hex())
|
||||
}
|
||||
}
|
||||
|
||||
impl<'de> Deserialize<'de> for EntryHash {
|
||||
fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
|
||||
let s = String::deserialize(deserializer)?;
|
||||
Self::from_hex(&s).map_err(serde::de::Error::custom)
|
||||
}
|
||||
}
|
||||
|
||||
/// Compute the hash for a log entry given its predecessor's hash.
|
||||
pub fn compute_hash(prev: Option<&EntryHash>, entry: &LogEntry) -> EntryHash {
|
||||
let mut hasher = Sha256::new();
|
||||
|
||||
// Feed prev_hash bytes (32 zero bytes if None).
|
||||
match prev {
|
||||
Some(h) => hasher.update(h.as_bytes()),
|
||||
None => hasher.update([0u8; 32]),
|
||||
}
|
||||
|
||||
// Canonical JSON of the entry.
|
||||
let json = serde_json::to_string(entry).expect("LogEntry serialization cannot fail");
|
||||
hasher.update(json.as_bytes());
|
||||
|
||||
EntryHash(hasher.finalize().into())
|
||||
}
|
||||
|
||||
/// A [`LogEntry`] with hash-chain metadata.
|
||||
///
|
||||
/// This is the unit persisted to JSONL — one line per `HashedEntry`.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct HashedEntry {
|
||||
pub hash: EntryHash,
|
||||
pub prev_hash: Option<EntryHash>,
|
||||
#[serde(flatten)]
|
||||
pub entry: LogEntry,
|
||||
}
|
||||
|
||||
/// A single session log entry, serialized as one JSONL line.
|
||||
///
|
||||
/// Variants correspond to specific mutation points in `Worker`:
|
||||
|
|
@ -110,10 +39,10 @@ pub enum LogEntry {
|
|||
system_prompt: Option<String>,
|
||||
config: RequestConfig,
|
||||
history: Vec<LoggedItem>,
|
||||
/// Origin: forked from another session at a specific entry.
|
||||
/// Origin: forked from another session at a specific turn boundary.
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
forked_from: Option<SessionOrigin>,
|
||||
/// Origin: compacted from another session at a specific entry.
|
||||
/// Origin: compacted from another session at a specific turn boundary.
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
compacted_from: Option<SessionOrigin>,
|
||||
},
|
||||
|
|
@ -235,13 +164,16 @@ pub enum LogEntry {
|
|||
},
|
||||
}
|
||||
|
||||
/// Provenance reference to a parent session.
|
||||
/// Provenance reference to a parent segment.
|
||||
///
|
||||
/// `at_turn_index` is the `turn_count` value of the most recent
|
||||
/// `TurnEnd` entry preceding the split point in the source segment.
|
||||
/// A value of `0` means the split happened before any turn completed
|
||||
/// (e.g. immediately after `SessionStart`).
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub struct SessionOrigin {
|
||||
/// Session ID of the source session.
|
||||
pub session_id: crate::SessionId,
|
||||
/// Hash of the entry in the source session at the point of fork/compact.
|
||||
pub at_hash: EntryHash,
|
||||
pub at_turn_index: usize,
|
||||
}
|
||||
|
||||
/// Domain used by Pod to persist its latest effective runtime scope.
|
||||
|
|
@ -262,8 +194,10 @@ pub struct RestoredState {
|
|||
pub history: Vec<Item>,
|
||||
pub turn_count: usize,
|
||||
pub last_run_interrupted: bool,
|
||||
/// Hash of the last entry in the chain (None if empty).
|
||||
pub head_hash: Option<EntryHash>,
|
||||
/// Number of entries replayed. `0` means the session log was empty.
|
||||
/// Writers track their own append count via the same counter so
|
||||
/// `ensure_head_or_fork` can compare it with the on-disk count.
|
||||
pub entries_count: usize,
|
||||
/// LLM リクエストごとの Usage スナップショット時系列。
|
||||
/// `LogEntry::LlmUsage` を replay して時系列順に積まれる。
|
||||
/// 任意位置のトークン数推定に使う。
|
||||
|
|
@ -283,25 +217,25 @@ pub struct RestoredState {
|
|||
pub user_segments: Vec<Vec<Segment>>,
|
||||
}
|
||||
|
||||
/// Replay a sequence of hashed entries to reconstruct worker state.
|
||||
pub fn collect_state(entries: &[HashedEntry]) -> RestoredState {
|
||||
/// Replay a sequence of log entries to reconstruct worker state.
|
||||
pub fn collect_state(entries: &[LogEntry]) -> RestoredState {
|
||||
let mut state = RestoredState {
|
||||
system_prompt: None,
|
||||
config: RequestConfig::default(),
|
||||
history: Vec::new(),
|
||||
turn_count: 0,
|
||||
last_run_interrupted: false,
|
||||
head_hash: None,
|
||||
entries_count: 0,
|
||||
usage_history: Vec::new(),
|
||||
extensions: Vec::new(),
|
||||
pod_scope: None,
|
||||
user_segments: Vec::new(),
|
||||
};
|
||||
|
||||
for hashed in entries {
|
||||
state.head_hash = Some(hashed.hash.clone());
|
||||
for entry in entries {
|
||||
state.entries_count += 1;
|
||||
|
||||
match &hashed.entry {
|
||||
match entry {
|
||||
LogEntry::SessionStart {
|
||||
system_prompt,
|
||||
config,
|
||||
|
|
@ -403,26 +337,6 @@ pub fn now_millis() -> u64 {
|
|||
.as_millis() as u64
|
||||
}
|
||||
|
||||
/// Build a hash chain from plain `LogEntry` values.
|
||||
///
|
||||
/// Useful for tests and for seeding new sessions from a list of entries.
|
||||
pub fn build_chain(entries: &[LogEntry]) -> Vec<HashedEntry> {
|
||||
let mut chain = Vec::with_capacity(entries.len());
|
||||
let mut prev: Option<EntryHash> = None;
|
||||
|
||||
for entry in entries {
|
||||
let hash = compute_hash(prev.as_ref(), entry);
|
||||
chain.push(HashedEntry {
|
||||
hash: hash.clone(),
|
||||
prev_hash: prev,
|
||||
entry: entry.clone(),
|
||||
});
|
||||
prev = Some(hash);
|
||||
}
|
||||
|
||||
chain
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
|
@ -432,12 +346,12 @@ mod tests {
|
|||
let state = collect_state(&[]);
|
||||
assert!(state.history.is_empty());
|
||||
assert_eq!(state.turn_count, 0);
|
||||
assert!(state.head_hash.is_none());
|
||||
assert_eq!(state.entries_count, 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn replay_session_start_sets_initial_state() {
|
||||
let entries = build_chain(&[LogEntry::SessionStart {
|
||||
let state = collect_state(&[LogEntry::SessionStart {
|
||||
ts: 1000,
|
||||
system_prompt: Some("You are helpful.".into()),
|
||||
config: RequestConfig::default().with_max_tokens(1024),
|
||||
|
|
@ -445,16 +359,15 @@ mod tests {
|
|||
forked_from: None,
|
||||
compacted_from: None,
|
||||
}]);
|
||||
let state = collect_state(&entries);
|
||||
assert_eq!(state.system_prompt.as_deref(), Some("You are helpful."));
|
||||
assert_eq!(state.config.max_tokens, Some(1024));
|
||||
assert_eq!(state.history.len(), 1);
|
||||
assert!(state.head_hash.is_some());
|
||||
assert_eq!(state.entries_count, 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn replay_full_turn() {
|
||||
let entries = build_chain(&[
|
||||
let state = collect_state(&[
|
||||
LogEntry::SessionStart {
|
||||
ts: 1000,
|
||||
system_prompt: None,
|
||||
|
|
@ -481,7 +394,6 @@ mod tests {
|
|||
result: WorkerResult::Finished,
|
||||
},
|
||||
]);
|
||||
let state = collect_state(&entries);
|
||||
assert_eq!(state.history.len(), 2);
|
||||
assert_eq!(state.turn_count, 1);
|
||||
assert!(!state.last_run_interrupted);
|
||||
|
|
@ -489,7 +401,7 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn replay_with_tool_calls() {
|
||||
let entries = build_chain(&[
|
||||
let state = collect_state(&[
|
||||
LogEntry::SessionStart {
|
||||
ts: 1000,
|
||||
system_prompt: None,
|
||||
|
|
@ -519,7 +431,6 @@ mod tests {
|
|||
turn_count: 1,
|
||||
},
|
||||
]);
|
||||
let state = collect_state(&entries);
|
||||
assert_eq!(state.history.len(), 4);
|
||||
assert!(state.history[1].is_tool_call());
|
||||
assert!(state.history[2].is_tool_result());
|
||||
|
|
@ -527,7 +438,7 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn replay_config_changed() {
|
||||
let entries = build_chain(&[
|
||||
let state = collect_state(&[
|
||||
LogEntry::SessionStart {
|
||||
ts: 1000,
|
||||
system_prompt: None,
|
||||
|
|
@ -541,50 +452,12 @@ mod tests {
|
|||
config: RequestConfig::default().with_temperature(0.5),
|
||||
},
|
||||
]);
|
||||
let state = collect_state(&entries);
|
||||
assert_eq!(state.config.temperature, Some(0.5));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn hash_chain_is_deterministic() {
|
||||
let raw = vec![
|
||||
LogEntry::SessionStart {
|
||||
ts: 1000,
|
||||
system_prompt: None,
|
||||
config: RequestConfig::default(),
|
||||
history: vec![],
|
||||
forked_from: None,
|
||||
compacted_from: None,
|
||||
},
|
||||
LogEntry::UserInput {
|
||||
ts: 2000,
|
||||
segments: vec![Segment::text("Hello")],
|
||||
},
|
||||
];
|
||||
let chain_a = build_chain(&raw);
|
||||
let chain_b = build_chain(&raw);
|
||||
assert_eq!(chain_a[0].hash, chain_b[0].hash);
|
||||
assert_eq!(chain_a[1].hash, chain_b[1].hash);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn different_content_produces_different_hash() {
|
||||
let entry_a = LogEntry::UserInput {
|
||||
ts: 1000,
|
||||
segments: vec![Segment::text("Hello")],
|
||||
};
|
||||
let entry_b = LogEntry::UserInput {
|
||||
ts: 1000,
|
||||
segments: vec![Segment::text("World")],
|
||||
};
|
||||
let hash_a = compute_hash(None, &entry_a);
|
||||
let hash_b = compute_hash(None, &entry_b);
|
||||
assert_ne!(hash_a, hash_b);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn replay_llm_usage_appends_to_usage_history() {
|
||||
let entries = build_chain(&[
|
||||
let state = collect_state(&[
|
||||
LogEntry::SessionStart {
|
||||
ts: 1000,
|
||||
system_prompt: None,
|
||||
|
|
@ -618,7 +491,6 @@ mod tests {
|
|||
output_tokens: 5,
|
||||
},
|
||||
]);
|
||||
let state = collect_state(&entries);
|
||||
// history は LlmUsage で変化しない
|
||||
assert_eq!(state.history.len(), 2);
|
||||
// usage_history は時系列順
|
||||
|
|
@ -631,8 +503,7 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn replay_without_llm_usage_keeps_usage_history_empty() {
|
||||
// 既存ログ互換: LlmUsage entry が無くても collect_state は壊れない
|
||||
let entries = build_chain(&[
|
||||
let state = collect_state(&[
|
||||
LogEntry::SessionStart {
|
||||
ts: 1000,
|
||||
system_prompt: None,
|
||||
|
|
@ -646,7 +517,6 @@ mod tests {
|
|||
segments: vec![Segment::text("hi")],
|
||||
},
|
||||
]);
|
||||
let state = collect_state(&entries);
|
||||
assert!(state.usage_history.is_empty());
|
||||
}
|
||||
|
||||
|
|
@ -704,7 +574,7 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn replay_invoke_marker_does_not_mutate_state() {
|
||||
let entries = build_chain(&[
|
||||
let state = collect_state(&[
|
||||
LogEntry::SessionStart {
|
||||
ts: 0,
|
||||
system_prompt: None,
|
||||
|
|
@ -730,14 +600,13 @@ mod tests {
|
|||
trigger: InvokeKind::Notify,
|
||||
},
|
||||
]);
|
||||
let state = collect_state(&entries);
|
||||
assert_eq!(state.history.len(), 1);
|
||||
assert_eq!(state.turn_count, 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn replay_extension_collects_domain_payload_pairs() {
|
||||
let entries = build_chain(&[
|
||||
let state = collect_state(&[
|
||||
LogEntry::SessionStart {
|
||||
ts: 1000,
|
||||
system_prompt: None,
|
||||
|
|
@ -762,7 +631,6 @@ mod tests {
|
|||
payload: serde_json::json!({ "x": 1 }),
|
||||
},
|
||||
]);
|
||||
let state = collect_state(&entries);
|
||||
// 順序保持で全件積まれる。fold は呼び出し側の責務。
|
||||
assert_eq!(state.extensions.len(), 3);
|
||||
assert_eq!(state.extensions[0].0, "memory.extract");
|
||||
|
|
@ -794,22 +662,6 @@ mod tests {
|
|||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn hash_hex_round_trip() {
|
||||
let entry = LogEntry::SessionStart {
|
||||
ts: 1000,
|
||||
system_prompt: None,
|
||||
config: RequestConfig::default(),
|
||||
history: vec![],
|
||||
forked_from: None,
|
||||
compacted_from: None,
|
||||
};
|
||||
let hash = compute_hash(None, &entry);
|
||||
let hex = hash.to_hex();
|
||||
let parsed = EntryHash::from_hex(&hex).unwrap();
|
||||
assert_eq!(hash, parsed);
|
||||
}
|
||||
|
||||
/// Mixed segments survive a JSON round-trip through `LogEntry::UserInput`,
|
||||
/// and `collect_state` derives `Item::user_message` from the flattened
|
||||
/// text while preserving the original segments separately. This covers
|
||||
|
|
@ -834,10 +686,10 @@ mod tests {
|
|||
ts: 4242,
|
||||
segments: segments.clone(),
|
||||
};
|
||||
// Hash + JSON round-trip preserves the variant byte-for-byte.
|
||||
// JSON round-trip preserves the variant byte-for-byte.
|
||||
let json = serde_json::to_string(&entry).unwrap();
|
||||
let parsed: LogEntry = serde_json::from_str(&json).unwrap();
|
||||
let entries = build_chain(&[
|
||||
let state = collect_state(&[
|
||||
LogEntry::SessionStart {
|
||||
ts: 1,
|
||||
system_prompt: None,
|
||||
|
|
@ -848,7 +700,6 @@ mod tests {
|
|||
},
|
||||
parsed,
|
||||
]);
|
||||
let state = collect_state(&entries);
|
||||
// Worker history gets a flattened user_message item.
|
||||
assert_eq!(state.history.len(), 1);
|
||||
match &state.history[0] {
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@
|
|||
|
||||
use crate::SessionId;
|
||||
use crate::event_trace::TraceEntry;
|
||||
use crate::session_log::{EntryHash, HashedEntry};
|
||||
use crate::session_log::LogEntry;
|
||||
|
||||
/// Errors from the persistence store.
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
|
|
@ -35,25 +35,30 @@ pub enum StoreError {
|
|||
/// All methods take `&self` — implementations should use interior mutability
|
||||
/// (e.g., append-mode file handles) when needed.
|
||||
pub trait Store: Send + Sync {
|
||||
/// Append a single hashed entry to the session log.
|
||||
fn append(&self, id: SessionId, entry: &HashedEntry) -> Result<(), StoreError>;
|
||||
/// Append a single log entry to the session log.
|
||||
///
|
||||
/// One line per call. The kernel orders concurrent `O_APPEND` writes
|
||||
/// for lines < `PIPE_BUF`, so user-space serialization is unnecessary.
|
||||
fn append(&self, id: SessionId, entry: &LogEntry) -> Result<(), StoreError>;
|
||||
|
||||
/// Read all hashed entries for a session, in order.
|
||||
fn read_all(&self, id: SessionId) -> Result<Vec<HashedEntry>, StoreError>;
|
||||
/// Read all log entries for a session, in order.
|
||||
fn read_all(&self, id: SessionId) -> Result<Vec<LogEntry>, StoreError>;
|
||||
|
||||
/// List all session IDs, most recent first.
|
||||
fn list_sessions(&self) -> Result<Vec<SessionId>, StoreError>;
|
||||
|
||||
/// Create a new session with initial entries.
|
||||
fn create_session(&self, id: SessionId, entries: &[HashedEntry]) -> Result<(), StoreError>;
|
||||
fn create_session(&self, id: SessionId, entries: &[LogEntry]) -> Result<(), StoreError>;
|
||||
|
||||
/// Check if a session exists.
|
||||
fn exists(&self, id: SessionId) -> Result<bool, StoreError>;
|
||||
|
||||
/// Read the hash of the last entry in a session (the head).
|
||||
/// Count entries currently stored for a session.
|
||||
///
|
||||
/// Returns `None` if the session is empty.
|
||||
fn read_head_hash(&self, id: SessionId) -> Result<Option<EntryHash>, StoreError>;
|
||||
/// Used by `ensure_head_or_fork` to detect concurrent writers:
|
||||
/// if the on-disk count exceeds the writer's own append tally,
|
||||
/// another process has extended the log.
|
||||
fn read_entry_count(&self, id: SessionId) -> Result<usize, StoreError>;
|
||||
|
||||
/// Append a trace entry to the debug event trace file.
|
||||
fn append_trace(&self, id: SessionId, entry: &TraceEntry) -> Result<(), StoreError>;
|
||||
|
|
|
|||
|
|
@ -1,8 +1,6 @@
|
|||
use llm_worker::WorkerResult;
|
||||
use llm_worker::llm_client::types::{Item, RequestConfig};
|
||||
use session_store::{
|
||||
FsStore, LogEntry, Store, TraceEntry, build_chain, collect_state, new_session_id,
|
||||
};
|
||||
use session_store::{FsStore, LogEntry, Store, TraceEntry, collect_state, new_session_id};
|
||||
|
||||
#[test]
|
||||
fn round_trip_write_and_read() {
|
||||
|
|
@ -10,7 +8,7 @@ fn round_trip_write_and_read() {
|
|||
let store = FsStore::new(dir.path()).unwrap();
|
||||
let id = new_session_id();
|
||||
|
||||
let raw = vec![
|
||||
let entries = vec![
|
||||
LogEntry::SessionStart {
|
||||
ts: 1000,
|
||||
system_prompt: Some("You are helpful.".into()),
|
||||
|
|
@ -37,31 +35,21 @@ fn round_trip_write_and_read() {
|
|||
result: WorkerResult::Finished,
|
||||
},
|
||||
];
|
||||
let entries = build_chain(&raw);
|
||||
|
||||
// Write entries one by one
|
||||
for entry in &entries {
|
||||
store.append(id, entry).unwrap();
|
||||
}
|
||||
|
||||
// Read back
|
||||
let read_back = store.read_all(id).unwrap();
|
||||
assert_eq!(read_back.len(), entries.len());
|
||||
|
||||
// Verify hashes survived round-trip
|
||||
for (orig, read) in entries.iter().zip(read_back.iter()) {
|
||||
assert_eq!(orig.hash, read.hash);
|
||||
assert_eq!(orig.prev_hash, read.prev_hash);
|
||||
}
|
||||
|
||||
// Replay and verify state
|
||||
let state = collect_state(&read_back);
|
||||
assert_eq!(state.system_prompt.as_deref(), Some("You are helpful."));
|
||||
assert_eq!(state.config.max_tokens, Some(1024));
|
||||
assert_eq!(state.history.len(), 2);
|
||||
assert_eq!(state.turn_count, 1);
|
||||
assert!(!state.last_run_interrupted);
|
||||
assert!(state.head_hash.is_some());
|
||||
assert_eq!(state.entries_count, entries.len());
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
@ -70,7 +58,7 @@ fn create_session_writes_all_entries() {
|
|||
let store = FsStore::new(dir.path()).unwrap();
|
||||
let id = new_session_id();
|
||||
|
||||
let entries = build_chain(&[LogEntry::SessionStart {
|
||||
let entries = [LogEntry::SessionStart {
|
||||
ts: 1000,
|
||||
system_prompt: None,
|
||||
config: RequestConfig::default(),
|
||||
|
|
@ -80,7 +68,7 @@ fn create_session_writes_all_entries() {
|
|||
],
|
||||
forked_from: None,
|
||||
compacted_from: None,
|
||||
}]);
|
||||
}];
|
||||
|
||||
store.create_session(id, &entries).unwrap();
|
||||
let read_back = store.read_all(id).unwrap();
|
||||
|
|
@ -100,25 +88,17 @@ fn list_sessions_returns_newest_first() {
|
|||
std::thread::sleep(std::time::Duration::from_millis(2));
|
||||
let id2 = new_session_id();
|
||||
|
||||
let entries1 = build_chain(&[LogEntry::SessionStart {
|
||||
let entry = LogEntry::SessionStart {
|
||||
ts: 1000,
|
||||
system_prompt: None,
|
||||
config: RequestConfig::default(),
|
||||
history: vec![],
|
||||
forked_from: None,
|
||||
compacted_from: None,
|
||||
}]);
|
||||
let entries2 = build_chain(&[LogEntry::SessionStart {
|
||||
ts: 1001,
|
||||
system_prompt: None,
|
||||
config: RequestConfig::default(),
|
||||
history: vec![],
|
||||
forked_from: None,
|
||||
compacted_from: None,
|
||||
}]);
|
||||
};
|
||||
|
||||
store.append(id1, &entries1[0]).unwrap();
|
||||
store.append(id2, &entries2[0]).unwrap();
|
||||
store.append(id1, &entry).unwrap();
|
||||
store.append(id2, &entry).unwrap();
|
||||
|
||||
let sessions = store.list_sessions().unwrap();
|
||||
assert_eq!(sessions.len(), 2);
|
||||
|
|
@ -134,15 +114,19 @@ fn exists_returns_correct_state() {
|
|||
|
||||
assert!(!store.exists(id).unwrap());
|
||||
|
||||
let entries = build_chain(&[LogEntry::SessionStart {
|
||||
ts: 1000,
|
||||
system_prompt: None,
|
||||
config: RequestConfig::default(),
|
||||
history: vec![],
|
||||
forked_from: None,
|
||||
compacted_from: None,
|
||||
}]);
|
||||
store.append(id, &entries[0]).unwrap();
|
||||
store
|
||||
.append(
|
||||
id,
|
||||
&LogEntry::SessionStart {
|
||||
ts: 1000,
|
||||
system_prompt: None,
|
||||
config: RequestConfig::default(),
|
||||
history: vec![],
|
||||
forked_from: None,
|
||||
compacted_from: None,
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
assert!(store.exists(id).unwrap());
|
||||
}
|
||||
|
|
@ -163,18 +147,20 @@ fn trace_entries_in_separate_file() {
|
|||
let store = FsStore::new(dir.path()).unwrap();
|
||||
let id = new_session_id();
|
||||
|
||||
// Write a log entry
|
||||
let entries = build_chain(&[LogEntry::SessionStart {
|
||||
ts: 1000,
|
||||
system_prompt: None,
|
||||
config: RequestConfig::default(),
|
||||
history: vec![],
|
||||
forked_from: None,
|
||||
compacted_from: None,
|
||||
}]);
|
||||
store.append(id, &entries[0]).unwrap();
|
||||
store
|
||||
.append(
|
||||
id,
|
||||
&LogEntry::SessionStart {
|
||||
ts: 1000,
|
||||
system_prompt: None,
|
||||
config: RequestConfig::default(),
|
||||
history: vec![],
|
||||
forked_from: None,
|
||||
compacted_from: None,
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// Write a trace entry
|
||||
let trace = TraceEntry {
|
||||
ts: 1500,
|
||||
turn: 0,
|
||||
|
|
@ -194,12 +180,12 @@ fn trace_entries_in_separate_file() {
|
|||
}
|
||||
|
||||
#[test]
|
||||
fn read_head_hash_returns_last_entry_hash() {
|
||||
fn read_entry_count_matches_append_tally() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let store = FsStore::new(dir.path()).unwrap();
|
||||
let id = new_session_id();
|
||||
|
||||
let entries = build_chain(&[
|
||||
let entries = [
|
||||
LogEntry::SessionStart {
|
||||
ts: 1000,
|
||||
system_prompt: None,
|
||||
|
|
@ -212,12 +198,11 @@ fn read_head_hash_returns_last_entry_hash() {
|
|||
ts: 2000,
|
||||
segments: vec![protocol::Segment::text("Hello")],
|
||||
},
|
||||
]);
|
||||
];
|
||||
|
||||
for entry in &entries {
|
||||
store.append(id, entry).unwrap();
|
||||
}
|
||||
|
||||
let head = store.read_head_hash(id).unwrap();
|
||||
assert_eq!(head.as_ref(), Some(&entries[1].hash));
|
||||
assert_eq!(store.read_entry_count(id).unwrap(), entries.len());
|
||||
}
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@ use llm_worker::interceptor::{Interceptor, TurnEndAction};
|
|||
use llm_worker::llm_client::event::{Event, ResponseStatus, StatusEvent};
|
||||
use llm_worker::llm_client::types::{Item, RequestConfig};
|
||||
use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput};
|
||||
use session_store::{EntryHash, FsStore, LogEntry, SessionStartState, Store, collect_state};
|
||||
use session_store::{FsStore, LogEntry, SessionStartState, Store, collect_state};
|
||||
|
||||
// =============================================================================
|
||||
// Helpers
|
||||
|
|
@ -96,20 +96,13 @@ async fn run_and_persist(
|
|||
worker: Worker<MockLlmClient>,
|
||||
store: &FsStore,
|
||||
session_id: session_store::SessionId,
|
||||
head_hash: &mut Option<EntryHash>,
|
||||
input: &str,
|
||||
) -> (Worker<MockLlmClient>, llm_worker::WorkerResult) {
|
||||
// Mirror Pod's run-entry contract: log the user input as segments
|
||||
// before the worker pushes its flattened user_message; save_delta
|
||||
// skips the resulting user_message item to avoid double-write.
|
||||
session_store::save_user_input(
|
||||
store,
|
||||
session_id,
|
||||
head_hash,
|
||||
vec![protocol::Segment::text(input)],
|
||||
)
|
||||
|
||||
.unwrap();
|
||||
session_store::save_user_input(store, session_id, vec![protocol::Segment::text(input)])
|
||||
.unwrap();
|
||||
|
||||
let history_before = worker.history().len();
|
||||
|
||||
|
|
@ -118,34 +111,26 @@ async fn run_and_persist(
|
|||
let worker = locked.unlock();
|
||||
|
||||
let new_items = &worker.history()[history_before..];
|
||||
session_store::save_delta(store, session_id, head_hash, new_items)
|
||||
|
||||
.unwrap();
|
||||
session_store::save_turn_end(store, session_id, head_hash, worker.turn_count())
|
||||
|
||||
.unwrap();
|
||||
session_store::save_delta(store, session_id, new_items).unwrap();
|
||||
session_store::save_turn_end(store, session_id, worker.turn_count()).unwrap();
|
||||
|
||||
match &result {
|
||||
Ok(r) => {
|
||||
session_store::save_run_completed(
|
||||
store,
|
||||
session_id,
|
||||
head_hash,
|
||||
r.clone(),
|
||||
worker.last_run_interrupted(),
|
||||
)
|
||||
|
||||
.unwrap();
|
||||
}
|
||||
Err(e) => {
|
||||
session_store::save_run_errored(
|
||||
store,
|
||||
session_id,
|
||||
head_hash,
|
||||
e.to_string(),
|
||||
worker.last_run_interrupted(),
|
||||
)
|
||||
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
|
|
@ -164,7 +149,7 @@ async fn session_run_logs_entries() {
|
|||
let client = MockLlmClient::new(simple_text_events());
|
||||
let worker = Worker::new(client);
|
||||
|
||||
let (sid, head_hash) = session_store::create_session(
|
||||
let sid = session_store::create_session(
|
||||
&store,
|
||||
SessionStartState {
|
||||
system_prompt: worker.get_system_prompt(),
|
||||
|
|
@ -172,11 +157,9 @@ async fn session_run_logs_entries() {
|
|||
history: worker.history(),
|
||||
},
|
||||
)
|
||||
|
||||
.unwrap();
|
||||
|
||||
let mut head_hash = Some(head_hash);
|
||||
let (worker, _) = run_and_persist(worker, &store, sid, &mut head_hash, "Hi").await;
|
||||
let (worker, _) = run_and_persist(worker, &store, sid, "Hi").await;
|
||||
let _ = &worker;
|
||||
|
||||
let entries = store.read_all(sid).unwrap();
|
||||
|
|
@ -189,12 +172,12 @@ async fn session_run_logs_entries() {
|
|||
);
|
||||
|
||||
// First entry is SessionStart
|
||||
assert!(matches!(&entries[0].entry, LogEntry::SessionStart { .. }));
|
||||
assert!(matches!(&entries[0], LogEntry::SessionStart { .. }));
|
||||
|
||||
// Has a RunCompleted with Finished
|
||||
let has_finished = entries.iter().any(|e| {
|
||||
matches!(
|
||||
&e.entry,
|
||||
e,
|
||||
LogEntry::RunCompleted {
|
||||
result: llm_worker::WorkerResult::Finished,
|
||||
..
|
||||
|
|
@ -202,17 +185,6 @@ async fn session_run_logs_entries() {
|
|||
)
|
||||
});
|
||||
assert!(has_finished, "should have a Finished outcome");
|
||||
|
||||
// Verify hash chain integrity
|
||||
assert!(entries[0].prev_hash.is_none());
|
||||
for i in 1..entries.len() {
|
||||
assert_eq!(
|
||||
entries[i].prev_hash.as_ref(),
|
||||
Some(&entries[i - 1].hash),
|
||||
"hash chain broken at entry {}",
|
||||
i
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
|
@ -222,7 +194,7 @@ async fn session_restore_round_trip() {
|
|||
let mut worker = Worker::new(client);
|
||||
worker.set_system_prompt("You are helpful.");
|
||||
|
||||
let (sid, head_hash) = session_store::create_session(
|
||||
let sid = session_store::create_session(
|
||||
&store,
|
||||
SessionStartState {
|
||||
system_prompt: worker.get_system_prompt(),
|
||||
|
|
@ -230,11 +202,9 @@ async fn session_restore_round_trip() {
|
|||
history: worker.history(),
|
||||
},
|
||||
)
|
||||
|
||||
.unwrap();
|
||||
let mut head_hash = Some(head_hash);
|
||||
|
||||
let (worker, _) = run_and_persist(worker, &store, sid, &mut head_hash, "Hi").await;
|
||||
let (worker, _) = run_and_persist(worker, &store, sid, "Hi").await;
|
||||
|
||||
let original_history_len = worker.history().len();
|
||||
let original_turn_count = worker.turn_count();
|
||||
|
|
@ -245,7 +215,7 @@ async fn session_restore_round_trip() {
|
|||
assert_eq!(state.history.len(), original_history_len);
|
||||
assert_eq!(state.turn_count, original_turn_count);
|
||||
assert_eq!(state.system_prompt.as_deref(), Some("You are helpful."));
|
||||
assert_eq!(state.head_hash, head_hash);
|
||||
assert_eq!(state.entries_count, store.read_entry_count(sid).unwrap());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
|
@ -255,7 +225,7 @@ async fn session_run_with_tool_call() {
|
|||
let mut worker = Worker::new(client);
|
||||
worker.register_tool(weather_tool_definition());
|
||||
|
||||
let (sid, head_hash) = session_store::create_session(
|
||||
let sid = session_store::create_session(
|
||||
&store,
|
||||
SessionStartState {
|
||||
system_prompt: worker.get_system_prompt(),
|
||||
|
|
@ -263,23 +233,20 @@ async fn session_run_with_tool_call() {
|
|||
history: worker.history(),
|
||||
},
|
||||
)
|
||||
|
||||
.unwrap();
|
||||
let mut head_hash = Some(head_hash);
|
||||
|
||||
let (_worker, _) =
|
||||
run_and_persist(worker, &store, sid, &mut head_hash, "What's the weather?").await;
|
||||
let (_worker, _) = run_and_persist(worker, &store, sid, "What's the weather?").await;
|
||||
|
||||
let entries = store.read_all(sid).unwrap();
|
||||
|
||||
let has_tool_results = entries
|
||||
.iter()
|
||||
.any(|e| matches!(&e.entry, LogEntry::ToolResult { .. }));
|
||||
.any(|e| matches!(e, LogEntry::ToolResult { .. }));
|
||||
assert!(has_tool_results, "should have ToolResult entry");
|
||||
|
||||
let has_assistant = entries
|
||||
.iter()
|
||||
.any(|e| matches!(&e.entry, LogEntry::AssistantItem { .. }));
|
||||
.any(|e| matches!(e, LogEntry::AssistantItem { .. }));
|
||||
assert!(has_assistant, "should have AssistantItem entry");
|
||||
}
|
||||
|
||||
|
|
@ -293,7 +260,7 @@ async fn session_resume_after_pause() {
|
|||
worker.register_tool(weather_tool_definition());
|
||||
worker.set_interceptor(PausePolicy);
|
||||
|
||||
let (sid, head_hash) = session_store::create_session(
|
||||
let sid = session_store::create_session(
|
||||
&store,
|
||||
SessionStartState {
|
||||
system_prompt: worker.get_system_prompt(),
|
||||
|
|
@ -301,18 +268,16 @@ async fn session_resume_after_pause() {
|
|||
history: worker.history(),
|
||||
},
|
||||
)
|
||||
|
||||
.unwrap();
|
||||
let mut head_hash = Some(head_hash);
|
||||
|
||||
let (_worker, result) = run_and_persist(worker, &store, sid, &mut head_hash, "Weather?").await;
|
||||
let (_worker, result) = run_and_persist(worker, &store, sid, "Weather?").await;
|
||||
assert!(matches!(result, llm_worker::WorkerResult::Paused));
|
||||
|
||||
// Check RunCompleted is Paused
|
||||
let entries = store.read_all(sid).unwrap();
|
||||
let has_paused = entries.iter().any(|e| {
|
||||
matches!(
|
||||
&e.entry,
|
||||
e,
|
||||
LogEntry::RunCompleted {
|
||||
result: llm_worker::WorkerResult::Paused,
|
||||
..
|
||||
|
|
@ -333,7 +298,7 @@ async fn session_fork_preserves_state() {
|
|||
let mut worker = Worker::new(client);
|
||||
worker.set_system_prompt("System prompt");
|
||||
|
||||
let (sid, head_hash) = session_store::create_session(
|
||||
let sid = session_store::create_session(
|
||||
&store,
|
||||
SessionStartState {
|
||||
system_prompt: worker.get_system_prompt(),
|
||||
|
|
@ -341,11 +306,9 @@ async fn session_fork_preserves_state() {
|
|||
history: worker.history(),
|
||||
},
|
||||
)
|
||||
|
||||
.unwrap();
|
||||
let mut head_hash = Some(head_hash);
|
||||
|
||||
let (worker, _) = run_and_persist(worker, &store, sid, &mut head_hash, "Hello").await;
|
||||
let (worker, _) = run_and_persist(worker, &store, sid, "Hello").await;
|
||||
|
||||
let original_history_len = worker.history().len();
|
||||
let fork_id = session_store::fork(
|
||||
|
|
@ -356,16 +319,12 @@ async fn session_fork_preserves_state() {
|
|||
history: worker.history(),
|
||||
},
|
||||
)
|
||||
|
||||
.unwrap();
|
||||
|
||||
// Fork should have a SessionStart with the current history
|
||||
let fork_entries = store.read_all(fork_id).unwrap();
|
||||
assert_eq!(fork_entries.len(), 1);
|
||||
assert!(matches!(
|
||||
&fork_entries[0].entry,
|
||||
LogEntry::SessionStart { .. }
|
||||
));
|
||||
assert!(matches!(&fork_entries[0], LogEntry::SessionStart { .. }));
|
||||
|
||||
let fork_state = collect_state(&fork_entries);
|
||||
assert_eq!(fork_state.history.len(), original_history_len);
|
||||
|
|
@ -378,7 +337,7 @@ async fn session_fork_at_truncates() {
|
|||
let client = MockLlmClient::new(simple_text_events());
|
||||
let worker = Worker::new(client);
|
||||
|
||||
let (sid, head_hash) = session_store::create_session(
|
||||
let sid = session_store::create_session(
|
||||
&store,
|
||||
SessionStartState {
|
||||
system_prompt: worker.get_system_prompt(),
|
||||
|
|
@ -386,29 +345,28 @@ async fn session_fork_at_truncates() {
|
|||
history: worker.history(),
|
||||
},
|
||||
)
|
||||
|
||||
.unwrap();
|
||||
let mut head_hash = Some(head_hash);
|
||||
|
||||
let (_worker, _) = run_and_persist(worker, &store, sid, &mut head_hash, "Hello").await;
|
||||
let (worker, _) = run_and_persist(worker, &store, sid, "Hello").await;
|
||||
|
||||
let all_entries = store.read_all(sid).unwrap();
|
||||
assert!(all_entries.len() > 2);
|
||||
|
||||
// Fork at the hash of the 2nd entry (SessionStart + UserInput)
|
||||
let at_hash = &all_entries[1].hash;
|
||||
let fork_id = session_store::fork_at(&store, sid, at_hash).unwrap();
|
||||
// Fork at turn 1 (one completed turn).
|
||||
let fork_id = session_store::fork_at(&store, sid, worker.turn_count()).unwrap();
|
||||
|
||||
let fork_entries = store.read_all(fork_id).unwrap();
|
||||
assert_eq!(fork_entries.len(), 1); // Just the new SessionStart
|
||||
|
||||
let fork_state = collect_state(&fork_entries);
|
||||
// Should have the state from replaying only the first 2 entries
|
||||
let original_truncated_state = collect_state(&all_entries[..2]);
|
||||
assert_eq!(
|
||||
fork_state.history.len(),
|
||||
original_truncated_state.history.len()
|
||||
);
|
||||
// History at fork point should match history right after the TurnEnd in
|
||||
// the source session.
|
||||
let turn_end_pos = all_entries
|
||||
.iter()
|
||||
.position(|e| matches!(e, LogEntry::TurnEnd { turn_count, .. } if *turn_count == worker.turn_count()))
|
||||
.expect("source session has the matching TurnEnd");
|
||||
let source_state_at_fork = collect_state(&all_entries[..=turn_end_pos]);
|
||||
assert_eq!(fork_state.history.len(), source_state_at_fork.history.len());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
|
@ -417,7 +375,7 @@ async fn session_config_changed_logged() {
|
|||
let client = MockLlmClient::new(vec![]);
|
||||
let mut worker = Worker::new(client);
|
||||
|
||||
let (sid, head_hash) = session_store::create_session(
|
||||
let sid = session_store::create_session(
|
||||
&store,
|
||||
SessionStartState {
|
||||
system_prompt: worker.get_system_prompt(),
|
||||
|
|
@ -425,21 +383,17 @@ async fn session_config_changed_logged() {
|
|||
history: worker.history(),
|
||||
},
|
||||
)
|
||||
|
||||
.unwrap();
|
||||
let mut head_hash = Some(head_hash);
|
||||
|
||||
// Modify config and log it
|
||||
let new_config = RequestConfig::default().with_temperature(0.7);
|
||||
worker.set_request_config(new_config.clone());
|
||||
session_store::save_config_changed(&store, sid, &mut head_hash, &new_config)
|
||||
|
||||
.unwrap();
|
||||
session_store::save_config_changed(&store, sid, &new_config).unwrap();
|
||||
|
||||
let entries = store.read_all(sid).unwrap();
|
||||
let has_config_changed = entries.iter().any(|e| {
|
||||
matches!(
|
||||
&e.entry,
|
||||
e,
|
||||
LogEntry::ConfigChanged { config, .. } if config.temperature == Some(0.7)
|
||||
)
|
||||
});
|
||||
|
|
@ -454,7 +408,7 @@ async fn session_auto_forks_on_conflict() {
|
|||
let client_a = MockLlmClient::new(simple_text_events());
|
||||
let worker_a = Worker::new(client_a);
|
||||
|
||||
let (original_sid, head_hash) = session_store::create_session(
|
||||
let original_sid = session_store::create_session(
|
||||
&store,
|
||||
SessionStartState {
|
||||
system_prompt: worker_a.get_system_prompt(),
|
||||
|
|
@ -462,37 +416,29 @@ async fn session_auto_forks_on_conflict() {
|
|||
history: worker_a.history(),
|
||||
},
|
||||
)
|
||||
|
||||
.unwrap();
|
||||
let mut session_id = original_sid;
|
||||
let mut head_hash = Some(head_hash);
|
||||
// Writer tracked: just the SessionStart we wrote.
|
||||
let mut entries_written: usize = 1;
|
||||
|
||||
// Simulate another Pod writing to the same session behind our back
|
||||
// Simulate another Pod writing to the same session behind our back.
|
||||
let extra_entry = LogEntry::UserInput {
|
||||
ts: 9999,
|
||||
segments: vec![protocol::Segment::text("Interloper")],
|
||||
};
|
||||
let current_head = store.read_head_hash(original_sid).unwrap();
|
||||
let hash = session_store::compute_hash(current_head.as_ref(), &extra_entry);
|
||||
let hashed = session_store::HashedEntry {
|
||||
hash,
|
||||
prev_hash: current_head,
|
||||
entry: extra_entry,
|
||||
};
|
||||
store.append(original_sid, &hashed).unwrap();
|
||||
store.append(original_sid, &extra_entry).unwrap();
|
||||
|
||||
// Now head_hash is stale — ensure_head_or_fork should auto-fork
|
||||
// Now the on-disk count exceeds our tally — ensure_head_or_fork should auto-fork.
|
||||
session_store::ensure_head_or_fork(
|
||||
&store,
|
||||
&mut session_id,
|
||||
&mut head_hash,
|
||||
&mut entries_written,
|
||||
SessionStartState {
|
||||
system_prompt: worker_a.get_system_prompt(),
|
||||
config: worker_a.request_config(),
|
||||
history: worker_a.history(),
|
||||
},
|
||||
)
|
||||
|
||||
.unwrap();
|
||||
|
||||
// session_id should now be different
|
||||
|
|
@ -506,6 +452,6 @@ async fn session_auto_forks_on_conflict() {
|
|||
let original_entries = store.read_all(original_sid).unwrap();
|
||||
let has_interloper = original_entries
|
||||
.iter()
|
||||
.any(|e| matches!(&e.entry, LogEntry::UserInput { .. }));
|
||||
.any(|e| matches!(e, LogEntry::UserInput { .. }));
|
||||
assert!(has_interloper);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -20,9 +20,7 @@ use ratatui::style::{Color, Modifier, Style};
|
|||
use ratatui::text::{Line, Span};
|
||||
use ratatui::widgets::Paragraph;
|
||||
use ratatui::{Frame, TerminalOptions, Viewport};
|
||||
use session_store::{
|
||||
FsStore, HashedEntry, LogEntry, LoggedContentPart, LoggedItem, SessionId, Store,
|
||||
};
|
||||
use session_store::{FsStore, LogEntry, LoggedContentPart, LoggedItem, SessionId, Store};
|
||||
|
||||
const MAX_ROWS: usize = 10;
|
||||
const VIEWPORT_LINES: u16 = MAX_ROWS as u16 + 4;
|
||||
|
|
@ -170,9 +168,9 @@ fn build_preview(store: &FsStore, id: SessionId) -> String {
|
|||
/// Walk the log from the tail looking for the most recent user-message
|
||||
/// or assistant-message entry, then render its first text fragment in
|
||||
/// a single line.
|
||||
fn last_message_preview(entries: &[HashedEntry]) -> Option<String> {
|
||||
for hashed in entries.iter().rev() {
|
||||
match &hashed.entry {
|
||||
fn last_message_preview(entries: &[LogEntry]) -> Option<String> {
|
||||
for entry in entries.iter().rev() {
|
||||
match entry {
|
||||
LogEntry::UserInput { segments, .. } => {
|
||||
let text = protocol::Segment::flatten_to_text(segments);
|
||||
if !text.is_empty() {
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user