From 7520dcad875a387095696186ec06e635049f7e8f Mon Sep 17 00:00:00 2001 From: Hare Date: Thu, 14 May 2026 19:16:48 +0900 Subject: [PATCH] =?UTF-8?q?update:=20=E6=9B=B8=E3=81=8D=E8=BE=BC=E3=81=BF?= =?UTF-8?q?=E3=81=AE=E4=B8=8D=E8=A6=81=E3=81=AAasync=E3=82=92=E5=89=8A?= =?UTF-8?q?=E9=99=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Cargo.lock | 1 + crates/pod/Cargo.toml | 1 + crates/pod/examples/pod_cli.rs | 2 +- crates/pod/examples/pod_protocol.rs | 2 +- crates/pod/src/controller.rs | 146 +------- crates/pod/src/ipc/interceptor.rs | 59 +-- crates/pod/src/ipc/server.rs | 29 +- crates/pod/src/main.rs | 2 +- crates/pod/src/pod.rs | 353 ++++++++++-------- crates/pod/src/session_log_sink.rs | 92 +++-- crates/pod/tests/compact_events_test.rs | 2 +- crates/pod/tests/consolidation_test.rs | 2 +- crates/pod/tests/controller_test.rs | 31 +- crates/pod/tests/restore_test.rs | 10 +- crates/pod/tests/session_metrics_test.rs | 42 +-- .../pod/tests/system_prompt_template_test.rs | 4 +- crates/session-metrics/src/lib.rs | 4 +- crates/session-store/src/fs_store.rs | 56 ++- crates/session-store/src/lib.rs | 9 +- crates/session-store/src/session.rs | 180 +++++---- crates/session-store/src/session_log.rs | 46 ++- crates/session-store/src/store.rs | 44 +-- crates/session-store/tests/fs_store_test.rs | 80 ++-- crates/session-store/tests/session_test.rs | 88 ++--- crates/tui/src/picker.rs | 14 +- crates/tui/src/spawn.rs | 4 +- 26 files changed, 615 insertions(+), 688 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 14877057..0a05ebcd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2160,6 +2160,7 @@ dependencies = [ "manifest", "memory", "minijinja", + "parking_lot", "pod-registry", "protocol", "provider", diff --git a/crates/pod/Cargo.toml b/crates/pod/Cargo.toml index a655c25c..2f663d62 100644 --- a/crates/pod/Cargo.toml +++ b/crates/pod/Cargo.toml @@ -30,6 +30,7 @@ memory = { workspace = true } workflow-crate = { package = "workflow", path = "../workflow" } uuid = { workspace = true, features = ["v7"] } session-metrics = { workspace = true } +parking_lot = "0.12.5" [dev-dependencies] dotenv = "0.15.0" diff --git a/crates/pod/examples/pod_cli.rs b/crates/pod/examples/pod_cli.rs index b647b657..414ed613 100644 --- a/crates/pod/examples/pod_cli.rs +++ b/crates/pod/examples/pod_cli.rs @@ -48,7 +48,7 @@ async fn main() -> Result<(), Box> { // 2. Create a persistent store (temp dir for demo) let tmp = tempfile::tempdir()?; - let store = FsStore::new(tmp.path()).await?; + let store = FsStore::new(tmp.path())?; // 3. Build the Pod from the single-layer manifest TOML let mut pod = Pod::from_manifest_toml(&toml, store).await?; diff --git a/crates/pod/examples/pod_protocol.rs b/crates/pod/examples/pod_protocol.rs index 3039a09b..7d6d6347 100644 --- a/crates/pod/examples/pod_protocol.rs +++ b/crates/pod/examples/pod_protocol.rs @@ -39,7 +39,7 @@ async fn main() -> Result<(), Box> { let pwd = std::env::current_dir()?; let toml = manifest_toml(&pwd); let tmp = tempfile::tempdir()?; - let store = FsStore::new(tmp.path()).await?; + let store = FsStore::new(tmp.path())?; let pod = pod::Pod::from_manifest_toml(&toml, store).await?; let runtime_tmp = tempfile::tempdir()?; diff --git a/crates/pod/src/controller.rs b/crates/pod/src/controller.rs index c5b65e94..d92753ca 100644 --- a/crates/pod/src/controller.rs +++ b/crates/pod/src/controller.rs @@ -6,14 +6,10 @@ use llm_worker::llm_client::client::LlmClient; use session_store::Store; use tokio::sync::{broadcast, mpsc, oneshot}; -use llm_worker::Item; -use session_store::LogEntry; -use session_store::session_log; - use crate::ipc::alerter::Alerter; use crate::ipc::notify_buffer::NotifyBuffer; use crate::ipc::server::SocketServer; -use crate::pod::{LogCommand, LogDrainHandle, Pod, PodError, PodRunResult}; +use crate::pod::{Pod, PodError, PodRunResult, SystemItemCommitter}; use crate::runtime::dir::RuntimeDir; use crate::session_log_sink::SessionLogSink; use crate::shared_state::PodSharedState; @@ -165,23 +161,21 @@ impl PodController { }]) .map_err(std::io::Error::other)?; - // === 1.5. Per-item history-commit drain task === + // === 1.5. Direct writer wiring === // // Worker callbacks fire `on_history_append` for each assistant - // item / tool result / hook-injected item that lands in - // history. The drain task picks them up off an unbounded mpsc - // and commits each as a typed `LogEntry` through the sink, - // serialised against the same `session_head` lock the Pod uses - // for its own commits. This gives mid-turn snapshot visibility: - // a late-attaching client sees in-flight tool calls + completed - // assistant blocks without waiting for the turn-end persist. - let (log_cmd_tx, log_cmd_rx) = mpsc::unbounded_channel::(); - let drain_ctx = pod.log_drain_handle(); - let _drain_task = tokio::spawn(run_log_drain(log_cmd_rx, drain_ctx)); - pod.attach_log_cmd_tx(log_cmd_tx.clone()); + // item / tool result that lands in history. With the sync + // writer in place, the callback commits each item directly + // through a `LogWriterHandle` (no mpsc ferry, no drain task). + // The same handle is type-erased into a `SystemItemCommitter` + // and handed to the interceptor for `SystemItem` commits, so + // assistant / tool / system items all share one commit path. + let writer_for_system: Arc = Arc::new(pod.log_writer_handle()); + pod.attach_log_writer(writer_for_system); + pod.wire_history_persistence(); // === 2. Worker event bridge wiring === - wire_event_bridges_on_worker(&mut pod, &event_tx, &alerter, log_cmd_tx); + wire_event_bridges_on_worker(&mut pod, &event_tx, &alerter); // === 3. Tool registration (builtin / memory / spawn-orchestration) === let fs_for_view = register_pod_tools( @@ -263,29 +257,20 @@ impl PodController { /// re-publishes a worker-level signal as a `protocol::Event` on `event_tx` /// so subscribers (TUI, socket clients) get a single typed stream. /// -/// Also wires `on_history_append` into the per-item drain channel so -/// every history append observed by the worker becomes a typed -/// `LogEntry` commit (via the drain task). +/// `Pod::wire_history_persistence` is called separately to wire the +/// per-item history commit callback so every assistant / tool item +/// landing in `worker.history` becomes a singular `LogEntry::AssistantItem` +/// / `ToolResult` commit through the sync writer. fn wire_event_bridges_on_worker( pod: &mut Pod, event_tx: &broadcast::Sender, alerter: &Alerter, - log_cmd_tx: mpsc::UnboundedSender, ) where C: LlmClient + Clone + 'static, St: Store + Clone + 'static, { let worker = pod.worker_mut(); - // Per-history-append → drain channel. Sends are infallible-by-design - // here (UnboundedSender never blocks); a closed receiver just means - // the controller is shutting down, in which case dropping the item - // is acceptable. - let drain_tx = log_cmd_tx.clone(); - worker.on_history_append(move |item| { - let _ = drain_tx.send(LogCommand::Item(item.clone())); - }); - let tx = event_tx.clone(); worker.on_turn_start(move |turn| { let _ = tx.send(Event::TurnStart { turn }); @@ -397,105 +382,6 @@ fn wire_event_bridges_on_worker( // per-item commit channel is wired at the top of this function. } -/// Drain task: consumes `LogCommand::Item` and `LogCommand::Flush` -/// off the channel and commits each item as a typed `LogEntry` through -/// the supplied store + sink. Lives as long as the controller; exits -/// when the sender is dropped (controller shutdown). -async fn run_log_drain(mut rx: mpsc::UnboundedReceiver, ctx: LogDrainHandle) -where - St: session_store::Store + Clone + Send + 'static, -{ - while let Some(cmd) = rx.recv().await { - match cmd { - LogCommand::Item(item) => { - let Some(entry) = classify_history_item(item) else { - continue; - }; - commit_via_drain(&ctx, entry).await; - } - LogCommand::SystemItems(items) => { - if items.is_empty() { - continue; - } - let entry = LogEntry::SystemItems { - ts: session_log::now_millis(), - items, - }; - commit_via_drain(&ctx, entry).await; - } - LogCommand::Flush(ack) => { - let _ = ack.send(()); - } - } - } -} - -async fn commit_via_drain(ctx: &LogDrainHandle, entry: LogEntry) -where - St: session_store::Store + Clone + Send + 'static, -{ - let mut head = ctx.session_head.lock().await; - match session_store::append_entry_with_hash( - &ctx.store, - head.session_id, - &mut head.head_hash, - entry.clone(), - ) - .await - { - Ok(_) => { - // Publish under the same critical section view a - // `subscribe_with_snapshot` would observe. - ctx.sink.publish(entry); - } - Err(e) => { - tracing::warn!(error = %e, "drain: append_entry failed; entry dropped"); - } - } -} - -/// Map one LLM-driven worker-history append to its `LogEntry` form. -/// -/// `None` is the skip signal for items that the drain must not commit: -/// - `user_message` items are committed by `Pod::run` up-front as -/// `LogEntry::UserInput { segments }`. -/// - `system_message` items are committed by `PodInterceptor` as part -/// of a `LogEntry::SystemItems` batch (with typed kind metadata) -/// before they reach the worker's history. -fn classify_history_item(item: Item) -> Option { - let ts = session_log::now_millis(); - if item.is_user_message() { - return None; - } - if matches!( - item, - Item::Message { - role: llm_worker::Role::System, - .. - } - ) { - return None; - } - if item.is_tool_result() { - return Some(LogEntry::ToolResults { - ts, - items: vec![session_store::LoggedItem::from(&item)], - }); - } - if item.is_assistant_message() || item.is_tool_call() || item.is_reasoning() { - return Some(LogEntry::AssistantItems { - ts, - items: vec![session_store::LoggedItem::from(&item)], - }); - } - // Defensive: anything else (future Item kinds) routes through - // AssistantItems rather than getting silently dropped. - Some(LogEntry::AssistantItems { - ts, - items: vec![session_store::LoggedItem::from(&item)], - }) -} - /// Register the builtin file-manipulation tools, optional memory tools, /// and the Pod-orchestration tools (SpawnPod + comm) on the Pod's /// Worker. Returns the `ScopedFs` clone used to attach a `PodFsView` to diff --git a/crates/pod/src/ipc/interceptor.rs b/crates/pod/src/ipc/interceptor.rs index 3403468f..2a0b8e14 100644 --- a/crates/pod/src/ipc/interceptor.rs +++ b/crates/pod/src/ipc/interceptor.rs @@ -23,14 +23,13 @@ use tracing::warn; use crate::compact::state::CompactState; use session_store::SystemItem; -use tokio::sync::mpsc; use crate::hook::{ AbortInfo, HookPromptAction, HookRegistry, PreRequestInfo, PromptSubmitInfo, ToolCallSummary, ToolResultSummary, TurnEndInfo, }; use crate::ipc::notify_buffer::{NotifyBuffer, build_system_item}; -use crate::pod::LogCommand; +use crate::pod::SystemItemCommitter; use crate::prompt::catalog::PromptCatalog; use llm_worker::token_counter::total_tokens; @@ -50,19 +49,20 @@ pub(crate) struct PodInterceptor { /// so the LLM has a visible trigger for any reaction it commits. pending_notifies: NotifyBuffer, /// Submit-scoped stash of resolver-produced typed system items. - /// Drained inside `on_prompt_submit`, committed as a - /// `LogEntry::SystemItems` through `log_cmd_tx`, and returned to - /// the worker as `Item::system_message` via + /// Drained inside `on_prompt_submit`, committed as + /// `LogEntry::SystemItem` entries through `log_writer`, and + /// returned to the worker as `Item::system_message` via /// `PromptAction::ContinueWith`. Populated by `Pod::run` /// immediately before handing off to the worker. pending_attachments: Arc>>, /// Prompt catalog used to render the injected notification wrapper. prompts: Arc, - /// Sender into the Pod's history-drain task. The interceptor uses - /// it to commit `LogCommand::SystemItems` batches before returning - /// the corresponding `Item::system_message`s up to the worker. - /// `None` in tests / `Pod::new` paths where no drain is wired. - log_cmd_tx: Option>, + /// Type-erased commit handle. The interceptor uses it to commit + /// `LogEntry::SystemItem` entries directly (sync) before + /// returning the corresponding `Item::system_message`s up to the + /// worker. `None` in tests / `Pod::new` paths where no writer is + /// attached. + log_writer: Option>, /// Next turn index assigned by `on_prompt_submit`. next_turn_index: AtomicUsize, /// Tool calls observed in the current turn (reset on each new prompt). @@ -77,7 +77,7 @@ impl PodInterceptor { pending_notifies: NotifyBuffer, pending_attachments: Arc>>, prompts: Arc, - log_cmd_tx: Option>, + log_writer: Option>, ) -> Self { Self { registry, @@ -86,23 +86,24 @@ impl PodInterceptor { pending_notifies, pending_attachments, prompts, - log_cmd_tx, + log_writer, next_turn_index: AtomicUsize::new(0), tool_calls_this_turn: AtomicUsize::new(0), } } - /// Send a `LogCommand::SystemItems` batch down the drain channel - /// (no-op if no drain is wired). The drain task commits the entry - /// before the corresponding `Item::system_message`s reach the - /// worker via `ContinueWith` / `pending_history_appends`, so the - /// drain barrier in `persist_turn` covers system commits too. - fn send_system_items(&self, items: Vec) { - if items.is_empty() { + /// Commit each `SystemItem` as its own `LogEntry::SystemItem` + /// entry through the attached writer (no-op when no writer is + /// wired). Sync — writes complete before the matching + /// `Item::system_message`s reach the worker via + /// `ContinueWith` / `pending_history_appends`, so on-disk order + /// matches worker-history order. + fn commit_system_items(&self, items: &[SystemItem]) { + let Some(writer) = self.log_writer.as_ref() else { return; - } - if let Some(tx) = self.log_cmd_tx.as_ref() { - let _ = tx.send(LogCommand::SystemItems(items)); + }; + for item in items { + writer.commit_system_item(item.clone()); } } @@ -148,12 +149,12 @@ impl Interceptor for PodInterceptor { PromptAction::Continue } else { // Commit the typed system items first, then hand the - // matching `Item::system_message`s to the worker. The - // drain task processes the `SystemItems` command BEFORE - // any subsequent `Item` commands from `on_history_append`, - // so on-disk order matches worker-history order. + // matching `Item::system_message`s to the worker. Sync + // commits land BEFORE the worker pushes its + // `Item::system_message`s, so on-disk order matches + // worker-history order. let items: Vec = extras.iter().map(SystemItem::to_history_item).collect(); - self.send_system_items(extras); + self.commit_system_items(&extras); PromptAction::ContinueWith(items) } } @@ -175,7 +176,7 @@ impl Interceptor for PodInterceptor { // A render failure here would starve the LLM of // the notify text. Fall back to a raw item so the // trigger still lands in history; the entry will - // simply be skipped from the SystemItems batch. + // simply be skipped from the SystemItem batch. warn!(error = %e, "failed to render notify_wrapper; using raw message"); let fallback = match &entry { super::notify_buffer::PendingNotify::Notify { message } => message.clone(), @@ -187,7 +188,7 @@ impl Interceptor for PodInterceptor { } } } - self.send_system_items(system_items); + self.commit_system_items(&system_items); items } diff --git a/crates/pod/src/ipc/server.rs b/crates/pod/src/ipc/server.rs index 3f2a03e4..a6a2dd64 100644 --- a/crates/pod/src/ipc/server.rs +++ b/crates/pod/src/ipc/server.rs @@ -108,37 +108,22 @@ async fn handle_connection(stream: tokio::net::UnixStream, handle: PodHandle) { session_store::LogEntry::SessionStart { .. } => { let value = serde_json::to_value(&entry) .expect("LogEntry is Serialize"); - vec![Event::SessionRotated { entry: value }] + Some(Event::SessionRotated { entry: value }) } - session_store::LogEntry::SystemItems { items, .. } => { - // Fan out per-item so each `SystemItem` - // arrives as its own `Event::SystemItem` - // on the wire. Batching on disk is an - // implementation detail of the drain - // task; clients see them one at a time. - items - .into_iter() - .map(|si| { - let value = serde_json::to_value(&si) - .expect("SystemItem is Serialize"); - Event::SystemItem { item: value } - }) - .collect() + session_store::LogEntry::SystemItem { item, .. } => { + let value = serde_json::to_value(&item) + .expect("SystemItem is Serialize"); + Some(Event::SystemItem { item: value }) } // Defensive: should never reach here per // `SessionLogSink::is_live_relevant`. - _ => Vec::new(), + _ => None, }; - let mut hit_error = false; - for event in outbound { + if let Some(event) = outbound { if writer.write(&event).await.is_err() { - hit_error = true; break; } } - if hit_error { - break; - } } Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => { // Slow client fell behind the broadcast buffer. diff --git a/crates/pod/src/main.rs b/crates/pod/src/main.rs index f5e4d5fb..bd98894a 100644 --- a/crates/pod/src/main.rs +++ b/crates/pod/src/main.rs @@ -162,7 +162,7 @@ async fn main() -> ExitCode { } }, }; - let store = match FsStore::new(&store_dir).await { + let store = match FsStore::new(&store_dir) { Ok(s) => s, Err(e) => { eprintln!("error: failed to initialize store at {store_dir:?}: {e}"); diff --git a/crates/pod/src/pod.rs b/crates/pod/src/pod.rs index c4e35c64..ea576002 100644 --- a/crates/pod/src/pod.rs +++ b/crates/pod/src/pod.rs @@ -1,13 +1,13 @@ use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; -use tokio::sync::Mutex as AsyncMutex; use llm_worker::Item; use llm_worker::llm_client::RequestConfig; use llm_worker::llm_client::client::LlmClient; use llm_worker::state::Mutable; use llm_worker::{ToolOutputLimits, UsageRecord, Worker, WorkerError, WorkerResult}; +use parking_lot::Mutex as SyncMutex; use session_store::{ EntryHash, HashedEntry, LogEntry, PodScopeSnapshot, SessionId, Store, StoreError, SystemItem, session_log, to_logged, @@ -16,36 +16,6 @@ use tracing::{info, warn}; use crate::session_log_sink::SessionLogSink; -/// Command sent to the per-Pod history-drain task. -/// -/// - `Item`: one worker-history append observed via -/// `Worker::on_history_append`; the drain classifies it into -/// `LogEntry::AssistantItems` / `LogEntry::ToolResults` and commits -/// through the sink. `role:system` items are explicitly skipped -/// because they are committed up-front through `SystemItems`. -/// - `SystemItems`: typed agent-injected items committed as a single -/// `LogEntry::SystemItems` entry. Used by the interceptor when it -/// drains the notify buffer or pending attachments. -/// - `Flush(ack)`: barrier used by `persist_turn` to ensure every -/// queued command has been processed before the trailing `TurnEnd` -/// entry lands. -#[derive(Debug)] -pub enum LogCommand { - Item(Item), - SystemItems(Vec), - Flush(tokio::sync::oneshot::Sender<()>), -} - -/// State shared between Pod and the controller-spawned history-drain -/// task: store + session-head lock + broadcast sink. All three are -/// `Clone`able (the latter two as `Arc` clones, the store per its -/// `Clone` impl) so handing a copy to the drain task is cheap. -pub struct LogDrainHandle { - pub store: St, - pub session_head: Arc>, - pub sink: SessionLogSink, -} - use manifest::{ Permission, PodManifest, PodManifestConfig, ResolveError, Scope, ScopeConfig, ScopeError, ScopeRule, SharedScope, WorkerManifest, @@ -78,6 +48,70 @@ pub struct SessionHead { pub head_hash: Option, } +/// Cheap-cloneable bundle of (store + session-head lock + sink) handed +/// to the worker callback and the interceptor so they can commit +/// `LogEntry` values directly without going through an mpsc ferry. +/// +/// All three fields are `Clone` (the latter two as `Arc` clones, the +/// store per its `Clone` impl) so the handle itself is a flat triple of +/// cheap copies. +pub struct LogWriterHandle { + pub store: St, + pub session_head: Arc>, + pub sink: SessionLogSink, +} + +impl Clone for LogWriterHandle { + fn clone(&self) -> Self { + Self { + store: self.store.clone(), + session_head: self.session_head.clone(), + sink: self.sink.clone(), + } + } +} + +impl LogWriterHandle +where + St: Store + Clone, +{ + /// Append `entry` to the log: disk write → in-memory mirror push → + /// broadcast — atomic w.r.t. `subscribe_with_snapshot` callers. + pub fn append_entry(&self, entry: LogEntry) -> Result { + let mut head = self.session_head.lock(); + let hash = session_store::append_entry_with_hash( + &self.store, + head.session_id, + &mut head.head_hash, + entry.clone(), + )?; + self.sink.publish(entry); + Ok(hash) + } +} + +/// Type-erased commit handle for the interceptor. Lets the +/// interceptor commit `SystemItem`s without being generic over the +/// concrete `Store` type. +pub trait SystemItemCommitter: Send + Sync { + fn commit_system_item(&self, item: SystemItem); +} + +impl SystemItemCommitter for LogWriterHandle +where + St: Store + Clone + Send + Sync + 'static, +{ + fn commit_system_item(&self, item: SystemItem) { + let entry = LogEntry::SystemItem { + ts: session_log::now_millis(), + item, + }; + if let Err(err) = self.append_entry(entry) { + warn!(error = %err, "system item commit failed; dropping"); + } + } +} + /// Pre-LLM-request hook that records `history.len()` at send time into a /// shared `UsageTracker`. The on_usage callback later pairs this with the /// aggregated UsageEvent to produce one `UsageRecord` per LLM call. @@ -103,7 +137,7 @@ pub struct Pod { worker: Option>, store: St, session_id: SessionId, - session_head: Arc>, + session_head: Arc>, /// Absolute working directory of the Pod. pwd: PathBuf, /// Shared, atomically-swappable view of the Pod's resolved scope. @@ -235,12 +269,21 @@ pub struct Pod { /// clients see a `(snapshot, live)` stream consistent with what's /// on disk. sink: SessionLogSink, - /// Sender into the controller-spawned history-drain task. - /// `None` when no controller has wired one (tests, low-level Pod - /// usage). The drain task is the source of mid-turn `AssistantItems` - /// / `ToolResults` / `HookInjectedItems` commits, fed by the - /// `Worker::on_history_append` callback. - log_cmd_tx: Option>, + /// `true` once `wire_history_persistence` has installed the + /// `Worker::on_history_append` callback that commits each appended + /// item as a singular `LogEntry::AssistantItem` / `ToolResult` + /// directly through the writer. Tests that drive `Pod::new` without + /// going through the controller leave this `false`; `persist_turn` + /// then walks the post-`history_before` slice inline so entries + /// still land on disk. + history_persistence_wired: bool, + /// Type-erased commit handle wired by the controller (or by tests + /// via `attach_log_writer`). The interceptor uses it to commit + /// `SystemItem`s directly without being generic over `St`. `None` + /// in low-level test paths that bypass the controller — those + /// paths skip SystemItem disk commits but still see the rendered + /// `Item::system_message` in worker history. + log_writer: Option>, } impl Pod { @@ -301,21 +344,66 @@ impl Pod { // (it only reads `worker.history()`), so a fresh sink is // fine — nothing observes its broadcast. sink: SessionLogSink::new(), - log_cmd_tx: None, + history_persistence_wired: false, + log_writer: None, } } - /// Build a `LogDrainHandle` carrying everything the controller's - /// drain task needs: store handle, the shared session-head lock, - /// and the broadcast sink. All three are cheap clones. - pub fn log_drain_handle(&self) -> LogDrainHandle { - LogDrainHandle { + /// Build a `LogWriterHandle` carrying everything the worker + /// callback / interceptor needs to commit `LogEntry` values + /// directly: store handle, the shared session-head lock, and the + /// broadcast sink. All three are cheap clones. + pub fn log_writer_handle(&self) -> LogWriterHandle { + LogWriterHandle { store: self.store.clone(), session_head: self.session_head.clone(), sink: self.sink.clone(), } } + /// Attach a type-erased system-item commit handle. The controller + /// calls this once during spawn so the interceptor can commit + /// `SystemItem`s directly without owning a generic store handle. + /// Idempotent: subsequent calls overwrite the previous handle. + pub fn attach_log_writer(&mut self, writer: Arc) { + self.log_writer = Some(writer); + } + + /// Wire `Worker::on_history_append` to commit each appended item + /// directly as a singular `LogEntry::AssistantItem` / `ToolResult` + /// through the writer. The controller calls this once per spawned + /// Pod after the worker is built; tests that drive `Pod::new` may + /// opt in to the same wiring or leave it off (in which case + /// `persist_turn`'s inline fallback writes entries at turn end). + /// + /// `user_message` items are skipped because they are committed + /// up-front via `commit_entry(LogEntry::UserInput { segments })`. + /// `role:system` items are committed by `PodInterceptor` as typed + /// `LogEntry::SystemItem` entries before they reach the worker's + /// history (so this callback would otherwise double-write them). + pub fn wire_history_persistence(&mut self) { + let writer = self.log_writer_handle(); + self.worker_mut().on_history_append(move |item| { + if item.is_user_message() { + return; + } + if matches!( + item, + Item::Message { + role: llm_worker::Role::System, + .. + } + ) { + return; + } + let entry = session_store::classify_history_item(item, session_log::now_millis()); + if let Err(err) = writer.append_entry(entry) { + warn!(error = %err, "history append commit failed; dropping"); + } + }); + self.history_persistence_wired = true; + } + pub fn spawn_post_run_memory_jobs(&mut self) { // Drop a finished prior handle so we can spawn a fresh task. // If the prior task is still running, coalesce by skipping — @@ -365,7 +453,7 @@ impl Pod { worker: Some(worker), store, session_id, - session_head: Arc::new(AsyncMutex::new(SessionHead { + session_head: Arc::new(SyncMutex::new(SessionHead { session_id, head_hash: None, })), @@ -397,7 +485,8 @@ impl Pod { memory_task: None, user_segments: Vec::new(), sink: SessionLogSink::new(), - log_cmd_tx: None, + history_persistence_wired: false, + log_writer: None, }; pod.apply_permissions_from_manifest(); pod.apply_prune_from_manifest(); @@ -491,8 +580,8 @@ impl Pod { /// process later exits while children keep their allocations, resume /// can restore the narrowed scope instead of reclaiming delegated /// writes. - pub async fn persist_scope_snapshot(&mut self) -> Result<(), StoreError> { - if self.session_head.lock().await.head_hash.is_none() { + pub fn persist_scope_snapshot(&mut self) -> Result<(), StoreError> { + if self.session_head.lock().head_hash.is_none() { return Ok(()); } let snapshot = { @@ -508,23 +597,21 @@ impl Pod { domain: session_store::POD_SCOPE_EXTENSION_DOMAIN.into(), payload, }) - .await .map(|_| ()) } /// Append `entry` to the session log AND publish it through the - /// broadcast sink. Holds the session-head async lock across the + /// broadcast sink. Holds the session-head sync lock across the /// disk write and the sink publish so subscribers see a gap-free /// `(snapshot, live)` stream consistent with what's on disk. - pub(crate) async fn commit_entry(&self, entry: LogEntry) -> Result { - let mut head = self.session_head.lock().await; + pub(crate) fn commit_entry(&self, entry: LogEntry) -> Result { + let mut head = self.session_head.lock(); let hash = session_store::append_entry_with_hash( &self.store, head.session_id, &mut head.head_hash, entry.clone(), - ) - .await?; + )?; self.sink.publish(entry); Ok(hash) } @@ -536,15 +623,6 @@ impl Pod { self.sink.clone() } - /// Wire a history-drain task. The controller calls this once per - /// Pod after the drain task is spawned; the matching mpsc receiver - /// drives per-item commits of assistant items / tool results / - /// hook-injected items committed by the worker via - /// `Worker::on_history_append`. - pub fn attach_log_cmd_tx(&mut self, tx: tokio::sync::mpsc::UnboundedSender) { - self.log_cmd_tx = Some(tx); - } - /// Cloneable callback handed to dynamic-scope tools. It cannot append /// directly to the async store from a sync tool callback, so it records /// the latest snapshot and the controller flushes it after the tool @@ -556,7 +634,7 @@ impl Pod { }) } - async fn flush_pending_scope_snapshot(&mut self) -> Result<(), StoreError> { + fn flush_pending_scope_snapshot(&mut self) -> Result<(), StoreError> { let snapshot = self .pending_scope_snapshot .lock() @@ -568,8 +646,7 @@ impl Pod { ts: session_log::now_millis(), domain: session_store::POD_SCOPE_EXTENSION_DOMAIN.into(), payload, - }) - .await?; + })?; } Ok(()) } @@ -731,14 +808,14 @@ impl Pod { /// fail the surrounding turn. On failure the head hash stays put /// (the entry is dropped) and a `Warn` alert + `tracing::warn!` are /// emitted so the failure isn't completely silent. - async fn try_record_metric(&mut self, metric: &session_metrics::Metric) { + fn try_record_metric(&mut self, metric: &session_metrics::Metric) { let payload = serde_json::to_value(metric).expect("Metric is Serialize"); let entry = LogEntry::Extension { ts: session_log::now_millis(), domain: session_metrics::DOMAIN.into(), payload, }; - if let Err(err) = self.commit_entry(entry).await { + if let Err(err) = self.commit_entry(entry) { warn!(name = %metric.name, error = %err, "failed to record session metric; dropping"); self.alert( AlertLevel::Warn, @@ -907,7 +984,7 @@ impl Pod { self.pending_notifies.clone(), self.pending_attachments.clone(), self.prompts.clone(), - self.log_cmd_tx.clone(), + self.log_writer.clone(), ); self.worker_mut().set_interceptor(interceptor); self.interceptor_installed = true; @@ -1073,12 +1150,12 @@ impl Pod { // Persist the user input as typed segments before the worker // pushes its flattened copy into history. save_delta deliberately // skips the resulting `is_user_message()` item to avoid double-write. - self.session_id = self.session_head.lock().await.session_id; + self.session_id = self.session_head.lock().session_id; self.commit_entry(LogEntry::UserInput { ts: session_log::now_millis(), segments: input.clone(), }) - .await?; + ?; self.user_segments.push(input.clone()); // Resolve `@` refs, `#` Knowledge refs, and `/` @@ -1447,7 +1524,7 @@ impl Pod { let w = self.worker.as_ref().unwrap(); let prev_session_id; let initial_state = { - let head = self.session_head.lock().await; + let head = self.session_head.lock(); prev_session_id = head.session_id; head.head_hash.is_none() }; @@ -1460,17 +1537,17 @@ impl Pod { forked_from: None, compacted_from: None, }; - self.commit_entry(initial).await?; - self.persist_scope_snapshot().await?; + self.commit_entry(initial)?; + self.persist_scope_snapshot()?; return Ok(()); } // Check store head + auto-fork if it drifted. let store_head = self .store .read_head_hash(prev_session_id) - .await + .map_err(PodError::from)?; - let mut head = self.session_head.lock().await; + let mut head = self.session_head.lock(); if store_head == head.head_hash { return Ok(()); } @@ -1494,7 +1571,7 @@ impl Pod { }; self.store .create_session(fork_id, &[hashed]) - .await + .map_err(PodError::from)?; head.session_id = fork_id; head.head_hash = Some(hash); @@ -1648,73 +1725,52 @@ impl Pod { // pass that replicates the legacy `save_delta` classification — // those code paths don't fire `on_history_append`, so the items // would otherwise be lost. - let _ = history_before; // referenced only by the fallback below. - self.session_id = self.session_head.lock().await.session_id; - if let Some(tx) = self.log_cmd_tx.as_ref() { - let (ack_tx, ack_rx) = tokio::sync::oneshot::channel(); - if tx.send(LogCommand::Flush(ack_tx)).is_ok() { - let _ = ack_rx.await; - } - } else { - // Fallback path for tests / Pod::new: classify and commit - // the post-`history_before` slice inline, matching the old - // `save_delta` shape. + // Per-item commits for AssistantItem / ToolResult / SystemItem + // entries are expected to have landed synchronously: the + // worker `on_history_append` callback (wired by the controller + // via `wire_history_persistence`) commits each appended item + // directly through the writer, and the interceptor commits + // SystemItems up-front in `on_prompt_submit` / + // `pending_history_appends` before returning the matching + // `Item::system_message`s. + // + // Low-level test paths that build `Pod::new` without wiring + // the callback fall through this branch: they classify the + // slice from `history_before` inline so the test's + // `restore`-style assertions still see entries on disk. + self.session_id = self.session_head.lock().session_id; + if !self.history_persistence_wired { let new_items: Vec = self.worker.as_ref().unwrap().history()[history_before..] .iter() .cloned() .collect(); let ts = session_log::now_millis(); - let mut i = 0; - while i < new_items.len() { - let item = &new_items[i]; + for item in &new_items { if item.is_user_message() { - i += 1; - } else if item.is_tool_result() { - let start = i; - while i < new_items.len() && new_items[i].is_tool_result() { - i += 1; - } - let items = new_items[start..i] - .iter() - .map(session_store::LoggedItem::from) - .collect(); - self.commit_entry(LogEntry::ToolResults { ts, items }) - .await?; - } else if item.is_assistant_message() || item.is_tool_call() || item.is_reasoning() - { - let start = i; - while i < new_items.len() - && (new_items[i].is_assistant_message() - || new_items[i].is_tool_call() - || new_items[i].is_reasoning()) - { - i += 1; - } - let items = new_items[start..i] - .iter() - .map(session_store::LoggedItem::from) - .collect(); - self.commit_entry(LogEntry::AssistantItems { ts, items }) - .await?; - } else { - self.commit_entry(LogEntry::HookInjectedItems { - ts, - items: vec![session_store::LoggedItem::from(&new_items[i])], - }) - .await?; - i += 1; + continue; } + if matches!( + item, + Item::Message { + role: llm_worker::Role::System, + .. + } + ) { + continue; + } + let entry = session_store::classify_history_item(item, ts); + self.commit_entry(entry)?; } } - self.flush_pending_scope_snapshot().await?; + self.flush_pending_scope_snapshot()?; let turn_count = self.worker.as_ref().unwrap().turn_count(); self.commit_entry(LogEntry::TurnEnd { ts: session_log::now_millis(), turn_count, }) - .await?; + ?; // Flush any sync-buffered metrics from this run first // (currently `prune.fire` / `prune.skip` from the prune observer). @@ -1730,7 +1786,7 @@ impl Pod { // by this point, and `save_run_completed` still needs to land). let pending_metrics = self.metrics_tracker.drain(); for metric in pending_metrics { - self.try_record_metric(&metric).await; + self.try_record_metric(&metric); } // Persist any LLM Usage measurements collected during this run. @@ -1755,14 +1811,14 @@ impl Pod { cache_write_tokens: record.cache_write_tokens, output_tokens: record.output_tokens, }) - .await?; + ?; if let Some(id) = correlation_id { let metric = session_metrics::Metric::now("prune.post_request") .with_correlation_id(&id) .with_value(record.cache_read_tokens as f64) .with_dimension("cache_write_tokens", record.cache_write_tokens.to_string()) .with_dimension("history_len", record.history_len.to_string()); - self.try_record_metric(&metric).await; + self.try_record_metric(&metric); } self.usage_history .lock() @@ -1778,7 +1834,7 @@ impl Pod { interrupted, result: r.clone(), }) - .await?; + ?; } Err(e) => { self.commit_entry(LogEntry::RunErrored { @@ -1786,7 +1842,7 @@ impl Pod { interrupted, message: e.to_string(), }) - .await?; + ?; } } @@ -2020,7 +2076,7 @@ impl Pod { // `SessionStart { compacted_from }` and reset their view. let new_session_id = session_store::new_session_id(); let session_start = { - let mut head = self.session_head.lock().await; + let mut head = self.session_head.lock(); let old_session_id = head.session_id; let old_head_hash = head .head_hash @@ -2044,7 +2100,7 @@ impl Pod { prev_hash: None, entry: entry.clone(), }; - self.store.create_session(new_session_id, &[hashed]).await?; + self.store.create_session(new_session_id, &[hashed])?; head.session_id = new_session_id; head.head_hash = Some(hash); self.session_id = new_session_id; @@ -2092,7 +2148,7 @@ impl Pod { .lock() .expect("usage_history poisoned") .clear(); - self.persist_scope_snapshot().await?; + self.persist_scope_snapshot()?; // Reset extract pointer alongside usage_history: the compacted // session has a fresh log with no `LogEntry::Extension` entries // yet, so a cold restore here would set extract_pointer to None @@ -2254,7 +2310,7 @@ impl Pod { // Read the session log to get the current entry count. This is // the boundary for the source.range end_entry. Called once per // extract, on a small local file. - let entries_now = self.store.read_all(self.session_id).await?.len(); + let entries_now = self.store.read_all(self.session_id)?.len(); if entries_now == 0 { return Ok(ExtractDecision::Skipped); } @@ -2322,7 +2378,7 @@ impl Pod { extract::ExtractedPayload::default() }); - let source_session_id = self.session_head.lock().await.session_id; + let source_session_id = self.session_head.lock().session_id; let staging_id = if payload.is_empty() { String::new() } else { @@ -2347,8 +2403,8 @@ impl Pod { domain: extract::EXTRACT_DOMAIN.into(), payload: payload_value, }) - .await?; - self.session_id = self.session_head.lock().await.session_id; + ?; + self.session_id = self.session_head.lock().session_id; *self .extract_pointer @@ -2655,7 +2711,7 @@ impl Pod, St> { worker: Some(worker), store, session_id, - session_head: Arc::new(AsyncMutex::new(SessionHead { + session_head: Arc::new(SyncMutex::new(SessionHead { session_id, head_hash: None, })), @@ -2687,7 +2743,8 @@ impl Pod, St> { memory_task: None, user_segments: Vec::new(), sink: SessionLogSink::new(), - log_cmd_tx: None, + history_persistence_wired: false, + log_writer: None, }; pod.apply_permissions_from_manifest(); pod.apply_prune_from_manifest(); @@ -2728,7 +2785,7 @@ impl Pod, St> { worker: Some(worker), store, session_id, - session_head: Arc::new(AsyncMutex::new(SessionHead { + session_head: Arc::new(SyncMutex::new(SessionHead { session_id, head_hash: None, })), @@ -2760,7 +2817,8 @@ impl Pod, St> { memory_task: None, user_segments: Vec::new(), sink: SessionLogSink::new(), - log_cmd_tx: None, + history_persistence_wired: false, + log_writer: None, }; pod.apply_permissions_from_manifest(); pod.apply_prune_from_manifest(); @@ -2795,7 +2853,7 @@ impl Pod, St> { // Read raw entries once so we can both reconstruct state and // seed the broadcast sink's mirror with the same prefix that // sits on disk. - let raw_entries = store.read_all(session_id).await?; + let raw_entries = store.read_all(session_id)?; let state = session_store::collect_state(&raw_entries); if state.head_hash.is_none() { return Err(PodError::SessionEmpty { session_id }); @@ -2870,7 +2928,7 @@ impl Pod, St> { worker: Some(worker), store, session_id, - session_head: Arc::new(AsyncMutex::new(SessionHead { + session_head: Arc::new(SyncMutex::new(SessionHead { session_id, head_hash: state.head_hash, })), @@ -2907,7 +2965,8 @@ impl Pod, St> { // late-attaching client sees the full prefix without an // extra round trip. sink: SessionLogSink::with_initial(mirror_entries), - log_cmd_tx: None, + history_persistence_wired: false, + log_writer: None, }; pod.apply_permissions_from_manifest(); pod.apply_prune_from_manifest(); diff --git a/crates/pod/src/session_log_sink.rs b/crates/pod/src/session_log_sink.rs index 1f72deaf..15c47666 100644 --- a/crates/pod/src/session_log_sink.rs +++ b/crates/pod/src/session_log_sink.rs @@ -7,7 +7,7 @@ //! Pod (which still owns the `Store` handle); the sink stays focused on //! the wire-side fan-out. //! -//! Atomicity contract (see ticket `tickets/pod-state-from-session-log.md`): +//! Atomicity contract: //! //! 1. Pod writes the entry to disk via the `Store`. //! 2. Pod calls [`SessionLogSink::publish`] which acquires the mirror @@ -24,10 +24,11 @@ use std::sync::{Arc, Mutex as StdMutex}; +use parking_lot::{Mutex, MutexGuard}; use session_store::{ EntryHash, HashedEntry, LogEntry, SessionId, SessionStartState, Store, StoreError, session_log, }; -use tokio::sync::{Mutex as AsyncMutex, MutexGuard, broadcast}; +use tokio::sync::broadcast; /// Broadcast capacity for the live receiver. Slow subscribers that /// fall behind will see `RecvError::Lagged` and are expected to drop @@ -92,8 +93,8 @@ impl SessionLogSink { /// Live broadcast fires only for entries that the streaming-event /// lane does not cover: /// - `LogEntry::SessionStart` → `Event::SessionRotated` on the wire. - /// - `LogEntry::HookInjectedItems` → `Event::HookInjectedItems`. - /// Everything else (AssistantItems, ToolResults, UserInput, TurnEnd, + /// - `LogEntry::SystemItem` → `Event::SystemItem`. + /// Everything else (AssistantItem, ToolResult, UserInput, TurnEnd, /// RunCompleted, RunErrored, LlmUsage, Extension, ConfigChanged) is /// reflected in the mirror so reconnect snapshots stay accurate, /// but is not sent live — the streaming events (TextDelta / @@ -120,7 +121,7 @@ impl SessionLogSink { fn is_live_relevant(entry: &LogEntry) -> bool { matches!( entry, - LogEntry::SessionStart { .. } | LogEntry::SystemItems { .. } + LogEntry::SessionStart { .. } | LogEntry::SystemItem { .. } ) } @@ -194,10 +195,9 @@ impl Default for SessionLogSink { } /// Active session head for the Pod's persistent log: session id + -/// last-committed entry hash. Replaces the previous `SessionHead` -/// struct local to `Pod`; bundled here so the writer can hand a -/// cloneable handle to background tasks (e.g. the per-item drain -/// task spawned by the controller). +/// last-committed entry hash. Bundled with the store + sink in a +/// `SessionLogWriter` so the worker callback / interceptor can share +/// one cheap `Clone` handle for direct sync appends. #[derive(Debug, Clone)] pub struct SessionHeadState { pub session_id: SessionId, @@ -209,20 +209,26 @@ pub struct SessionHeadState { /// Bundles the (1) persistent store, (2) the in-memory session-head /// state (id + hash), and (3) the broadcast sink. `append_entry` /// chains the hash on disk, advances the head, then publishes the -/// entry through the sink — under a single async mutex so two writers +/// entry through the sink — under a single sync mutex so two writers /// cannot interleave the chain. /// +/// All append paths run synchronously: a local-fs `<1 KiB` JSONL line +/// completes well below a millisecond, and going through an async +/// `tokio::fs` ferry would re-introduce the `LogCommand` / drain task +/// we removed. `parking_lot::Mutex` is safe to hold across the disk +/// write since the lock is never crossed by an `.await`. +/// /// `Clone` is a cheap `Arc` clone. The Pod keeps one writer for its /// inline commits (UserInput, TurnEnd, Usage, RunCompleted/Errored, -/// scope snapshots, metrics) and hands clones to background tasks -/// (e.g. the controller's per-item history drain task). +/// scope snapshots, metrics) and hands clones to every other commit +/// site (worker callback, interceptor). pub struct SessionLogWriter { inner: Arc>, } struct WriterInner { store: St, - head: AsyncMutex, + head: Mutex, sink: SessionLogSink, } @@ -245,7 +251,7 @@ where Self { inner: Arc::new(WriterInner { store, - head: AsyncMutex::new(SessionHeadState { + head: Mutex::new(SessionHeadState { session_id, head_hash: None, }), @@ -267,7 +273,7 @@ where Self { inner: Arc::new(WriterInner { store, - head: AsyncMutex::new(SessionHeadState { + head: Mutex::new(SessionHeadState { session_id, head_hash, }), @@ -278,15 +284,14 @@ where /// Append `entry` to the log: disk write → in-memory mirror push → /// broadcast — atomic w.r.t. `subscribe_with_snapshot` callers. - pub async fn append_entry(&self, entry: LogEntry) -> Result { - let mut head = self.inner.head.lock().await; + pub fn append_entry(&self, entry: LogEntry) -> Result { + let mut head = self.inner.head.lock(); let hash = session_store::append_entry_with_hash( &self.inner.store, head.session_id, &mut head.head_hash, entry.clone(), - ) - .await?; + )?; self.inner.sink.publish(entry); Ok(hash) } @@ -299,7 +304,7 @@ where /// subscribers observe the swap as a freshly broadcast /// `SessionStart` (with `compacted_from` set), which is their /// signal to reset their derived view. - pub async fn swap_session( + pub fn swap_session( &self, new_session_id: SessionId, initial: LogEntry, @@ -310,11 +315,8 @@ where prev_hash: None, entry: initial.clone(), }; - self.inner - .store - .create_session(new_session_id, &[hashed]) - .await?; - let mut head = self.inner.head.lock().await; + self.inner.store.create_session(new_session_id, &[hashed])?; + let mut head = self.inner.head.lock(); head.session_id = new_session_id; head.head_hash = Some(hash.clone()); self.inner.sink.reset_with_initial(initial); @@ -324,12 +326,9 @@ where /// If the store's head no longer matches our cached head, mint a /// fresh session that forks from the current state and switch to /// it. Returns `true` when a fork happened. - pub async fn ensure_head_or_fork( - &self, - state: SessionStartState<'_>, - ) -> Result { - let mut head = self.inner.head.lock().await; - let store_head = self.inner.store.read_head_hash(head.session_id).await?; + pub fn ensure_head_or_fork(&self, state: SessionStartState<'_>) -> Result { + let mut head = self.inner.head.lock(); + let store_head = self.inner.store.read_head_hash(head.session_id)?; if store_head == head.head_hash { return Ok(false); } @@ -348,7 +347,7 @@ where prev_hash: None, entry: entry.clone(), }; - self.inner.store.create_session(fork_id, &[hashed]).await?; + self.inner.store.create_session(fork_id, &[hashed])?; head.session_id = fork_id; head.head_hash = Some(hash); self.inner.sink.reset_with_initial(entry); @@ -370,20 +369,19 @@ where } /// Cheap snapshot of the current session id. - pub async fn current_session_id(&self) -> SessionId { - self.inner.head.lock().await.session_id + pub fn current_session_id(&self) -> SessionId { + self.inner.head.lock().session_id } /// Cheap snapshot of the current head hash. - pub async fn current_head_hash(&self) -> Option { - self.inner.head.lock().await.head_hash.clone() + pub fn current_head_hash(&self) -> Option { + self.inner.head.lock().head_hash.clone() } /// Direct lock on the head. Used by paths that need to coordinate - /// custom writes with the hash chain (currently - /// `session_metrics::record_metric`). - pub async fn lock_head(&self) -> MutexGuard<'_, SessionHeadState> { - self.inner.head.lock().await + /// custom writes with the hash chain. + pub fn lock_head(&self) -> MutexGuard<'_, SessionHeadState> { + self.inner.head.lock() } } @@ -428,12 +426,12 @@ mod tests { } fn notification_entry(text: &str) -> LogEntry { - LogEntry::SystemItems { + LogEntry::SystemItem { ts: now_millis(), - items: vec![session_store::SystemItem::Notification { + item: session_store::SystemItem::Notification { message: text.to_owned(), body: format!("[Notification] {text}"), - }], + }, } } @@ -449,11 +447,11 @@ mod tests { sink.publish(turn_end(1)); assert!(rx.try_recv().is_err(), "TurnEnd must not be broadcast live"); - // SystemItems is live-relevant. + // SystemItem is live-relevant. sink.publish(notification_entry("hi")); match rx.try_recv() { - Ok(LogEntry::SystemItems { .. }) => {} - other => panic!("expected SystemItems, got {other:?}"), + Ok(LogEntry::SystemItem { .. }) => {} + other => panic!("expected SystemItem, got {other:?}"), } // Mirror still grew with both entries (snapshot completeness). @@ -470,7 +468,7 @@ mod tests { assert_eq!(snapshot.len(), 1); match rx.try_recv() { - Ok(LogEntry::SystemItems { .. }) => {} + Ok(LogEntry::SystemItem { .. }) => {} other => panic!("unexpected: {other:?}"), } assert!(rx.try_recv().is_err()); diff --git a/crates/pod/tests/compact_events_test.rs b/crates/pod/tests/compact_events_test.rs index 7b827938..5611816c 100644 --- a/crates/pod/tests/compact_events_test.rs +++ b/crates/pod/tests/compact_events_test.rs @@ -149,7 +149,7 @@ async fn make_pod_with_manifest( let manifest = pod::PodManifest::from_toml(manifest_toml).unwrap(); let store_tmp = tempfile::tempdir().unwrap(); - let store = FsStore::new(store_tmp.path()).await.unwrap(); + let store = FsStore::new(store_tmp.path()).unwrap(); std::mem::forget(store_tmp); let pwd_tmp = tempfile::tempdir().unwrap(); diff --git a/crates/pod/tests/consolidation_test.rs b/crates/pod/tests/consolidation_test.rs index ae09161d..7c11c335 100644 --- a/crates/pod/tests/consolidation_test.rs +++ b/crates/pod/tests/consolidation_test.rs @@ -158,7 +158,7 @@ async fn make_pod_with( let manifest = pod::PodManifest::from_toml(manifest_toml).unwrap(); let store_tmp = tempfile::tempdir().unwrap(); - let store = FsStore::new(store_tmp.path()).await.unwrap(); + let store = FsStore::new(store_tmp.path()).unwrap(); std::mem::forget(store_tmp); let scope = pod::Scope::writable(&pwd).unwrap(); diff --git a/crates/pod/tests/controller_test.rs b/crates/pod/tests/controller_test.rs index b4859641..a9cdbbb8 100644 --- a/crates/pod/tests/controller_test.rs +++ b/crates/pod/tests/controller_test.rs @@ -29,6 +29,12 @@ fn history_from_sink(handle: &PodHandle) -> Vec { let text = protocol::Segment::flatten_to_text(&segments); items.push(Item::user_message(text)); } + LogEntry::AssistantItem { item, .. } | LogEntry::ToolResult { item, .. } => { + items.push(Item::from(item)); + } + LogEntry::SystemItem { item, .. } => { + items.push(item.to_history_item()); + } LogEntry::AssistantItems { items: i, .. } | LogEntry::ToolResults { items: i, .. } | LogEntry::HookInjectedItems { items: i, .. } => { @@ -167,7 +173,7 @@ async fn make_pod_with_pwd_and_manifest( ) -> (Pod, std::path::PathBuf) { let manifest = PodManifest::from_toml(manifest_toml).unwrap(); let store_tmp = tempfile::tempdir().unwrap(); - let store = FsStore::new(store_tmp.path()).await.unwrap(); + let store = FsStore::new(store_tmp.path()).unwrap(); std::mem::forget(store_tmp); // Separate tempdir to serve as the Pod's pwd/scope — these tests @@ -773,12 +779,10 @@ async fn notify_while_idle_auto_starts_turn_and_injects_system_message() { let (entries, _) = handle.sink.subscribe_with_snapshot(); let saw_notify_in_mirror = entries.iter().any(|e| matches!( e, - session_store::LogEntry::SystemItems { items, .. } - if items.iter().any(|si| matches!( - si, - session_store::SystemItem::Notification { message, .. } - if message == "turn finished" - )) + session_store::LogEntry::SystemItem { + item: session_store::SystemItem::Notification { message, .. }, + .. + } if message == "turn finished" )); assert!( saw_notify_in_mirror, @@ -863,12 +867,13 @@ async fn pod_event_turn_ended_while_idle_auto_starts_turn_and_injects_system_mes let (entries, _) = handle.sink.subscribe_with_snapshot(); let saw_pod_event_in_mirror = entries.iter().any(|e| matches!( e, - session_store::LogEntry::SystemItems { items, .. } - if items.iter().any(|si| matches!( - si, - session_store::SystemItem::PodEvent { event: protocol::PodEvent::TurnEnded { pod_name }, .. } - if pod_name == "child" - )) + session_store::LogEntry::SystemItem { + item: session_store::SystemItem::PodEvent { + event: protocol::PodEvent::TurnEnded { pod_name }, + .. + }, + .. + } if pod_name == "child" )); assert!( saw_pod_event_in_mirror, diff --git a/crates/pod/tests/restore_test.rs b/crates/pod/tests/restore_test.rs index 8cf74192..5283c9cf 100644 --- a/crates/pod/tests/restore_test.rs +++ b/crates/pod/tests/restore_test.rs @@ -36,7 +36,7 @@ async fn restore_from_manifest_rejects_unknown_session() { let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner()); let store_tmp = tempfile::tempdir().unwrap(); - let store = FsStore::new(store_tmp.path()).await.unwrap(); + let store = FsStore::new(store_tmp.path()).unwrap(); let manifest = pod::PodManifest::from_toml(MINIMAL_MANIFEST_TOML).unwrap(); // A freshly-minted id with no jsonl file at all → store returns @@ -59,7 +59,7 @@ async fn restore_from_manifest_rejects_empty_session_log() { let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner()); let store_tmp = tempfile::tempdir().unwrap(); - let store = FsStore::new(store_tmp.path()).await.unwrap(); + let store = FsStore::new(store_tmp.path()).unwrap(); let manifest = pod::PodManifest::from_toml(MINIMAL_MANIFEST_TOML).unwrap(); // Pre-create an empty `.jsonl` so `read_all` succeeds with no @@ -86,7 +86,7 @@ async fn restore_from_manifest_rejects_session_without_scope_snapshot() { let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner()); let store_tmp = tempfile::tempdir().unwrap(); - let store = FsStore::new(store_tmp.path()).await.unwrap(); + let store = FsStore::new(store_tmp.path()).unwrap(); let manifest = pod::PodManifest::from_toml(MINIMAL_MANIFEST_TOML).unwrap(); let id = session_store::new_session_id(); @@ -95,9 +95,7 @@ async fn restore_from_manifest_rejects_session_without_scope_snapshot() { config: &Default::default(), history: &[], }; - session_store::create_session_with_id(&store, id, state) - .await - .unwrap(); + session_store::create_session_with_id(&store, id, state).unwrap(); let result = Pod::restore_from_manifest(id, manifest, store, pod::PromptLoader::builtins_only()).await; diff --git a/crates/pod/tests/session_metrics_test.rs b/crates/pod/tests/session_metrics_test.rs index c396385a..9e1a4d90 100644 --- a/crates/pod/tests/session_metrics_test.rs +++ b/crates/pod/tests/session_metrics_test.rs @@ -174,7 +174,7 @@ async fn make_pod( ) { let manifest = PodManifest::from_toml(&manifest_toml).unwrap(); let store_tmp = tempfile::tempdir().unwrap(); - let store = FsStore::new(store_tmp.path()).await.unwrap(); + let store = FsStore::new(store_tmp.path()).unwrap(); let pwd_tmp = tempfile::tempdir().unwrap(); let pwd = pwd_tmp.path().to_path_buf(); let scope = pod::Scope::writable(&pwd).unwrap(); @@ -210,7 +210,7 @@ async fn prune_metrics_emit_skip_then_fire_with_post_request_join() { pod.run_text("first").await.unwrap(); pod.run_text("second").await.unwrap(); - let state = session_store::restore(&store, session_id).await.unwrap(); + let state = session_store::restore(&store, session_id).unwrap(); let metrics = metrics_from_extensions(&state.extensions); // Run 1 has 2 LLM iterations (tool loop), each evaluates prune with @@ -296,7 +296,7 @@ async fn prune_metrics_record_below_min_savings_skip() { pod.run_text("first").await.unwrap(); pod.run_text("second").await.unwrap(); - let state = session_store::restore(&store, session_id).await.unwrap(); + let state = session_store::restore(&store, session_id).unwrap(); let metrics = metrics_from_extensions(&state.extensions); let below = metrics .iter() @@ -329,35 +329,35 @@ struct MetricFailingStore { } impl Store for MetricFailingStore { - async fn append(&self, id: SessionId, entry: &HashedEntry) -> Result<(), StoreError> { + fn append(&self, id: SessionId, entry: &HashedEntry) -> Result<(), StoreError> { if let LogEntry::Extension { domain, .. } = &entry.entry { if domain == DOMAIN { return Err(StoreError::Io(std::io::Error::other("synthetic failure"))); } } - self.inner.append(id, entry).await + self.inner.append(id, entry) } - async fn read_all(&self, id: SessionId) -> Result, StoreError> { - self.inner.read_all(id).await + fn read_all(&self, id: SessionId) -> Result, StoreError> { + self.inner.read_all(id) } - async fn list_sessions(&self) -> Result, StoreError> { - self.inner.list_sessions().await + fn list_sessions(&self) -> Result, StoreError> { + self.inner.list_sessions() } - async fn create_session( + fn create_session( &self, id: SessionId, entries: &[HashedEntry], ) -> Result<(), StoreError> { - self.inner.create_session(id, entries).await + self.inner.create_session(id, entries) } - async fn exists(&self, id: SessionId) -> Result { - self.inner.exists(id).await + fn exists(&self, id: SessionId) -> Result { + self.inner.exists(id) } - async fn read_head_hash(&self, id: SessionId) -> Result, StoreError> { - self.inner.read_head_hash(id).await + fn read_head_hash(&self, id: SessionId) -> Result, StoreError> { + self.inner.read_head_hash(id) } - async fn append_trace(&self, id: SessionId, entry: &TraceEntry) -> Result<(), StoreError> { - self.inner.append_trace(id, entry).await + fn append_trace(&self, id: SessionId, entry: &TraceEntry) -> Result<(), StoreError> { + self.inner.append_trace(id, entry) } } @@ -372,7 +372,7 @@ async fn metric_write_failure_emits_warn_alert_and_does_not_abort_run() { let manifest_toml = manifest_toml(1, 1); let manifest = PodManifest::from_toml(&manifest_toml).unwrap(); let store_tmp = tempfile::tempdir().unwrap(); - let inner = FsStore::new(store_tmp.path()).await.unwrap(); + let inner = FsStore::new(store_tmp.path()).unwrap(); let store = MetricFailingStore { inner }; let pwd_tmp = tempfile::tempdir().unwrap(); let pwd = pwd_tmp.path().to_path_buf(); @@ -397,7 +397,7 @@ async fn metric_write_failure_emits_warn_alert_and_does_not_abort_run() { pod.run_text("hello").await.unwrap(); // No metrics ended up in the log (writes were rejected). - let state = session_store::restore(&store, session_id).await.unwrap(); + let state = session_store::restore(&store, session_id).unwrap(); let metrics = metrics_from_extensions(&state.extensions); assert!(metrics.is_empty(), "metrics must drop on write failure"); @@ -444,7 +444,7 @@ permission = "write" let client = MockClient::new(vec![text_response_with_cache("hi", 0, 0)]); let manifest = PodManifest::from_toml(manifest_toml).unwrap(); let store_tmp = tempfile::tempdir().unwrap(); - let store = FsStore::new(store_tmp.path()).await.unwrap(); + let store = FsStore::new(store_tmp.path()).unwrap(); let pwd_tmp = tempfile::tempdir().unwrap(); let pwd = pwd_tmp.path().to_path_buf(); let scope = pod::Scope::writable(&pwd).unwrap(); @@ -455,7 +455,7 @@ permission = "write" let session_id = pod.session_id(); pod.run_text("hello").await.unwrap(); - let state = session_store::restore(&store, session_id).await.unwrap(); + let state = session_store::restore(&store, session_id).unwrap(); let metrics = metrics_from_extensions(&state.extensions); assert!( metrics.is_empty(), diff --git a/crates/pod/tests/system_prompt_template_test.rs b/crates/pod/tests/system_prompt_template_test.rs index 5cf3af8c..04789f79 100644 --- a/crates/pod/tests/system_prompt_template_test.rs +++ b/crates/pod/tests/system_prompt_template_test.rs @@ -103,7 +103,7 @@ async fn make_pod_with_body( let manifest = pod::PodManifest::from_toml(MINIMAL_MANIFEST_TOML).unwrap(); let store_tmp = tempfile::tempdir().unwrap(); - let store = FsStore::new(store_tmp.path()).await.unwrap(); + let store = FsStore::new(store_tmp.path()).unwrap(); std::mem::forget(store_tmp); let pwd_tmp = tempfile::tempdir().unwrap(); @@ -182,7 +182,7 @@ async fn session_start_state_captures_rendered_prompt() { .unwrap(); pod.run_text("hi").await.unwrap(); - let entries = pod.store().read_all(pod.session_id()).await.unwrap(); + let entries = pod.store().read_all(pod.session_id()).unwrap(); let first = entries.first().expect("at least one entry"); match &first.entry { LogEntry::SessionStart { system_prompt, .. } => { diff --git a/crates/session-metrics/src/lib.rs b/crates/session-metrics/src/lib.rs index 3b209125..6fd8d481 100644 --- a/crates/session-metrics/src/lib.rs +++ b/crates/session-metrics/src/lib.rs @@ -75,14 +75,14 @@ impl Metric { /// /// `save_extension` の薄い wrapper。書き込み失敗は呼び出し側に返す /// (メトリクスのために本体処理を止めるかは呼び出し側の判断)。 -pub async fn record_metric( +pub fn record_metric( store: &impl Store, session_id: SessionId, head_hash: &mut Option, metric: &Metric, ) -> Result<(), StoreError> { let payload = serde_json::to_value(metric).expect("Metric serialization cannot fail"); - save_extension(store, session_id, head_hash, DOMAIN, payload).await + save_extension(store, session_id, head_hash, DOMAIN, payload) } /// `RestoredState.extensions` から metrics domain の payload を順に取り出し、 diff --git a/crates/session-store/src/fs_store.rs b/crates/session-store/src/fs_store.rs index bdf7dc91..f3e07b0e 100644 --- a/crates/session-store/src/fs_store.rs +++ b/crates/session-store/src/fs_store.rs @@ -8,9 +8,9 @@ use crate::SessionId; use crate::event_trace::TraceEntry; use crate::session_log::{EntryHash, HashedEntry}; use crate::store::{Store, StoreError}; +use std::fs; +use std::io::Write; use std::path::{Path, PathBuf}; -use tokio::fs; -use tokio::io::AsyncWriteExt; /// Filesystem-backed JSONL store. /// @@ -24,9 +24,9 @@ pub struct FsStore { impl FsStore { /// Create a new `FsStore` rooted at the given directory. /// Creates the directory if it does not exist. - pub async fn new(root: impl Into) -> Result { + pub fn new(root: impl Into) -> Result { let root = root.into(); - fs::create_dir_all(&root).await?; + fs::create_dir_all(&root)?; Ok(Self { root }) } @@ -38,15 +38,13 @@ impl FsStore { self.root.join(format!("{id}.trace.jsonl")) } - async fn append_line(&self, path: &Path, line: &str) -> Result<(), StoreError> { - let mut file = fs::OpenOptions::new() - .create(true) - .append(true) - .open(path) - .await?; - file.write_all(line.as_bytes()).await?; - file.write_all(b"\n").await?; - file.flush().await?; + fn append_line(&self, path: &Path, line: &str) -> Result<(), StoreError> { + let mut file = fs::OpenOptions::new().create(true).append(true).open(path)?; + file.write_all(line.as_bytes())?; + file.write_all(b"\n")?; + // Append-mode write is the durability boundary; an explicit + // `sync_all` here would multiply latency by ~10× for no gain + // since the kernel already orders concurrent `O_APPEND` writes. Ok(()) } @@ -67,24 +65,24 @@ impl FsStore { } impl Store for FsStore { - async fn append(&self, id: SessionId, entry: &HashedEntry) -> Result<(), StoreError> { + fn append(&self, id: SessionId, entry: &HashedEntry) -> Result<(), StoreError> { let line = serde_json::to_string(entry)?; - self.append_line(&self.log_path(id), &line).await + self.append_line(&self.log_path(id), &line) } - async fn read_all(&self, id: SessionId) -> Result, StoreError> { + fn read_all(&self, id: SessionId) -> Result, StoreError> { let path = self.log_path(id); if !path.exists() { return Err(StoreError::NotFound(id)); } - let content = fs::read_to_string(&path).await?; + let content = fs::read_to_string(&path)?; Self::parse_jsonl(&content) } - async fn list_sessions(&self) -> Result, StoreError> { + fn list_sessions(&self) -> Result, StoreError> { let mut sessions = Vec::new(); - let mut dir = fs::read_dir(&self.root).await?; - while let Some(entry) = dir.next_entry().await? { + for entry in fs::read_dir(&self.root)? { + let entry = entry?; let path = entry.path(); // Only match .jsonl files, not .trace.jsonl let name = path.file_name().and_then(|n| n.to_str()).unwrap_or(""); @@ -100,31 +98,27 @@ impl Store for FsStore { Ok(sessions) } - async fn create_session( - &self, - id: SessionId, - entries: &[HashedEntry], - ) -> Result<(), StoreError> { + fn create_session(&self, id: SessionId, entries: &[HashedEntry]) -> Result<(), StoreError> { let path = self.log_path(id); let mut content = String::new(); for entry in entries { content.push_str(&serde_json::to_string(entry)?); content.push('\n'); } - fs::write(&path, content.as_bytes()).await?; + fs::write(&path, content.as_bytes())?; Ok(()) } - async fn exists(&self, id: SessionId) -> Result { + fn exists(&self, id: SessionId) -> Result { Ok(self.log_path(id).exists()) } - async fn read_head_hash(&self, id: SessionId) -> Result, StoreError> { + fn read_head_hash(&self, id: SessionId) -> Result, StoreError> { let path = self.log_path(id); if !path.exists() { return Err(StoreError::NotFound(id)); } - let content = fs::read_to_string(&path).await?; + let content = fs::read_to_string(&path)?; let last_line = content.lines().rev().find(|l| !l.trim().is_empty()); match last_line { Some(line) => { @@ -139,8 +133,8 @@ impl Store for FsStore { } } - async fn append_trace(&self, id: SessionId, entry: &TraceEntry) -> Result<(), StoreError> { + fn append_trace(&self, id: SessionId, entry: &TraceEntry) -> Result<(), StoreError> { let line = serde_json::to_string(entry)?; - self.append_line(&self.trace_path(id), &line).await + self.append_line(&self.trace_path(id), &line) } } diff --git a/crates/session-store/src/lib.rs b/crates/session-store/src/lib.rs index 62f0d872..a8cfcb0c 100644 --- a/crates/session-store/src/lib.rs +++ b/crates/session-store/src/lib.rs @@ -40,10 +40,11 @@ pub use llm_worker::UsageRecord; pub use llm_worker::llm_client::types::{ContentPart, Item, Role}; pub use logged_item::{LoggedContentPart, LoggedItem, LoggedRole, from_logged, to_logged}; pub use session::{ - SessionStartState, append_entry, append_entry_with_hash, create_compacted_session, - create_session, create_session_with_id, ensure_head_or_fork, fork, fork_at, restore, - save_config_changed, save_delta, save_extension, save_pod_scope, save_run_completed, - save_run_errored, save_turn_end, save_usage, save_user_input, + SessionStartState, append_entry, append_entry_with_hash, append_system_item, + classify_history_item, create_compacted_session, create_session, create_session_with_id, + ensure_head_or_fork, fork, fork_at, restore, save_config_changed, save_delta, save_extension, + save_pod_scope, save_run_completed, save_run_errored, save_turn_end, save_usage, + save_user_input, }; pub use session_log::{ EntryHash, HashedEntry, LogEntry, POD_SCOPE_EXTENSION_DOMAIN, PodScopeSnapshot, RestoredState, diff --git a/crates/session-store/src/session.rs b/crates/session-store/src/session.rs index 8351b031..2c472335 100644 --- a/crates/session-store/src/session.rs +++ b/crates/session-store/src/session.rs @@ -8,6 +8,7 @@ use crate::SessionId; use crate::logged_item::{LoggedItem, to_logged}; use crate::session_log::{self, EntryHash, HashedEntry, LogEntry, PodScopeSnapshot, SessionOrigin}; use crate::store::{Store, StoreError}; +use crate::system_item::SystemItem; use llm_worker::WorkerResult; use llm_worker::llm_client::RequestConfig; use llm_worker::llm_client::types::Item; @@ -23,12 +24,12 @@ pub struct SessionStartState<'a> { /// Create a new session, writing the initial `SessionStart` entry. /// /// Returns the new session ID and head hash. -pub async fn create_session( +pub fn create_session( store: &impl Store, state: SessionStartState<'_>, ) -> Result<(SessionId, EntryHash), StoreError> { let session_id = crate::new_session_id(); - let hash = create_session_with_id(store, session_id, state).await?; + let hash = create_session_with_id(store, session_id, state)?; Ok((session_id, hash)) } @@ -37,7 +38,7 @@ pub async fn create_session( /// Used by callers that need to reserve a session ID synchronously but /// defer the initial log append (e.g. Pod, which resolves a templated /// system prompt only at first turn). Returns the resulting head hash. -pub async fn create_session_with_id( +pub fn create_session_with_id( store: &impl Store, session_id: SessionId, state: SessionStartState<'_>, @@ -56,7 +57,7 @@ pub async fn create_session_with_id( prev_hash: None, entry, }; - store.append(session_id, &hashed_entry).await?; + store.append(session_id, &hashed_entry)?; Ok(hash) } @@ -64,7 +65,7 @@ pub async fn create_session_with_id( /// /// Records `compacted_from` provenance linking back to the source session. /// Returns the new session ID and head hash. -pub async fn create_compacted_session( +pub fn create_compacted_session( store: &impl Store, state: SessionStartState<'_>, source_session_id: SessionId, @@ -88,7 +89,7 @@ pub async fn create_compacted_session( prev_hash: None, entry, }; - store.append(session_id, &hashed_entry).await?; + store.append(session_id, &hashed_entry)?; Ok((session_id, hash)) } @@ -96,11 +97,11 @@ pub async fn create_compacted_session( /// /// Returns the reconstructed state. The caller is responsible for /// applying it to a Worker. -pub async fn restore( +pub fn restore( store: &impl Store, session_id: SessionId, ) -> Result { - let entries = store.read_all(session_id).await?; + let entries = store.read_all(session_id)?; Ok(session_log::collect_state(&entries)) } @@ -108,13 +109,13 @@ pub async fn restore( /// If not, auto-fork into a new session. /// /// Updates `session_id` and `head_hash` in place when a fork occurs. -pub async fn ensure_head_or_fork( +pub fn ensure_head_or_fork( store: &impl Store, session_id: &mut SessionId, head_hash: &mut Option, state: SessionStartState<'_>, ) -> Result<(), StoreError> { - let store_head = store.read_head_hash(*session_id).await?; + let store_head = store.read_head_hash(*session_id)?; if store_head == *head_hash { return Ok(()); } @@ -133,7 +134,7 @@ pub async fn ensure_head_or_fork( prev_hash: None, entry, }; - store.create_session(fork_id, &[hashed_entry]).await?; + store.create_session(fork_id, &[hashed_entry])?; *session_id = fork_id; *head_hash = Some(hash); Ok(()) @@ -145,7 +146,7 @@ pub async fn ensure_head_or_fork( /// the worker pushes its flattened user message into history; replay /// derives the worker `Item::user_message` from these segments via /// [`Segment::flatten_to_text`]. -pub async fn save_user_input( +pub fn save_user_input( store: &impl Store, session_id: SessionId, head_hash: &mut Option, @@ -160,17 +161,17 @@ pub async fn save_user_input( segments, }, ) - .await } /// Log the history delta — new items added since the previous snapshot. /// -/// Classifies items into AssistantItems, ToolResults, and HookInjectedItems -/// entries automatically. User messages are skipped because they are -/// persisted upfront via [`save_user_input`] at submit time; the worker -/// pushes a flattened copy into its history that arrives here in -/// `new_items` and would otherwise produce a duplicate `UserInput` entry. -pub async fn save_delta( +/// Classifies items into AssistantItem / ToolResult / HookInjectedItems +/// entries automatically (one entry per item). User messages are skipped +/// because they are persisted upfront via [`save_user_input`] at submit +/// time; the worker pushes a flattened copy into its history that +/// arrives here in `new_items` and would otherwise produce a duplicate +/// `UserInput` entry. +pub fn save_delta( store: &impl Store, session_id: SessionId, head_hash: &mut Option, @@ -181,66 +182,63 @@ pub async fn save_delta( } let ts = session_log::now_millis(); - let mut i = 0; - - while i < new_items.len() { - let item = &new_items[i]; + for item in new_items { if item.is_user_message() { // Already persisted by save_user_input at submit time. - i += 1; - } else if item.is_tool_result() { - let start = i; - while i < new_items.len() && new_items[i].is_tool_result() { - i += 1; - } - append_entry( - store, - session_id, - head_hash, - LogEntry::ToolResults { - ts, - items: to_logged(&new_items[start..i]), - }, - ) - .await?; - } else if item.is_assistant_message() || item.is_tool_call() || item.is_reasoning() { - let start = i; - while i < new_items.len() - && (new_items[i].is_assistant_message() - || new_items[i].is_tool_call() - || new_items[i].is_reasoning()) - { - i += 1; - } - append_entry( - store, - session_id, - head_hash, - LogEntry::AssistantItems { - ts, - items: to_logged(&new_items[start..i]), - }, - ) - .await?; - } else { - append_entry( - store, - session_id, - head_hash, - LogEntry::HookInjectedItems { - ts, - items: vec![LoggedItem::from(&new_items[i])], - }, - ) - .await?; - i += 1; + continue; } + let entry = classify_history_item(item, ts); + append_entry(store, session_id, head_hash, entry)?; } Ok(()) } +/// Map one history item to its singular `LogEntry` form. Used by the +/// fallback `save_delta` path and the controller's worker-callback +/// classifier so write classification lives in one place. +pub fn classify_history_item(item: &Item, ts: u64) -> LogEntry { + if item.is_tool_result() { + LogEntry::ToolResult { + ts, + item: LoggedItem::from(item), + } + } else if item.is_assistant_message() || item.is_tool_call() || item.is_reasoning() { + LogEntry::AssistantItem { + ts, + item: LoggedItem::from(item), + } + } else { + // Defensive: anything else (future Item kinds) routes through + // AssistantItem rather than getting silently dropped. + LogEntry::AssistantItem { + ts, + item: LoggedItem::from(item), + } + } +} + +/// Append a single typed system item as `LogEntry::SystemItem`. Helper +/// for the Pod-side interceptor commit path; mirrors the per-item +/// commit shape used for assistant / tool result entries. +pub fn append_system_item( + store: &impl Store, + session_id: SessionId, + head_hash: &mut Option, + item: SystemItem, +) -> Result { + append_entry_with_hash( + store, + session_id, + head_hash, + LogEntry::SystemItem { + ts: session_log::now_millis(), + item, + }, + ) +} + /// Log a TurnEnd entry. -pub async fn save_turn_end( +pub fn save_turn_end( store: &impl Store, session_id: SessionId, head_hash: &mut Option, @@ -255,11 +253,10 @@ pub async fn save_turn_end( turn_count, }, ) - .await } /// Log a `RunCompleted` entry — `run()` / `resume()` returned `Ok(WorkerResult)`. -pub async fn save_run_completed( +pub fn save_run_completed( store: &impl Store, session_id: SessionId, head_hash: &mut Option, @@ -276,14 +273,13 @@ pub async fn save_run_completed( result, }, ) - .await } /// Log a `RunErrored` entry — `run()` / `resume()` returned `Err(WorkerError)`. /// /// `WorkerError` is not `Serialize`, so the caller passes a lossy /// `to_string()` rendering as `message`. -pub async fn save_run_errored( +pub fn save_run_errored( store: &impl Store, session_id: SessionId, head_hash: &mut Option, @@ -300,7 +296,6 @@ pub async fn save_run_errored( message, }, ) - .await } /// Log an `LlmUsage` entry — 1 LLM リクエスト分の Usage スナップショット。 @@ -309,7 +304,7 @@ pub async fn save_run_errored( /// その prefix をプロバイダが実測した占有量(プロンプト全長)で、 /// プロバイダ別の正規化(Anthropic では `input + cache_read + cache_creation`)を /// 済ませた値を渡す。 -pub async fn save_usage( +pub fn save_usage( store: &impl Store, session_id: SessionId, head_hash: &mut Option, @@ -332,7 +327,6 @@ pub async fn save_usage( output_tokens, }, ) - .await } /// Log an `Extension` entry — domain-tagged opaque payload. @@ -340,7 +334,7 @@ pub async fn save_usage( /// session-store treats `payload` as an unstructured `serde_json::Value`. /// Each domain is responsible for serializing into and folding out of it. /// Use `RestoredState.extensions` to read entries back at restore time. -pub async fn save_extension( +pub fn save_extension( store: &impl Store, session_id: SessionId, head_hash: &mut Option, @@ -357,11 +351,10 @@ pub async fn save_extension( payload, }, ) - .await } /// Log the Pod's latest runtime scope snapshot. -pub async fn save_pod_scope( +pub fn save_pod_scope( store: &impl Store, session_id: SessionId, head_hash: &mut Option, @@ -375,11 +368,10 @@ pub async fn save_pod_scope( session_log::POD_SCOPE_EXTENSION_DOMAIN, payload, ) - .await } /// Log a `ConfigChanged` entry. -pub async fn save_config_changed( +pub fn save_config_changed( store: &impl Store, session_id: SessionId, head_hash: &mut Option, @@ -394,14 +386,10 @@ pub async fn save_config_changed( config: config.clone(), }, ) - .await } /// Fork the current state into a new session. -pub async fn fork( - store: &impl Store, - state: SessionStartState<'_>, -) -> Result { +pub fn fork(store: &impl Store, state: SessionStartState<'_>) -> Result { let fork_id = crate::new_session_id(); let entry = LogEntry::SessionStart { ts: session_log::now_millis(), @@ -417,17 +405,17 @@ pub async fn fork( prev_hash: None, entry, }; - store.create_session(fork_id, &[hashed_entry]).await?; + store.create_session(fork_id, &[hashed_entry])?; Ok(fork_id) } /// Fork from an arbitrary point in a stored session's log. -pub async fn fork_at( +pub fn fork_at( store: &impl Store, source_id: SessionId, at_hash: &EntryHash, ) -> Result { - let entries = store.read_all(source_id).await?; + let entries = store.read_all(source_id)?; let cut = entries .iter() .position(|e| &e.hash == at_hash) @@ -453,7 +441,7 @@ pub async fn fork_at( prev_hash: None, entry, }; - store.create_session(fork_id, &[hashed_entry]).await?; + store.create_session(fork_id, &[hashed_entry])?; Ok(fork_id) } @@ -462,13 +450,13 @@ pub async fn fork_at( /// Lower-level dual of the `save_*` convenience wrappers in this module. /// Use when the caller already builds the typed entry itself (e.g. when /// it needs the same value for an in-memory mirror + broadcast). -pub async fn append_entry( +pub fn append_entry( store: &impl Store, session_id: SessionId, head_hash: &mut Option, entry: LogEntry, ) -> Result<(), StoreError> { - append_entry_with_hash(store, session_id, head_hash, entry).await?; + append_entry_with_hash(store, session_id, head_hash, entry)?; Ok(()) } @@ -476,7 +464,7 @@ pub async fn append_entry( /// /// Used by paths that need the hash for downstream broadcast or mirror /// updates (e.g. the Pod's `SessionLogSink`). -pub async fn append_entry_with_hash( +pub fn append_entry_with_hash( store: &impl Store, session_id: SessionId, head_hash: &mut Option, @@ -488,7 +476,7 @@ pub async fn append_entry_with_hash( prev_hash: head_hash.clone(), entry, }; - store.append(session_id, &hashed_entry).await?; + store.append(session_id, &hashed_entry)?; *head_hash = Some(hash.clone()); Ok(hash) } diff --git a/crates/session-store/src/session_log.rs b/crates/session-store/src/session_log.rs index 056ef115..920cb483 100644 --- a/crates/session-store/src/session_log.rs +++ b/crates/session-store/src/session_log.rs @@ -120,24 +120,37 @@ pub enum LogEntry { /// history; the worker layer never sees segments directly. UserInput { ts: u64, segments: Vec }, - /// Assistant response items added to history (worker.rs:1040-1041). + /// One assistant-side item appended to history — assistant message, + /// reasoning, or tool call. Singular: one entry per history item so + /// the wire-side `Event::*` lane and on-disk LogEntry stay 1:1. + AssistantItem { ts: u64, item: LoggedItem }, + + /// One tool-execution result appended to history. + ToolResult { ts: u64, item: LoggedItem }, + + /// One typed agent-injected system item: notification, child-Pod + /// lifecycle event, `@` / `#` / `/` resolution + /// payload. Each `SystemItem` carries kind metadata that the LLM + /// itself never sees (the LLM gets `Item::system_message` with the + /// item's denormalised `body`), but live clients and replay paths + /// dispatch on `kind` for typed rendering. + SystemItem { ts: u64, item: SystemItem }, + + /// Legacy plural form: kept **read-only** so old session logs still + /// open. New writes always use the singular `AssistantItem`. Items + /// are flattened on replay. AssistantItems { ts: u64, items: Vec }, - /// Tool execution results added to history (worker.rs:897-900, 1072-1076). + /// Legacy plural form: kept **read-only**. New writes use the + /// singular `ToolResult`. ToolResults { ts: u64, items: Vec }, - /// Typed agent-injected system items: notifications, child-Pod - /// lifecycle events, `@` / `#` / `/` resolution - /// payloads. Each `SystemItem` carries kind metadata that the LLM - /// itself never sees (the LLM gets `Item::system_message` with the - /// item's `history_text()`), but live clients and replay paths - /// dispatch on `kind` for typed rendering. + /// Legacy plural form: kept **read-only**. New writes use the + /// singular `SystemItem`. SystemItems { ts: u64, items: Vec }, - /// Legacy pre-`SystemItems` form. Deserialize-only — new writes - /// always use `SystemItems`. Items are flattened to - /// `Item::system_message` on replay, matching how the original - /// path worked. + /// Legacy pre-`SystemItem*` form. Deserialize-only. Items are + /// flattened to `Item::system_message` on replay. HookInjectedItems { ts: u64, items: Vec }, /// Turn boundary. Records the turn count after increment. @@ -282,6 +295,15 @@ pub fn collect_state(entries: &[HashedEntry]) -> RestoredState { state.history.push(Item::user_message(text)); state.user_segments.push(segments.clone()); } + LogEntry::AssistantItem { item, .. } => { + state.history.push(Item::from(item.clone())); + } + LogEntry::ToolResult { item, .. } => { + state.history.push(Item::from(item.clone())); + } + LogEntry::SystemItem { item, .. } => { + state.history.push(item.to_history_item()); + } LogEntry::AssistantItems { items, .. } => { state.history.extend(items.iter().cloned().map(Item::from)); } diff --git a/crates/session-store/src/store.rs b/crates/session-store/src/store.rs index 74a8066e..6906ce0d 100644 --- a/crates/session-store/src/store.rs +++ b/crates/session-store/src/store.rs @@ -1,12 +1,18 @@ //! Persistence backend abstraction. //! -//! [`Store`] defines the async interface for reading and writing session logs. +//! [`Store`] defines the sync interface for reading and writing session logs. //! Implementations handle the physical storage (filesystem, database, etc.). +//! +//! Sync (rather than async) is intentional: a session log append is a single +//! `< 1 KiB` line on local fs and completes well below a millisecond. Going +//! through `tokio::fs` would force every caller — including `Worker`'s sync +//! `on_history_append` callback — to bridge sync → async via a channel + +//! drain task. Keeping the store sync lets the worker callback, Pod commit +//! paths, and `PodInterceptor` all share one direct `append_entry` call. use crate::SessionId; use crate::event_trace::TraceEntry; use crate::session_log::{EntryHash, HashedEntry}; -use std::future::Future; /// Errors from the persistence store. #[derive(Debug, thiserror::Error)] @@ -24,49 +30,31 @@ pub enum StoreError { Corrupt { line: usize, message: String }, } -/// Async persistence backend for session logs. +/// Sync persistence backend for session logs. /// /// All methods take `&self` — implementations should use interior mutability /// (e.g., append-mode file handles) when needed. pub trait Store: Send + Sync { /// Append a single hashed entry to the session log. - fn append( - &self, - id: SessionId, - entry: &HashedEntry, - ) -> impl Future> + Send; + fn append(&self, id: SessionId, entry: &HashedEntry) -> Result<(), StoreError>; /// Read all hashed entries for a session, in order. - fn read_all( - &self, - id: SessionId, - ) -> impl Future, StoreError>> + Send; + fn read_all(&self, id: SessionId) -> Result, StoreError>; /// List all session IDs, most recent first. - fn list_sessions(&self) -> impl Future, StoreError>> + Send; + fn list_sessions(&self) -> Result, StoreError>; /// Create a new session with initial entries. - fn create_session( - &self, - id: SessionId, - entries: &[HashedEntry], - ) -> impl Future> + Send; + fn create_session(&self, id: SessionId, entries: &[HashedEntry]) -> Result<(), StoreError>; /// Check if a session exists. - fn exists(&self, id: SessionId) -> impl Future> + Send; + fn exists(&self, id: SessionId) -> Result; /// Read the hash of the last entry in a session (the head). /// /// Returns `None` if the session is empty. - fn read_head_hash( - &self, - id: SessionId, - ) -> impl Future, StoreError>> + Send; + fn read_head_hash(&self, id: SessionId) -> Result, StoreError>; /// Append a trace entry to the debug event trace file. - fn append_trace( - &self, - id: SessionId, - entry: &TraceEntry, - ) -> impl Future> + Send; + fn append_trace(&self, id: SessionId, entry: &TraceEntry) -> Result<(), StoreError>; } diff --git a/crates/session-store/tests/fs_store_test.rs b/crates/session-store/tests/fs_store_test.rs index 58ff02bb..52a9dee9 100644 --- a/crates/session-store/tests/fs_store_test.rs +++ b/crates/session-store/tests/fs_store_test.rs @@ -4,10 +4,10 @@ use session_store::{ FsStore, LogEntry, Store, TraceEntry, build_chain, collect_state, new_session_id, }; -#[tokio::test] -async fn round_trip_write_and_read() { +#[test] +fn round_trip_write_and_read() { let dir = tempfile::tempdir().unwrap(); - let store = FsStore::new(dir.path()).await.unwrap(); + let store = FsStore::new(dir.path()).unwrap(); let id = new_session_id(); let raw = vec![ @@ -23,9 +23,9 @@ async fn round_trip_write_and_read() { ts: 2000, segments: vec![protocol::Segment::text("Hello")], }, - LogEntry::AssistantItems { + LogEntry::AssistantItem { ts: 3000, - items: vec![Item::assistant_message("Hi there!").into()], + item: Item::assistant_message("Hi there!").into(), }, LogEntry::TurnEnd { ts: 3100, @@ -41,11 +41,11 @@ async fn round_trip_write_and_read() { // Write entries one by one for entry in &entries { - store.append(id, entry).await.unwrap(); + store.append(id, entry).unwrap(); } // Read back - let read_back = store.read_all(id).await.unwrap(); + let read_back = store.read_all(id).unwrap(); assert_eq!(read_back.len(), entries.len()); // Verify hashes survived round-trip @@ -64,10 +64,10 @@ async fn round_trip_write_and_read() { assert!(state.head_hash.is_some()); } -#[tokio::test] -async fn create_session_writes_all_entries() { +#[test] +fn create_session_writes_all_entries() { let dir = tempfile::tempdir().unwrap(); - let store = FsStore::new(dir.path()).await.unwrap(); + let store = FsStore::new(dir.path()).unwrap(); let id = new_session_id(); let entries = build_chain(&[LogEntry::SessionStart { @@ -82,22 +82,22 @@ async fn create_session_writes_all_entries() { compacted_from: None, }]); - store.create_session(id, &entries).await.unwrap(); - let read_back = store.read_all(id).await.unwrap(); + store.create_session(id, &entries).unwrap(); + let read_back = store.read_all(id).unwrap(); assert_eq!(read_back.len(), 1); let state = collect_state(&read_back); assert_eq!(state.history.len(), 2); } -#[tokio::test] -async fn list_sessions_returns_newest_first() { +#[test] +fn list_sessions_returns_newest_first() { let dir = tempfile::tempdir().unwrap(); - let store = FsStore::new(dir.path()).await.unwrap(); + let store = FsStore::new(dir.path()).unwrap(); let id1 = new_session_id(); // Small delay to ensure different UUID v7 timestamps - tokio::time::sleep(std::time::Duration::from_millis(2)).await; + std::thread::sleep(std::time::Duration::from_millis(2)); let id2 = new_session_id(); let entries1 = build_chain(&[LogEntry::SessionStart { @@ -117,22 +117,22 @@ async fn list_sessions_returns_newest_first() { compacted_from: None, }]); - store.append(id1, &entries1[0]).await.unwrap(); - store.append(id2, &entries2[0]).await.unwrap(); + store.append(id1, &entries1[0]).unwrap(); + store.append(id2, &entries2[0]).unwrap(); - let sessions = store.list_sessions().await.unwrap(); + let sessions = store.list_sessions().unwrap(); assert_eq!(sessions.len(), 2); assert_eq!(sessions[0], id2); // newest first assert_eq!(sessions[1], id1); } -#[tokio::test] -async fn exists_returns_correct_state() { +#[test] +fn exists_returns_correct_state() { let dir = tempfile::tempdir().unwrap(); - let store = FsStore::new(dir.path()).await.unwrap(); + let store = FsStore::new(dir.path()).unwrap(); let id = new_session_id(); - assert!(!store.exists(id).await.unwrap()); + assert!(!store.exists(id).unwrap()); let entries = build_chain(&[LogEntry::SessionStart { ts: 1000, @@ -142,25 +142,25 @@ async fn exists_returns_correct_state() { forked_from: None, compacted_from: None, }]); - store.append(id, &entries[0]).await.unwrap(); + store.append(id, &entries[0]).unwrap(); - assert!(store.exists(id).await.unwrap()); + assert!(store.exists(id).unwrap()); } -#[tokio::test] -async fn not_found_error_for_missing_session() { +#[test] +fn not_found_error_for_missing_session() { let dir = tempfile::tempdir().unwrap(); - let store = FsStore::new(dir.path()).await.unwrap(); + let store = FsStore::new(dir.path()).unwrap(); let id = new_session_id(); - let result = store.read_all(id).await; + let result = store.read_all(id); assert!(result.is_err()); } -#[tokio::test] -async fn trace_entries_in_separate_file() { +#[test] +fn trace_entries_in_separate_file() { let dir = tempfile::tempdir().unwrap(); - let store = FsStore::new(dir.path()).await.unwrap(); + let store = FsStore::new(dir.path()).unwrap(); let id = new_session_id(); // Write a log entry @@ -172,7 +172,7 @@ async fn trace_entries_in_separate_file() { forked_from: None, compacted_from: None, }]); - store.append(id, &entries[0]).await.unwrap(); + store.append(id, &entries[0]).unwrap(); // Write a trace entry let trace = TraceEntry { @@ -182,10 +182,10 @@ async fn trace_entries_in_separate_file() { llm_worker::llm_client::event::PingEvent { timestamp: None }, ), }; - store.append_trace(id, &trace).await.unwrap(); + store.append_trace(id, &trace).unwrap(); // Log should have 1 entry, unaffected by trace - let log = store.read_all(id).await.unwrap(); + let log = store.read_all(id).unwrap(); assert_eq!(log.len(), 1); // Trace file should exist separately @@ -193,10 +193,10 @@ async fn trace_entries_in_separate_file() { assert!(trace_path.exists()); } -#[tokio::test] -async fn read_head_hash_returns_last_entry_hash() { +#[test] +fn read_head_hash_returns_last_entry_hash() { let dir = tempfile::tempdir().unwrap(); - let store = FsStore::new(dir.path()).await.unwrap(); + let store = FsStore::new(dir.path()).unwrap(); let id = new_session_id(); let entries = build_chain(&[ @@ -215,9 +215,9 @@ async fn read_head_hash_returns_last_entry_hash() { ]); for entry in &entries { - store.append(id, entry).await.unwrap(); + store.append(id, entry).unwrap(); } - let head = store.read_head_hash(id).await.unwrap(); + let head = store.read_head_hash(id).unwrap(); assert_eq!(head.as_ref(), Some(&entries[1].hash)); } diff --git a/crates/session-store/tests/session_test.rs b/crates/session-store/tests/session_test.rs index f821d577..070f14d2 100644 --- a/crates/session-store/tests/session_test.rs +++ b/crates/session-store/tests/session_test.rs @@ -84,9 +84,9 @@ impl Interceptor for PausePolicy { } } -async fn make_store() -> (tempfile::TempDir, FsStore) { +fn make_store() -> (tempfile::TempDir, FsStore) { let dir = tempfile::tempdir().unwrap(); - let store = FsStore::new(dir.path()).await.unwrap(); + let store = FsStore::new(dir.path()).unwrap(); (dir, store) } @@ -108,7 +108,7 @@ async fn run_and_persist( head_hash, vec![protocol::Segment::text(input)], ) - .await + .unwrap(); let history_before = worker.history().len(); @@ -119,10 +119,10 @@ async fn run_and_persist( let new_items = &worker.history()[history_before..]; session_store::save_delta(store, session_id, head_hash, new_items) - .await + .unwrap(); session_store::save_turn_end(store, session_id, head_hash, worker.turn_count()) - .await + .unwrap(); match &result { @@ -134,7 +134,7 @@ async fn run_and_persist( r.clone(), worker.last_run_interrupted(), ) - .await + .unwrap(); } Err(e) => { @@ -145,7 +145,7 @@ async fn run_and_persist( e.to_string(), worker.last_run_interrupted(), ) - .await + .unwrap(); } } @@ -160,7 +160,7 @@ async fn run_and_persist( #[tokio::test] async fn session_run_logs_entries() { - let (_dir, store) = make_store().await; + let (_dir, store) = make_store(); let client = MockLlmClient::new(simple_text_events()); let worker = Worker::new(client); @@ -172,14 +172,14 @@ async fn session_run_logs_entries() { history: worker.history(), }, ) - .await + .unwrap(); let mut head_hash = Some(head_hash); let (worker, _) = run_and_persist(worker, &store, sid, &mut head_hash, "Hi").await; let _ = &worker; - let entries = store.read_all(sid).await.unwrap(); + let entries = store.read_all(sid).unwrap(); // SessionStart, UserInput, AssistantItems, TurnEnd, RunCompleted (at minimum) assert!( @@ -217,7 +217,7 @@ async fn session_run_logs_entries() { #[tokio::test] async fn session_restore_round_trip() { - let (_dir, store) = make_store().await; + let (_dir, store) = make_store(); let client = MockLlmClient::new(simple_text_events()); let mut worker = Worker::new(client); worker.set_system_prompt("You are helpful."); @@ -230,7 +230,7 @@ async fn session_restore_round_trip() { history: worker.history(), }, ) - .await + .unwrap(); let mut head_hash = Some(head_hash); @@ -240,7 +240,7 @@ async fn session_restore_round_trip() { let original_turn_count = worker.turn_count(); // Restore - let state = session_store::restore(&store, sid).await.unwrap(); + let state = session_store::restore(&store, sid).unwrap(); assert_eq!(state.history.len(), original_history_len); assert_eq!(state.turn_count, original_turn_count); @@ -250,7 +250,7 @@ async fn session_restore_round_trip() { #[tokio::test] async fn session_run_with_tool_call() { - let (_dir, store) = make_store().await; + let (_dir, store) = make_store(); let client = MockLlmClient::with_responses(tool_call_events()); let mut worker = Worker::new(client); worker.register_tool(weather_tool_definition()); @@ -263,29 +263,29 @@ async fn session_run_with_tool_call() { history: worker.history(), }, ) - .await + .unwrap(); let mut head_hash = Some(head_hash); let (_worker, _) = run_and_persist(worker, &store, sid, &mut head_hash, "What's the weather?").await; - let entries = store.read_all(sid).await.unwrap(); + let entries = store.read_all(sid).unwrap(); let has_tool_results = entries .iter() - .any(|e| matches!(&e.entry, LogEntry::ToolResults { .. })); - assert!(has_tool_results, "should have ToolResults entry"); + .any(|e| matches!(&e.entry, LogEntry::ToolResult { .. })); + assert!(has_tool_results, "should have ToolResult entry"); let has_assistant = entries .iter() - .any(|e| matches!(&e.entry, LogEntry::AssistantItems { .. })); - assert!(has_assistant, "should have AssistantItems entry"); + .any(|e| matches!(&e.entry, LogEntry::AssistantItem { .. })); + assert!(has_assistant, "should have AssistantItem entry"); } #[tokio::test] async fn session_resume_after_pause() { - let (_dir, store) = make_store().await; + let (_dir, store) = make_store(); // First run: tool call with pause policy → Paused let client = MockLlmClient::with_responses(tool_call_events()); @@ -301,7 +301,7 @@ async fn session_resume_after_pause() { history: worker.history(), }, ) - .await + .unwrap(); let mut head_hash = Some(head_hash); @@ -309,7 +309,7 @@ async fn session_resume_after_pause() { assert!(matches!(result, llm_worker::WorkerResult::Paused)); // Check RunCompleted is Paused - let entries = store.read_all(sid).await.unwrap(); + let entries = store.read_all(sid).unwrap(); let has_paused = entries.iter().any(|e| { matches!( &e.entry, @@ -322,13 +322,13 @@ async fn session_resume_after_pause() { assert!(has_paused, "should have Paused outcome"); // Restore state and verify - let state = session_store::restore(&store, sid).await.unwrap(); + let state = session_store::restore(&store, sid).unwrap(); assert!(state.last_run_interrupted); } #[tokio::test] async fn session_fork_preserves_state() { - let (_dir, store) = make_store().await; + let (_dir, store) = make_store(); let client = MockLlmClient::new(simple_text_events()); let mut worker = Worker::new(client); worker.set_system_prompt("System prompt"); @@ -341,7 +341,7 @@ async fn session_fork_preserves_state() { history: worker.history(), }, ) - .await + .unwrap(); let mut head_hash = Some(head_hash); @@ -356,11 +356,11 @@ async fn session_fork_preserves_state() { history: worker.history(), }, ) - .await + .unwrap(); // Fork should have a SessionStart with the current history - let fork_entries = store.read_all(fork_id).await.unwrap(); + let fork_entries = store.read_all(fork_id).unwrap(); assert_eq!(fork_entries.len(), 1); assert!(matches!( &fork_entries[0].entry, @@ -374,7 +374,7 @@ async fn session_fork_preserves_state() { #[tokio::test] async fn session_fork_at_truncates() { - let (_dir, store) = make_store().await; + let (_dir, store) = make_store(); let client = MockLlmClient::new(simple_text_events()); let worker = Worker::new(client); @@ -386,20 +386,20 @@ async fn session_fork_at_truncates() { history: worker.history(), }, ) - .await + .unwrap(); let mut head_hash = Some(head_hash); let (_worker, _) = run_and_persist(worker, &store, sid, &mut head_hash, "Hello").await; - let all_entries = store.read_all(sid).await.unwrap(); + let all_entries = store.read_all(sid).unwrap(); assert!(all_entries.len() > 2); // Fork at the hash of the 2nd entry (SessionStart + UserInput) let at_hash = &all_entries[1].hash; - let fork_id = session_store::fork_at(&store, sid, at_hash).await.unwrap(); + let fork_id = session_store::fork_at(&store, sid, at_hash).unwrap(); - let fork_entries = store.read_all(fork_id).await.unwrap(); + let fork_entries = store.read_all(fork_id).unwrap(); assert_eq!(fork_entries.len(), 1); // Just the new SessionStart let fork_state = collect_state(&fork_entries); @@ -413,7 +413,7 @@ async fn session_fork_at_truncates() { #[tokio::test] async fn session_config_changed_logged() { - let (_dir, store) = make_store().await; + let (_dir, store) = make_store(); let client = MockLlmClient::new(vec![]); let mut worker = Worker::new(client); @@ -425,7 +425,7 @@ async fn session_config_changed_logged() { history: worker.history(), }, ) - .await + .unwrap(); let mut head_hash = Some(head_hash); @@ -433,10 +433,10 @@ async fn session_config_changed_logged() { let new_config = RequestConfig::default().with_temperature(0.7); worker.set_request_config(new_config.clone()); session_store::save_config_changed(&store, sid, &mut head_hash, &new_config) - .await + .unwrap(); - let entries = store.read_all(sid).await.unwrap(); + let entries = store.read_all(sid).unwrap(); let has_config_changed = entries.iter().any(|e| { matches!( &e.entry, @@ -448,7 +448,7 @@ async fn session_config_changed_logged() { #[tokio::test] async fn session_auto_forks_on_conflict() { - let (_dir, store) = make_store().await; + let (_dir, store) = make_store(); // Create a session let client_a = MockLlmClient::new(simple_text_events()); @@ -462,7 +462,7 @@ async fn session_auto_forks_on_conflict() { history: worker_a.history(), }, ) - .await + .unwrap(); let mut session_id = original_sid; let mut head_hash = Some(head_hash); @@ -472,14 +472,14 @@ async fn session_auto_forks_on_conflict() { ts: 9999, segments: vec![protocol::Segment::text("Interloper")], }; - let current_head = store.read_head_hash(original_sid).await.unwrap(); + let current_head = store.read_head_hash(original_sid).unwrap(); let hash = session_store::compute_hash(current_head.as_ref(), &extra_entry); let hashed = session_store::HashedEntry { hash, prev_hash: current_head, entry: extra_entry, }; - store.append(original_sid, &hashed).await.unwrap(); + store.append(original_sid, &hashed).unwrap(); // Now head_hash is stale — ensure_head_or_fork should auto-fork session_store::ensure_head_or_fork( @@ -492,18 +492,18 @@ async fn session_auto_forks_on_conflict() { history: worker_a.history(), }, ) - .await + .unwrap(); // session_id should now be different assert_ne!(session_id, original_sid); // The fork session should exist and have entries - let fork_entries = store.read_all(session_id).await.unwrap(); + let fork_entries = store.read_all(session_id).unwrap(); assert!(!fork_entries.is_empty()); // Original session should still have the interloper entry - let original_entries = store.read_all(original_sid).await.unwrap(); + let original_entries = store.read_all(original_sid).unwrap(); let has_interloper = original_entries .iter() .any(|e| matches!(&e.entry, LogEntry::UserInput { .. })); diff --git a/crates/tui/src/picker.rs b/crates/tui/src/picker.rs index f020921c..5839af48 100644 --- a/crates/tui/src/picker.rs +++ b/crates/tui/src/picker.rs @@ -80,14 +80,14 @@ struct Row { } pub async fn run() -> Result { - let store = open_default_store().await?; - let ids = store.list_sessions().await?; + let store = open_default_store()?; + let ids = store.list_sessions()?; if ids.is_empty() { return Err(PickerError::NoSessions); } let mut rows: Vec = Vec::with_capacity(MAX_ROWS); for id in ids.into_iter().take(MAX_ROWS) { - let preview = build_preview(&store, id).await; + let preview = build_preview(&store, id); // Best-effort live check. A pods.json I/O hiccup downgrades // the row to "no badge" rather than killing the picker — the // user still gets to see the listing. @@ -149,7 +149,7 @@ fn close_viewport(terminal: &mut Terminal>) -> io:: Ok(()) } -async fn open_default_store() -> Result { +fn open_default_store() -> Result { let dir = manifest::paths::sessions_dir().ok_or_else(|| { PickerError::Io(io::Error::new( io::ErrorKind::NotFound, @@ -157,11 +157,11 @@ async fn open_default_store() -> Result { (set INSOMNIA_HOME, INSOMNIA_DATA_DIR, or HOME)", )) })?; - Ok(FsStore::new(&dir).await?) + Ok(FsStore::new(&dir)?) } -async fn build_preview(store: &FsStore, id: SessionId) -> String { - match store.read_all(id).await { +fn build_preview(store: &FsStore, id: SessionId) -> String { + match store.read_all(id) { Ok(entries) => last_message_preview(&entries).unwrap_or_else(|| "[empty]".to_string()), Err(_) => "[corrupt]".to_string(), } diff --git a/crates/tui/src/spawn.rs b/crates/tui/src/spawn.rs index 41aa67ad..0654c7f8 100644 --- a/crates/tui/src/spawn.rs +++ b/crates/tui/src/spawn.rs @@ -328,8 +328,8 @@ async fn load_resume_scope(session_id: SessionId) -> Result