update: 書き込みの不要なasyncを削除

This commit is contained in:
Keisuke Hirata 2026-05-14 19:16:48 +09:00
parent 34c89f8739
commit 988495cfea
26 changed files with 615 additions and 688 deletions

1
Cargo.lock generated
View File

@ -2160,6 +2160,7 @@ dependencies = [
"manifest", "manifest",
"memory", "memory",
"minijinja", "minijinja",
"parking_lot",
"pod-registry", "pod-registry",
"protocol", "protocol",
"provider", "provider",

View File

@ -30,6 +30,7 @@ memory = { workspace = true }
workflow-crate = { package = "workflow", path = "../workflow" } workflow-crate = { package = "workflow", path = "../workflow" }
uuid = { workspace = true, features = ["v7"] } uuid = { workspace = true, features = ["v7"] }
session-metrics = { workspace = true } session-metrics = { workspace = true }
parking_lot = "0.12.5"
[dev-dependencies] [dev-dependencies]
dotenv = "0.15.0" dotenv = "0.15.0"

View File

@ -48,7 +48,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 2. Create a persistent store (temp dir for demo) // 2. Create a persistent store (temp dir for demo)
let tmp = tempfile::tempdir()?; let tmp = tempfile::tempdir()?;
let store = FsStore::new(tmp.path()).await?; let store = FsStore::new(tmp.path())?;
// 3. Build the Pod from the single-layer manifest TOML // 3. Build the Pod from the single-layer manifest TOML
let mut pod = Pod::from_manifest_toml(&toml, store).await?; let mut pod = Pod::from_manifest_toml(&toml, store).await?;

View File

@ -39,7 +39,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let pwd = std::env::current_dir()?; let pwd = std::env::current_dir()?;
let toml = manifest_toml(&pwd); let toml = manifest_toml(&pwd);
let tmp = tempfile::tempdir()?; let tmp = tempfile::tempdir()?;
let store = FsStore::new(tmp.path()).await?; let store = FsStore::new(tmp.path())?;
let pod = pod::Pod::from_manifest_toml(&toml, store).await?; let pod = pod::Pod::from_manifest_toml(&toml, store).await?;
let runtime_tmp = tempfile::tempdir()?; let runtime_tmp = tempfile::tempdir()?;

View File

@ -6,14 +6,10 @@ use llm_worker::llm_client::client::LlmClient;
use session_store::Store; use session_store::Store;
use tokio::sync::{broadcast, mpsc, oneshot}; use tokio::sync::{broadcast, mpsc, oneshot};
use llm_worker::Item;
use session_store::LogEntry;
use session_store::session_log;
use crate::ipc::alerter::Alerter; use crate::ipc::alerter::Alerter;
use crate::ipc::notify_buffer::NotifyBuffer; use crate::ipc::notify_buffer::NotifyBuffer;
use crate::ipc::server::SocketServer; use crate::ipc::server::SocketServer;
use crate::pod::{LogCommand, LogDrainHandle, Pod, PodError, PodRunResult}; use crate::pod::{Pod, PodError, PodRunResult, SystemItemCommitter};
use crate::runtime::dir::RuntimeDir; use crate::runtime::dir::RuntimeDir;
use crate::session_log_sink::SessionLogSink; use crate::session_log_sink::SessionLogSink;
use crate::shared_state::PodSharedState; use crate::shared_state::PodSharedState;
@ -165,23 +161,21 @@ impl PodController {
}]) }])
.map_err(std::io::Error::other)?; .map_err(std::io::Error::other)?;
// === 1.5. Per-item history-commit drain task === // === 1.5. Direct writer wiring ===
// //
// Worker callbacks fire `on_history_append` for each assistant // Worker callbacks fire `on_history_append` for each assistant
// item / tool result / hook-injected item that lands in // item / tool result that lands in history. With the sync
// history. The drain task picks them up off an unbounded mpsc // writer in place, the callback commits each item directly
// and commits each as a typed `LogEntry` through the sink, // through a `LogWriterHandle` (no mpsc ferry, no drain task).
// serialised against the same `session_head` lock the Pod uses // The same handle is type-erased into a `SystemItemCommitter`
// for its own commits. This gives mid-turn snapshot visibility: // and handed to the interceptor for `SystemItem` commits, so
// a late-attaching client sees in-flight tool calls + completed // assistant / tool / system items all share one commit path.
// assistant blocks without waiting for the turn-end persist. let writer_for_system: Arc<dyn SystemItemCommitter> = Arc::new(pod.log_writer_handle());
let (log_cmd_tx, log_cmd_rx) = mpsc::unbounded_channel::<LogCommand>(); pod.attach_log_writer(writer_for_system);
let drain_ctx = pod.log_drain_handle(); pod.wire_history_persistence();
let _drain_task = tokio::spawn(run_log_drain(log_cmd_rx, drain_ctx));
pod.attach_log_cmd_tx(log_cmd_tx.clone());
// === 2. Worker event bridge wiring === // === 2. Worker event bridge wiring ===
wire_event_bridges_on_worker(&mut pod, &event_tx, &alerter, log_cmd_tx); wire_event_bridges_on_worker(&mut pod, &event_tx, &alerter);
// === 3. Tool registration (builtin / memory / spawn-orchestration) === // === 3. Tool registration (builtin / memory / spawn-orchestration) ===
let fs_for_view = register_pod_tools( let fs_for_view = register_pod_tools(
@ -263,29 +257,20 @@ impl PodController {
/// re-publishes a worker-level signal as a `protocol::Event` on `event_tx` /// re-publishes a worker-level signal as a `protocol::Event` on `event_tx`
/// so subscribers (TUI, socket clients) get a single typed stream. /// so subscribers (TUI, socket clients) get a single typed stream.
/// ///
/// Also wires `on_history_append` into the per-item drain channel so /// `Pod::wire_history_persistence` is called separately to wire the
/// every history append observed by the worker becomes a typed /// per-item history commit callback so every assistant / tool item
/// `LogEntry` commit (via the drain task). /// landing in `worker.history` becomes a singular `LogEntry::AssistantItem`
/// / `ToolResult` commit through the sync writer.
fn wire_event_bridges_on_worker<C, St>( fn wire_event_bridges_on_worker<C, St>(
pod: &mut Pod<C, St>, pod: &mut Pod<C, St>,
event_tx: &broadcast::Sender<Event>, event_tx: &broadcast::Sender<Event>,
alerter: &Alerter, alerter: &Alerter,
log_cmd_tx: mpsc::UnboundedSender<LogCommand>,
) where ) where
C: LlmClient + Clone + 'static, C: LlmClient + Clone + 'static,
St: Store + Clone + 'static, St: Store + Clone + 'static,
{ {
let worker = pod.worker_mut(); let worker = pod.worker_mut();
// Per-history-append → drain channel. Sends are infallible-by-design
// here (UnboundedSender never blocks); a closed receiver just means
// the controller is shutting down, in which case dropping the item
// is acceptable.
let drain_tx = log_cmd_tx.clone();
worker.on_history_append(move |item| {
let _ = drain_tx.send(LogCommand::Item(item.clone()));
});
let tx = event_tx.clone(); let tx = event_tx.clone();
worker.on_turn_start(move |turn| { worker.on_turn_start(move |turn| {
let _ = tx.send(Event::TurnStart { turn }); let _ = tx.send(Event::TurnStart { turn });
@ -397,105 +382,6 @@ fn wire_event_bridges_on_worker<C, St>(
// per-item commit channel is wired at the top of this function. // per-item commit channel is wired at the top of this function.
} }
/// Drain task: consumes `LogCommand::Item` and `LogCommand::Flush`
/// off the channel and commits each item as a typed `LogEntry` through
/// the supplied store + sink. Lives as long as the controller; exits
/// when the sender is dropped (controller shutdown).
async fn run_log_drain<St>(mut rx: mpsc::UnboundedReceiver<LogCommand>, ctx: LogDrainHandle<St>)
where
St: session_store::Store + Clone + Send + 'static,
{
while let Some(cmd) = rx.recv().await {
match cmd {
LogCommand::Item(item) => {
let Some(entry) = classify_history_item(item) else {
continue;
};
commit_via_drain(&ctx, entry).await;
}
LogCommand::SystemItems(items) => {
if items.is_empty() {
continue;
}
let entry = LogEntry::SystemItems {
ts: session_log::now_millis(),
items,
};
commit_via_drain(&ctx, entry).await;
}
LogCommand::Flush(ack) => {
let _ = ack.send(());
}
}
}
}
async fn commit_via_drain<St>(ctx: &LogDrainHandle<St>, entry: LogEntry)
where
St: session_store::Store + Clone + Send + 'static,
{
let mut head = ctx.session_head.lock().await;
match session_store::append_entry_with_hash(
&ctx.store,
head.session_id,
&mut head.head_hash,
entry.clone(),
)
.await
{
Ok(_) => {
// Publish under the same critical section view a
// `subscribe_with_snapshot` would observe.
ctx.sink.publish(entry);
}
Err(e) => {
tracing::warn!(error = %e, "drain: append_entry failed; entry dropped");
}
}
}
/// Map one LLM-driven worker-history append to its `LogEntry` form.
///
/// `None` is the skip signal for items that the drain must not commit:
/// - `user_message` items are committed by `Pod::run` up-front as
/// `LogEntry::UserInput { segments }`.
/// - `system_message` items are committed by `PodInterceptor` as part
/// of a `LogEntry::SystemItems` batch (with typed kind metadata)
/// before they reach the worker's history.
fn classify_history_item(item: Item) -> Option<LogEntry> {
let ts = session_log::now_millis();
if item.is_user_message() {
return None;
}
if matches!(
item,
Item::Message {
role: llm_worker::Role::System,
..
}
) {
return None;
}
if item.is_tool_result() {
return Some(LogEntry::ToolResults {
ts,
items: vec![session_store::LoggedItem::from(&item)],
});
}
if item.is_assistant_message() || item.is_tool_call() || item.is_reasoning() {
return Some(LogEntry::AssistantItems {
ts,
items: vec![session_store::LoggedItem::from(&item)],
});
}
// Defensive: anything else (future Item kinds) routes through
// AssistantItems rather than getting silently dropped.
Some(LogEntry::AssistantItems {
ts,
items: vec![session_store::LoggedItem::from(&item)],
})
}
/// Register the builtin file-manipulation tools, optional memory tools, /// Register the builtin file-manipulation tools, optional memory tools,
/// and the Pod-orchestration tools (SpawnPod + comm) on the Pod's /// and the Pod-orchestration tools (SpawnPod + comm) on the Pod's
/// Worker. Returns the `ScopedFs` clone used to attach a `PodFsView` to /// Worker. Returns the `ScopedFs` clone used to attach a `PodFsView` to

View File

@ -23,14 +23,13 @@ use tracing::warn;
use crate::compact::state::CompactState; use crate::compact::state::CompactState;
use session_store::SystemItem; use session_store::SystemItem;
use tokio::sync::mpsc;
use crate::hook::{ use crate::hook::{
AbortInfo, HookPromptAction, HookRegistry, PreRequestInfo, PromptSubmitInfo, ToolCallSummary, AbortInfo, HookPromptAction, HookRegistry, PreRequestInfo, PromptSubmitInfo, ToolCallSummary,
ToolResultSummary, TurnEndInfo, ToolResultSummary, TurnEndInfo,
}; };
use crate::ipc::notify_buffer::{NotifyBuffer, build_system_item}; use crate::ipc::notify_buffer::{NotifyBuffer, build_system_item};
use crate::pod::LogCommand; use crate::pod::SystemItemCommitter;
use crate::prompt::catalog::PromptCatalog; use crate::prompt::catalog::PromptCatalog;
use llm_worker::token_counter::total_tokens; use llm_worker::token_counter::total_tokens;
@ -50,19 +49,20 @@ pub(crate) struct PodInterceptor {
/// so the LLM has a visible trigger for any reaction it commits. /// so the LLM has a visible trigger for any reaction it commits.
pending_notifies: NotifyBuffer, pending_notifies: NotifyBuffer,
/// Submit-scoped stash of resolver-produced typed system items. /// Submit-scoped stash of resolver-produced typed system items.
/// Drained inside `on_prompt_submit`, committed as a /// Drained inside `on_prompt_submit`, committed as
/// `LogEntry::SystemItems` through `log_cmd_tx`, and returned to /// `LogEntry::SystemItem` entries through `log_writer`, and
/// the worker as `Item::system_message` via /// returned to the worker as `Item::system_message` via
/// `PromptAction::ContinueWith`. Populated by `Pod::run` /// `PromptAction::ContinueWith`. Populated by `Pod::run`
/// immediately before handing off to the worker. /// immediately before handing off to the worker.
pending_attachments: Arc<Mutex<Vec<SystemItem>>>, pending_attachments: Arc<Mutex<Vec<SystemItem>>>,
/// Prompt catalog used to render the injected notification wrapper. /// Prompt catalog used to render the injected notification wrapper.
prompts: Arc<PromptCatalog>, prompts: Arc<PromptCatalog>,
/// Sender into the Pod's history-drain task. The interceptor uses /// Type-erased commit handle. The interceptor uses it to commit
/// it to commit `LogCommand::SystemItems` batches before returning /// `LogEntry::SystemItem` entries directly (sync) before
/// the corresponding `Item::system_message`s up to the worker. /// returning the corresponding `Item::system_message`s up to the
/// `None` in tests / `Pod::new` paths where no drain is wired. /// worker. `None` in tests / `Pod::new` paths where no writer is
log_cmd_tx: Option<mpsc::UnboundedSender<LogCommand>>, /// attached.
log_writer: Option<Arc<dyn SystemItemCommitter>>,
/// Next turn index assigned by `on_prompt_submit`. /// Next turn index assigned by `on_prompt_submit`.
next_turn_index: AtomicUsize, next_turn_index: AtomicUsize,
/// Tool calls observed in the current turn (reset on each new prompt). /// Tool calls observed in the current turn (reset on each new prompt).
@ -77,7 +77,7 @@ impl PodInterceptor {
pending_notifies: NotifyBuffer, pending_notifies: NotifyBuffer,
pending_attachments: Arc<Mutex<Vec<SystemItem>>>, pending_attachments: Arc<Mutex<Vec<SystemItem>>>,
prompts: Arc<PromptCatalog>, prompts: Arc<PromptCatalog>,
log_cmd_tx: Option<mpsc::UnboundedSender<LogCommand>>, log_writer: Option<Arc<dyn SystemItemCommitter>>,
) -> Self { ) -> Self {
Self { Self {
registry, registry,
@ -86,23 +86,24 @@ impl PodInterceptor {
pending_notifies, pending_notifies,
pending_attachments, pending_attachments,
prompts, prompts,
log_cmd_tx, log_writer,
next_turn_index: AtomicUsize::new(0), next_turn_index: AtomicUsize::new(0),
tool_calls_this_turn: AtomicUsize::new(0), tool_calls_this_turn: AtomicUsize::new(0),
} }
} }
/// Send a `LogCommand::SystemItems` batch down the drain channel /// Commit each `SystemItem` as its own `LogEntry::SystemItem`
/// (no-op if no drain is wired). The drain task commits the entry /// entry through the attached writer (no-op when no writer is
/// before the corresponding `Item::system_message`s reach the /// wired). Sync — writes complete before the matching
/// worker via `ContinueWith` / `pending_history_appends`, so the /// `Item::system_message`s reach the worker via
/// drain barrier in `persist_turn` covers system commits too. /// `ContinueWith` / `pending_history_appends`, so on-disk order
fn send_system_items(&self, items: Vec<SystemItem>) { /// matches worker-history order.
if items.is_empty() { fn commit_system_items(&self, items: &[SystemItem]) {
let Some(writer) = self.log_writer.as_ref() else {
return; return;
} };
if let Some(tx) = self.log_cmd_tx.as_ref() { for item in items {
let _ = tx.send(LogCommand::SystemItems(items)); writer.commit_system_item(item.clone());
} }
} }
@ -148,12 +149,12 @@ impl Interceptor for PodInterceptor {
PromptAction::Continue PromptAction::Continue
} else { } else {
// Commit the typed system items first, then hand the // Commit the typed system items first, then hand the
// matching `Item::system_message`s to the worker. The // matching `Item::system_message`s to the worker. Sync
// drain task processes the `SystemItems` command BEFORE // commits land BEFORE the worker pushes its
// any subsequent `Item` commands from `on_history_append`, // `Item::system_message`s, so on-disk order matches
// so on-disk order matches worker-history order. // worker-history order.
let items: Vec<Item> = extras.iter().map(SystemItem::to_history_item).collect(); let items: Vec<Item> = extras.iter().map(SystemItem::to_history_item).collect();
self.send_system_items(extras); self.commit_system_items(&extras);
PromptAction::ContinueWith(items) PromptAction::ContinueWith(items)
} }
} }
@ -175,7 +176,7 @@ impl Interceptor for PodInterceptor {
// A render failure here would starve the LLM of // A render failure here would starve the LLM of
// the notify text. Fall back to a raw item so the // the notify text. Fall back to a raw item so the
// trigger still lands in history; the entry will // trigger still lands in history; the entry will
// simply be skipped from the SystemItems batch. // simply be skipped from the SystemItem batch.
warn!(error = %e, "failed to render notify_wrapper; using raw message"); warn!(error = %e, "failed to render notify_wrapper; using raw message");
let fallback = match &entry { let fallback = match &entry {
super::notify_buffer::PendingNotify::Notify { message } => message.clone(), super::notify_buffer::PendingNotify::Notify { message } => message.clone(),
@ -187,7 +188,7 @@ impl Interceptor for PodInterceptor {
} }
} }
} }
self.send_system_items(system_items); self.commit_system_items(&system_items);
items items
} }

View File

@ -108,37 +108,22 @@ async fn handle_connection(stream: tokio::net::UnixStream, handle: PodHandle) {
session_store::LogEntry::SessionStart { .. } => { session_store::LogEntry::SessionStart { .. } => {
let value = serde_json::to_value(&entry) let value = serde_json::to_value(&entry)
.expect("LogEntry is Serialize"); .expect("LogEntry is Serialize");
vec![Event::SessionRotated { entry: value }] Some(Event::SessionRotated { entry: value })
} }
session_store::LogEntry::SystemItems { items, .. } => { session_store::LogEntry::SystemItem { item, .. } => {
// Fan out per-item so each `SystemItem` let value = serde_json::to_value(&item)
// arrives as its own `Event::SystemItem` .expect("SystemItem is Serialize");
// on the wire. Batching on disk is an Some(Event::SystemItem { item: value })
// implementation detail of the drain
// task; clients see them one at a time.
items
.into_iter()
.map(|si| {
let value = serde_json::to_value(&si)
.expect("SystemItem is Serialize");
Event::SystemItem { item: value }
})
.collect()
} }
// Defensive: should never reach here per // Defensive: should never reach here per
// `SessionLogSink::is_live_relevant`. // `SessionLogSink::is_live_relevant`.
_ => Vec::new(), _ => None,
}; };
let mut hit_error = false; if let Some(event) = outbound {
for event in outbound {
if writer.write(&event).await.is_err() { if writer.write(&event).await.is_err() {
hit_error = true;
break; break;
} }
} }
if hit_error {
break;
}
} }
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => { Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
// Slow client fell behind the broadcast buffer. // Slow client fell behind the broadcast buffer.

View File

@ -162,7 +162,7 @@ async fn main() -> ExitCode {
} }
}, },
}; };
let store = match FsStore::new(&store_dir).await { let store = match FsStore::new(&store_dir) {
Ok(s) => s, Ok(s) => s,
Err(e) => { Err(e) => {
eprintln!("error: failed to initialize store at {store_dir:?}: {e}"); eprintln!("error: failed to initialize store at {store_dir:?}: {e}");

View File

@ -1,13 +1,13 @@
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use tokio::sync::Mutex as AsyncMutex;
use llm_worker::Item; use llm_worker::Item;
use llm_worker::llm_client::RequestConfig; use llm_worker::llm_client::RequestConfig;
use llm_worker::llm_client::client::LlmClient; use llm_worker::llm_client::client::LlmClient;
use llm_worker::state::Mutable; use llm_worker::state::Mutable;
use llm_worker::{ToolOutputLimits, UsageRecord, Worker, WorkerError, WorkerResult}; use llm_worker::{ToolOutputLimits, UsageRecord, Worker, WorkerError, WorkerResult};
use parking_lot::Mutex as SyncMutex;
use session_store::{ use session_store::{
EntryHash, HashedEntry, LogEntry, PodScopeSnapshot, SessionId, Store, StoreError, SystemItem, EntryHash, HashedEntry, LogEntry, PodScopeSnapshot, SessionId, Store, StoreError, SystemItem,
session_log, to_logged, session_log, to_logged,
@ -16,36 +16,6 @@ use tracing::{info, warn};
use crate::session_log_sink::SessionLogSink; use crate::session_log_sink::SessionLogSink;
/// Command sent to the per-Pod history-drain task.
///
/// - `Item`: one worker-history append observed via
/// `Worker::on_history_append`; the drain classifies it into
/// `LogEntry::AssistantItems` / `LogEntry::ToolResults` and commits
/// through the sink. `role:system` items are explicitly skipped
/// because they are committed up-front through `SystemItems`.
/// - `SystemItems`: typed agent-injected items committed as a single
/// `LogEntry::SystemItems` entry. Used by the interceptor when it
/// drains the notify buffer or pending attachments.
/// - `Flush(ack)`: barrier used by `persist_turn` to ensure every
/// queued command has been processed before the trailing `TurnEnd`
/// entry lands.
#[derive(Debug)]
pub enum LogCommand {
Item(Item),
SystemItems(Vec<SystemItem>),
Flush(tokio::sync::oneshot::Sender<()>),
}
/// State shared between Pod and the controller-spawned history-drain
/// task: store + session-head lock + broadcast sink. All three are
/// `Clone`able (the latter two as `Arc` clones, the store per its
/// `Clone` impl) so handing a copy to the drain task is cheap.
pub struct LogDrainHandle<St> {
pub store: St,
pub session_head: Arc<AsyncMutex<SessionHead>>,
pub sink: SessionLogSink,
}
use manifest::{ use manifest::{
Permission, PodManifest, PodManifestConfig, ResolveError, Scope, ScopeConfig, ScopeError, Permission, PodManifest, PodManifestConfig, ResolveError, Scope, ScopeConfig, ScopeError,
ScopeRule, SharedScope, WorkerManifest, ScopeRule, SharedScope, WorkerManifest,
@ -78,6 +48,70 @@ pub struct SessionHead {
pub head_hash: Option<EntryHash>, pub head_hash: Option<EntryHash>,
} }
/// Cheap-cloneable bundle of (store + session-head lock + sink) handed
/// to the worker callback and the interceptor so they can commit
/// `LogEntry` values directly without going through an mpsc ferry.
///
/// All three fields are `Clone` (the latter two as `Arc` clones, the
/// store per its `Clone` impl) so the handle itself is a flat triple of
/// cheap copies.
pub struct LogWriterHandle<St> {
pub store: St,
pub session_head: Arc<SyncMutex<SessionHead>>,
pub sink: SessionLogSink,
}
impl<St: Clone> Clone for LogWriterHandle<St> {
fn clone(&self) -> Self {
Self {
store: self.store.clone(),
session_head: self.session_head.clone(),
sink: self.sink.clone(),
}
}
}
impl<St> LogWriterHandle<St>
where
St: Store + Clone,
{
/// Append `entry` to the log: disk write → in-memory mirror push →
/// broadcast — atomic w.r.t. `subscribe_with_snapshot` callers.
pub fn append_entry(&self, entry: LogEntry) -> Result<EntryHash, StoreError> {
let mut head = self.session_head.lock();
let hash = session_store::append_entry_with_hash(
&self.store,
head.session_id,
&mut head.head_hash,
entry.clone(),
)?;
self.sink.publish(entry);
Ok(hash)
}
}
/// Type-erased commit handle for the interceptor. Lets the
/// interceptor commit `SystemItem`s without being generic over the
/// concrete `Store` type.
pub trait SystemItemCommitter: Send + Sync {
fn commit_system_item(&self, item: SystemItem);
}
impl<St> SystemItemCommitter for LogWriterHandle<St>
where
St: Store + Clone + Send + Sync + 'static,
{
fn commit_system_item(&self, item: SystemItem) {
let entry = LogEntry::SystemItem {
ts: session_log::now_millis(),
item,
};
if let Err(err) = self.append_entry(entry) {
warn!(error = %err, "system item commit failed; dropping");
}
}
}
/// Pre-LLM-request hook that records `history.len()` at send time into a /// Pre-LLM-request hook that records `history.len()` at send time into a
/// shared `UsageTracker`. The on_usage callback later pairs this with the /// shared `UsageTracker`. The on_usage callback later pairs this with the
/// aggregated UsageEvent to produce one `UsageRecord` per LLM call. /// aggregated UsageEvent to produce one `UsageRecord` per LLM call.
@ -103,7 +137,7 @@ pub struct Pod<C: LlmClient, St: Store> {
worker: Option<Worker<C, Mutable>>, worker: Option<Worker<C, Mutable>>,
store: St, store: St,
session_id: SessionId, session_id: SessionId,
session_head: Arc<AsyncMutex<SessionHead>>, session_head: Arc<SyncMutex<SessionHead>>,
/// Absolute working directory of the Pod. /// Absolute working directory of the Pod.
pwd: PathBuf, pwd: PathBuf,
/// Shared, atomically-swappable view of the Pod's resolved scope. /// Shared, atomically-swappable view of the Pod's resolved scope.
@ -235,12 +269,21 @@ pub struct Pod<C: LlmClient, St: Store> {
/// clients see a `(snapshot, live)` stream consistent with what's /// clients see a `(snapshot, live)` stream consistent with what's
/// on disk. /// on disk.
sink: SessionLogSink, sink: SessionLogSink,
/// Sender into the controller-spawned history-drain task. /// `true` once `wire_history_persistence` has installed the
/// `None` when no controller has wired one (tests, low-level Pod /// `Worker::on_history_append` callback that commits each appended
/// usage). The drain task is the source of mid-turn `AssistantItems` /// item as a singular `LogEntry::AssistantItem` / `ToolResult`
/// / `ToolResults` / `HookInjectedItems` commits, fed by the /// directly through the writer. Tests that drive `Pod::new` without
/// `Worker::on_history_append` callback. /// going through the controller leave this `false`; `persist_turn`
log_cmd_tx: Option<tokio::sync::mpsc::UnboundedSender<LogCommand>>, /// then walks the post-`history_before` slice inline so entries
/// still land on disk.
history_persistence_wired: bool,
/// Type-erased commit handle wired by the controller (or by tests
/// via `attach_log_writer`). The interceptor uses it to commit
/// `SystemItem`s directly without being generic over `St`. `None`
/// in low-level test paths that bypass the controller — those
/// paths skip SystemItem disk commits but still see the rendered
/// `Item::system_message` in worker history.
log_writer: Option<Arc<dyn SystemItemCommitter>>,
} }
impl<C: LlmClient + 'static, St: Store + 'static> Pod<C, St> { impl<C: LlmClient + 'static, St: Store + 'static> Pod<C, St> {
@ -301,21 +344,66 @@ impl<C: LlmClient + Clone + 'static, St: Store + Clone + 'static> Pod<C, St> {
// (it only reads `worker.history()`), so a fresh sink is // (it only reads `worker.history()`), so a fresh sink is
// fine — nothing observes its broadcast. // fine — nothing observes its broadcast.
sink: SessionLogSink::new(), sink: SessionLogSink::new(),
log_cmd_tx: None, history_persistence_wired: false,
log_writer: None,
} }
} }
/// Build a `LogDrainHandle` carrying everything the controller's /// Build a `LogWriterHandle` carrying everything the worker
/// drain task needs: store handle, the shared session-head lock, /// callback / interceptor needs to commit `LogEntry` values
/// and the broadcast sink. All three are cheap clones. /// directly: store handle, the shared session-head lock, and the
pub fn log_drain_handle(&self) -> LogDrainHandle<St> { /// broadcast sink. All three are cheap clones.
LogDrainHandle { pub fn log_writer_handle(&self) -> LogWriterHandle<St> {
LogWriterHandle {
store: self.store.clone(), store: self.store.clone(),
session_head: self.session_head.clone(), session_head: self.session_head.clone(),
sink: self.sink.clone(), sink: self.sink.clone(),
} }
} }
/// Attach a type-erased system-item commit handle. The controller
/// calls this once during spawn so the interceptor can commit
/// `SystemItem`s directly without owning a generic store handle.
/// Idempotent: subsequent calls overwrite the previous handle.
pub fn attach_log_writer(&mut self, writer: Arc<dyn SystemItemCommitter>) {
self.log_writer = Some(writer);
}
/// Wire `Worker::on_history_append` to commit each appended item
/// directly as a singular `LogEntry::AssistantItem` / `ToolResult`
/// through the writer. The controller calls this once per spawned
/// Pod after the worker is built; tests that drive `Pod::new` may
/// opt in to the same wiring or leave it off (in which case
/// `persist_turn`'s inline fallback writes entries at turn end).
///
/// `user_message` items are skipped because they are committed
/// up-front via `commit_entry(LogEntry::UserInput { segments })`.
/// `role:system` items are committed by `PodInterceptor` as typed
/// `LogEntry::SystemItem` entries before they reach the worker's
/// history (so this callback would otherwise double-write them).
pub fn wire_history_persistence(&mut self) {
let writer = self.log_writer_handle();
self.worker_mut().on_history_append(move |item| {
if item.is_user_message() {
return;
}
if matches!(
item,
Item::Message {
role: llm_worker::Role::System,
..
}
) {
return;
}
let entry = session_store::classify_history_item(item, session_log::now_millis());
if let Err(err) = writer.append_entry(entry) {
warn!(error = %err, "history append commit failed; dropping");
}
});
self.history_persistence_wired = true;
}
pub fn spawn_post_run_memory_jobs(&mut self) { pub fn spawn_post_run_memory_jobs(&mut self) {
// Drop a finished prior handle so we can spawn a fresh task. // Drop a finished prior handle so we can spawn a fresh task.
// If the prior task is still running, coalesce by skipping — // If the prior task is still running, coalesce by skipping —
@ -365,7 +453,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
worker: Some(worker), worker: Some(worker),
store, store,
session_id, session_id,
session_head: Arc::new(AsyncMutex::new(SessionHead { session_head: Arc::new(SyncMutex::new(SessionHead {
session_id, session_id,
head_hash: None, head_hash: None,
})), })),
@ -397,7 +485,8 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
memory_task: None, memory_task: None,
user_segments: Vec::new(), user_segments: Vec::new(),
sink: SessionLogSink::new(), sink: SessionLogSink::new(),
log_cmd_tx: None, history_persistence_wired: false,
log_writer: None,
}; };
pod.apply_permissions_from_manifest(); pod.apply_permissions_from_manifest();
pod.apply_prune_from_manifest(); pod.apply_prune_from_manifest();
@ -491,8 +580,8 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
/// process later exits while children keep their allocations, resume /// process later exits while children keep their allocations, resume
/// can restore the narrowed scope instead of reclaiming delegated /// can restore the narrowed scope instead of reclaiming delegated
/// writes. /// writes.
pub async fn persist_scope_snapshot(&mut self) -> Result<(), StoreError> { pub fn persist_scope_snapshot(&mut self) -> Result<(), StoreError> {
if self.session_head.lock().await.head_hash.is_none() { if self.session_head.lock().head_hash.is_none() {
return Ok(()); return Ok(());
} }
let snapshot = { let snapshot = {
@ -508,23 +597,21 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
domain: session_store::POD_SCOPE_EXTENSION_DOMAIN.into(), domain: session_store::POD_SCOPE_EXTENSION_DOMAIN.into(),
payload, payload,
}) })
.await
.map(|_| ()) .map(|_| ())
} }
/// Append `entry` to the session log AND publish it through the /// Append `entry` to the session log AND publish it through the
/// broadcast sink. Holds the session-head async lock across the /// broadcast sink. Holds the session-head sync lock across the
/// disk write and the sink publish so subscribers see a gap-free /// disk write and the sink publish so subscribers see a gap-free
/// `(snapshot, live)` stream consistent with what's on disk. /// `(snapshot, live)` stream consistent with what's on disk.
pub(crate) async fn commit_entry(&self, entry: LogEntry) -> Result<EntryHash, StoreError> { pub(crate) fn commit_entry(&self, entry: LogEntry) -> Result<EntryHash, StoreError> {
let mut head = self.session_head.lock().await; let mut head = self.session_head.lock();
let hash = session_store::append_entry_with_hash( let hash = session_store::append_entry_with_hash(
&self.store, &self.store,
head.session_id, head.session_id,
&mut head.head_hash, &mut head.head_hash,
entry.clone(), entry.clone(),
) )?;
.await?;
self.sink.publish(entry); self.sink.publish(entry);
Ok(hash) Ok(hash)
} }
@ -536,15 +623,6 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
self.sink.clone() self.sink.clone()
} }
/// Wire a history-drain task. The controller calls this once per
/// Pod after the drain task is spawned; the matching mpsc receiver
/// drives per-item commits of assistant items / tool results /
/// hook-injected items committed by the worker via
/// `Worker::on_history_append`.
pub fn attach_log_cmd_tx(&mut self, tx: tokio::sync::mpsc::UnboundedSender<LogCommand>) {
self.log_cmd_tx = Some(tx);
}
/// Cloneable callback handed to dynamic-scope tools. It cannot append /// Cloneable callback handed to dynamic-scope tools. It cannot append
/// directly to the async store from a sync tool callback, so it records /// directly to the async store from a sync tool callback, so it records
/// the latest snapshot and the controller flushes it after the tool /// the latest snapshot and the controller flushes it after the tool
@ -556,7 +634,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
}) })
} }
async fn flush_pending_scope_snapshot(&mut self) -> Result<(), StoreError> { fn flush_pending_scope_snapshot(&mut self) -> Result<(), StoreError> {
let snapshot = self let snapshot = self
.pending_scope_snapshot .pending_scope_snapshot
.lock() .lock()
@ -568,8 +646,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
ts: session_log::now_millis(), ts: session_log::now_millis(),
domain: session_store::POD_SCOPE_EXTENSION_DOMAIN.into(), domain: session_store::POD_SCOPE_EXTENSION_DOMAIN.into(),
payload, payload,
}) })?;
.await?;
} }
Ok(()) Ok(())
} }
@ -731,14 +808,14 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
/// fail the surrounding turn. On failure the head hash stays put /// fail the surrounding turn. On failure the head hash stays put
/// (the entry is dropped) and a `Warn` alert + `tracing::warn!` are /// (the entry is dropped) and a `Warn` alert + `tracing::warn!` are
/// emitted so the failure isn't completely silent. /// emitted so the failure isn't completely silent.
async fn try_record_metric(&mut self, metric: &session_metrics::Metric) { fn try_record_metric(&mut self, metric: &session_metrics::Metric) {
let payload = serde_json::to_value(metric).expect("Metric is Serialize"); let payload = serde_json::to_value(metric).expect("Metric is Serialize");
let entry = LogEntry::Extension { let entry = LogEntry::Extension {
ts: session_log::now_millis(), ts: session_log::now_millis(),
domain: session_metrics::DOMAIN.into(), domain: session_metrics::DOMAIN.into(),
payload, payload,
}; };
if let Err(err) = self.commit_entry(entry).await { if let Err(err) = self.commit_entry(entry) {
warn!(name = %metric.name, error = %err, "failed to record session metric; dropping"); warn!(name = %metric.name, error = %err, "failed to record session metric; dropping");
self.alert( self.alert(
AlertLevel::Warn, AlertLevel::Warn,
@ -907,7 +984,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
self.pending_notifies.clone(), self.pending_notifies.clone(),
self.pending_attachments.clone(), self.pending_attachments.clone(),
self.prompts.clone(), self.prompts.clone(),
self.log_cmd_tx.clone(), self.log_writer.clone(),
); );
self.worker_mut().set_interceptor(interceptor); self.worker_mut().set_interceptor(interceptor);
self.interceptor_installed = true; self.interceptor_installed = true;
@ -1073,12 +1150,12 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
// Persist the user input as typed segments before the worker // Persist the user input as typed segments before the worker
// pushes its flattened copy into history. save_delta deliberately // pushes its flattened copy into history. save_delta deliberately
// skips the resulting `is_user_message()` item to avoid double-write. // skips the resulting `is_user_message()` item to avoid double-write.
self.session_id = self.session_head.lock().await.session_id; self.session_id = self.session_head.lock().session_id;
self.commit_entry(LogEntry::UserInput { self.commit_entry(LogEntry::UserInput {
ts: session_log::now_millis(), ts: session_log::now_millis(),
segments: input.clone(), segments: input.clone(),
}) })
.await?; ?;
self.user_segments.push(input.clone()); self.user_segments.push(input.clone());
// Resolve `@<path>` refs, `#<slug>` Knowledge refs, and `/<slug>` // Resolve `@<path>` refs, `#<slug>` Knowledge refs, and `/<slug>`
@ -1447,7 +1524,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
let w = self.worker.as_ref().unwrap(); let w = self.worker.as_ref().unwrap();
let prev_session_id; let prev_session_id;
let initial_state = { let initial_state = {
let head = self.session_head.lock().await; let head = self.session_head.lock();
prev_session_id = head.session_id; prev_session_id = head.session_id;
head.head_hash.is_none() head.head_hash.is_none()
}; };
@ -1460,17 +1537,17 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
forked_from: None, forked_from: None,
compacted_from: None, compacted_from: None,
}; };
self.commit_entry(initial).await?; self.commit_entry(initial)?;
self.persist_scope_snapshot().await?; self.persist_scope_snapshot()?;
return Ok(()); return Ok(());
} }
// Check store head + auto-fork if it drifted. // Check store head + auto-fork if it drifted.
let store_head = self let store_head = self
.store .store
.read_head_hash(prev_session_id) .read_head_hash(prev_session_id)
.await
.map_err(PodError::from)?; .map_err(PodError::from)?;
let mut head = self.session_head.lock().await; let mut head = self.session_head.lock();
if store_head == head.head_hash { if store_head == head.head_hash {
return Ok(()); return Ok(());
} }
@ -1494,7 +1571,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
}; };
self.store self.store
.create_session(fork_id, &[hashed]) .create_session(fork_id, &[hashed])
.await
.map_err(PodError::from)?; .map_err(PodError::from)?;
head.session_id = fork_id; head.session_id = fork_id;
head.head_hash = Some(hash); head.head_hash = Some(hash);
@ -1648,73 +1725,52 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
// pass that replicates the legacy `save_delta` classification — // pass that replicates the legacy `save_delta` classification —
// those code paths don't fire `on_history_append`, so the items // those code paths don't fire `on_history_append`, so the items
// would otherwise be lost. // would otherwise be lost.
let _ = history_before; // referenced only by the fallback below. // Per-item commits for AssistantItem / ToolResult / SystemItem
self.session_id = self.session_head.lock().await.session_id; // entries are expected to have landed synchronously: the
if let Some(tx) = self.log_cmd_tx.as_ref() { // worker `on_history_append` callback (wired by the controller
let (ack_tx, ack_rx) = tokio::sync::oneshot::channel(); // via `wire_history_persistence`) commits each appended item
if tx.send(LogCommand::Flush(ack_tx)).is_ok() { // directly through the writer, and the interceptor commits
let _ = ack_rx.await; // SystemItems up-front in `on_prompt_submit` /
} // `pending_history_appends` before returning the matching
} else { // `Item::system_message`s.
// Fallback path for tests / Pod::new: classify and commit //
// the post-`history_before` slice inline, matching the old // Low-level test paths that build `Pod::new` without wiring
// `save_delta` shape. // the callback fall through this branch: they classify the
// slice from `history_before` inline so the test's
// `restore`-style assertions still see entries on disk.
self.session_id = self.session_head.lock().session_id;
if !self.history_persistence_wired {
let new_items: Vec<Item> = self.worker.as_ref().unwrap().history()[history_before..] let new_items: Vec<Item> = self.worker.as_ref().unwrap().history()[history_before..]
.iter() .iter()
.cloned() .cloned()
.collect(); .collect();
let ts = session_log::now_millis(); let ts = session_log::now_millis();
let mut i = 0; for item in &new_items {
while i < new_items.len() {
let item = &new_items[i];
if item.is_user_message() { if item.is_user_message() {
i += 1; continue;
} else if item.is_tool_result() {
let start = i;
while i < new_items.len() && new_items[i].is_tool_result() {
i += 1;
}
let items = new_items[start..i]
.iter()
.map(session_store::LoggedItem::from)
.collect();
self.commit_entry(LogEntry::ToolResults { ts, items })
.await?;
} else if item.is_assistant_message() || item.is_tool_call() || item.is_reasoning()
{
let start = i;
while i < new_items.len()
&& (new_items[i].is_assistant_message()
|| new_items[i].is_tool_call()
|| new_items[i].is_reasoning())
{
i += 1;
}
let items = new_items[start..i]
.iter()
.map(session_store::LoggedItem::from)
.collect();
self.commit_entry(LogEntry::AssistantItems { ts, items })
.await?;
} else {
self.commit_entry(LogEntry::HookInjectedItems {
ts,
items: vec![session_store::LoggedItem::from(&new_items[i])],
})
.await?;
i += 1;
} }
if matches!(
item,
Item::Message {
role: llm_worker::Role::System,
..
}
) {
continue;
}
let entry = session_store::classify_history_item(item, ts);
self.commit_entry(entry)?;
} }
} }
self.flush_pending_scope_snapshot().await?; self.flush_pending_scope_snapshot()?;
let turn_count = self.worker.as_ref().unwrap().turn_count(); let turn_count = self.worker.as_ref().unwrap().turn_count();
self.commit_entry(LogEntry::TurnEnd { self.commit_entry(LogEntry::TurnEnd {
ts: session_log::now_millis(), ts: session_log::now_millis(),
turn_count, turn_count,
}) })
.await?; ?;
// Flush any sync-buffered metrics from this run first // Flush any sync-buffered metrics from this run first
// (currently `prune.fire` / `prune.skip` from the prune observer). // (currently `prune.fire` / `prune.skip` from the prune observer).
@ -1730,7 +1786,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
// by this point, and `save_run_completed` still needs to land). // by this point, and `save_run_completed` still needs to land).
let pending_metrics = self.metrics_tracker.drain(); let pending_metrics = self.metrics_tracker.drain();
for metric in pending_metrics { for metric in pending_metrics {
self.try_record_metric(&metric).await; self.try_record_metric(&metric);
} }
// Persist any LLM Usage measurements collected during this run. // Persist any LLM Usage measurements collected during this run.
@ -1755,14 +1811,14 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
cache_write_tokens: record.cache_write_tokens, cache_write_tokens: record.cache_write_tokens,
output_tokens: record.output_tokens, output_tokens: record.output_tokens,
}) })
.await?; ?;
if let Some(id) = correlation_id { if let Some(id) = correlation_id {
let metric = session_metrics::Metric::now("prune.post_request") let metric = session_metrics::Metric::now("prune.post_request")
.with_correlation_id(&id) .with_correlation_id(&id)
.with_value(record.cache_read_tokens as f64) .with_value(record.cache_read_tokens as f64)
.with_dimension("cache_write_tokens", record.cache_write_tokens.to_string()) .with_dimension("cache_write_tokens", record.cache_write_tokens.to_string())
.with_dimension("history_len", record.history_len.to_string()); .with_dimension("history_len", record.history_len.to_string());
self.try_record_metric(&metric).await; self.try_record_metric(&metric);
} }
self.usage_history self.usage_history
.lock() .lock()
@ -1778,7 +1834,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
interrupted, interrupted,
result: r.clone(), result: r.clone(),
}) })
.await?; ?;
} }
Err(e) => { Err(e) => {
self.commit_entry(LogEntry::RunErrored { self.commit_entry(LogEntry::RunErrored {
@ -1786,7 +1842,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
interrupted, interrupted,
message: e.to_string(), message: e.to_string(),
}) })
.await?; ?;
} }
} }
@ -2020,7 +2076,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
// `SessionStart { compacted_from }` and reset their view. // `SessionStart { compacted_from }` and reset their view.
let new_session_id = session_store::new_session_id(); let new_session_id = session_store::new_session_id();
let session_start = { let session_start = {
let mut head = self.session_head.lock().await; let mut head = self.session_head.lock();
let old_session_id = head.session_id; let old_session_id = head.session_id;
let old_head_hash = head let old_head_hash = head
.head_hash .head_hash
@ -2044,7 +2100,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
prev_hash: None, prev_hash: None,
entry: entry.clone(), entry: entry.clone(),
}; };
self.store.create_session(new_session_id, &[hashed]).await?; self.store.create_session(new_session_id, &[hashed])?;
head.session_id = new_session_id; head.session_id = new_session_id;
head.head_hash = Some(hash); head.head_hash = Some(hash);
self.session_id = new_session_id; self.session_id = new_session_id;
@ -2092,7 +2148,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
.lock() .lock()
.expect("usage_history poisoned") .expect("usage_history poisoned")
.clear(); .clear();
self.persist_scope_snapshot().await?; self.persist_scope_snapshot()?;
// Reset extract pointer alongside usage_history: the compacted // Reset extract pointer alongside usage_history: the compacted
// session has a fresh log with no `LogEntry::Extension` entries // session has a fresh log with no `LogEntry::Extension` entries
// yet, so a cold restore here would set extract_pointer to None // yet, so a cold restore here would set extract_pointer to None
@ -2254,7 +2310,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
// Read the session log to get the current entry count. This is // Read the session log to get the current entry count. This is
// the boundary for the source.range end_entry. Called once per // the boundary for the source.range end_entry. Called once per
// extract, on a small local file. // extract, on a small local file.
let entries_now = self.store.read_all(self.session_id).await?.len(); let entries_now = self.store.read_all(self.session_id)?.len();
if entries_now == 0 { if entries_now == 0 {
return Ok(ExtractDecision::Skipped); return Ok(ExtractDecision::Skipped);
} }
@ -2322,7 +2378,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
extract::ExtractedPayload::default() extract::ExtractedPayload::default()
}); });
let source_session_id = self.session_head.lock().await.session_id; let source_session_id = self.session_head.lock().session_id;
let staging_id = if payload.is_empty() { let staging_id = if payload.is_empty() {
String::new() String::new()
} else { } else {
@ -2347,8 +2403,8 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
domain: extract::EXTRACT_DOMAIN.into(), domain: extract::EXTRACT_DOMAIN.into(),
payload: payload_value, payload: payload_value,
}) })
.await?; ?;
self.session_id = self.session_head.lock().await.session_id; self.session_id = self.session_head.lock().session_id;
*self *self
.extract_pointer .extract_pointer
@ -2655,7 +2711,7 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
worker: Some(worker), worker: Some(worker),
store, store,
session_id, session_id,
session_head: Arc::new(AsyncMutex::new(SessionHead { session_head: Arc::new(SyncMutex::new(SessionHead {
session_id, session_id,
head_hash: None, head_hash: None,
})), })),
@ -2687,7 +2743,8 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
memory_task: None, memory_task: None,
user_segments: Vec::new(), user_segments: Vec::new(),
sink: SessionLogSink::new(), sink: SessionLogSink::new(),
log_cmd_tx: None, history_persistence_wired: false,
log_writer: None,
}; };
pod.apply_permissions_from_manifest(); pod.apply_permissions_from_manifest();
pod.apply_prune_from_manifest(); pod.apply_prune_from_manifest();
@ -2728,7 +2785,7 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
worker: Some(worker), worker: Some(worker),
store, store,
session_id, session_id,
session_head: Arc::new(AsyncMutex::new(SessionHead { session_head: Arc::new(SyncMutex::new(SessionHead {
session_id, session_id,
head_hash: None, head_hash: None,
})), })),
@ -2760,7 +2817,8 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
memory_task: None, memory_task: None,
user_segments: Vec::new(), user_segments: Vec::new(),
sink: SessionLogSink::new(), sink: SessionLogSink::new(),
log_cmd_tx: None, history_persistence_wired: false,
log_writer: None,
}; };
pod.apply_permissions_from_manifest(); pod.apply_permissions_from_manifest();
pod.apply_prune_from_manifest(); pod.apply_prune_from_manifest();
@ -2795,7 +2853,7 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
// Read raw entries once so we can both reconstruct state and // Read raw entries once so we can both reconstruct state and
// seed the broadcast sink's mirror with the same prefix that // seed the broadcast sink's mirror with the same prefix that
// sits on disk. // sits on disk.
let raw_entries = store.read_all(session_id).await?; let raw_entries = store.read_all(session_id)?;
let state = session_store::collect_state(&raw_entries); let state = session_store::collect_state(&raw_entries);
if state.head_hash.is_none() { if state.head_hash.is_none() {
return Err(PodError::SessionEmpty { session_id }); return Err(PodError::SessionEmpty { session_id });
@ -2870,7 +2928,7 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
worker: Some(worker), worker: Some(worker),
store, store,
session_id, session_id,
session_head: Arc::new(AsyncMutex::new(SessionHead { session_head: Arc::new(SyncMutex::new(SessionHead {
session_id, session_id,
head_hash: state.head_hash, head_hash: state.head_hash,
})), })),
@ -2907,7 +2965,8 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
// late-attaching client sees the full prefix without an // late-attaching client sees the full prefix without an
// extra round trip. // extra round trip.
sink: SessionLogSink::with_initial(mirror_entries), sink: SessionLogSink::with_initial(mirror_entries),
log_cmd_tx: None, history_persistence_wired: false,
log_writer: None,
}; };
pod.apply_permissions_from_manifest(); pod.apply_permissions_from_manifest();
pod.apply_prune_from_manifest(); pod.apply_prune_from_manifest();

View File

@ -7,7 +7,7 @@
//! Pod (which still owns the `Store` handle); the sink stays focused on //! Pod (which still owns the `Store` handle); the sink stays focused on
//! the wire-side fan-out. //! the wire-side fan-out.
//! //!
//! Atomicity contract (see ticket `tickets/pod-state-from-session-log.md`): //! Atomicity contract:
//! //!
//! 1. Pod writes the entry to disk via the `Store`. //! 1. Pod writes the entry to disk via the `Store`.
//! 2. Pod calls [`SessionLogSink::publish`] which acquires the mirror //! 2. Pod calls [`SessionLogSink::publish`] which acquires the mirror
@ -24,10 +24,11 @@
use std::sync::{Arc, Mutex as StdMutex}; use std::sync::{Arc, Mutex as StdMutex};
use parking_lot::{Mutex, MutexGuard};
use session_store::{ use session_store::{
EntryHash, HashedEntry, LogEntry, SessionId, SessionStartState, Store, StoreError, session_log, EntryHash, HashedEntry, LogEntry, SessionId, SessionStartState, Store, StoreError, session_log,
}; };
use tokio::sync::{Mutex as AsyncMutex, MutexGuard, broadcast}; use tokio::sync::broadcast;
/// Broadcast capacity for the live receiver. Slow subscribers that /// Broadcast capacity for the live receiver. Slow subscribers that
/// fall behind will see `RecvError::Lagged` and are expected to drop /// fall behind will see `RecvError::Lagged` and are expected to drop
@ -92,8 +93,8 @@ impl SessionLogSink {
/// Live broadcast fires only for entries that the streaming-event /// Live broadcast fires only for entries that the streaming-event
/// lane does not cover: /// lane does not cover:
/// - `LogEntry::SessionStart` → `Event::SessionRotated` on the wire. /// - `LogEntry::SessionStart` → `Event::SessionRotated` on the wire.
/// - `LogEntry::HookInjectedItems` → `Event::HookInjectedItems`. /// - `LogEntry::SystemItem` → `Event::SystemItem`.
/// Everything else (AssistantItems, ToolResults, UserInput, TurnEnd, /// Everything else (AssistantItem, ToolResult, UserInput, TurnEnd,
/// RunCompleted, RunErrored, LlmUsage, Extension, ConfigChanged) is /// RunCompleted, RunErrored, LlmUsage, Extension, ConfigChanged) is
/// reflected in the mirror so reconnect snapshots stay accurate, /// reflected in the mirror so reconnect snapshots stay accurate,
/// but is not sent live — the streaming events (TextDelta / /// but is not sent live — the streaming events (TextDelta /
@ -120,7 +121,7 @@ impl SessionLogSink {
fn is_live_relevant(entry: &LogEntry) -> bool { fn is_live_relevant(entry: &LogEntry) -> bool {
matches!( matches!(
entry, entry,
LogEntry::SessionStart { .. } | LogEntry::SystemItems { .. } LogEntry::SessionStart { .. } | LogEntry::SystemItem { .. }
) )
} }
@ -194,10 +195,9 @@ impl Default for SessionLogSink {
} }
/// Active session head for the Pod's persistent log: session id + /// Active session head for the Pod's persistent log: session id +
/// last-committed entry hash. Replaces the previous `SessionHead` /// last-committed entry hash. Bundled with the store + sink in a
/// struct local to `Pod`; bundled here so the writer can hand a /// `SessionLogWriter` so the worker callback / interceptor can share
/// cloneable handle to background tasks (e.g. the per-item drain /// one cheap `Clone` handle for direct sync appends.
/// task spawned by the controller).
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct SessionHeadState { pub struct SessionHeadState {
pub session_id: SessionId, pub session_id: SessionId,
@ -209,20 +209,26 @@ pub struct SessionHeadState {
/// Bundles the (1) persistent store, (2) the in-memory session-head /// Bundles the (1) persistent store, (2) the in-memory session-head
/// state (id + hash), and (3) the broadcast sink. `append_entry` /// state (id + hash), and (3) the broadcast sink. `append_entry`
/// chains the hash on disk, advances the head, then publishes the /// chains the hash on disk, advances the head, then publishes the
/// entry through the sink — under a single async mutex so two writers /// entry through the sink — under a single sync mutex so two writers
/// cannot interleave the chain. /// cannot interleave the chain.
/// ///
/// All append paths run synchronously: a local-fs `<1 KiB` JSONL line
/// completes well below a millisecond, and going through an async
/// `tokio::fs` ferry would re-introduce the `LogCommand` / drain task
/// we removed. `parking_lot::Mutex` is safe to hold across the disk
/// write since the lock is never crossed by an `.await`.
///
/// `Clone` is a cheap `Arc` clone. The Pod keeps one writer for its /// `Clone` is a cheap `Arc` clone. The Pod keeps one writer for its
/// inline commits (UserInput, TurnEnd, Usage, RunCompleted/Errored, /// inline commits (UserInput, TurnEnd, Usage, RunCompleted/Errored,
/// scope snapshots, metrics) and hands clones to background tasks /// scope snapshots, metrics) and hands clones to every other commit
/// (e.g. the controller's per-item history drain task). /// site (worker callback, interceptor).
pub struct SessionLogWriter<St> { pub struct SessionLogWriter<St> {
inner: Arc<WriterInner<St>>, inner: Arc<WriterInner<St>>,
} }
struct WriterInner<St> { struct WriterInner<St> {
store: St, store: St,
head: AsyncMutex<SessionHeadState>, head: Mutex<SessionHeadState>,
sink: SessionLogSink, sink: SessionLogSink,
} }
@ -245,7 +251,7 @@ where
Self { Self {
inner: Arc::new(WriterInner { inner: Arc::new(WriterInner {
store, store,
head: AsyncMutex::new(SessionHeadState { head: Mutex::new(SessionHeadState {
session_id, session_id,
head_hash: None, head_hash: None,
}), }),
@ -267,7 +273,7 @@ where
Self { Self {
inner: Arc::new(WriterInner { inner: Arc::new(WriterInner {
store, store,
head: AsyncMutex::new(SessionHeadState { head: Mutex::new(SessionHeadState {
session_id, session_id,
head_hash, head_hash,
}), }),
@ -278,15 +284,14 @@ where
/// Append `entry` to the log: disk write → in-memory mirror push → /// Append `entry` to the log: disk write → in-memory mirror push →
/// broadcast — atomic w.r.t. `subscribe_with_snapshot` callers. /// broadcast — atomic w.r.t. `subscribe_with_snapshot` callers.
pub async fn append_entry(&self, entry: LogEntry) -> Result<EntryHash, StoreError> { pub fn append_entry(&self, entry: LogEntry) -> Result<EntryHash, StoreError> {
let mut head = self.inner.head.lock().await; let mut head = self.inner.head.lock();
let hash = session_store::append_entry_with_hash( let hash = session_store::append_entry_with_hash(
&self.inner.store, &self.inner.store,
head.session_id, head.session_id,
&mut head.head_hash, &mut head.head_hash,
entry.clone(), entry.clone(),
) )?;
.await?;
self.inner.sink.publish(entry); self.inner.sink.publish(entry);
Ok(hash) Ok(hash)
} }
@ -299,7 +304,7 @@ where
/// subscribers observe the swap as a freshly broadcast /// subscribers observe the swap as a freshly broadcast
/// `SessionStart` (with `compacted_from` set), which is their /// `SessionStart` (with `compacted_from` set), which is their
/// signal to reset their derived view. /// signal to reset their derived view.
pub async fn swap_session( pub fn swap_session(
&self, &self,
new_session_id: SessionId, new_session_id: SessionId,
initial: LogEntry, initial: LogEntry,
@ -310,11 +315,8 @@ where
prev_hash: None, prev_hash: None,
entry: initial.clone(), entry: initial.clone(),
}; };
self.inner self.inner.store.create_session(new_session_id, &[hashed])?;
.store let mut head = self.inner.head.lock();
.create_session(new_session_id, &[hashed])
.await?;
let mut head = self.inner.head.lock().await;
head.session_id = new_session_id; head.session_id = new_session_id;
head.head_hash = Some(hash.clone()); head.head_hash = Some(hash.clone());
self.inner.sink.reset_with_initial(initial); self.inner.sink.reset_with_initial(initial);
@ -324,12 +326,9 @@ where
/// If the store's head no longer matches our cached head, mint a /// If the store's head no longer matches our cached head, mint a
/// fresh session that forks from the current state and switch to /// fresh session that forks from the current state and switch to
/// it. Returns `true` when a fork happened. /// it. Returns `true` when a fork happened.
pub async fn ensure_head_or_fork( pub fn ensure_head_or_fork(&self, state: SessionStartState<'_>) -> Result<bool, StoreError> {
&self, let mut head = self.inner.head.lock();
state: SessionStartState<'_>, let store_head = self.inner.store.read_head_hash(head.session_id)?;
) -> Result<bool, StoreError> {
let mut head = self.inner.head.lock().await;
let store_head = self.inner.store.read_head_hash(head.session_id).await?;
if store_head == head.head_hash { if store_head == head.head_hash {
return Ok(false); return Ok(false);
} }
@ -348,7 +347,7 @@ where
prev_hash: None, prev_hash: None,
entry: entry.clone(), entry: entry.clone(),
}; };
self.inner.store.create_session(fork_id, &[hashed]).await?; self.inner.store.create_session(fork_id, &[hashed])?;
head.session_id = fork_id; head.session_id = fork_id;
head.head_hash = Some(hash); head.head_hash = Some(hash);
self.inner.sink.reset_with_initial(entry); self.inner.sink.reset_with_initial(entry);
@ -370,20 +369,19 @@ where
} }
/// Cheap snapshot of the current session id. /// Cheap snapshot of the current session id.
pub async fn current_session_id(&self) -> SessionId { pub fn current_session_id(&self) -> SessionId {
self.inner.head.lock().await.session_id self.inner.head.lock().session_id
} }
/// Cheap snapshot of the current head hash. /// Cheap snapshot of the current head hash.
pub async fn current_head_hash(&self) -> Option<EntryHash> { pub fn current_head_hash(&self) -> Option<EntryHash> {
self.inner.head.lock().await.head_hash.clone() self.inner.head.lock().head_hash.clone()
} }
/// Direct lock on the head. Used by paths that need to coordinate /// Direct lock on the head. Used by paths that need to coordinate
/// custom writes with the hash chain (currently /// custom writes with the hash chain.
/// `session_metrics::record_metric`). pub fn lock_head(&self) -> MutexGuard<'_, SessionHeadState> {
pub async fn lock_head(&self) -> MutexGuard<'_, SessionHeadState> { self.inner.head.lock()
self.inner.head.lock().await
} }
} }
@ -428,12 +426,12 @@ mod tests {
} }
fn notification_entry(text: &str) -> LogEntry { fn notification_entry(text: &str) -> LogEntry {
LogEntry::SystemItems { LogEntry::SystemItem {
ts: now_millis(), ts: now_millis(),
items: vec![session_store::SystemItem::Notification { item: session_store::SystemItem::Notification {
message: text.to_owned(), message: text.to_owned(),
body: format!("[Notification] {text}"), body: format!("[Notification] {text}"),
}], },
} }
} }
@ -449,11 +447,11 @@ mod tests {
sink.publish(turn_end(1)); sink.publish(turn_end(1));
assert!(rx.try_recv().is_err(), "TurnEnd must not be broadcast live"); assert!(rx.try_recv().is_err(), "TurnEnd must not be broadcast live");
// SystemItems is live-relevant. // SystemItem is live-relevant.
sink.publish(notification_entry("hi")); sink.publish(notification_entry("hi"));
match rx.try_recv() { match rx.try_recv() {
Ok(LogEntry::SystemItems { .. }) => {} Ok(LogEntry::SystemItem { .. }) => {}
other => panic!("expected SystemItems, got {other:?}"), other => panic!("expected SystemItem, got {other:?}"),
} }
// Mirror still grew with both entries (snapshot completeness). // Mirror still grew with both entries (snapshot completeness).
@ -470,7 +468,7 @@ mod tests {
assert_eq!(snapshot.len(), 1); assert_eq!(snapshot.len(), 1);
match rx.try_recv() { match rx.try_recv() {
Ok(LogEntry::SystemItems { .. }) => {} Ok(LogEntry::SystemItem { .. }) => {}
other => panic!("unexpected: {other:?}"), other => panic!("unexpected: {other:?}"),
} }
assert!(rx.try_recv().is_err()); assert!(rx.try_recv().is_err());

View File

@ -149,7 +149,7 @@ async fn make_pod_with_manifest(
let manifest = pod::PodManifest::from_toml(manifest_toml).unwrap(); let manifest = pod::PodManifest::from_toml(manifest_toml).unwrap();
let store_tmp = tempfile::tempdir().unwrap(); let store_tmp = tempfile::tempdir().unwrap();
let store = FsStore::new(store_tmp.path()).await.unwrap(); let store = FsStore::new(store_tmp.path()).unwrap();
std::mem::forget(store_tmp); std::mem::forget(store_tmp);
let pwd_tmp = tempfile::tempdir().unwrap(); let pwd_tmp = tempfile::tempdir().unwrap();

View File

@ -158,7 +158,7 @@ async fn make_pod_with(
let manifest = pod::PodManifest::from_toml(manifest_toml).unwrap(); let manifest = pod::PodManifest::from_toml(manifest_toml).unwrap();
let store_tmp = tempfile::tempdir().unwrap(); let store_tmp = tempfile::tempdir().unwrap();
let store = FsStore::new(store_tmp.path()).await.unwrap(); let store = FsStore::new(store_tmp.path()).unwrap();
std::mem::forget(store_tmp); std::mem::forget(store_tmp);
let scope = pod::Scope::writable(&pwd).unwrap(); let scope = pod::Scope::writable(&pwd).unwrap();

View File

@ -29,6 +29,12 @@ fn history_from_sink(handle: &PodHandle) -> Vec<Item> {
let text = protocol::Segment::flatten_to_text(&segments); let text = protocol::Segment::flatten_to_text(&segments);
items.push(Item::user_message(text)); items.push(Item::user_message(text));
} }
LogEntry::AssistantItem { item, .. } | LogEntry::ToolResult { item, .. } => {
items.push(Item::from(item));
}
LogEntry::SystemItem { item, .. } => {
items.push(item.to_history_item());
}
LogEntry::AssistantItems { items: i, .. } LogEntry::AssistantItems { items: i, .. }
| LogEntry::ToolResults { items: i, .. } | LogEntry::ToolResults { items: i, .. }
| LogEntry::HookInjectedItems { items: i, .. } => { | LogEntry::HookInjectedItems { items: i, .. } => {
@ -167,7 +173,7 @@ async fn make_pod_with_pwd_and_manifest(
) -> (Pod<MockClient, FsStore>, std::path::PathBuf) { ) -> (Pod<MockClient, FsStore>, std::path::PathBuf) {
let manifest = PodManifest::from_toml(manifest_toml).unwrap(); let manifest = PodManifest::from_toml(manifest_toml).unwrap();
let store_tmp = tempfile::tempdir().unwrap(); let store_tmp = tempfile::tempdir().unwrap();
let store = FsStore::new(store_tmp.path()).await.unwrap(); let store = FsStore::new(store_tmp.path()).unwrap();
std::mem::forget(store_tmp); std::mem::forget(store_tmp);
// Separate tempdir to serve as the Pod's pwd/scope — these tests // Separate tempdir to serve as the Pod's pwd/scope — these tests
@ -773,12 +779,10 @@ async fn notify_while_idle_auto_starts_turn_and_injects_system_message() {
let (entries, _) = handle.sink.subscribe_with_snapshot(); let (entries, _) = handle.sink.subscribe_with_snapshot();
let saw_notify_in_mirror = entries.iter().any(|e| matches!( let saw_notify_in_mirror = entries.iter().any(|e| matches!(
e, e,
session_store::LogEntry::SystemItems { items, .. } session_store::LogEntry::SystemItem {
if items.iter().any(|si| matches!( item: session_store::SystemItem::Notification { message, .. },
si, ..
session_store::SystemItem::Notification { message, .. } } if message == "turn finished"
if message == "turn finished"
))
)); ));
assert!( assert!(
saw_notify_in_mirror, saw_notify_in_mirror,
@ -863,12 +867,13 @@ async fn pod_event_turn_ended_while_idle_auto_starts_turn_and_injects_system_mes
let (entries, _) = handle.sink.subscribe_with_snapshot(); let (entries, _) = handle.sink.subscribe_with_snapshot();
let saw_pod_event_in_mirror = entries.iter().any(|e| matches!( let saw_pod_event_in_mirror = entries.iter().any(|e| matches!(
e, e,
session_store::LogEntry::SystemItems { items, .. } session_store::LogEntry::SystemItem {
if items.iter().any(|si| matches!( item: session_store::SystemItem::PodEvent {
si, event: protocol::PodEvent::TurnEnded { pod_name },
session_store::SystemItem::PodEvent { event: protocol::PodEvent::TurnEnded { pod_name }, .. } ..
if pod_name == "child" },
)) ..
} if pod_name == "child"
)); ));
assert!( assert!(
saw_pod_event_in_mirror, saw_pod_event_in_mirror,

View File

@ -36,7 +36,7 @@ async fn restore_from_manifest_rejects_unknown_session() {
let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner()); let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
let store_tmp = tempfile::tempdir().unwrap(); let store_tmp = tempfile::tempdir().unwrap();
let store = FsStore::new(store_tmp.path()).await.unwrap(); let store = FsStore::new(store_tmp.path()).unwrap();
let manifest = pod::PodManifest::from_toml(MINIMAL_MANIFEST_TOML).unwrap(); let manifest = pod::PodManifest::from_toml(MINIMAL_MANIFEST_TOML).unwrap();
// A freshly-minted id with no jsonl file at all → store returns // A freshly-minted id with no jsonl file at all → store returns
@ -59,7 +59,7 @@ async fn restore_from_manifest_rejects_empty_session_log() {
let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner()); let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
let store_tmp = tempfile::tempdir().unwrap(); let store_tmp = tempfile::tempdir().unwrap();
let store = FsStore::new(store_tmp.path()).await.unwrap(); let store = FsStore::new(store_tmp.path()).unwrap();
let manifest = pod::PodManifest::from_toml(MINIMAL_MANIFEST_TOML).unwrap(); let manifest = pod::PodManifest::from_toml(MINIMAL_MANIFEST_TOML).unwrap();
// Pre-create an empty `<id>.jsonl` so `read_all` succeeds with no // Pre-create an empty `<id>.jsonl` so `read_all` succeeds with no
@ -86,7 +86,7 @@ async fn restore_from_manifest_rejects_session_without_scope_snapshot() {
let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner()); let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
let store_tmp = tempfile::tempdir().unwrap(); let store_tmp = tempfile::tempdir().unwrap();
let store = FsStore::new(store_tmp.path()).await.unwrap(); let store = FsStore::new(store_tmp.path()).unwrap();
let manifest = pod::PodManifest::from_toml(MINIMAL_MANIFEST_TOML).unwrap(); let manifest = pod::PodManifest::from_toml(MINIMAL_MANIFEST_TOML).unwrap();
let id = session_store::new_session_id(); let id = session_store::new_session_id();
@ -95,9 +95,7 @@ async fn restore_from_manifest_rejects_session_without_scope_snapshot() {
config: &Default::default(), config: &Default::default(),
history: &[], history: &[],
}; };
session_store::create_session_with_id(&store, id, state) session_store::create_session_with_id(&store, id, state).unwrap();
.await
.unwrap();
let result = let result =
Pod::restore_from_manifest(id, manifest, store, pod::PromptLoader::builtins_only()).await; Pod::restore_from_manifest(id, manifest, store, pod::PromptLoader::builtins_only()).await;

View File

@ -174,7 +174,7 @@ async fn make_pod(
) { ) {
let manifest = PodManifest::from_toml(&manifest_toml).unwrap(); let manifest = PodManifest::from_toml(&manifest_toml).unwrap();
let store_tmp = tempfile::tempdir().unwrap(); let store_tmp = tempfile::tempdir().unwrap();
let store = FsStore::new(store_tmp.path()).await.unwrap(); let store = FsStore::new(store_tmp.path()).unwrap();
let pwd_tmp = tempfile::tempdir().unwrap(); let pwd_tmp = tempfile::tempdir().unwrap();
let pwd = pwd_tmp.path().to_path_buf(); let pwd = pwd_tmp.path().to_path_buf();
let scope = pod::Scope::writable(&pwd).unwrap(); let scope = pod::Scope::writable(&pwd).unwrap();
@ -210,7 +210,7 @@ async fn prune_metrics_emit_skip_then_fire_with_post_request_join() {
pod.run_text("first").await.unwrap(); pod.run_text("first").await.unwrap();
pod.run_text("second").await.unwrap(); pod.run_text("second").await.unwrap();
let state = session_store::restore(&store, session_id).await.unwrap(); let state = session_store::restore(&store, session_id).unwrap();
let metrics = metrics_from_extensions(&state.extensions); let metrics = metrics_from_extensions(&state.extensions);
// Run 1 has 2 LLM iterations (tool loop), each evaluates prune with // Run 1 has 2 LLM iterations (tool loop), each evaluates prune with
@ -296,7 +296,7 @@ async fn prune_metrics_record_below_min_savings_skip() {
pod.run_text("first").await.unwrap(); pod.run_text("first").await.unwrap();
pod.run_text("second").await.unwrap(); pod.run_text("second").await.unwrap();
let state = session_store::restore(&store, session_id).await.unwrap(); let state = session_store::restore(&store, session_id).unwrap();
let metrics = metrics_from_extensions(&state.extensions); let metrics = metrics_from_extensions(&state.extensions);
let below = metrics let below = metrics
.iter() .iter()
@ -329,35 +329,35 @@ struct MetricFailingStore {
} }
impl Store for MetricFailingStore { impl Store for MetricFailingStore {
async fn append(&self, id: SessionId, entry: &HashedEntry) -> Result<(), StoreError> { fn append(&self, id: SessionId, entry: &HashedEntry) -> Result<(), StoreError> {
if let LogEntry::Extension { domain, .. } = &entry.entry { if let LogEntry::Extension { domain, .. } = &entry.entry {
if domain == DOMAIN { if domain == DOMAIN {
return Err(StoreError::Io(std::io::Error::other("synthetic failure"))); return Err(StoreError::Io(std::io::Error::other("synthetic failure")));
} }
} }
self.inner.append(id, entry).await self.inner.append(id, entry)
} }
async fn read_all(&self, id: SessionId) -> Result<Vec<HashedEntry>, StoreError> { fn read_all(&self, id: SessionId) -> Result<Vec<HashedEntry>, StoreError> {
self.inner.read_all(id).await self.inner.read_all(id)
} }
async fn list_sessions(&self) -> Result<Vec<SessionId>, StoreError> { fn list_sessions(&self) -> Result<Vec<SessionId>, StoreError> {
self.inner.list_sessions().await self.inner.list_sessions()
} }
async fn create_session( fn create_session(
&self, &self,
id: SessionId, id: SessionId,
entries: &[HashedEntry], entries: &[HashedEntry],
) -> Result<(), StoreError> { ) -> Result<(), StoreError> {
self.inner.create_session(id, entries).await self.inner.create_session(id, entries)
} }
async fn exists(&self, id: SessionId) -> Result<bool, StoreError> { fn exists(&self, id: SessionId) -> Result<bool, StoreError> {
self.inner.exists(id).await self.inner.exists(id)
} }
async fn read_head_hash(&self, id: SessionId) -> Result<Option<EntryHash>, StoreError> { fn read_head_hash(&self, id: SessionId) -> Result<Option<EntryHash>, StoreError> {
self.inner.read_head_hash(id).await self.inner.read_head_hash(id)
} }
async fn append_trace(&self, id: SessionId, entry: &TraceEntry) -> Result<(), StoreError> { fn append_trace(&self, id: SessionId, entry: &TraceEntry) -> Result<(), StoreError> {
self.inner.append_trace(id, entry).await self.inner.append_trace(id, entry)
} }
} }
@ -372,7 +372,7 @@ async fn metric_write_failure_emits_warn_alert_and_does_not_abort_run() {
let manifest_toml = manifest_toml(1, 1); let manifest_toml = manifest_toml(1, 1);
let manifest = PodManifest::from_toml(&manifest_toml).unwrap(); let manifest = PodManifest::from_toml(&manifest_toml).unwrap();
let store_tmp = tempfile::tempdir().unwrap(); let store_tmp = tempfile::tempdir().unwrap();
let inner = FsStore::new(store_tmp.path()).await.unwrap(); let inner = FsStore::new(store_tmp.path()).unwrap();
let store = MetricFailingStore { inner }; let store = MetricFailingStore { inner };
let pwd_tmp = tempfile::tempdir().unwrap(); let pwd_tmp = tempfile::tempdir().unwrap();
let pwd = pwd_tmp.path().to_path_buf(); let pwd = pwd_tmp.path().to_path_buf();
@ -397,7 +397,7 @@ async fn metric_write_failure_emits_warn_alert_and_does_not_abort_run() {
pod.run_text("hello").await.unwrap(); pod.run_text("hello").await.unwrap();
// No metrics ended up in the log (writes were rejected). // No metrics ended up in the log (writes were rejected).
let state = session_store::restore(&store, session_id).await.unwrap(); let state = session_store::restore(&store, session_id).unwrap();
let metrics = metrics_from_extensions(&state.extensions); let metrics = metrics_from_extensions(&state.extensions);
assert!(metrics.is_empty(), "metrics must drop on write failure"); assert!(metrics.is_empty(), "metrics must drop on write failure");
@ -444,7 +444,7 @@ permission = "write"
let client = MockClient::new(vec![text_response_with_cache("hi", 0, 0)]); let client = MockClient::new(vec![text_response_with_cache("hi", 0, 0)]);
let manifest = PodManifest::from_toml(manifest_toml).unwrap(); let manifest = PodManifest::from_toml(manifest_toml).unwrap();
let store_tmp = tempfile::tempdir().unwrap(); let store_tmp = tempfile::tempdir().unwrap();
let store = FsStore::new(store_tmp.path()).await.unwrap(); let store = FsStore::new(store_tmp.path()).unwrap();
let pwd_tmp = tempfile::tempdir().unwrap(); let pwd_tmp = tempfile::tempdir().unwrap();
let pwd = pwd_tmp.path().to_path_buf(); let pwd = pwd_tmp.path().to_path_buf();
let scope = pod::Scope::writable(&pwd).unwrap(); let scope = pod::Scope::writable(&pwd).unwrap();
@ -455,7 +455,7 @@ permission = "write"
let session_id = pod.session_id(); let session_id = pod.session_id();
pod.run_text("hello").await.unwrap(); pod.run_text("hello").await.unwrap();
let state = session_store::restore(&store, session_id).await.unwrap(); let state = session_store::restore(&store, session_id).unwrap();
let metrics = metrics_from_extensions(&state.extensions); let metrics = metrics_from_extensions(&state.extensions);
assert!( assert!(
metrics.is_empty(), metrics.is_empty(),

View File

@ -103,7 +103,7 @@ async fn make_pod_with_body(
let manifest = pod::PodManifest::from_toml(MINIMAL_MANIFEST_TOML).unwrap(); let manifest = pod::PodManifest::from_toml(MINIMAL_MANIFEST_TOML).unwrap();
let store_tmp = tempfile::tempdir().unwrap(); let store_tmp = tempfile::tempdir().unwrap();
let store = FsStore::new(store_tmp.path()).await.unwrap(); let store = FsStore::new(store_tmp.path()).unwrap();
std::mem::forget(store_tmp); std::mem::forget(store_tmp);
let pwd_tmp = tempfile::tempdir().unwrap(); let pwd_tmp = tempfile::tempdir().unwrap();
@ -182,7 +182,7 @@ async fn session_start_state_captures_rendered_prompt() {
.unwrap(); .unwrap();
pod.run_text("hi").await.unwrap(); pod.run_text("hi").await.unwrap();
let entries = pod.store().read_all(pod.session_id()).await.unwrap(); let entries = pod.store().read_all(pod.session_id()).unwrap();
let first = entries.first().expect("at least one entry"); let first = entries.first().expect("at least one entry");
match &first.entry { match &first.entry {
LogEntry::SessionStart { system_prompt, .. } => { LogEntry::SessionStart { system_prompt, .. } => {

View File

@ -75,14 +75,14 @@ impl Metric {
/// ///
/// `save_extension` の薄い wrapper。書き込み失敗は呼び出し側に返す /// `save_extension` の薄い wrapper。書き込み失敗は呼び出し側に返す
/// (メトリクスのために本体処理を止めるかは呼び出し側の判断)。 /// (メトリクスのために本体処理を止めるかは呼び出し側の判断)。
pub async fn record_metric( pub fn record_metric(
store: &impl Store, store: &impl Store,
session_id: SessionId, session_id: SessionId,
head_hash: &mut Option<EntryHash>, head_hash: &mut Option<EntryHash>,
metric: &Metric, metric: &Metric,
) -> Result<(), StoreError> { ) -> Result<(), StoreError> {
let payload = serde_json::to_value(metric).expect("Metric serialization cannot fail"); let payload = serde_json::to_value(metric).expect("Metric serialization cannot fail");
save_extension(store, session_id, head_hash, DOMAIN, payload).await save_extension(store, session_id, head_hash, DOMAIN, payload)
} }
/// `RestoredState.extensions` から metrics domain の payload を順に取り出し、 /// `RestoredState.extensions` から metrics domain の payload を順に取り出し、

View File

@ -8,9 +8,9 @@ use crate::SessionId;
use crate::event_trace::TraceEntry; use crate::event_trace::TraceEntry;
use crate::session_log::{EntryHash, HashedEntry}; use crate::session_log::{EntryHash, HashedEntry};
use crate::store::{Store, StoreError}; use crate::store::{Store, StoreError};
use std::fs;
use std::io::Write;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use tokio::fs;
use tokio::io::AsyncWriteExt;
/// Filesystem-backed JSONL store. /// Filesystem-backed JSONL store.
/// ///
@ -24,9 +24,9 @@ pub struct FsStore {
impl FsStore { impl FsStore {
/// Create a new `FsStore` rooted at the given directory. /// Create a new `FsStore` rooted at the given directory.
/// Creates the directory if it does not exist. /// Creates the directory if it does not exist.
pub async fn new(root: impl Into<PathBuf>) -> Result<Self, StoreError> { pub fn new(root: impl Into<PathBuf>) -> Result<Self, StoreError> {
let root = root.into(); let root = root.into();
fs::create_dir_all(&root).await?; fs::create_dir_all(&root)?;
Ok(Self { root }) Ok(Self { root })
} }
@ -38,15 +38,13 @@ impl FsStore {
self.root.join(format!("{id}.trace.jsonl")) self.root.join(format!("{id}.trace.jsonl"))
} }
async fn append_line(&self, path: &Path, line: &str) -> Result<(), StoreError> { fn append_line(&self, path: &Path, line: &str) -> Result<(), StoreError> {
let mut file = fs::OpenOptions::new() let mut file = fs::OpenOptions::new().create(true).append(true).open(path)?;
.create(true) file.write_all(line.as_bytes())?;
.append(true) file.write_all(b"\n")?;
.open(path) // Append-mode write is the durability boundary; an explicit
.await?; // `sync_all` here would multiply latency by ~10× for no gain
file.write_all(line.as_bytes()).await?; // since the kernel already orders concurrent `O_APPEND` writes.
file.write_all(b"\n").await?;
file.flush().await?;
Ok(()) Ok(())
} }
@ -67,24 +65,24 @@ impl FsStore {
} }
impl Store for FsStore { impl Store for FsStore {
async fn append(&self, id: SessionId, entry: &HashedEntry) -> Result<(), StoreError> { fn append(&self, id: SessionId, entry: &HashedEntry) -> Result<(), StoreError> {
let line = serde_json::to_string(entry)?; let line = serde_json::to_string(entry)?;
self.append_line(&self.log_path(id), &line).await self.append_line(&self.log_path(id), &line)
} }
async fn read_all(&self, id: SessionId) -> Result<Vec<HashedEntry>, StoreError> { fn read_all(&self, id: SessionId) -> Result<Vec<HashedEntry>, StoreError> {
let path = self.log_path(id); let path = self.log_path(id);
if !path.exists() { if !path.exists() {
return Err(StoreError::NotFound(id)); return Err(StoreError::NotFound(id));
} }
let content = fs::read_to_string(&path).await?; let content = fs::read_to_string(&path)?;
Self::parse_jsonl(&content) Self::parse_jsonl(&content)
} }
async fn list_sessions(&self) -> Result<Vec<SessionId>, StoreError> { fn list_sessions(&self) -> Result<Vec<SessionId>, StoreError> {
let mut sessions = Vec::new(); let mut sessions = Vec::new();
let mut dir = fs::read_dir(&self.root).await?; for entry in fs::read_dir(&self.root)? {
while let Some(entry) = dir.next_entry().await? { let entry = entry?;
let path = entry.path(); let path = entry.path();
// Only match .jsonl files, not .trace.jsonl // Only match .jsonl files, not .trace.jsonl
let name = path.file_name().and_then(|n| n.to_str()).unwrap_or(""); let name = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
@ -100,31 +98,27 @@ impl Store for FsStore {
Ok(sessions) Ok(sessions)
} }
async fn create_session( fn create_session(&self, id: SessionId, entries: &[HashedEntry]) -> Result<(), StoreError> {
&self,
id: SessionId,
entries: &[HashedEntry],
) -> Result<(), StoreError> {
let path = self.log_path(id); let path = self.log_path(id);
let mut content = String::new(); let mut content = String::new();
for entry in entries { for entry in entries {
content.push_str(&serde_json::to_string(entry)?); content.push_str(&serde_json::to_string(entry)?);
content.push('\n'); content.push('\n');
} }
fs::write(&path, content.as_bytes()).await?; fs::write(&path, content.as_bytes())?;
Ok(()) Ok(())
} }
async fn exists(&self, id: SessionId) -> Result<bool, StoreError> { fn exists(&self, id: SessionId) -> Result<bool, StoreError> {
Ok(self.log_path(id).exists()) Ok(self.log_path(id).exists())
} }
async fn read_head_hash(&self, id: SessionId) -> Result<Option<EntryHash>, StoreError> { fn read_head_hash(&self, id: SessionId) -> Result<Option<EntryHash>, StoreError> {
let path = self.log_path(id); let path = self.log_path(id);
if !path.exists() { if !path.exists() {
return Err(StoreError::NotFound(id)); return Err(StoreError::NotFound(id));
} }
let content = fs::read_to_string(&path).await?; let content = fs::read_to_string(&path)?;
let last_line = content.lines().rev().find(|l| !l.trim().is_empty()); let last_line = content.lines().rev().find(|l| !l.trim().is_empty());
match last_line { match last_line {
Some(line) => { Some(line) => {
@ -139,8 +133,8 @@ impl Store for FsStore {
} }
} }
async fn append_trace(&self, id: SessionId, entry: &TraceEntry) -> Result<(), StoreError> { fn append_trace(&self, id: SessionId, entry: &TraceEntry) -> Result<(), StoreError> {
let line = serde_json::to_string(entry)?; let line = serde_json::to_string(entry)?;
self.append_line(&self.trace_path(id), &line).await self.append_line(&self.trace_path(id), &line)
} }
} }

View File

@ -40,10 +40,11 @@ pub use llm_worker::UsageRecord;
pub use llm_worker::llm_client::types::{ContentPart, Item, Role}; pub use llm_worker::llm_client::types::{ContentPart, Item, Role};
pub use logged_item::{LoggedContentPart, LoggedItem, LoggedRole, from_logged, to_logged}; pub use logged_item::{LoggedContentPart, LoggedItem, LoggedRole, from_logged, to_logged};
pub use session::{ pub use session::{
SessionStartState, append_entry, append_entry_with_hash, create_compacted_session, SessionStartState, append_entry, append_entry_with_hash, append_system_item,
create_session, create_session_with_id, ensure_head_or_fork, fork, fork_at, restore, classify_history_item, create_compacted_session, create_session, create_session_with_id,
save_config_changed, save_delta, save_extension, save_pod_scope, save_run_completed, ensure_head_or_fork, fork, fork_at, restore, save_config_changed, save_delta, save_extension,
save_run_errored, save_turn_end, save_usage, save_user_input, save_pod_scope, save_run_completed, save_run_errored, save_turn_end, save_usage,
save_user_input,
}; };
pub use session_log::{ pub use session_log::{
EntryHash, HashedEntry, LogEntry, POD_SCOPE_EXTENSION_DOMAIN, PodScopeSnapshot, RestoredState, EntryHash, HashedEntry, LogEntry, POD_SCOPE_EXTENSION_DOMAIN, PodScopeSnapshot, RestoredState,

View File

@ -8,6 +8,7 @@ use crate::SessionId;
use crate::logged_item::{LoggedItem, to_logged}; use crate::logged_item::{LoggedItem, to_logged};
use crate::session_log::{self, EntryHash, HashedEntry, LogEntry, PodScopeSnapshot, SessionOrigin}; use crate::session_log::{self, EntryHash, HashedEntry, LogEntry, PodScopeSnapshot, SessionOrigin};
use crate::store::{Store, StoreError}; use crate::store::{Store, StoreError};
use crate::system_item::SystemItem;
use llm_worker::WorkerResult; use llm_worker::WorkerResult;
use llm_worker::llm_client::RequestConfig; use llm_worker::llm_client::RequestConfig;
use llm_worker::llm_client::types::Item; use llm_worker::llm_client::types::Item;
@ -23,12 +24,12 @@ pub struct SessionStartState<'a> {
/// Create a new session, writing the initial `SessionStart` entry. /// Create a new session, writing the initial `SessionStart` entry.
/// ///
/// Returns the new session ID and head hash. /// Returns the new session ID and head hash.
pub async fn create_session( pub fn create_session(
store: &impl Store, store: &impl Store,
state: SessionStartState<'_>, state: SessionStartState<'_>,
) -> Result<(SessionId, EntryHash), StoreError> { ) -> Result<(SessionId, EntryHash), StoreError> {
let session_id = crate::new_session_id(); let session_id = crate::new_session_id();
let hash = create_session_with_id(store, session_id, state).await?; let hash = create_session_with_id(store, session_id, state)?;
Ok((session_id, hash)) Ok((session_id, hash))
} }
@ -37,7 +38,7 @@ pub async fn create_session(
/// Used by callers that need to reserve a session ID synchronously but /// Used by callers that need to reserve a session ID synchronously but
/// defer the initial log append (e.g. Pod, which resolves a templated /// defer the initial log append (e.g. Pod, which resolves a templated
/// system prompt only at first turn). Returns the resulting head hash. /// system prompt only at first turn). Returns the resulting head hash.
pub async fn create_session_with_id( pub fn create_session_with_id(
store: &impl Store, store: &impl Store,
session_id: SessionId, session_id: SessionId,
state: SessionStartState<'_>, state: SessionStartState<'_>,
@ -56,7 +57,7 @@ pub async fn create_session_with_id(
prev_hash: None, prev_hash: None,
entry, entry,
}; };
store.append(session_id, &hashed_entry).await?; store.append(session_id, &hashed_entry)?;
Ok(hash) Ok(hash)
} }
@ -64,7 +65,7 @@ pub async fn create_session_with_id(
/// ///
/// Records `compacted_from` provenance linking back to the source session. /// Records `compacted_from` provenance linking back to the source session.
/// Returns the new session ID and head hash. /// Returns the new session ID and head hash.
pub async fn create_compacted_session( pub fn create_compacted_session(
store: &impl Store, store: &impl Store,
state: SessionStartState<'_>, state: SessionStartState<'_>,
source_session_id: SessionId, source_session_id: SessionId,
@ -88,7 +89,7 @@ pub async fn create_compacted_session(
prev_hash: None, prev_hash: None,
entry, entry,
}; };
store.append(session_id, &hashed_entry).await?; store.append(session_id, &hashed_entry)?;
Ok((session_id, hash)) Ok((session_id, hash))
} }
@ -96,11 +97,11 @@ pub async fn create_compacted_session(
/// ///
/// Returns the reconstructed state. The caller is responsible for /// Returns the reconstructed state. The caller is responsible for
/// applying it to a Worker. /// applying it to a Worker.
pub async fn restore( pub fn restore(
store: &impl Store, store: &impl Store,
session_id: SessionId, session_id: SessionId,
) -> Result<crate::session_log::RestoredState, StoreError> { ) -> Result<crate::session_log::RestoredState, StoreError> {
let entries = store.read_all(session_id).await?; let entries = store.read_all(session_id)?;
Ok(session_log::collect_state(&entries)) Ok(session_log::collect_state(&entries))
} }
@ -108,13 +109,13 @@ pub async fn restore(
/// If not, auto-fork into a new session. /// If not, auto-fork into a new session.
/// ///
/// Updates `session_id` and `head_hash` in place when a fork occurs. /// Updates `session_id` and `head_hash` in place when a fork occurs.
pub async fn ensure_head_or_fork( pub fn ensure_head_or_fork(
store: &impl Store, store: &impl Store,
session_id: &mut SessionId, session_id: &mut SessionId,
head_hash: &mut Option<EntryHash>, head_hash: &mut Option<EntryHash>,
state: SessionStartState<'_>, state: SessionStartState<'_>,
) -> Result<(), StoreError> { ) -> Result<(), StoreError> {
let store_head = store.read_head_hash(*session_id).await?; let store_head = store.read_head_hash(*session_id)?;
if store_head == *head_hash { if store_head == *head_hash {
return Ok(()); return Ok(());
} }
@ -133,7 +134,7 @@ pub async fn ensure_head_or_fork(
prev_hash: None, prev_hash: None,
entry, entry,
}; };
store.create_session(fork_id, &[hashed_entry]).await?; store.create_session(fork_id, &[hashed_entry])?;
*session_id = fork_id; *session_id = fork_id;
*head_hash = Some(hash); *head_hash = Some(hash);
Ok(()) Ok(())
@ -145,7 +146,7 @@ pub async fn ensure_head_or_fork(
/// the worker pushes its flattened user message into history; replay /// the worker pushes its flattened user message into history; replay
/// derives the worker `Item::user_message` from these segments via /// derives the worker `Item::user_message` from these segments via
/// [`Segment::flatten_to_text`]. /// [`Segment::flatten_to_text`].
pub async fn save_user_input( pub fn save_user_input(
store: &impl Store, store: &impl Store,
session_id: SessionId, session_id: SessionId,
head_hash: &mut Option<EntryHash>, head_hash: &mut Option<EntryHash>,
@ -160,17 +161,17 @@ pub async fn save_user_input(
segments, segments,
}, },
) )
.await
} }
/// Log the history delta — new items added since the previous snapshot. /// Log the history delta — new items added since the previous snapshot.
/// ///
/// Classifies items into AssistantItems, ToolResults, and HookInjectedItems /// Classifies items into AssistantItem / ToolResult / HookInjectedItems
/// entries automatically. User messages are skipped because they are /// entries automatically (one entry per item). User messages are skipped
/// persisted upfront via [`save_user_input`] at submit time; the worker /// because they are persisted upfront via [`save_user_input`] at submit
/// pushes a flattened copy into its history that arrives here in /// time; the worker pushes a flattened copy into its history that
/// `new_items` and would otherwise produce a duplicate `UserInput` entry. /// arrives here in `new_items` and would otherwise produce a duplicate
pub async fn save_delta( /// `UserInput` entry.
pub fn save_delta(
store: &impl Store, store: &impl Store,
session_id: SessionId, session_id: SessionId,
head_hash: &mut Option<EntryHash>, head_hash: &mut Option<EntryHash>,
@ -181,66 +182,63 @@ pub async fn save_delta(
} }
let ts = session_log::now_millis(); let ts = session_log::now_millis();
let mut i = 0; for item in new_items {
while i < new_items.len() {
let item = &new_items[i];
if item.is_user_message() { if item.is_user_message() {
// Already persisted by save_user_input at submit time. // Already persisted by save_user_input at submit time.
i += 1; continue;
} else if item.is_tool_result() {
let start = i;
while i < new_items.len() && new_items[i].is_tool_result() {
i += 1;
}
append_entry(
store,
session_id,
head_hash,
LogEntry::ToolResults {
ts,
items: to_logged(&new_items[start..i]),
},
)
.await?;
} else if item.is_assistant_message() || item.is_tool_call() || item.is_reasoning() {
let start = i;
while i < new_items.len()
&& (new_items[i].is_assistant_message()
|| new_items[i].is_tool_call()
|| new_items[i].is_reasoning())
{
i += 1;
}
append_entry(
store,
session_id,
head_hash,
LogEntry::AssistantItems {
ts,
items: to_logged(&new_items[start..i]),
},
)
.await?;
} else {
append_entry(
store,
session_id,
head_hash,
LogEntry::HookInjectedItems {
ts,
items: vec![LoggedItem::from(&new_items[i])],
},
)
.await?;
i += 1;
} }
let entry = classify_history_item(item, ts);
append_entry(store, session_id, head_hash, entry)?;
} }
Ok(()) Ok(())
} }
/// Map one history item to its singular `LogEntry` form. Used by the
/// fallback `save_delta` path and the controller's worker-callback
/// classifier so write classification lives in one place.
pub fn classify_history_item(item: &Item, ts: u64) -> LogEntry {
if item.is_tool_result() {
LogEntry::ToolResult {
ts,
item: LoggedItem::from(item),
}
} else if item.is_assistant_message() || item.is_tool_call() || item.is_reasoning() {
LogEntry::AssistantItem {
ts,
item: LoggedItem::from(item),
}
} else {
// Defensive: anything else (future Item kinds) routes through
// AssistantItem rather than getting silently dropped.
LogEntry::AssistantItem {
ts,
item: LoggedItem::from(item),
}
}
}
/// Append a single typed system item as `LogEntry::SystemItem`. Helper
/// for the Pod-side interceptor commit path; mirrors the per-item
/// commit shape used for assistant / tool result entries.
pub fn append_system_item(
store: &impl Store,
session_id: SessionId,
head_hash: &mut Option<EntryHash>,
item: SystemItem,
) -> Result<EntryHash, StoreError> {
append_entry_with_hash(
store,
session_id,
head_hash,
LogEntry::SystemItem {
ts: session_log::now_millis(),
item,
},
)
}
/// Log a TurnEnd entry. /// Log a TurnEnd entry.
pub async fn save_turn_end( pub fn save_turn_end(
store: &impl Store, store: &impl Store,
session_id: SessionId, session_id: SessionId,
head_hash: &mut Option<EntryHash>, head_hash: &mut Option<EntryHash>,
@ -255,11 +253,10 @@ pub async fn save_turn_end(
turn_count, turn_count,
}, },
) )
.await
} }
/// Log a `RunCompleted` entry — `run()` / `resume()` returned `Ok(WorkerResult)`. /// Log a `RunCompleted` entry — `run()` / `resume()` returned `Ok(WorkerResult)`.
pub async fn save_run_completed( pub fn save_run_completed(
store: &impl Store, store: &impl Store,
session_id: SessionId, session_id: SessionId,
head_hash: &mut Option<EntryHash>, head_hash: &mut Option<EntryHash>,
@ -276,14 +273,13 @@ pub async fn save_run_completed(
result, result,
}, },
) )
.await
} }
/// Log a `RunErrored` entry — `run()` / `resume()` returned `Err(WorkerError)`. /// Log a `RunErrored` entry — `run()` / `resume()` returned `Err(WorkerError)`.
/// ///
/// `WorkerError` is not `Serialize`, so the caller passes a lossy /// `WorkerError` is not `Serialize`, so the caller passes a lossy
/// `to_string()` rendering as `message`. /// `to_string()` rendering as `message`.
pub async fn save_run_errored( pub fn save_run_errored(
store: &impl Store, store: &impl Store,
session_id: SessionId, session_id: SessionId,
head_hash: &mut Option<EntryHash>, head_hash: &mut Option<EntryHash>,
@ -300,7 +296,6 @@ pub async fn save_run_errored(
message, message,
}, },
) )
.await
} }
/// Log an `LlmUsage` entry — 1 LLM リクエスト分の Usage スナップショット。 /// Log an `LlmUsage` entry — 1 LLM リクエスト分の Usage スナップショット。
@ -309,7 +304,7 @@ pub async fn save_run_errored(
/// その prefix をプロバイダが実測した占有量(プロンプト全長)で、 /// その prefix をプロバイダが実測した占有量(プロンプト全長)で、
/// プロバイダ別の正規化Anthropic では `input + cache_read + cache_creation`)を /// プロバイダ別の正規化Anthropic では `input + cache_read + cache_creation`)を
/// 済ませた値を渡す。 /// 済ませた値を渡す。
pub async fn save_usage( pub fn save_usage(
store: &impl Store, store: &impl Store,
session_id: SessionId, session_id: SessionId,
head_hash: &mut Option<EntryHash>, head_hash: &mut Option<EntryHash>,
@ -332,7 +327,6 @@ pub async fn save_usage(
output_tokens, output_tokens,
}, },
) )
.await
} }
/// Log an `Extension` entry — domain-tagged opaque payload. /// Log an `Extension` entry — domain-tagged opaque payload.
@ -340,7 +334,7 @@ pub async fn save_usage(
/// session-store treats `payload` as an unstructured `serde_json::Value`. /// session-store treats `payload` as an unstructured `serde_json::Value`.
/// Each domain is responsible for serializing into and folding out of it. /// Each domain is responsible for serializing into and folding out of it.
/// Use `RestoredState.extensions` to read entries back at restore time. /// Use `RestoredState.extensions` to read entries back at restore time.
pub async fn save_extension( pub fn save_extension(
store: &impl Store, store: &impl Store,
session_id: SessionId, session_id: SessionId,
head_hash: &mut Option<EntryHash>, head_hash: &mut Option<EntryHash>,
@ -357,11 +351,10 @@ pub async fn save_extension(
payload, payload,
}, },
) )
.await
} }
/// Log the Pod's latest runtime scope snapshot. /// Log the Pod's latest runtime scope snapshot.
pub async fn save_pod_scope( pub fn save_pod_scope(
store: &impl Store, store: &impl Store,
session_id: SessionId, session_id: SessionId,
head_hash: &mut Option<EntryHash>, head_hash: &mut Option<EntryHash>,
@ -375,11 +368,10 @@ pub async fn save_pod_scope(
session_log::POD_SCOPE_EXTENSION_DOMAIN, session_log::POD_SCOPE_EXTENSION_DOMAIN,
payload, payload,
) )
.await
} }
/// Log a `ConfigChanged` entry. /// Log a `ConfigChanged` entry.
pub async fn save_config_changed( pub fn save_config_changed(
store: &impl Store, store: &impl Store,
session_id: SessionId, session_id: SessionId,
head_hash: &mut Option<EntryHash>, head_hash: &mut Option<EntryHash>,
@ -394,14 +386,10 @@ pub async fn save_config_changed(
config: config.clone(), config: config.clone(),
}, },
) )
.await
} }
/// Fork the current state into a new session. /// Fork the current state into a new session.
pub async fn fork( pub fn fork(store: &impl Store, state: SessionStartState<'_>) -> Result<SessionId, StoreError> {
store: &impl Store,
state: SessionStartState<'_>,
) -> Result<SessionId, StoreError> {
let fork_id = crate::new_session_id(); let fork_id = crate::new_session_id();
let entry = LogEntry::SessionStart { let entry = LogEntry::SessionStart {
ts: session_log::now_millis(), ts: session_log::now_millis(),
@ -417,17 +405,17 @@ pub async fn fork(
prev_hash: None, prev_hash: None,
entry, entry,
}; };
store.create_session(fork_id, &[hashed_entry]).await?; store.create_session(fork_id, &[hashed_entry])?;
Ok(fork_id) Ok(fork_id)
} }
/// Fork from an arbitrary point in a stored session's log. /// Fork from an arbitrary point in a stored session's log.
pub async fn fork_at( pub fn fork_at(
store: &impl Store, store: &impl Store,
source_id: SessionId, source_id: SessionId,
at_hash: &EntryHash, at_hash: &EntryHash,
) -> Result<SessionId, StoreError> { ) -> Result<SessionId, StoreError> {
let entries = store.read_all(source_id).await?; let entries = store.read_all(source_id)?;
let cut = entries let cut = entries
.iter() .iter()
.position(|e| &e.hash == at_hash) .position(|e| &e.hash == at_hash)
@ -453,7 +441,7 @@ pub async fn fork_at(
prev_hash: None, prev_hash: None,
entry, entry,
}; };
store.create_session(fork_id, &[hashed_entry]).await?; store.create_session(fork_id, &[hashed_entry])?;
Ok(fork_id) Ok(fork_id)
} }
@ -462,13 +450,13 @@ pub async fn fork_at(
/// Lower-level dual of the `save_*` convenience wrappers in this module. /// Lower-level dual of the `save_*` convenience wrappers in this module.
/// Use when the caller already builds the typed entry itself (e.g. when /// Use when the caller already builds the typed entry itself (e.g. when
/// it needs the same value for an in-memory mirror + broadcast). /// it needs the same value for an in-memory mirror + broadcast).
pub async fn append_entry( pub fn append_entry(
store: &impl Store, store: &impl Store,
session_id: SessionId, session_id: SessionId,
head_hash: &mut Option<EntryHash>, head_hash: &mut Option<EntryHash>,
entry: LogEntry, entry: LogEntry,
) -> Result<(), StoreError> { ) -> Result<(), StoreError> {
append_entry_with_hash(store, session_id, head_hash, entry).await?; append_entry_with_hash(store, session_id, head_hash, entry)?;
Ok(()) Ok(())
} }
@ -476,7 +464,7 @@ pub async fn append_entry(
/// ///
/// Used by paths that need the hash for downstream broadcast or mirror /// Used by paths that need the hash for downstream broadcast or mirror
/// updates (e.g. the Pod's `SessionLogSink`). /// updates (e.g. the Pod's `SessionLogSink`).
pub async fn append_entry_with_hash( pub fn append_entry_with_hash(
store: &impl Store, store: &impl Store,
session_id: SessionId, session_id: SessionId,
head_hash: &mut Option<EntryHash>, head_hash: &mut Option<EntryHash>,
@ -488,7 +476,7 @@ pub async fn append_entry_with_hash(
prev_hash: head_hash.clone(), prev_hash: head_hash.clone(),
entry, entry,
}; };
store.append(session_id, &hashed_entry).await?; store.append(session_id, &hashed_entry)?;
*head_hash = Some(hash.clone()); *head_hash = Some(hash.clone());
Ok(hash) Ok(hash)
} }

View File

@ -120,24 +120,37 @@ pub enum LogEntry {
/// history; the worker layer never sees segments directly. /// history; the worker layer never sees segments directly.
UserInput { ts: u64, segments: Vec<Segment> }, UserInput { ts: u64, segments: Vec<Segment> },
/// Assistant response items added to history (worker.rs:1040-1041). /// One assistant-side item appended to history — assistant message,
/// reasoning, or tool call. Singular: one entry per history item so
/// the wire-side `Event::*` lane and on-disk LogEntry stay 1:1.
AssistantItem { ts: u64, item: LoggedItem },
/// One tool-execution result appended to history.
ToolResult { ts: u64, item: LoggedItem },
/// One typed agent-injected system item: notification, child-Pod
/// lifecycle event, `@<path>` / `#<slug>` / `/<slug>` resolution
/// payload. Each `SystemItem` carries kind metadata that the LLM
/// itself never sees (the LLM gets `Item::system_message` with the
/// item's denormalised `body`), but live clients and replay paths
/// dispatch on `kind` for typed rendering.
SystemItem { ts: u64, item: SystemItem },
/// Legacy plural form: kept **read-only** so old session logs still
/// open. New writes always use the singular `AssistantItem`. Items
/// are flattened on replay.
AssistantItems { ts: u64, items: Vec<LoggedItem> }, AssistantItems { ts: u64, items: Vec<LoggedItem> },
/// Tool execution results added to history (worker.rs:897-900, 1072-1076). /// Legacy plural form: kept **read-only**. New writes use the
/// singular `ToolResult`.
ToolResults { ts: u64, items: Vec<LoggedItem> }, ToolResults { ts: u64, items: Vec<LoggedItem> },
/// Typed agent-injected system items: notifications, child-Pod /// Legacy plural form: kept **read-only**. New writes use the
/// lifecycle events, `@<path>` / `#<slug>` / `/<slug>` resolution /// singular `SystemItem`.
/// payloads. Each `SystemItem` carries kind metadata that the LLM
/// itself never sees (the LLM gets `Item::system_message` with the
/// item's `history_text()`), but live clients and replay paths
/// dispatch on `kind` for typed rendering.
SystemItems { ts: u64, items: Vec<SystemItem> }, SystemItems { ts: u64, items: Vec<SystemItem> },
/// Legacy pre-`SystemItems` form. Deserialize-only — new writes /// Legacy pre-`SystemItem*` form. Deserialize-only. Items are
/// always use `SystemItems`. Items are flattened to /// flattened to `Item::system_message` on replay.
/// `Item::system_message` on replay, matching how the original
/// path worked.
HookInjectedItems { ts: u64, items: Vec<LoggedItem> }, HookInjectedItems { ts: u64, items: Vec<LoggedItem> },
/// Turn boundary. Records the turn count after increment. /// Turn boundary. Records the turn count after increment.
@ -282,6 +295,15 @@ pub fn collect_state(entries: &[HashedEntry]) -> RestoredState {
state.history.push(Item::user_message(text)); state.history.push(Item::user_message(text));
state.user_segments.push(segments.clone()); state.user_segments.push(segments.clone());
} }
LogEntry::AssistantItem { item, .. } => {
state.history.push(Item::from(item.clone()));
}
LogEntry::ToolResult { item, .. } => {
state.history.push(Item::from(item.clone()));
}
LogEntry::SystemItem { item, .. } => {
state.history.push(item.to_history_item());
}
LogEntry::AssistantItems { items, .. } => { LogEntry::AssistantItems { items, .. } => {
state.history.extend(items.iter().cloned().map(Item::from)); state.history.extend(items.iter().cloned().map(Item::from));
} }

View File

@ -1,12 +1,18 @@
//! Persistence backend abstraction. //! Persistence backend abstraction.
//! //!
//! [`Store`] defines the async interface for reading and writing session logs. //! [`Store`] defines the sync interface for reading and writing session logs.
//! Implementations handle the physical storage (filesystem, database, etc.). //! Implementations handle the physical storage (filesystem, database, etc.).
//!
//! Sync (rather than async) is intentional: a session log append is a single
//! `< 1 KiB` line on local fs and completes well below a millisecond. Going
//! through `tokio::fs` would force every caller — including `Worker`'s sync
//! `on_history_append` callback — to bridge sync → async via a channel +
//! drain task. Keeping the store sync lets the worker callback, Pod commit
//! paths, and `PodInterceptor` all share one direct `append_entry` call.
use crate::SessionId; use crate::SessionId;
use crate::event_trace::TraceEntry; use crate::event_trace::TraceEntry;
use crate::session_log::{EntryHash, HashedEntry}; use crate::session_log::{EntryHash, HashedEntry};
use std::future::Future;
/// Errors from the persistence store. /// Errors from the persistence store.
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
@ -24,49 +30,31 @@ pub enum StoreError {
Corrupt { line: usize, message: String }, Corrupt { line: usize, message: String },
} }
/// Async persistence backend for session logs. /// Sync persistence backend for session logs.
/// ///
/// All methods take `&self` — implementations should use interior mutability /// All methods take `&self` — implementations should use interior mutability
/// (e.g., append-mode file handles) when needed. /// (e.g., append-mode file handles) when needed.
pub trait Store: Send + Sync { pub trait Store: Send + Sync {
/// Append a single hashed entry to the session log. /// Append a single hashed entry to the session log.
fn append( fn append(&self, id: SessionId, entry: &HashedEntry) -> Result<(), StoreError>;
&self,
id: SessionId,
entry: &HashedEntry,
) -> impl Future<Output = Result<(), StoreError>> + Send;
/// Read all hashed entries for a session, in order. /// Read all hashed entries for a session, in order.
fn read_all( fn read_all(&self, id: SessionId) -> Result<Vec<HashedEntry>, StoreError>;
&self,
id: SessionId,
) -> impl Future<Output = Result<Vec<HashedEntry>, StoreError>> + Send;
/// List all session IDs, most recent first. /// List all session IDs, most recent first.
fn list_sessions(&self) -> impl Future<Output = Result<Vec<SessionId>, StoreError>> + Send; fn list_sessions(&self) -> Result<Vec<SessionId>, StoreError>;
/// Create a new session with initial entries. /// Create a new session with initial entries.
fn create_session( fn create_session(&self, id: SessionId, entries: &[HashedEntry]) -> Result<(), StoreError>;
&self,
id: SessionId,
entries: &[HashedEntry],
) -> impl Future<Output = Result<(), StoreError>> + Send;
/// Check if a session exists. /// Check if a session exists.
fn exists(&self, id: SessionId) -> impl Future<Output = Result<bool, StoreError>> + Send; fn exists(&self, id: SessionId) -> Result<bool, StoreError>;
/// Read the hash of the last entry in a session (the head). /// Read the hash of the last entry in a session (the head).
/// ///
/// Returns `None` if the session is empty. /// Returns `None` if the session is empty.
fn read_head_hash( fn read_head_hash(&self, id: SessionId) -> Result<Option<EntryHash>, StoreError>;
&self,
id: SessionId,
) -> impl Future<Output = Result<Option<EntryHash>, StoreError>> + Send;
/// Append a trace entry to the debug event trace file. /// Append a trace entry to the debug event trace file.
fn append_trace( fn append_trace(&self, id: SessionId, entry: &TraceEntry) -> Result<(), StoreError>;
&self,
id: SessionId,
entry: &TraceEntry,
) -> impl Future<Output = Result<(), StoreError>> + Send;
} }

View File

@ -4,10 +4,10 @@ use session_store::{
FsStore, LogEntry, Store, TraceEntry, build_chain, collect_state, new_session_id, FsStore, LogEntry, Store, TraceEntry, build_chain, collect_state, new_session_id,
}; };
#[tokio::test] #[test]
async fn round_trip_write_and_read() { fn round_trip_write_and_read() {
let dir = tempfile::tempdir().unwrap(); let dir = tempfile::tempdir().unwrap();
let store = FsStore::new(dir.path()).await.unwrap(); let store = FsStore::new(dir.path()).unwrap();
let id = new_session_id(); let id = new_session_id();
let raw = vec![ let raw = vec![
@ -23,9 +23,9 @@ async fn round_trip_write_and_read() {
ts: 2000, ts: 2000,
segments: vec![protocol::Segment::text("Hello")], segments: vec![protocol::Segment::text("Hello")],
}, },
LogEntry::AssistantItems { LogEntry::AssistantItem {
ts: 3000, ts: 3000,
items: vec![Item::assistant_message("Hi there!").into()], item: Item::assistant_message("Hi there!").into(),
}, },
LogEntry::TurnEnd { LogEntry::TurnEnd {
ts: 3100, ts: 3100,
@ -41,11 +41,11 @@ async fn round_trip_write_and_read() {
// Write entries one by one // Write entries one by one
for entry in &entries { for entry in &entries {
store.append(id, entry).await.unwrap(); store.append(id, entry).unwrap();
} }
// Read back // Read back
let read_back = store.read_all(id).await.unwrap(); let read_back = store.read_all(id).unwrap();
assert_eq!(read_back.len(), entries.len()); assert_eq!(read_back.len(), entries.len());
// Verify hashes survived round-trip // Verify hashes survived round-trip
@ -64,10 +64,10 @@ async fn round_trip_write_and_read() {
assert!(state.head_hash.is_some()); assert!(state.head_hash.is_some());
} }
#[tokio::test] #[test]
async fn create_session_writes_all_entries() { fn create_session_writes_all_entries() {
let dir = tempfile::tempdir().unwrap(); let dir = tempfile::tempdir().unwrap();
let store = FsStore::new(dir.path()).await.unwrap(); let store = FsStore::new(dir.path()).unwrap();
let id = new_session_id(); let id = new_session_id();
let entries = build_chain(&[LogEntry::SessionStart { let entries = build_chain(&[LogEntry::SessionStart {
@ -82,22 +82,22 @@ async fn create_session_writes_all_entries() {
compacted_from: None, compacted_from: None,
}]); }]);
store.create_session(id, &entries).await.unwrap(); store.create_session(id, &entries).unwrap();
let read_back = store.read_all(id).await.unwrap(); let read_back = store.read_all(id).unwrap();
assert_eq!(read_back.len(), 1); assert_eq!(read_back.len(), 1);
let state = collect_state(&read_back); let state = collect_state(&read_back);
assert_eq!(state.history.len(), 2); assert_eq!(state.history.len(), 2);
} }
#[tokio::test] #[test]
async fn list_sessions_returns_newest_first() { fn list_sessions_returns_newest_first() {
let dir = tempfile::tempdir().unwrap(); let dir = tempfile::tempdir().unwrap();
let store = FsStore::new(dir.path()).await.unwrap(); let store = FsStore::new(dir.path()).unwrap();
let id1 = new_session_id(); let id1 = new_session_id();
// Small delay to ensure different UUID v7 timestamps // Small delay to ensure different UUID v7 timestamps
tokio::time::sleep(std::time::Duration::from_millis(2)).await; std::thread::sleep(std::time::Duration::from_millis(2));
let id2 = new_session_id(); let id2 = new_session_id();
let entries1 = build_chain(&[LogEntry::SessionStart { let entries1 = build_chain(&[LogEntry::SessionStart {
@ -117,22 +117,22 @@ async fn list_sessions_returns_newest_first() {
compacted_from: None, compacted_from: None,
}]); }]);
store.append(id1, &entries1[0]).await.unwrap(); store.append(id1, &entries1[0]).unwrap();
store.append(id2, &entries2[0]).await.unwrap(); store.append(id2, &entries2[0]).unwrap();
let sessions = store.list_sessions().await.unwrap(); let sessions = store.list_sessions().unwrap();
assert_eq!(sessions.len(), 2); assert_eq!(sessions.len(), 2);
assert_eq!(sessions[0], id2); // newest first assert_eq!(sessions[0], id2); // newest first
assert_eq!(sessions[1], id1); assert_eq!(sessions[1], id1);
} }
#[tokio::test] #[test]
async fn exists_returns_correct_state() { fn exists_returns_correct_state() {
let dir = tempfile::tempdir().unwrap(); let dir = tempfile::tempdir().unwrap();
let store = FsStore::new(dir.path()).await.unwrap(); let store = FsStore::new(dir.path()).unwrap();
let id = new_session_id(); let id = new_session_id();
assert!(!store.exists(id).await.unwrap()); assert!(!store.exists(id).unwrap());
let entries = build_chain(&[LogEntry::SessionStart { let entries = build_chain(&[LogEntry::SessionStart {
ts: 1000, ts: 1000,
@ -142,25 +142,25 @@ async fn exists_returns_correct_state() {
forked_from: None, forked_from: None,
compacted_from: None, compacted_from: None,
}]); }]);
store.append(id, &entries[0]).await.unwrap(); store.append(id, &entries[0]).unwrap();
assert!(store.exists(id).await.unwrap()); assert!(store.exists(id).unwrap());
} }
#[tokio::test] #[test]
async fn not_found_error_for_missing_session() { fn not_found_error_for_missing_session() {
let dir = tempfile::tempdir().unwrap(); let dir = tempfile::tempdir().unwrap();
let store = FsStore::new(dir.path()).await.unwrap(); let store = FsStore::new(dir.path()).unwrap();
let id = new_session_id(); let id = new_session_id();
let result = store.read_all(id).await; let result = store.read_all(id);
assert!(result.is_err()); assert!(result.is_err());
} }
#[tokio::test] #[test]
async fn trace_entries_in_separate_file() { fn trace_entries_in_separate_file() {
let dir = tempfile::tempdir().unwrap(); let dir = tempfile::tempdir().unwrap();
let store = FsStore::new(dir.path()).await.unwrap(); let store = FsStore::new(dir.path()).unwrap();
let id = new_session_id(); let id = new_session_id();
// Write a log entry // Write a log entry
@ -172,7 +172,7 @@ async fn trace_entries_in_separate_file() {
forked_from: None, forked_from: None,
compacted_from: None, compacted_from: None,
}]); }]);
store.append(id, &entries[0]).await.unwrap(); store.append(id, &entries[0]).unwrap();
// Write a trace entry // Write a trace entry
let trace = TraceEntry { let trace = TraceEntry {
@ -182,10 +182,10 @@ async fn trace_entries_in_separate_file() {
llm_worker::llm_client::event::PingEvent { timestamp: None }, llm_worker::llm_client::event::PingEvent { timestamp: None },
), ),
}; };
store.append_trace(id, &trace).await.unwrap(); store.append_trace(id, &trace).unwrap();
// Log should have 1 entry, unaffected by trace // Log should have 1 entry, unaffected by trace
let log = store.read_all(id).await.unwrap(); let log = store.read_all(id).unwrap();
assert_eq!(log.len(), 1); assert_eq!(log.len(), 1);
// Trace file should exist separately // Trace file should exist separately
@ -193,10 +193,10 @@ async fn trace_entries_in_separate_file() {
assert!(trace_path.exists()); assert!(trace_path.exists());
} }
#[tokio::test] #[test]
async fn read_head_hash_returns_last_entry_hash() { fn read_head_hash_returns_last_entry_hash() {
let dir = tempfile::tempdir().unwrap(); let dir = tempfile::tempdir().unwrap();
let store = FsStore::new(dir.path()).await.unwrap(); let store = FsStore::new(dir.path()).unwrap();
let id = new_session_id(); let id = new_session_id();
let entries = build_chain(&[ let entries = build_chain(&[
@ -215,9 +215,9 @@ async fn read_head_hash_returns_last_entry_hash() {
]); ]);
for entry in &entries { for entry in &entries {
store.append(id, entry).await.unwrap(); store.append(id, entry).unwrap();
} }
let head = store.read_head_hash(id).await.unwrap(); let head = store.read_head_hash(id).unwrap();
assert_eq!(head.as_ref(), Some(&entries[1].hash)); assert_eq!(head.as_ref(), Some(&entries[1].hash));
} }

View File

@ -84,9 +84,9 @@ impl Interceptor for PausePolicy {
} }
} }
async fn make_store() -> (tempfile::TempDir, FsStore) { fn make_store() -> (tempfile::TempDir, FsStore) {
let dir = tempfile::tempdir().unwrap(); let dir = tempfile::tempdir().unwrap();
let store = FsStore::new(dir.path()).await.unwrap(); let store = FsStore::new(dir.path()).unwrap();
(dir, store) (dir, store)
} }
@ -108,7 +108,7 @@ async fn run_and_persist(
head_hash, head_hash,
vec![protocol::Segment::text(input)], vec![protocol::Segment::text(input)],
) )
.await
.unwrap(); .unwrap();
let history_before = worker.history().len(); let history_before = worker.history().len();
@ -119,10 +119,10 @@ async fn run_and_persist(
let new_items = &worker.history()[history_before..]; let new_items = &worker.history()[history_before..];
session_store::save_delta(store, session_id, head_hash, new_items) session_store::save_delta(store, session_id, head_hash, new_items)
.await
.unwrap(); .unwrap();
session_store::save_turn_end(store, session_id, head_hash, worker.turn_count()) session_store::save_turn_end(store, session_id, head_hash, worker.turn_count())
.await
.unwrap(); .unwrap();
match &result { match &result {
@ -134,7 +134,7 @@ async fn run_and_persist(
r.clone(), r.clone(),
worker.last_run_interrupted(), worker.last_run_interrupted(),
) )
.await
.unwrap(); .unwrap();
} }
Err(e) => { Err(e) => {
@ -145,7 +145,7 @@ async fn run_and_persist(
e.to_string(), e.to_string(),
worker.last_run_interrupted(), worker.last_run_interrupted(),
) )
.await
.unwrap(); .unwrap();
} }
} }
@ -160,7 +160,7 @@ async fn run_and_persist(
#[tokio::test] #[tokio::test]
async fn session_run_logs_entries() { async fn session_run_logs_entries() {
let (_dir, store) = make_store().await; let (_dir, store) = make_store();
let client = MockLlmClient::new(simple_text_events()); let client = MockLlmClient::new(simple_text_events());
let worker = Worker::new(client); let worker = Worker::new(client);
@ -172,14 +172,14 @@ async fn session_run_logs_entries() {
history: worker.history(), history: worker.history(),
}, },
) )
.await
.unwrap(); .unwrap();
let mut head_hash = Some(head_hash); let mut head_hash = Some(head_hash);
let (worker, _) = run_and_persist(worker, &store, sid, &mut head_hash, "Hi").await; let (worker, _) = run_and_persist(worker, &store, sid, &mut head_hash, "Hi").await;
let _ = &worker; let _ = &worker;
let entries = store.read_all(sid).await.unwrap(); let entries = store.read_all(sid).unwrap();
// SessionStart, UserInput, AssistantItems, TurnEnd, RunCompleted (at minimum) // SessionStart, UserInput, AssistantItems, TurnEnd, RunCompleted (at minimum)
assert!( assert!(
@ -217,7 +217,7 @@ async fn session_run_logs_entries() {
#[tokio::test] #[tokio::test]
async fn session_restore_round_trip() { async fn session_restore_round_trip() {
let (_dir, store) = make_store().await; let (_dir, store) = make_store();
let client = MockLlmClient::new(simple_text_events()); let client = MockLlmClient::new(simple_text_events());
let mut worker = Worker::new(client); let mut worker = Worker::new(client);
worker.set_system_prompt("You are helpful."); worker.set_system_prompt("You are helpful.");
@ -230,7 +230,7 @@ async fn session_restore_round_trip() {
history: worker.history(), history: worker.history(),
}, },
) )
.await
.unwrap(); .unwrap();
let mut head_hash = Some(head_hash); let mut head_hash = Some(head_hash);
@ -240,7 +240,7 @@ async fn session_restore_round_trip() {
let original_turn_count = worker.turn_count(); let original_turn_count = worker.turn_count();
// Restore // Restore
let state = session_store::restore(&store, sid).await.unwrap(); let state = session_store::restore(&store, sid).unwrap();
assert_eq!(state.history.len(), original_history_len); assert_eq!(state.history.len(), original_history_len);
assert_eq!(state.turn_count, original_turn_count); assert_eq!(state.turn_count, original_turn_count);
@ -250,7 +250,7 @@ async fn session_restore_round_trip() {
#[tokio::test] #[tokio::test]
async fn session_run_with_tool_call() { async fn session_run_with_tool_call() {
let (_dir, store) = make_store().await; let (_dir, store) = make_store();
let client = MockLlmClient::with_responses(tool_call_events()); let client = MockLlmClient::with_responses(tool_call_events());
let mut worker = Worker::new(client); let mut worker = Worker::new(client);
worker.register_tool(weather_tool_definition()); worker.register_tool(weather_tool_definition());
@ -263,29 +263,29 @@ async fn session_run_with_tool_call() {
history: worker.history(), history: worker.history(),
}, },
) )
.await
.unwrap(); .unwrap();
let mut head_hash = Some(head_hash); let mut head_hash = Some(head_hash);
let (_worker, _) = let (_worker, _) =
run_and_persist(worker, &store, sid, &mut head_hash, "What's the weather?").await; run_and_persist(worker, &store, sid, &mut head_hash, "What's the weather?").await;
let entries = store.read_all(sid).await.unwrap(); let entries = store.read_all(sid).unwrap();
let has_tool_results = entries let has_tool_results = entries
.iter() .iter()
.any(|e| matches!(&e.entry, LogEntry::ToolResults { .. })); .any(|e| matches!(&e.entry, LogEntry::ToolResult { .. }));
assert!(has_tool_results, "should have ToolResults entry"); assert!(has_tool_results, "should have ToolResult entry");
let has_assistant = entries let has_assistant = entries
.iter() .iter()
.any(|e| matches!(&e.entry, LogEntry::AssistantItems { .. })); .any(|e| matches!(&e.entry, LogEntry::AssistantItem { .. }));
assert!(has_assistant, "should have AssistantItems entry"); assert!(has_assistant, "should have AssistantItem entry");
} }
#[tokio::test] #[tokio::test]
async fn session_resume_after_pause() { async fn session_resume_after_pause() {
let (_dir, store) = make_store().await; let (_dir, store) = make_store();
// First run: tool call with pause policy → Paused // First run: tool call with pause policy → Paused
let client = MockLlmClient::with_responses(tool_call_events()); let client = MockLlmClient::with_responses(tool_call_events());
@ -301,7 +301,7 @@ async fn session_resume_after_pause() {
history: worker.history(), history: worker.history(),
}, },
) )
.await
.unwrap(); .unwrap();
let mut head_hash = Some(head_hash); let mut head_hash = Some(head_hash);
@ -309,7 +309,7 @@ async fn session_resume_after_pause() {
assert!(matches!(result, llm_worker::WorkerResult::Paused)); assert!(matches!(result, llm_worker::WorkerResult::Paused));
// Check RunCompleted is Paused // Check RunCompleted is Paused
let entries = store.read_all(sid).await.unwrap(); let entries = store.read_all(sid).unwrap();
let has_paused = entries.iter().any(|e| { let has_paused = entries.iter().any(|e| {
matches!( matches!(
&e.entry, &e.entry,
@ -322,13 +322,13 @@ async fn session_resume_after_pause() {
assert!(has_paused, "should have Paused outcome"); assert!(has_paused, "should have Paused outcome");
// Restore state and verify // Restore state and verify
let state = session_store::restore(&store, sid).await.unwrap(); let state = session_store::restore(&store, sid).unwrap();
assert!(state.last_run_interrupted); assert!(state.last_run_interrupted);
} }
#[tokio::test] #[tokio::test]
async fn session_fork_preserves_state() { async fn session_fork_preserves_state() {
let (_dir, store) = make_store().await; let (_dir, store) = make_store();
let client = MockLlmClient::new(simple_text_events()); let client = MockLlmClient::new(simple_text_events());
let mut worker = Worker::new(client); let mut worker = Worker::new(client);
worker.set_system_prompt("System prompt"); worker.set_system_prompt("System prompt");
@ -341,7 +341,7 @@ async fn session_fork_preserves_state() {
history: worker.history(), history: worker.history(),
}, },
) )
.await
.unwrap(); .unwrap();
let mut head_hash = Some(head_hash); let mut head_hash = Some(head_hash);
@ -356,11 +356,11 @@ async fn session_fork_preserves_state() {
history: worker.history(), history: worker.history(),
}, },
) )
.await
.unwrap(); .unwrap();
// Fork should have a SessionStart with the current history // Fork should have a SessionStart with the current history
let fork_entries = store.read_all(fork_id).await.unwrap(); let fork_entries = store.read_all(fork_id).unwrap();
assert_eq!(fork_entries.len(), 1); assert_eq!(fork_entries.len(), 1);
assert!(matches!( assert!(matches!(
&fork_entries[0].entry, &fork_entries[0].entry,
@ -374,7 +374,7 @@ async fn session_fork_preserves_state() {
#[tokio::test] #[tokio::test]
async fn session_fork_at_truncates() { async fn session_fork_at_truncates() {
let (_dir, store) = make_store().await; let (_dir, store) = make_store();
let client = MockLlmClient::new(simple_text_events()); let client = MockLlmClient::new(simple_text_events());
let worker = Worker::new(client); let worker = Worker::new(client);
@ -386,20 +386,20 @@ async fn session_fork_at_truncates() {
history: worker.history(), history: worker.history(),
}, },
) )
.await
.unwrap(); .unwrap();
let mut head_hash = Some(head_hash); let mut head_hash = Some(head_hash);
let (_worker, _) = run_and_persist(worker, &store, sid, &mut head_hash, "Hello").await; let (_worker, _) = run_and_persist(worker, &store, sid, &mut head_hash, "Hello").await;
let all_entries = store.read_all(sid).await.unwrap(); let all_entries = store.read_all(sid).unwrap();
assert!(all_entries.len() > 2); assert!(all_entries.len() > 2);
// Fork at the hash of the 2nd entry (SessionStart + UserInput) // Fork at the hash of the 2nd entry (SessionStart + UserInput)
let at_hash = &all_entries[1].hash; let at_hash = &all_entries[1].hash;
let fork_id = session_store::fork_at(&store, sid, at_hash).await.unwrap(); let fork_id = session_store::fork_at(&store, sid, at_hash).unwrap();
let fork_entries = store.read_all(fork_id).await.unwrap(); let fork_entries = store.read_all(fork_id).unwrap();
assert_eq!(fork_entries.len(), 1); // Just the new SessionStart assert_eq!(fork_entries.len(), 1); // Just the new SessionStart
let fork_state = collect_state(&fork_entries); let fork_state = collect_state(&fork_entries);
@ -413,7 +413,7 @@ async fn session_fork_at_truncates() {
#[tokio::test] #[tokio::test]
async fn session_config_changed_logged() { async fn session_config_changed_logged() {
let (_dir, store) = make_store().await; let (_dir, store) = make_store();
let client = MockLlmClient::new(vec![]); let client = MockLlmClient::new(vec![]);
let mut worker = Worker::new(client); let mut worker = Worker::new(client);
@ -425,7 +425,7 @@ async fn session_config_changed_logged() {
history: worker.history(), history: worker.history(),
}, },
) )
.await
.unwrap(); .unwrap();
let mut head_hash = Some(head_hash); let mut head_hash = Some(head_hash);
@ -433,10 +433,10 @@ async fn session_config_changed_logged() {
let new_config = RequestConfig::default().with_temperature(0.7); let new_config = RequestConfig::default().with_temperature(0.7);
worker.set_request_config(new_config.clone()); worker.set_request_config(new_config.clone());
session_store::save_config_changed(&store, sid, &mut head_hash, &new_config) session_store::save_config_changed(&store, sid, &mut head_hash, &new_config)
.await
.unwrap(); .unwrap();
let entries = store.read_all(sid).await.unwrap(); let entries = store.read_all(sid).unwrap();
let has_config_changed = entries.iter().any(|e| { let has_config_changed = entries.iter().any(|e| {
matches!( matches!(
&e.entry, &e.entry,
@ -448,7 +448,7 @@ async fn session_config_changed_logged() {
#[tokio::test] #[tokio::test]
async fn session_auto_forks_on_conflict() { async fn session_auto_forks_on_conflict() {
let (_dir, store) = make_store().await; let (_dir, store) = make_store();
// Create a session // Create a session
let client_a = MockLlmClient::new(simple_text_events()); let client_a = MockLlmClient::new(simple_text_events());
@ -462,7 +462,7 @@ async fn session_auto_forks_on_conflict() {
history: worker_a.history(), history: worker_a.history(),
}, },
) )
.await
.unwrap(); .unwrap();
let mut session_id = original_sid; let mut session_id = original_sid;
let mut head_hash = Some(head_hash); let mut head_hash = Some(head_hash);
@ -472,14 +472,14 @@ async fn session_auto_forks_on_conflict() {
ts: 9999, ts: 9999,
segments: vec![protocol::Segment::text("Interloper")], segments: vec![protocol::Segment::text("Interloper")],
}; };
let current_head = store.read_head_hash(original_sid).await.unwrap(); let current_head = store.read_head_hash(original_sid).unwrap();
let hash = session_store::compute_hash(current_head.as_ref(), &extra_entry); let hash = session_store::compute_hash(current_head.as_ref(), &extra_entry);
let hashed = session_store::HashedEntry { let hashed = session_store::HashedEntry {
hash, hash,
prev_hash: current_head, prev_hash: current_head,
entry: extra_entry, entry: extra_entry,
}; };
store.append(original_sid, &hashed).await.unwrap(); store.append(original_sid, &hashed).unwrap();
// Now head_hash is stale — ensure_head_or_fork should auto-fork // Now head_hash is stale — ensure_head_or_fork should auto-fork
session_store::ensure_head_or_fork( session_store::ensure_head_or_fork(
@ -492,18 +492,18 @@ async fn session_auto_forks_on_conflict() {
history: worker_a.history(), history: worker_a.history(),
}, },
) )
.await
.unwrap(); .unwrap();
// session_id should now be different // session_id should now be different
assert_ne!(session_id, original_sid); assert_ne!(session_id, original_sid);
// The fork session should exist and have entries // The fork session should exist and have entries
let fork_entries = store.read_all(session_id).await.unwrap(); let fork_entries = store.read_all(session_id).unwrap();
assert!(!fork_entries.is_empty()); assert!(!fork_entries.is_empty());
// Original session should still have the interloper entry // Original session should still have the interloper entry
let original_entries = store.read_all(original_sid).await.unwrap(); let original_entries = store.read_all(original_sid).unwrap();
let has_interloper = original_entries let has_interloper = original_entries
.iter() .iter()
.any(|e| matches!(&e.entry, LogEntry::UserInput { .. })); .any(|e| matches!(&e.entry, LogEntry::UserInput { .. }));

View File

@ -80,14 +80,14 @@ struct Row {
} }
pub async fn run() -> Result<PickerOutcome, PickerError> { pub async fn run() -> Result<PickerOutcome, PickerError> {
let store = open_default_store().await?; let store = open_default_store()?;
let ids = store.list_sessions().await?; let ids = store.list_sessions()?;
if ids.is_empty() { if ids.is_empty() {
return Err(PickerError::NoSessions); return Err(PickerError::NoSessions);
} }
let mut rows: Vec<Row> = Vec::with_capacity(MAX_ROWS); let mut rows: Vec<Row> = Vec::with_capacity(MAX_ROWS);
for id in ids.into_iter().take(MAX_ROWS) { for id in ids.into_iter().take(MAX_ROWS) {
let preview = build_preview(&store, id).await; let preview = build_preview(&store, id);
// Best-effort live check. A pods.json I/O hiccup downgrades // Best-effort live check. A pods.json I/O hiccup downgrades
// the row to "no badge" rather than killing the picker — the // the row to "no badge" rather than killing the picker — the
// user still gets to see the listing. // user still gets to see the listing.
@ -149,7 +149,7 @@ fn close_viewport(terminal: &mut Terminal<CrosstermBackend<io::Stdout>>) -> io::
Ok(()) Ok(())
} }
async fn open_default_store() -> Result<FsStore, PickerError> { fn open_default_store() -> Result<FsStore, PickerError> {
let dir = manifest::paths::sessions_dir().ok_or_else(|| { let dir = manifest::paths::sessions_dir().ok_or_else(|| {
PickerError::Io(io::Error::new( PickerError::Io(io::Error::new(
io::ErrorKind::NotFound, io::ErrorKind::NotFound,
@ -157,11 +157,11 @@ async fn open_default_store() -> Result<FsStore, PickerError> {
(set INSOMNIA_HOME, INSOMNIA_DATA_DIR, or HOME)", (set INSOMNIA_HOME, INSOMNIA_DATA_DIR, or HOME)",
)) ))
})?; })?;
Ok(FsStore::new(&dir).await?) Ok(FsStore::new(&dir)?)
} }
async fn build_preview(store: &FsStore, id: SessionId) -> String { fn build_preview(store: &FsStore, id: SessionId) -> String {
match store.read_all(id).await { match store.read_all(id) {
Ok(entries) => last_message_preview(&entries).unwrap_or_else(|| "[empty]".to_string()), Ok(entries) => last_message_preview(&entries).unwrap_or_else(|| "[empty]".to_string()),
Err(_) => "[corrupt]".to_string(), Err(_) => "[corrupt]".to_string(),
} }

View File

@ -328,8 +328,8 @@ async fn load_resume_scope(session_id: SessionId) -> Result<ScopeConfig, SpawnEr
"could not resolve sessions directory (set INSOMNIA_HOME, INSOMNIA_DATA_DIR, or HOME)", "could not resolve sessions directory (set INSOMNIA_HOME, INSOMNIA_DATA_DIR, or HOME)",
) )
})?; })?;
let store = session_store::FsStore::new(&store_dir).await?; let store = session_store::FsStore::new(&store_dir)?;
let state = session_store::restore(&store, session_id).await?; let state = session_store::restore(&store, session_id)?;
let snapshot = state let snapshot = state
.pod_scope .pod_scope
.ok_or(SpawnError::MissingResumeScope { session_id })?; .ok_or(SpawnError::MissingResumeScope { session_id })?;