From 0f76142993be9832d2805d01f63f62543a48ddad Mon Sep 17 00:00:00 2001 From: Hare Date: Thu, 14 May 2026 03:27:49 +0900 Subject: [PATCH] =?UTF-8?q?refactor:=20Pod=E3=81=AE=E3=83=A1=E3=82=A4?= =?UTF-8?q?=E3=83=B3=E3=83=AB=E3=83=BC=E3=83=97=E3=81=AE=E3=83=AA=E3=83=95?= =?UTF-8?q?=E3=82=A1=E3=82=AF=E3=82=BF=E3=83=AA=E3=83=B3=E3=82=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Cargo.lock | 1 + TODO.md | 1 - crates/llm-worker/src/worker.rs | 19 +- crates/pod/examples/pod_protocol.rs | 5 +- crates/pod/src/controller.rs | 153 +++++-- crates/pod/src/ipc/server.rs | 91 ++--- crates/pod/src/lib.rs | 2 + crates/pod/src/pod.rs | 503 ++++++++++++++++-------- crates/pod/src/runtime/dir.rs | 18 - crates/pod/src/session_log_sink.rs | 474 ++++++++++++++++++++++ crates/pod/src/shared_state.rs | 84 +--- crates/pod/src/spawn/comm_tools.rs | 63 +-- crates/pod/tests/compact_events_test.rs | 54 ++- crates/pod/tests/controller_test.rs | 160 ++++++-- crates/pod/tests/pod_comm_tools_test.rs | 59 ++- crates/protocol/src/lib.rs | 95 +++-- crates/session-store/src/lib.rs | 8 +- crates/session-store/src/session.rs | 27 +- crates/tui/Cargo.toml | 1 + crates/tui/src/app.rs | 197 +++++++--- crates/tui/src/main.rs | 5 +- tickets/pod-state-from-session-log.md | 55 --- 22 files changed, 1463 insertions(+), 612 deletions(-) create mode 100644 crates/pod/src/session_log_sink.rs delete mode 100644 tickets/pod-state-from-session-log.md diff --git a/Cargo.lock b/Cargo.lock index 08327049..14877057 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3645,6 +3645,7 @@ version = "0.1.0" dependencies = [ "client", "crossterm 0.28.1", + "llm-worker", "manifest", "pod-registry", "protocol", diff --git a/TODO.md b/TODO.md index b687f91e..0b538aa6 100644 --- a/TODO.md +++ b/TODO.md @@ -8,7 +8,6 @@ - Pod: 任意ターンからの Fork(複数ターン巻き戻しを汎用化) → [tickets/pod-session-fork.md](tickets/pod-session-fork.md) - Pod: 子→親の TurnEnded/Errored callback を親由来ターンのみに絞る → [tickets/pod-parent-turn-callback.md](tickets/pod-parent-turn-callback.md) - Pod: セッションログをバックエンドにした Pod 単位の永続化 → [tickets/pod-persistent-state.md](tickets/pod-persistent-state.md) -- Pod: 状態と socket 配信を session log 正本に統合 → [tickets/pod-state-from-session-log.md](tickets/pod-state-from-session-log.md) - 永続化層のセマンティック整理 → [tickets/persistence-semantics.md](tickets/persistence-semantics.md) - Exchange / Turn / Call セマンティクス整理 → [tickets/exchange-turn-call-semantics.md](tickets/exchange-turn-call-semantics.md) - llm-worker のエラー耐性 diff --git a/crates/llm-worker/src/worker.rs b/crates/llm-worker/src/worker.rs index 8c98c210..a8602e13 100644 --- a/crates/llm-worker/src/worker.rs +++ b/crates/llm-worker/src/worker.rs @@ -1013,13 +1013,16 @@ impl Worker { } self.turn_count += 1; - // Collect and commit assistant items + // Collect and commit assistant items. Routed through + // `extend_history_with_callbacks` so observers (e.g. the + // Pod-side per-item session-log committer) see each item + // as it lands. let reasoning_items = self.reasoning_item_collector.take_collected(); let text_blocks = self.text_block_collector.take_collected(); let tool_calls = self.tool_call_collector.take_collected(); let assistant_items = self.build_assistant_items(&reasoning_items, &text_blocks, &tool_calls); - self.history.extend(assistant_items); + self.extend_history_with_callbacks(assistant_items); if tool_calls.is_empty() { match self.interceptor.on_turn_end(&self.history).await { @@ -1134,14 +1137,18 @@ impl Worker { Ok(Some(WorkerResult::Paused)) } Ok(ToolExecutionResult::Completed(results)) => { - for result in results { - self.history.push(Item::tool_result_item( + // Route per-result pushes through the callback path so + // observers (e.g. the Pod-side per-item session-log + // committer) see each tool result as it lands. + let items = results.into_iter().map(|result| { + Item::tool_result_item( &result.tool_use_id, &result.summary, result.content, result.is_error, - )); - } + ) + }); + self.extend_history_with_callbacks(items); Ok(None) } Err(err) => { diff --git a/crates/pod/examples/pod_protocol.rs b/crates/pod/examples/pod_protocol.rs index 6e4a1801..3039a09b 100644 --- a/crates/pod/examples/pod_protocol.rs +++ b/crates/pod/examples/pod_protocol.rs @@ -104,10 +104,7 @@ async fn main() -> Result<(), Box> { "\n[shared_state] final: {}", handle.shared_state.status_json() ); - println!( - "[history] {} bytes", - handle.shared_state.history_json().len() - ); + println!("[session log] {} entries", handle.sink.len()); drop(handle); let _ = listener.await; diff --git a/crates/pod/src/controller.rs b/crates/pod/src/controller.rs index 07e39061..1cf822c5 100644 --- a/crates/pod/src/controller.rs +++ b/crates/pod/src/controller.rs @@ -3,15 +3,19 @@ use std::sync::Arc; use llm_worker::WorkerError; use llm_worker::llm_client::client::LlmClient; -use llm_worker::llm_client::types::{Item, Role}; use session_store::Store; use tokio::sync::{broadcast, mpsc, oneshot}; +use llm_worker::Item; +use session_store::LogEntry; +use session_store::session_log; + use crate::ipc::alerter::Alerter; use crate::ipc::notify_buffer::NotifyBuffer; use crate::ipc::server::SocketServer; -use crate::pod::{Pod, PodError, PodRunResult}; +use crate::pod::{LogCommand, LogDrainHandle, Pod, PodError, PodRunResult}; use crate::runtime::dir::RuntimeDir; +use crate::session_log_sink::SessionLogSink; use crate::shared_state::PodSharedState; use crate::spawn::comm_tools::{ list_pods_tool, read_pod_output_tool, send_to_pod_tool, stop_pod_tool, @@ -22,16 +26,6 @@ use protocol::{ AlertLevel, AlertSource, ErrorCode, Event, Method, PodStatus, RunResult, Segment, TurnResult, }; -fn is_system_message_item(item: &Item) -> bool { - matches!( - item, - Item::Message { - role: Role::System, - .. - } - ) -} - // --------------------------------------------------------------------------- // PodHandle — client-facing, Clone-able // --------------------------------------------------------------------------- @@ -43,6 +37,10 @@ pub struct PodHandle { pub shared_state: Arc, pub runtime_dir: Arc, pub alerter: Alerter, + /// Session-log mirror + broadcast handle. The IPC server snapshots + /// it on every new connection (Event::Snapshot) and forwards + /// subsequent commits (Event::Entry) on the receiver. + pub sink: SessionLogSink, } impl PodHandle { @@ -86,11 +84,11 @@ async fn finish_controller_run( C: LlmClient + Clone + 'static, St: Store + Clone + 'static, { - let items = pod.worker().history().to_vec(); - shared_state.update_history(items); - shared_state.set_user_segments(pod.user_segments().to_vec()); + // history / user_segments are no longer mirrored on PodSharedState — + // clients reconstruct them from `Event::Snapshot` + live + // `Event::Entry` deliveries driven by the session-log sink. We + // only flip the status and kick post-run memory jobs here. set_controller_status(shared_state, runtime_dir, event_tx, new_status).await; - let _ = runtime_dir.write_history(shared_state).await; pod.spawn_post_run_memory_jobs(); } @@ -167,8 +165,23 @@ impl PodController { }]) .map_err(std::io::Error::other)?; + // === 1.5. Per-item history-commit drain task === + // + // Worker callbacks fire `on_history_append` for each assistant + // item / tool result / hook-injected item that lands in + // history. The drain task picks them up off an unbounded mpsc + // and commits each as a typed `LogEntry` through the sink, + // serialised against the same `session_head` lock the Pod uses + // for its own commits. This gives mid-turn snapshot visibility: + // a late-attaching client sees in-flight tool calls + completed + // assistant blocks without waiting for the turn-end persist. + let (log_cmd_tx, log_cmd_rx) = mpsc::unbounded_channel::(); + let drain_ctx = pod.log_drain_handle(); + let _drain_task = tokio::spawn(run_log_drain(log_cmd_rx, drain_ctx)); + pod.attach_log_cmd_tx(log_cmd_tx.clone()); + // === 2. Worker event bridge wiring === - wire_event_bridges_on_worker(&mut pod, &event_tx, &alerter); + wire_event_bridges_on_worker(&mut pod, &event_tx, &alerter, log_cmd_tx); // === 3. Tool registration (builtin / memory / spawn-orchestration) === let fs_for_view = register_pod_tools( @@ -193,8 +206,6 @@ impl PodController { manifest_toml.clone(), greeting, )); - shared_state.update_history(pod.worker().history().to_vec()); - shared_state.set_user_segments(pod.user_segments().to_vec()); shared_state.set_fs_view(crate::fs_view::PodFsView::new(fs_for_view)); shared_state.set_workflows( pod.workflow_completions() @@ -210,7 +221,6 @@ impl PodController { ); runtime_dir.write_manifest(&manifest_toml).await?; runtime_dir.write_status(&shared_state).await?; - runtime_dir.write_history(&shared_state).await?; let handle = PodHandle { method_tx, @@ -218,6 +228,7 @@ impl PodController { shared_state: shared_state.clone(), runtime_dir: runtime_dir.clone(), alerter: alerter.clone(), + sink: pod.sink(), }; let socket_server = SocketServer::start(&handle).await?; @@ -251,16 +262,30 @@ impl PodController { /// Wire the per-event broadcast bridges on the Pod's Worker. Each callback /// re-publishes a worker-level signal as a `protocol::Event` on `event_tx` /// so subscribers (TUI, socket clients) get a single typed stream. +/// +/// Also wires `on_history_append` into the per-item drain channel so +/// every history append observed by the worker becomes a typed +/// `LogEntry` commit (via the drain task). fn wire_event_bridges_on_worker( pod: &mut Pod, event_tx: &broadcast::Sender, alerter: &Alerter, + log_cmd_tx: mpsc::UnboundedSender, ) where C: LlmClient + Clone + 'static, St: Store + Clone + 'static, { let worker = pod.worker_mut(); + // Per-history-append → drain channel. Sends are infallible-by-design + // here (UnboundedSender never blocks); a closed receiver just means + // the controller is shutting down, in which case dropping the item + // is acceptable. + let drain_tx = log_cmd_tx.clone(); + worker.on_history_append(move |item| { + let _ = drain_tx.send(LogCommand::Item(item.clone())); + }); + let tx = event_tx.clone(); worker.on_turn_start(move |turn| { let _ = tx.send(Event::TurnStart { turn }); @@ -365,13 +390,80 @@ fn wire_event_bridges_on_worker( alerter_for_worker.alert(AlertLevel::Warn, AlertSource::Worker, message.to_owned()); }); - let tx = event_tx.clone(); - worker.on_history_append(move |item| { - if is_system_message_item(item) { - let value = serde_json::to_value(item).expect("Item is Serialize"); - let _ = tx.send(Event::SystemMessage { item: value }); + // History-append broadcasts (previously `Event::SystemMessage`) + // have been removed: every persistent history item is now committed + // through the session-log sink as a typed `LogEntry`, and clients + // see it via `Event::Snapshot` + live `Event::Entry`. The + // per-item commit channel is wired at the top of this function. +} + +/// Drain task: consumes `LogCommand::Item` and `LogCommand::Flush` +/// off the channel and commits each item as a typed `LogEntry` through +/// the supplied store + sink. Lives as long as the controller; exits +/// when the sender is dropped (controller shutdown). +async fn run_log_drain( + mut rx: mpsc::UnboundedReceiver, + ctx: LogDrainHandle, +) where + St: session_store::Store + Clone + Send + 'static, +{ + while let Some(cmd) = rx.recv().await { + match cmd { + LogCommand::Item(item) => { + let Some(entry) = classify_history_item(item) else { + continue; + }; + let mut head = ctx.session_head.lock().await; + match session_store::append_entry_with_hash( + &ctx.store, + head.session_id, + &mut head.head_hash, + entry.clone(), + ) + .await + { + Ok(_) => { + // Publish under the same critical section view + // a `subscribe_with_snapshot` would observe. + ctx.sink.publish(entry); + } + Err(e) => { + tracing::warn!(error = %e, "drain: append_entry failed; entry dropped"); + } + } + } + LogCommand::Flush(ack) => { + let _ = ack.send(()); + } } - }); + } +} + +/// Map a single worker-history `Item` to its corresponding `LogEntry` +/// classification. `None` is the skip signal for `user_message` items — +/// those are committed via `LogEntry::UserInput` by `Pod::run` at +/// submit time and would otherwise produce a duplicate entry here. +fn classify_history_item(item: Item) -> Option { + let ts = session_log::now_millis(); + if item.is_user_message() { + return None; + } + if item.is_tool_result() { + return Some(LogEntry::ToolResults { + ts, + items: vec![session_store::LoggedItem::from(&item)], + }); + } + if item.is_assistant_message() || item.is_tool_call() || item.is_reasoning() { + return Some(LogEntry::AssistantItems { + ts, + items: vec![session_store::LoggedItem::from(&item)], + }); + } + Some(LogEntry::HookInjectedItems { + ts, + items: vec![session_store::LoggedItem::from(&item)], + }) } /// Register the builtin file-manipulation tools, optional memory tools, @@ -656,10 +748,9 @@ async fn controller_loop( break; } - // GetHistory / ListCompletions are handled at the socket - // layer (direct response). If they reach the controller, - // ignore them. - Method::GetHistory | Method::ListCompletions { .. } => {} + // ListCompletions is handled at the socket layer (direct + // response). If it reaches the controller, ignore it. + Method::ListCompletions { .. } => {} Method::PodEvent(event) => { // Echo the received event to all subscribers so every @@ -820,7 +911,7 @@ where // drain it at its next pre_llm_request. notify_buffer.push(message); } - Some(Method::GetHistory | Method::ListCompletions { .. }) => {} + Some(Method::ListCompletions { .. }) => {} Some(Method::PodEvent(event)) => { let _ = event_tx.send(Event::PodEvent(event.clone())); // mpsc is consume-once, so we cannot defer this diff --git a/crates/pod/src/ipc/server.rs b/crates/pod/src/ipc/server.rs index 1a52b364..664be199 100644 --- a/crates/pod/src/ipc/server.rs +++ b/crates/pod/src/ipc/server.rs @@ -62,6 +62,13 @@ async fn handle_connection(stream: tokio::net::UnixStream, handle: PodHandle) { let mut reader = JsonLineReader::new(reader); let mut writer = JsonLineWriter::new(writer); + // Atomically subscribe to the session-log mirror first. The + // returned (snapshot, rx) pair partitions the entry timeline: + // entries committed before this call appear in `entries`, every + // entry after lands on `entry_rx`. Doing this before the alert + // snapshot keeps both ordering pairs internally consistent. + let (entries_snapshot, mut entry_rx) = handle.sink.subscribe_with_snapshot(); + // Atomically subscribe and snapshot buffered alerts so that // warnings emitted before this client connected are replayed // exactly once — they appear in the snapshot, and any alert @@ -73,8 +80,41 @@ async fn handle_connection(stream: tokio::net::UnixStream, handle: PodHandle) { } } + // Send the typed snapshot up front so late attachers can + // reconstruct view state without an extra round trip. + let snapshot_event = Event::Snapshot { + entries: entries_snapshot + .into_iter() + .map(|e| serde_json::to_value(&e).expect("LogEntry is Serialize")) + .collect(), + greeting: handle.shared_state.greeting.clone(), + status: handle.shared_state.get_status(), + }; + if writer.write(&snapshot_event).await.is_err() { + return; + } + loop { tokio::select! { + // Live session-log entries → this client as Event::Entry. + entry = entry_rx.recv() => { + match entry { + Ok(entry) => { + let value = serde_json::to_value(&entry) + .expect("LogEntry is Serialize"); + if writer.write(&Event::Entry { entry: value }).await.is_err() { + break; + } + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => { + // Slow client fell behind the broadcast buffer. + // Drop the connection so the next reconnect + // re-seeds the prefix via subscribe_with_snapshot. + break; + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => break, + } + } // Broadcast events → this client event = rx.recv() => { match event { @@ -129,57 +169,6 @@ async fn handle_connection(stream: tokio::net::UnixStream, handle: PodHandle) { break; } } - Ok(Some(Method::GetHistory)) => { - let items = handle.shared_state.history(); - let segments_per_user = handle.shared_state.user_segments(); - // Embed `segments` on user-message JSON values so - // the TUI can re-render typed atoms on restore. - // Alignment: segments are recorded only for - // submissions made during the live session, never - // for seed history loaded via `SessionStart.history` - // (post-compaction). The seed user_messages always - // come first in worker history, so the last - // `segments_per_user.len()` user_messages are the - // ones that map 1:1 to the segments list. - let total_user_msgs = - items.iter().filter(|i| i.is_user_message()).count(); - let skip = total_user_msgs.saturating_sub(segments_per_user.len()); - let mut user_idx = 0usize; - let values = items - .iter() - .map(|item| { - let mut value = - serde_json::to_value(item).expect("Item is Serialize"); - if item.is_user_message() { - if user_idx >= skip { - let seg_idx = user_idx - skip; - if let Some(obj) = value.as_object_mut() { - let segs = serde_json::to_value( - &segments_per_user[seg_idx], - ) - .expect("Segment is Serialize"); - obj.insert("segments".into(), segs); - } - } - user_idx += 1; - } - value - }) - .collect(); - let greeting = handle.shared_state.greeting.clone(); - let status = handle.shared_state.get_status(); - if writer - .write(&Event::History { - items: values, - greeting, - status, - }) - .await - .is_err() - { - break; - } - } Ok(Some(method)) => { let _ = handle.send(method).await; } diff --git a/crates/pod/src/lib.rs b/crates/pod/src/lib.rs index c9b030a6..17c44114 100644 --- a/crates/pod/src/lib.rs +++ b/crates/pod/src/lib.rs @@ -5,6 +5,7 @@ pub mod hook; pub mod ipc; pub mod prompt; pub mod runtime; +pub mod session_log_sink; pub mod shared_state; pub mod spawn; pub mod workflow; @@ -30,4 +31,5 @@ pub use prompt::system::{SystemPromptContext, SystemPromptError, SystemPromptTem pub use protocol::{ErrorCode, Event, Method, PodStatus, TurnResult}; pub use provider::{ProviderError, build_client}; pub use runtime::dir::RuntimeDir; +pub use session_log_sink::{SessionLogSink, SessionLogWriter}; pub use shared_state::PodSharedState; diff --git a/crates/pod/src/pod.rs b/crates/pod/src/pod.rs index 351a327d..653276e7 100644 --- a/crates/pod/src/pod.rs +++ b/crates/pod/src/pod.rs @@ -7,10 +7,40 @@ use llm_worker::Item; use llm_worker::llm_client::RequestConfig; use llm_worker::llm_client::client::LlmClient; use llm_worker::state::Mutable; -use llm_worker::{Role, ToolOutputLimits, UsageRecord, Worker, WorkerError, WorkerResult}; -use session_store::{EntryHash, PodScopeSnapshot, SessionId, SessionStartState, Store, StoreError}; +use llm_worker::{ToolOutputLimits, UsageRecord, Worker, WorkerError, WorkerResult}; +use session_store::{ + EntryHash, HashedEntry, LogEntry, PodScopeSnapshot, SessionId, Store, StoreError, session_log, + to_logged, +}; use tracing::{info, warn}; +use crate::session_log_sink::SessionLogSink; + +/// Command sent to the per-Pod history-drain task. +/// +/// `Item` carries one worker-history append observed via +/// `Worker::on_history_append`; the drain classifies it into a +/// `LogEntry::AssistantItems` / `LogEntry::ToolResults` / +/// `LogEntry::HookInjectedItems` and commits it through the sink. +/// `Flush(ack)` is the barrier used by `persist_turn` to ensure every +/// in-flight item is committed before the trailing `TurnEnd` entry +/// lands. +#[derive(Debug)] +pub enum LogCommand { + Item(Item), + Flush(tokio::sync::oneshot::Sender<()>), +} + +/// State shared between Pod and the controller-spawned history-drain +/// task: store + session-head lock + broadcast sink. All three are +/// `Clone`able (the latter two as `Arc` clones, the store per its +/// `Clone` impl) so handing a copy to the drain task is cheap. +pub struct LogDrainHandle { + pub store: St, + pub session_head: Arc>, + pub sink: SessionLogSink, +} + use manifest::{ Permission, PodManifest, PodManifestConfig, ResolveError, Scope, ScopeConfig, ScopeError, ScopeRule, SharedScope, WorkerManifest, @@ -38,9 +68,9 @@ use protocol::{AlertLevel, AlertSource, Event, Segment}; use tokio::sync::broadcast; use tokio::task::JoinHandle; -struct SessionHead { - session_id: SessionId, - head_hash: Option, +pub struct SessionHead { + pub session_id: SessionId, + pub head_hash: Option, } /// Pre-LLM-request hook that records `history.len()` at send time into a @@ -190,9 +220,22 @@ pub struct Pod { /// the K-th `Item::user_message` in `worker.history()` (modulo seed /// history loaded via `SessionStart.history`, whose original segments /// are not preserved). Populated from log on `restore_from_manifest`, - /// appended after `save_user_input` on each `run`. Mirrored to - /// `PodSharedState` by the controller for `Event::History` use. + /// appended after `save_user_input` on each `run`. Pre-`Event::Snapshot` + /// this fed `PodSharedState.user_segments`; the new wire format + /// carries typed atoms via `LogEntry::UserInput { segments }` so + /// this remains purely an in-memory tracker for compact alignment. user_segments: Vec>, + /// Pod-side session-log mirror + broadcast sink. Populated alongside + /// every successful `session_store::append_entry` write so connected + /// clients see a `(snapshot, live)` stream consistent with what's + /// on disk. + sink: SessionLogSink, + /// Sender into the controller-spawned history-drain task. + /// `None` when no controller has wired one (tests, low-level Pod + /// usage). The drain task is the source of mid-turn `AssistantItems` + /// / `ToolResults` / `HookInjectedItems` commits, fed by the + /// `Worker::on_history_append` callback. + log_cmd_tx: Option>, } impl Pod { @@ -249,6 +292,22 @@ impl Pod { extract_pointer: self.extract_pointer.clone(), memory_task: None, user_segments: self.user_segments.clone(), + // The memory-task clone never appends to the session log + // (it only reads `worker.history()`), so a fresh sink is + // fine — nothing observes its broadcast. + sink: SessionLogSink::new(), + log_cmd_tx: None, + } + } + + /// Build a `LogDrainHandle` carrying everything the controller's + /// drain task needs: store handle, the shared session-head lock, + /// and the broadcast sink. All three are cheap clones. + pub fn log_drain_handle(&self) -> LogDrainHandle { + LogDrainHandle { + store: self.store.clone(), + session_head: self.session_head.clone(), + sink: self.sink.clone(), } } @@ -332,6 +391,8 @@ impl Pod { extract_pointer: Arc::new(Mutex::new(None)), memory_task: None, user_segments: Vec::new(), + sink: SessionLogSink::new(), + log_cmd_tx: None, }; pod.apply_permissions_from_manifest(); pod.apply_prune_from_manifest(); @@ -426,8 +487,7 @@ impl Pod { /// can restore the narrowed scope instead of reclaiming delegated /// writes. pub async fn persist_scope_snapshot(&mut self) -> Result<(), StoreError> { - let mut head = self.session_head.lock().await; - if head.head_hash.is_none() { + if self.session_head.lock().await.head_hash.is_none() { return Ok(()); } let snapshot = { @@ -437,8 +497,50 @@ impl Pod { deny: scope.deny_rules(), } }; - session_store::save_pod_scope(&self.store, head.session_id, &mut head.head_hash, &snapshot) - .await + let payload = serde_json::to_value(&snapshot).expect("PodScopeSnapshot is Serialize"); + self.commit_entry(LogEntry::Extension { + ts: session_log::now_millis(), + domain: session_store::POD_SCOPE_EXTENSION_DOMAIN.into(), + payload, + }) + .await + .map(|_| ()) + } + + /// Append `entry` to the session log AND publish it through the + /// broadcast sink. Holds the session-head async lock across the + /// disk write and the sink publish so subscribers see a gap-free + /// `(snapshot, live)` stream consistent with what's on disk. + pub(crate) async fn commit_entry( + &self, + entry: LogEntry, + ) -> Result { + let mut head = self.session_head.lock().await; + let hash = session_store::append_entry_with_hash( + &self.store, + head.session_id, + &mut head.head_hash, + entry.clone(), + ) + .await?; + self.sink.publish(entry); + Ok(hash) + } + + /// Cloneable sink handle. Exposed to the controller so the IPC + /// layer can `subscribe_with_snapshot` and stream entries to + /// clients without consulting any other state. + pub fn sink(&self) -> SessionLogSink { + self.sink.clone() + } + + /// Wire a history-drain task. The controller calls this once per + /// Pod after the drain task is spawned; the matching mpsc receiver + /// drives per-item commits of assistant items / tool results / + /// hook-injected items committed by the worker via + /// `Worker::on_history_append`. + pub fn attach_log_cmd_tx(&mut self, tx: tokio::sync::mpsc::UnboundedSender) { + self.log_cmd_tx = Some(tx); } /// Cloneable callback handed to dynamic-scope tools. It cannot append @@ -459,13 +561,12 @@ impl Pod { .expect("pending_scope_snapshot poisoned") .take(); if let Some(snapshot) = snapshot { - let mut head = self.session_head.lock().await; - session_store::save_pod_scope( - &self.store, - head.session_id, - &mut head.head_hash, - &snapshot, - ) + let payload = serde_json::to_value(&snapshot).expect("PodScopeSnapshot is Serialize"); + self.commit_entry(LogEntry::Extension { + ts: session_log::now_millis(), + domain: session_store::POD_SCOPE_EXTENSION_DOMAIN.into(), + payload, + }) .await?; } Ok(()) @@ -629,15 +730,13 @@ impl Pod { /// (the entry is dropped) and a `Warn` alert + `tracing::warn!` are /// emitted so the failure isn't completely silent. async fn try_record_metric(&mut self, metric: &session_metrics::Metric) { - let mut head = self.session_head.lock().await; - if let Err(err) = session_metrics::record_metric( - &self.store, - head.session_id, - &mut head.head_hash, - metric, - ) - .await - { + let payload = serde_json::to_value(metric).expect("Metric is Serialize"); + let entry = LogEntry::Extension { + ts: session_log::now_millis(), + domain: session_metrics::DOMAIN.into(), + payload, + }; + if let Err(err) = self.commit_entry(entry).await { warn!(name = %metric.name, error = %err, "failed to record session metric; dropping"); self.alert( AlertLevel::Warn, @@ -656,20 +755,6 @@ impl Pod { } } - fn broadcast_system_message_item(&self, item: &Item) { - if !matches!( - item, - Item::Message { - role: Role::System, - .. - } - ) { - return; - } - let value = serde_json::to_value(item).expect("Item is Serialize"); - self.send_event(Event::SystemMessage { item: value }); - } - /// Push a `Method::Notify` (or rendered `Method::PodEvent`) entry /// onto the pending buffer. /// @@ -975,17 +1060,12 @@ impl Pod { // Persist the user input as typed segments before the worker // pushes its flattened copy into history. save_delta deliberately // skips the resulting `is_user_message()` item to avoid double-write. - { - let mut head = self.session_head.lock().await; - self.session_id = head.session_id; - session_store::save_user_input( - &self.store, - head.session_id, - &mut head.head_hash, - input.clone(), - ) - .await?; - } + self.session_id = self.session_head.lock().await.session_id; + self.commit_entry(LogEntry::UserInput { + ts: session_log::now_millis(), + segments: input.clone(), + }) + .await?; self.user_segments.push(input.clone()); // Resolve `@` refs, `#` Knowledge refs, and `/` @@ -1330,34 +1410,64 @@ impl Pod { /// another writer has advanced the store head behind our back. async fn ensure_session_head(&mut self) -> Result<(), PodError> { let w = self.worker.as_ref().unwrap(); - let state = SessionStartState { - system_prompt: w.get_system_prompt(), - config: w.request_config(), - history: w.history(), + let prev_session_id; + let initial_state = { + let head = self.session_head.lock().await; + prev_session_id = head.session_id; + head.head_hash.is_none() }; - let mut head = self.session_head.lock().await; - if head.head_hash.is_none() { - let hash = - session_store::create_session_with_id(&self.store, head.session_id, state).await?; - head.head_hash = Some(hash); - drop(head); + if initial_state { + let initial = LogEntry::SessionStart { + ts: session_log::now_millis(), + system_prompt: w.get_system_prompt().map(String::from), + config: w.request_config().clone(), + history: to_logged(w.history()), + forked_from: None, + compacted_from: None, + }; + self.commit_entry(initial).await?; self.persist_scope_snapshot().await?; return Ok(()); } - let prev_session_id = head.session_id; - let mut session_id = head.session_id; - let mut head_hash = head.head_hash.clone(); - session_store::ensure_head_or_fork(&self.store, &mut session_id, &mut head_hash, state) - .await?; - head.session_id = session_id; - head.head_hash = head_hash; - self.session_id = session_id; - // ensure_head_or_fork mints a fresh session_id when it auto- - // forks. Sync that to pods.json so a concurrent - // restore_from_manifest can't see "no live writer" for the new - // session and grab it. - if session_id != prev_session_id && self.scope_allocation.is_some() { - pod_registry::update_session(&self.manifest.pod.name, session_id)?; + // Check store head + auto-fork if it drifted. + let store_head = self + .store + .read_head_hash(prev_session_id) + .await + .map_err(PodError::from)?; + let mut head = self.session_head.lock().await; + if store_head == head.head_hash { + return Ok(()); + } + // Fork: mint a fresh session and switch to it. The new + // SessionStart entry replaces the mirror and is broadcast + // through the sink so existing subscribers reset their view. + let fork_id = session_store::new_session_id(); + let entry = LogEntry::SessionStart { + ts: session_log::now_millis(), + system_prompt: w.get_system_prompt().map(String::from), + config: w.request_config().clone(), + history: to_logged(w.history()), + forked_from: None, + compacted_from: None, + }; + let hash = session_log::compute_hash(None, &entry); + let hashed = HashedEntry { + hash: hash.clone(), + prev_hash: None, + entry: entry.clone(), + }; + self.store + .create_session(fork_id, &[hashed]) + .await + .map_err(PodError::from)?; + head.session_id = fork_id; + head.head_hash = Some(hash); + self.session_id = fork_id; + self.sink.reset_with_initial(entry); + drop(head); + if self.scope_allocation.is_some() { + pod_registry::update_session(&self.manifest.pod.name, fork_id)?; } Ok(()) } @@ -1493,28 +1603,84 @@ impl Pod { history_before: usize, result: &Result, ) -> Result<(), StoreError> { - // Use direct field access for split borrows (worker immutable, - // head_hash mutable). - let w = self.worker.as_ref().unwrap(); - let new_items = &w.history()[history_before..]; - let mut head = self.session_head.lock().await; - self.session_id = head.session_id; - session_store::save_delta(&self.store, head.session_id, &mut head.head_hash, new_items) - .await?; + // Per-item commits for AssistantItems / ToolResults / + // HookInjectedItems already landed mid-turn through the + // controller-spawned drain task, fed by + // `Worker::on_history_append`. Drain the queue here so every + // in-flight item has actually been committed before the + // trailing `TurnEnd` entry. When no drain is wired (low-level + // tests / direct `Pod::new` usage) we fall back to a synchronous + // pass that replicates the legacy `save_delta` classification — + // those code paths don't fire `on_history_append`, so the items + // would otherwise be lost. + let _ = history_before; // referenced only by the fallback below. + self.session_id = self.session_head.lock().await.session_id; + if let Some(tx) = self.log_cmd_tx.as_ref() { + let (ack_tx, ack_rx) = tokio::sync::oneshot::channel(); + if tx.send(LogCommand::Flush(ack_tx)).is_ok() { + let _ = ack_rx.await; + } + } else { + // Fallback path for tests / Pod::new: classify and commit + // the post-`history_before` slice inline, matching the old + // `save_delta` shape. + let new_items: Vec = self.worker.as_ref().unwrap().history()[history_before..] + .iter() + .cloned() + .collect(); + let ts = session_log::now_millis(); + let mut i = 0; + while i < new_items.len() { + let item = &new_items[i]; + if item.is_user_message() { + i += 1; + } else if item.is_tool_result() { + let start = i; + while i < new_items.len() && new_items[i].is_tool_result() { + i += 1; + } + let items = new_items[start..i] + .iter() + .map(session_store::LoggedItem::from) + .collect(); + self.commit_entry(LogEntry::ToolResults { ts, items }).await?; + } else if item.is_assistant_message() + || item.is_tool_call() + || item.is_reasoning() + { + let start = i; + while i < new_items.len() + && (new_items[i].is_assistant_message() + || new_items[i].is_tool_call() + || new_items[i].is_reasoning()) + { + i += 1; + } + let items = new_items[start..i] + .iter() + .map(session_store::LoggedItem::from) + .collect(); + self.commit_entry(LogEntry::AssistantItems { ts, items }) + .await?; + } else { + self.commit_entry(LogEntry::HookInjectedItems { + ts, + items: vec![session_store::LoggedItem::from(&new_items[i])], + }) + .await?; + i += 1; + } + } + } - drop(head); self.flush_pending_scope_snapshot().await?; let turn_count = self.worker.as_ref().unwrap().turn_count(); - let mut head = self.session_head.lock().await; - session_store::save_turn_end( - &self.store, - head.session_id, - &mut head.head_hash, + self.commit_entry(LogEntry::TurnEnd { + ts: session_log::now_millis(), turn_count, - ) + }) .await?; - drop(head); // Flush any sync-buffered metrics from this run first // (currently `prune.fire` / `prune.skip` from the prune observer). @@ -1547,19 +1713,15 @@ impl Pod { record, correlation_id, } = recorded; - let mut head = self.session_head.lock().await; - session_store::save_usage( - &self.store, - head.session_id, - &mut head.head_hash, - record.history_len, - record.input_total_tokens, - record.cache_read_tokens, - record.cache_write_tokens, - record.output_tokens, - ) + self.commit_entry(LogEntry::LlmUsage { + ts: session_log::now_millis(), + history_len: record.history_len, + input_total_tokens: record.input_total_tokens, + cache_read_tokens: record.cache_read_tokens, + cache_write_tokens: record.cache_write_tokens, + output_tokens: record.output_tokens, + }) .await?; - drop(head); if let Some(id) = correlation_id { let metric = session_metrics::Metric::now("prune.post_request") .with_correlation_id(&id) @@ -1577,25 +1739,19 @@ impl Pod { let interrupted = self.worker.as_ref().unwrap().last_run_interrupted(); match result { Ok(r) => { - let mut head = self.session_head.lock().await; - session_store::save_run_completed( - &self.store, - head.session_id, - &mut head.head_hash, - r.clone(), + self.commit_entry(LogEntry::RunCompleted { + ts: session_log::now_millis(), interrupted, - ) + result: r.clone(), + }) .await?; } Err(e) => { - let mut head = self.session_head.lock().await; - session_store::save_run_errored( - &self.store, - head.session_id, - &mut head.head_hash, - e.to_string(), + self.commit_entry(LogEntry::RunErrored { + ts: session_log::now_millis(), interrupted, - ) + message: e.to_string(), + }) .await?; } } @@ -1824,34 +1980,46 @@ impl Pod { task_snapshot_text.clone(), )); - // Persist as a new compacted session. - let mut head = self.session_head.lock().await; - let old_session_id = head.session_id; - let old_head_hash = head - .head_hash - .clone() - .expect("head_hash should be set after at least one entry"); - - let w = self.worker.as_ref().unwrap(); - let state = SessionStartState { - system_prompt: w.get_system_prompt(), - config: w.request_config(), - history: &new_history, + // Build the SessionStart entry for the new compacted session, + // then atomically rotate to it: create on disk, swap head, reset + // the broadcast sink so existing subscribers see the new + // `SessionStart { compacted_from }` and reset their view. + let new_session_id = session_store::new_session_id(); + let session_start = { + let mut head = self.session_head.lock().await; + let old_session_id = head.session_id; + let old_head_hash = head + .head_hash + .clone() + .expect("head_hash should be set after at least one entry"); + let w = self.worker.as_ref().unwrap(); + let entry = LogEntry::SessionStart { + ts: session_log::now_millis(), + system_prompt: w.get_system_prompt().map(String::from), + config: w.request_config().clone(), + history: to_logged(&new_history), + forked_from: None, + compacted_from: Some(session_store::SessionOrigin { + session_id: old_session_id, + at_hash: old_head_hash, + }), + }; + let hash = session_log::compute_hash(None, &entry); + let hashed = HashedEntry { + hash: hash.clone(), + prev_hash: None, + entry: entry.clone(), + }; + self.store.create_session(new_session_id, &[hashed]).await?; + head.session_id = new_session_id; + head.head_hash = Some(hash); + self.session_id = new_session_id; + entry }; - let (new_session_id, new_head_hash) = session_store::create_compacted_session( - &self.store, - state, - old_session_id, - old_head_hash, - ) - .await?; - - // Swap in the new session state. usage_history belongs to the old - // session — the new compacted session starts with no measurements - // until its first LLM call. - self.session_id = new_session_id; - head.session_id = new_session_id; - head.head_hash = Some(new_head_hash); + // Broadcast the SessionStart through the sink. This atomically + // resets the mirror to `[SessionStart]` so any subscriber + // querying after this point sees the post-compaction prefix. + self.sink.reset_with_initial(session_start); // Keep pods.json pointing at the live session_id. Without this // a concurrent `restore_from_manifest(new_session_id)` would // see no live writer and grab the session this Pod just moved @@ -1861,7 +2029,6 @@ impl Pod { if self.scope_allocation.is_some() { pod_registry::update_session(&self.manifest.pod.name, new_session_id)?; } - drop(head); // Align user_segments with the post-compaction history. Items // before `retain_from` (now folded into the summary) lose their // segments; only the user_messages surviving in retained_items @@ -1873,9 +2040,11 @@ impl Pod { } self.worker.as_mut().unwrap().set_history(new_history); - for item in &compact_introduced_system_messages { - self.broadcast_system_message_item(item); - } + // Compaction-introduced system messages are part of the new + // SessionStart's history (broadcast above) — clients derive + // their blocks from `SessionStart.history`. No per-item + // broadcast is required. + let _ = &compact_introduced_system_messages; let worker = self.worker.as_mut().unwrap(); // Anchor the prompt cache at the summary item so that Anthropic // can place a durable `cache_control` breakpoint there — our @@ -2139,18 +2308,13 @@ impl Pod { }; let payload_value = serde_json::to_value(&pointer_payload) .expect("ExtractPointerPayload is always JSON-serializable"); - { - let mut head = self.session_head.lock().await; - session_store::save_extension( - &self.store, - head.session_id, - &mut head.head_hash, - extract::EXTRACT_DOMAIN, - payload_value, - ) - .await?; - self.session_id = head.session_id; - } + self.commit_entry(LogEntry::Extension { + ts: session_log::now_millis(), + domain: extract::EXTRACT_DOMAIN.into(), + payload: payload_value, + }) + .await?; + self.session_id = self.session_head.lock().await.session_id; *self .extract_pointer @@ -2488,6 +2652,8 @@ impl Pod, St> { extract_pointer: Arc::new(Mutex::new(None)), memory_task: None, user_segments: Vec::new(), + sink: SessionLogSink::new(), + log_cmd_tx: None, }; pod.apply_permissions_from_manifest(); pod.apply_prune_from_manifest(); @@ -2559,6 +2725,8 @@ impl Pod, St> { extract_pointer: Arc::new(Mutex::new(None)), memory_task: None, user_segments: Vec::new(), + sink: SessionLogSink::new(), + log_cmd_tx: None, }; pod.apply_permissions_from_manifest(); pod.apply_prune_from_manifest(); @@ -2590,10 +2758,16 @@ impl Pod, St> { store: St, loader: PromptLoader, ) -> Result { - let state = session_store::restore(&store, session_id).await?; + // Read raw entries once so we can both reconstruct state and + // seed the broadcast sink's mirror with the same prefix that + // sits on disk. + let raw_entries = store.read_all(session_id).await?; + let state = session_store::collect_state(&raw_entries); if state.head_hash.is_none() { return Err(PodError::SessionEmpty { session_id }); } + let mirror_entries: Vec = + raw_entries.iter().map(|e| e.entry.clone()).collect(); let scope_snapshot = state .pod_scope .clone() @@ -2696,6 +2870,11 @@ impl Pod, St> { extract_pointer: Arc::new(Mutex::new(extract_pointer)), memory_task: None, user_segments: state.user_segments, + // Seed the mirror with the entries we just replayed so a + // late-attaching client sees the full prefix without an + // extra round trip. + sink: SessionLogSink::with_initial(mirror_entries), + log_cmd_tx: None, }; pod.apply_permissions_from_manifest(); pod.apply_prune_from_manifest(); diff --git a/crates/pod/src/runtime/dir.rs b/crates/pod/src/runtime/dir.rs index 8a6c6ece..07e59f7f 100644 --- a/crates/pod/src/runtime/dir.rs +++ b/crates/pod/src/runtime/dir.rs @@ -73,12 +73,6 @@ impl RuntimeDir { atomic_write(&self.path.join("manifest.toml"), toml.as_bytes()).await } - /// Write history.json atomically. - pub async fn write_history(&self, state: &PodSharedState) -> Result<(), io::Error> { - let content = state.history_json(); - atomic_write(&self.path.join("history.json"), content.as_bytes()).await - } - /// Write `spawned_pods.json` atomically. The entries are the full /// set of spawned children known to this Pod — callers pass the /// replacement list, no incremental merge. @@ -223,18 +217,6 @@ mod tests { assert_eq!(parsed[0].pod_name, "child"); } - #[tokio::test] - async fn write_history_creates_file() { - let tmp = tempfile::tempdir().unwrap(); - let rt = RuntimeDir::create(tmp.path(), "my-pod").await.unwrap(); - let state = test_state(); - - rt.write_history(&state).await.unwrap(); - - let content = std::fs::read_to_string(rt.path().join("history.json")).unwrap(); - assert_eq!(content, "[]"); - } - #[tokio::test] async fn socket_path() { let tmp = tempfile::tempdir().unwrap(); diff --git a/crates/pod/src/session_log_sink.rs b/crates/pod/src/session_log_sink.rs new file mode 100644 index 00000000..ff94b645 --- /dev/null +++ b/crates/pod/src/session_log_sink.rs @@ -0,0 +1,474 @@ +//! Pod-side session-log mirror + broadcast. +//! +//! Owns the in-memory `Vec` mirror that backs `Event::Snapshot` +//! delivery to newly connected clients and the +//! `broadcast::Sender` that fans out per-entry commits to +//! existing subscribers. Disk writes remain the responsibility of the +//! Pod (which still owns the `Store` handle); the sink stays focused on +//! the wire-side fan-out. +//! +//! Atomicity contract (see ticket `tickets/pod-state-from-session-log.md`): +//! +//! 1. Pod writes the entry to disk via the `Store`. +//! 2. Pod calls [`SessionLogSink::publish`] which acquires the mirror +//! mutex, pushes the entry, and fires `broadcast::send` — all under +//! the same critical section. +//! +//! [`SessionLogSink::subscribe_with_snapshot`] takes the same mutex, +//! so the `(snapshot, receiver)` pair returned to a connecting client +//! splits the entry sequence cleanly: every entry shows up in exactly +//! one of `snapshot` or on `receiver`. +//! +//! Disk-write failures short-circuit before `publish`, so a failed +//! entry never appears in the mirror or on the broadcast. + +use std::sync::{Arc, Mutex as StdMutex}; + +use session_store::{ + EntryHash, HashedEntry, LogEntry, SessionId, SessionStartState, Store, StoreError, session_log, +}; +use tokio::sync::{Mutex as AsyncMutex, MutexGuard, broadcast}; + +/// Broadcast capacity for the live receiver. Slow subscribers that +/// fall behind will see `RecvError::Lagged` and are expected to drop +/// the connection so that the next reconnect's `subscribe_with_snapshot` +/// re-seeds the prefix. +const BROADCAST_CAPACITY: usize = 256; + +/// In-memory mirror + broadcast fan-out for the active session log. +/// +/// Clone is cheap (`Arc` clone) — the Pod hands one to the IPC layer +/// for read-only `subscribe_with_snapshot` access and keeps one for +/// its own write path. +#[derive(Clone)] +pub struct SessionLogSink { + inner: Arc, +} + +struct SinkInner { + /// Full session log mirror in commit order. Reset on session swap + /// (compaction / fork) via [`SessionLogSink::reset_with_initial`]. + mirror: StdMutex>, + /// Broadcast channel for live entry updates. The same `Sender` + /// survives session swaps so existing subscribers keep their + /// receiver — they observe the swap as a freshly broadcast + /// `LogEntry::SessionStart` and reset their view accordingly. + broadcast_tx: broadcast::Sender, +} + +impl SessionLogSink { + /// Create a fresh sink with an empty mirror. Used before any entry + /// has been written (deferred SessionStart) or as a placeholder in + /// tests. + pub fn new() -> Self { + let (broadcast_tx, _) = broadcast::channel(BROADCAST_CAPACITY); + Self { + inner: Arc::new(SinkInner { + mirror: StdMutex::new(Vec::new()), + broadcast_tx, + }), + } + } + + /// Create a sink seeded with a prefix of entries already on disk. + /// Used by restore / fork-at-restore code paths that materialise + /// the existing log before the sink starts taking new commits. + pub fn with_initial(entries: Vec) -> Self { + let (broadcast_tx, _) = broadcast::channel(BROADCAST_CAPACITY); + Self { + inner: Arc::new(SinkInner { + mirror: StdMutex::new(entries), + broadcast_tx, + }), + } + } + + /// Push `entry` to the mirror and broadcast it. + /// + /// MUST be called only after the Pod has successfully persisted the + /// entry to the underlying `Store` — disk write is the gate. Failed + /// disk writes must not call `publish`. + pub fn publish(&self, entry: LogEntry) { + let mut mirror = self + .inner + .mirror + .lock() + .expect("session log mirror mutex poisoned"); + mirror.push(entry.clone()); + // SendError means there are zero subscribers; harmless. We hold + // the mirror lock across `send` so that `subscribe_with_snapshot` + // cannot observe an inconsistent (snapshot, receiver) pair. + let _ = self.inner.broadcast_tx.send(entry); + } + + /// Atomically swap the mirror to `[initial]` and broadcast the new + /// session-start entry. Used during compaction / fork: the new + /// `LogEntry::SessionStart` is the first entry of the replacement + /// session, and existing subscribers transition by replaying it + /// like any other live entry. + /// + /// Existing snapshot prefixes seen by old subscribers stay valid + /// for the prior session; the new `SessionStart` on the broadcast + /// is the signal to reset their derived view. + pub fn reset_with_initial(&self, initial: LogEntry) { + let mut mirror = self + .inner + .mirror + .lock() + .expect("session log mirror mutex poisoned"); + mirror.clear(); + mirror.push(initial.clone()); + let _ = self.inner.broadcast_tx.send(initial); + } + + /// Replace the mirror with the supplied prefix without broadcasting. + /// + /// Used by restore paths that load a session's complete log into + /// the mirror before any subscriber is connected. Callers that need + /// to notify existing subscribers should use [`reset_with_initial`]. + pub fn replace_silent(&self, entries: Vec) { + let mut mirror = self + .inner + .mirror + .lock() + .expect("session log mirror mutex poisoned"); + *mirror = entries; + } + + /// Atomically read the current mirror and subscribe to subsequent + /// commits. The returned snapshot and receiver split the entry + /// timeline into a duplicate-free, gap-free prefix/suffix pair. + pub fn subscribe_with_snapshot(&self) -> (Vec, broadcast::Receiver) { + let mirror = self + .inner + .mirror + .lock() + .expect("session log mirror mutex poisoned"); + let snapshot = mirror.clone(); + let rx = self.inner.broadcast_tx.subscribe(); + (snapshot, rx) + } + + /// Current entry count. Useful for tests / diagnostics. + pub fn len(&self) -> usize { + self.inner + .mirror + .lock() + .expect("session log mirror mutex poisoned") + .len() + } + + /// Whether the mirror is empty. + pub fn is_empty(&self) -> bool { + self.len() == 0 + } +} + +impl Default for SessionLogSink { + fn default() -> Self { + Self::new() + } +} + +/// Active session head for the Pod's persistent log: session id + +/// last-committed entry hash. Replaces the previous `SessionHead` +/// struct local to `Pod`; bundled here so the writer can hand a +/// cloneable handle to background tasks (e.g. the per-item drain +/// task spawned by the controller). +#[derive(Debug, Clone)] +pub struct SessionHeadState { + pub session_id: SessionId, + pub head_hash: Option, +} + +/// Pod-side session-log writer. +/// +/// Bundles the (1) persistent store, (2) the in-memory session-head +/// state (id + hash), and (3) the broadcast sink. `append_entry` +/// chains the hash on disk, advances the head, then publishes the +/// entry through the sink — under a single async mutex so two writers +/// cannot interleave the chain. +/// +/// `Clone` is a cheap `Arc` clone. The Pod keeps one writer for its +/// inline commits (UserInput, TurnEnd, Usage, RunCompleted/Errored, +/// scope snapshots, metrics) and hands clones to background tasks +/// (e.g. the controller's per-item history drain task). +pub struct SessionLogWriter { + inner: Arc>, +} + +struct WriterInner { + store: St, + head: AsyncMutex, + sink: SessionLogSink, +} + +impl Clone for SessionLogWriter { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + } + } +} + +impl SessionLogWriter +where + St: Store + Clone, +{ + /// Create a writer for a fresh Pod with no entries on disk yet. + /// `head_hash` is `None` until the first `append_entry` (typically + /// the deferred `SessionStart` written by `ensure_session_head`). + pub fn new(store: St, session_id: SessionId) -> Self { + Self { + inner: Arc::new(WriterInner { + store, + head: AsyncMutex::new(SessionHeadState { + session_id, + head_hash: None, + }), + sink: SessionLogSink::new(), + }), + } + } + + /// Create a writer seeded with a session already on disk. The + /// mirror is populated with `mirror` (typically loaded via + /// `Store::read_all`), and `head_hash` should be the hash of the + /// last entry. + pub fn restored( + store: St, + session_id: SessionId, + head_hash: Option, + mirror: Vec, + ) -> Self { + Self { + inner: Arc::new(WriterInner { + store, + head: AsyncMutex::new(SessionHeadState { + session_id, + head_hash, + }), + sink: SessionLogSink::with_initial(mirror), + }), + } + } + + /// Append `entry` to the log: disk write → in-memory mirror push → + /// broadcast — atomic w.r.t. `subscribe_with_snapshot` callers. + pub async fn append_entry(&self, entry: LogEntry) -> Result { + let mut head = self.inner.head.lock().await; + let hash = session_store::append_entry_with_hash( + &self.inner.store, + head.session_id, + &mut head.head_hash, + entry.clone(), + ) + .await?; + self.inner.sink.publish(entry); + Ok(hash) + } + + /// Atomically swap to a new compacted session. + /// + /// Creates the new session on disk with `initial` as its + /// `SessionStart`, advances the head, and resets the sink mirror + /// to `[initial]` while broadcasting the entry. Existing + /// subscribers observe the swap as a freshly broadcast + /// `SessionStart` (with `compacted_from` set), which is their + /// signal to reset their derived view. + pub async fn swap_session( + &self, + new_session_id: SessionId, + initial: LogEntry, + ) -> Result { + let hash = session_log::compute_hash(None, &initial); + let hashed = HashedEntry { + hash: hash.clone(), + prev_hash: None, + entry: initial.clone(), + }; + self.inner + .store + .create_session(new_session_id, &[hashed]) + .await?; + let mut head = self.inner.head.lock().await; + head.session_id = new_session_id; + head.head_hash = Some(hash.clone()); + self.inner.sink.reset_with_initial(initial); + Ok(hash) + } + + /// If the store's head no longer matches our cached head, mint a + /// fresh session that forks from the current state and switch to + /// it. Returns `true` when a fork happened. + pub async fn ensure_head_or_fork( + &self, + state: SessionStartState<'_>, + ) -> Result { + let mut head = self.inner.head.lock().await; + let store_head = self.inner.store.read_head_hash(head.session_id).await?; + if store_head == head.head_hash { + return Ok(false); + } + let fork_id = session_store::new_session_id(); + let entry = LogEntry::SessionStart { + ts: session_log::now_millis(), + system_prompt: state.system_prompt.map(String::from), + config: state.config.clone(), + history: session_store::to_logged(state.history), + forked_from: None, + compacted_from: None, + }; + let hash = session_log::compute_hash(None, &entry); + let hashed = HashedEntry { + hash: hash.clone(), + prev_hash: None, + entry: entry.clone(), + }; + self.inner.store.create_session(fork_id, &[hashed]).await?; + head.session_id = fork_id; + head.head_hash = Some(hash); + self.inner.sink.reset_with_initial(entry); + Ok(true) + } + + /// Cloneable handle to the broadcast sink. Used by the IPC layer + /// for `subscribe_with_snapshot` and by tests that just want the + /// non-write side. + pub fn sink(&self) -> SessionLogSink { + self.inner.sink.clone() + } + + /// Underlying store handle. Direct access is preserved for callers + /// that read state (`read_all`, `read_head_hash`) without going + /// through the writer's hash chain. + pub fn store(&self) -> &St { + &self.inner.store + } + + /// Cheap snapshot of the current session id. + pub async fn current_session_id(&self) -> SessionId { + self.inner.head.lock().await.session_id + } + + /// Cheap snapshot of the current head hash. + pub async fn current_head_hash(&self) -> Option { + self.inner.head.lock().await.head_hash.clone() + } + + /// Direct lock on the head. Used by paths that need to coordinate + /// custom writes with the hash chain (currently + /// `session_metrics::record_metric`). + pub async fn lock_head(&self) -> MutexGuard<'_, SessionHeadState> { + self.inner.head.lock().await + } +} + +#[cfg(test)] +mod tests { + use super::*; + use llm_worker::llm_client::RequestConfig; + use session_store::session_log::now_millis; + + fn session_start() -> LogEntry { + LogEntry::SessionStart { + ts: now_millis(), + system_prompt: None, + config: RequestConfig::default(), + history: vec![], + forked_from: None, + compacted_from: None, + } + } + + fn turn_end(n: usize) -> LogEntry { + LogEntry::TurnEnd { + ts: now_millis(), + turn_count: n, + } + } + + #[test] + fn publish_then_subscribe_returns_history_in_snapshot() { + let sink = SessionLogSink::new(); + sink.publish(session_start()); + sink.publish(turn_end(1)); + + let (snapshot, mut rx) = sink.subscribe_with_snapshot(); + assert_eq!(snapshot.len(), 2); + assert!(matches!(snapshot[0], LogEntry::SessionStart { .. })); + assert!(matches!(snapshot[1], LogEntry::TurnEnd { turn_count: 1, .. })); + assert!(rx.try_recv().is_err()); + } + + #[test] + fn subscribe_then_publish_delivers_live_entries() { + let sink = SessionLogSink::new(); + sink.publish(session_start()); + + let (snapshot, mut rx) = sink.subscribe_with_snapshot(); + assert_eq!(snapshot.len(), 1); + + sink.publish(turn_end(1)); + match rx.try_recv() { + Ok(LogEntry::TurnEnd { turn_count: 1, .. }) => {} + other => panic!("unexpected: {other:?}"), + } + assert!(rx.try_recv().is_err()); + } + + #[test] + fn snapshot_and_live_never_overlap() { + let sink = SessionLogSink::new(); + sink.publish(session_start()); + let (snapshot, mut rx) = sink.subscribe_with_snapshot(); + sink.publish(turn_end(1)); + + assert_eq!(snapshot.len(), 1); + match rx.try_recv() { + Ok(LogEntry::TurnEnd { turn_count: 1, .. }) => {} + other => panic!("unexpected: {other:?}"), + } + assert!(rx.try_recv().is_err()); + } + + #[test] + fn reset_with_initial_clears_and_broadcasts() { + let sink = SessionLogSink::new(); + sink.publish(session_start()); + sink.publish(turn_end(1)); + + let (_pre_snapshot, mut rx) = sink.subscribe_with_snapshot(); + sink.reset_with_initial(session_start()); + + match rx.try_recv() { + Ok(LogEntry::SessionStart { .. }) => {} + other => panic!("expected SessionStart broadcast, got {other:?}"), + } + + let (post_snapshot, _) = sink.subscribe_with_snapshot(); + assert_eq!(post_snapshot.len(), 1); + assert!(matches!(post_snapshot[0], LogEntry::SessionStart { .. })); + } + + #[test] + fn replace_silent_does_not_broadcast() { + let sink = SessionLogSink::new(); + sink.publish(session_start()); + let (_pre_snapshot, mut rx) = sink.subscribe_with_snapshot(); + + sink.replace_silent(vec![session_start(), turn_end(1)]); + + // No broadcast fired. + assert!(rx.try_recv().is_err()); + let (post_snapshot, _) = sink.subscribe_with_snapshot(); + assert_eq!(post_snapshot.len(), 2); + } + + #[test] + fn with_initial_seeds_the_mirror() { + let sink = SessionLogSink::with_initial(vec![session_start(), turn_end(1)]); + let (snapshot, _) = sink.subscribe_with_snapshot(); + assert_eq!(snapshot.len(), 2); + } +} diff --git a/crates/pod/src/shared_state.rs b/crates/pod/src/shared_state.rs index e9bb6413..c29cfe6a 100644 --- a/crates/pod/src/shared_state.rs +++ b/crates/pod/src/shared_state.rs @@ -1,7 +1,6 @@ use std::sync::{OnceLock, RwLock}; -use llm_worker::llm_client::types::Item; -use protocol::{PodStatus, Segment}; +use protocol::PodStatus; use serde_json::json; use session_store::SessionId; @@ -19,21 +18,20 @@ pub struct KnowledgeCandidate { /// Shared state between PodController and runtime directory. /// -/// Controller updates this in-memory; RuntimeDir writes it to disk. -/// Wrapped in `Arc` for sharing. +/// Controller updates this in-memory; RuntimeDir writes the status +/// snapshot to disk. Wrapped in `Arc` for sharing. +/// +/// History and typed user-segment mirrors used to live here so the +/// IPC layer could answer `Method::GetHistory`. Those reads now go +/// directly through the session-log sink (`Event::Snapshot` + +/// `Event::Entry`), so this struct holds only status, identity, +/// greeting, and completion lookup hubs. pub struct PodSharedState { pub pod_name: String, pub session_id: SessionId, pub manifest_toml: String, pub greeting: protocol::Greeting, pub status: RwLock, - pub history: RwLock>, - /// Typed user submissions in submit order. The K-th entry corresponds - /// to the K-th `Item::user_message` in `history` (modulo seed history - /// loaded from a pre-compaction `SessionStart.history`, whose original - /// segments are not preserved). Surfaced via `Event::History` so - /// clients can re-render typed atoms on session restore. - pub user_segments: RwLock>>, /// Pod-from-the-inside view of the filesystem. Set once in /// `PodController::start` after the `ScopedFs` is materialised, and /// read from the IPC server layer to answer `ListCompletions` @@ -58,8 +56,6 @@ impl PodSharedState { manifest_toml, greeting, status: RwLock::new(PodStatus::Idle), - history: RwLock::new(Vec::new()), - user_segments: RwLock::new(Vec::new()), fs_view: OnceLock::new(), workflows: OnceLock::new(), knowledge: OnceLock::new(), @@ -112,25 +108,6 @@ impl PodSharedState { .unwrap_or_default() } - pub fn user_segments(&self) -> Vec> { - self.user_segments - .read() - .map(|s| s.clone()) - .unwrap_or_default() - } - - pub fn set_user_segments(&self, segments: Vec>) { - if let Ok(mut s) = self.user_segments.write() { - *s = segments; - } - } - - pub fn push_user_segments(&self, segments: Vec) { - if let Ok(mut s) = self.user_segments.write() { - s.push(segments); - } - } - pub fn set_status(&self, status: PodStatus) { if let Ok(mut s) = self.status.write() { *s = status; @@ -141,16 +118,6 @@ impl PodSharedState { self.status.read().map(|s| *s).unwrap_or(PodStatus::Idle) } - pub fn history(&self) -> Vec { - self.history.read().map(|h| h.clone()).unwrap_or_default() - } - - pub fn update_history(&self, items: Vec) { - if let Ok(mut h) = self.history.write() { - *h = items; - } - } - /// Serialize status as JSON. pub fn status_json(&self) -> String { let status = self.get_status(); @@ -161,21 +128,11 @@ impl PodSharedState { }) .to_string() } - - /// Serialize history as JSON. - pub fn history_json(&self) -> String { - if let Ok(h) = self.history.read() { - serde_json::to_string(&*h).unwrap_or_else(|_| "[]".into()) - } else { - "[]".into() - } - } } #[cfg(test)] mod tests { use super::*; - use llm_worker::llm_client::types::{ContentPart, Item, Role}; fn test_state() -> PodSharedState { PodSharedState::new( @@ -231,29 +188,6 @@ mod tests { assert_eq!(parsed["state"], "running"); } - #[test] - fn history_json_empty_initially() { - let state = test_state(); - assert_eq!(state.history_json(), "[]"); - } - - #[test] - fn history_json_after_update() { - let state = test_state(); - let items = vec![Item::Message { - id: None, - role: Role::Assistant, - content: vec![ContentPart::Text { - text: "Hello".into(), - }], - status: None, - }]; - state.update_history(items); - let json = state.history_json(); - let parsed: serde_json::Value = serde_json::from_str(&json).unwrap(); - assert!(parsed.is_array()); - assert_eq!(parsed[0]["role"], "assistant"); - } #[test] fn knowledge_completions_empty_when_unset() { diff --git a/crates/pod/src/spawn/comm_tools.rs b/crates/pod/src/spawn/comm_tools.rs index 33d16d70..c6b88b54 100644 --- a/crates/pod/src/spawn/comm_tools.rs +++ b/crates/pod/src/spawn/comm_tools.rs @@ -19,6 +19,7 @@ use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput}; use protocol::stream::{JsonLineReader, JsonLineWriter}; use protocol::{ErrorCode, Event, Method}; use serde::Deserialize; +use session_store::LogEntry; use tokio::net::UnixStream; use crate::runtime::dir::SpawnedPodRecord; @@ -385,33 +386,31 @@ async fn send_run_and_confirm(socket: &Path, input: String) -> Result<(), SendRu } } -/// Connect and ask the Pod for its conversation history. Skips -/// pre-History events (such as buffered alerts replayed to new -/// clients). Returns the raw JSON items as `serde_json::Value` since -/// the pod crate already round-trips via `Value` on the wire. +/// Connect to a Pod's socket and read the connect-time `Event::Snapshot`. +/// +/// Pods deliver the session-log mirror as the first non-Alert event on +/// every new connection, so consuming it is sufficient — no explicit +/// `GetHistory` method round trip. Returns the entries as raw JSON +/// values; callers deserialize as `session_store::LogEntry` if they +/// need typed access. async fn fetch_history(socket: &Path) -> std::io::Result> { let stream = tokio::time::timeout(SOCKET_OP_TIMEOUT, UnixStream::connect(socket)) .await .map_err(|_| std::io::Error::new(std::io::ErrorKind::TimedOut, "connect timed out"))??; - let (r, w) = stream.into_split(); - let mut writer = JsonLineWriter::new(w); + let (r, _w) = stream.into_split(); let mut reader = JsonLineReader::new(r); - tokio::time::timeout(SOCKET_OP_TIMEOUT, writer.write(&Method::GetHistory)) - .await - .map_err(|_| std::io::Error::new(std::io::ErrorKind::TimedOut, "write timed out"))??; - loop { let event = tokio::time::timeout(SOCKET_OP_TIMEOUT, reader.next::()) .await .map_err(|_| std::io::Error::new(std::io::ErrorKind::TimedOut, "read timed out"))??; match event { - Some(Event::History { items, .. }) => return Ok(items), + Some(Event::Snapshot { entries, .. }) => return Ok(entries), Some(_) => continue, None => { return Err(std::io::Error::new( std::io::ErrorKind::UnexpectedEof, - "pod closed connection before History event", + "pod closed connection before Snapshot event", )); } } @@ -426,24 +425,36 @@ async fn is_reachable(socket: &Path) -> bool { .unwrap_or(false) } -fn extract_assistant_text(items: &[serde_json::Value]) -> String { +fn extract_assistant_text(entries: &[serde_json::Value]) -> String { let mut out = String::new(); - for value in items { - let Ok(item) = serde_json::from_value::(value.clone()) else { + for value in entries { + // The wire payload is the JSON form of `session_store::LogEntry`. + // Walk Assistant items inside each entry that can carry them: + // post-compaction `SessionStart.history` (seed) and per-LLM-call + // `AssistantItems` deltas. + let Ok(entry) = serde_json::from_value::(value.clone()) else { continue; }; - if let Item::Message { - role: Role::Assistant, - content, - .. - } = item - { - for part in content { - if let ContentPart::Text { text } = part { - if !out.is_empty() { - out.push_str("\n\n"); + let logged_items = match entry { + LogEntry::SessionStart { history, .. } => history, + LogEntry::AssistantItems { items, .. } => items, + _ => continue, + }; + for logged in logged_items { + let item: Item = logged.into(); + if let Item::Message { + role: Role::Assistant, + content, + .. + } = item + { + for part in content { + if let ContentPart::Text { text } = part { + if !out.is_empty() { + out.push_str("\n\n"); + } + out.push_str(&text); } - out.push_str(&text); } } } diff --git a/crates/pod/tests/compact_events_test.rs b/crates/pod/tests/compact_events_test.rs index ccb93460..8823b408 100644 --- a/crates/pod/tests/compact_events_test.rs +++ b/crates/pod/tests/compact_events_test.rs @@ -177,17 +177,39 @@ fn drain(rx: &mut broadcast::Receiver) -> Vec { out } -fn system_event_text(event: &Event) -> Option<&str> { - match event { - Event::SystemMessage { item } => item["content"] - .as_array() - .and_then(|parts| parts.iter().filter_map(|p| p["text"].as_str()).next()), - _ => None, +/// Collect every system-message text that the post-compaction +/// `SessionStart.history` carries, by reading the sink mirror directly. +fn system_texts_in_sink_session_start(pod: &pod::Pod) -> Vec { + let (entries, _rx) = pod.sink().subscribe_with_snapshot(); + for entry in entries.into_iter().rev() { + if let session_store::LogEntry::SessionStart { history, .. } = entry { + return history + .into_iter() + .filter_map(|logged| { + let item: Item = logged.into(); + match item { + Item::Message { + role: llm_worker::Role::System, + content, + .. + } => Some( + content + .iter() + .map(|p| p.as_text().to_owned()) + .collect::>() + .join(""), + ), + _ => None, + } + }) + .collect(); + } } + Vec::new() } #[tokio::test] -async fn compact_broadcasts_only_new_system_messages_not_retained_ones() { +async fn compact_emits_session_start_carrying_summary_and_task_snapshot() { let client = MockClient::new(vec![ single_text_events("hi"), write_summary_tool_use_events("call-1", "summary"), @@ -195,18 +217,16 @@ async fn compact_broadcasts_only_new_system_messages_not_retained_ones() { ]); let mut pod = make_pod(client).await; - let (tx, mut rx) = broadcast::channel::(64); + let (tx, _rx_keep) = broadcast::channel::(64); pod.attach_event_tx(tx); pod.run_text("first").await.unwrap(); - let retained_message = Item::system_message("[Retained system]\nold"); - pod.worker_mut().push_item(retained_message); - let _ = drain(&mut rx); - pod.compact(10_000).await.unwrap(); - let events = drain(&mut rx); - let system_texts: Vec<&str> = events.iter().filter_map(system_event_text).collect(); + let system_texts = system_texts_in_sink_session_start(&pod); + // The post-compaction `SessionStart.history` carries the new system + // messages introduced by the compactor. Clients re-seed their view + // from this entry alone, so it is the load-bearing payload. assert!( system_texts .iter() @@ -219,12 +239,6 @@ async fn compact_broadcasts_only_new_system_messages_not_retained_ones() { .any(|text| text.starts_with("[Session TaskStore snapshot]")), "task snapshot system message missing from {system_texts:?}" ); - assert!( - !system_texts - .iter() - .any(|text| text.starts_with("[Retained system]")), - "retained system message should not be rebroadcast: {system_texts:?}" - ); } #[tokio::test] diff --git a/crates/pod/tests/controller_test.rs b/crates/pod/tests/controller_test.rs index 5f968d9f..c4629bcd 100644 --- a/crates/pod/tests/controller_test.rs +++ b/crates/pod/tests/controller_test.rs @@ -6,12 +6,40 @@ use async_trait::async_trait; use futures::{Stream, StreamExt}; use llm_worker::Worker; use llm_worker::llm_client::event::{Event as LlmEvent, ResponseStatus, StatusEvent}; +use llm_worker::llm_client::types::Item; use llm_worker::llm_client::{ClientError, LlmClient, Request}; use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput}; -use session_store::FsStore; +use session_store::{FsStore, LogEntry}; use pod::{Event, Method, Pod, PodController, PodHandle, PodManifest, PodStatus}; +/// Reconstruct a worker-history-like `Vec` from the live session +/// log mirror held by the Pod's broadcast sink. Replaces the previous +/// `PodSharedState.history()` test helper now that the mirror lives in +/// the sink. +fn history_from_sink(handle: &PodHandle) -> Vec { + let (entries, _rx) = handle.sink.subscribe_with_snapshot(); + let mut items = Vec::new(); + for entry in entries { + match entry { + LogEntry::SessionStart { history, .. } => { + items.extend(history.into_iter().map(Item::from)); + } + LogEntry::UserInput { segments, .. } => { + let text = protocol::Segment::flatten_to_text(&segments); + items.push(Item::user_message(text)); + } + LogEntry::AssistantItems { items: i, .. } + | LogEntry::ToolResults { items: i, .. } + | LogEntry::HookInjectedItems { items: i, .. } => { + items.extend(i.into_iter().map(Item::from)); + } + _ => {} + } + } + items +} + // --------------------------------------------------------------------------- // Mock LLM Client // --------------------------------------------------------------------------- @@ -218,8 +246,61 @@ async fn run_end_returns_to_idle_without_busy_status() { assert_eq!(handle.shared_state.get_status(), PodStatus::Idle); } +/// Mid-turn re-attach: a client connecting while the worker is still +/// running observes the in-flight `UserInput` entry in the connect-time +/// `Event::Snapshot`. This is the load-bearing property of the new +/// session-log-driven IPC: a late attacher reconstructs the running +/// view without needing the prior client's diff. #[tokio::test] -async fn attach_history_includes_current_status() { +async fn snapshot_includes_user_input_for_in_flight_turn() { + let client = MockClient::sequential(vec![MockResponse::Hang(simple_text_events())]); + let pod = make_pod(client).await; + let handle = spawn_controller(pod).await; + + handle + .send(Method::run_text("hello in-flight")) + .await + .unwrap(); + wait_for_status(&handle, PodStatus::Running).await; + + let stream = tokio::net::UnixStream::connect(handle.runtime_dir.socket_path()) + .await + .unwrap(); + let (reader, _writer) = stream.into_split(); + let mut reader = protocol::stream::JsonLineReader::new(reader); + + loop { + let event = reader.next::().await.unwrap().unwrap(); + match event { + Event::Snapshot { entries, .. } => { + // Walk the entries, find a `LogEntry::UserInput` and + // confirm its segments flatten to our submitted text. + let mut found = false; + for value in entries { + let entry: session_store::LogEntry = + serde_json::from_value(value).expect("LogEntry deserialise"); + if let session_store::LogEntry::UserInput { segments, .. } = entry { + let text = protocol::Segment::flatten_to_text(&segments); + if text == "hello in-flight" { + found = true; + break; + } + } + } + assert!( + found, + "snapshot must carry the in-flight UserInput entry" + ); + return; + } + Event::Alert(_) => continue, + other => panic!("expected Snapshot first, got {other:?}"), + } + } +} + +#[tokio::test] +async fn attach_snapshot_includes_current_status() { let client = MockClient::sequential(vec![MockResponse::Hang(simple_text_events())]); let pod = make_pod(client).await; let handle = spawn_controller(pod).await; @@ -230,15 +311,20 @@ async fn attach_history_includes_current_status() { let stream = tokio::net::UnixStream::connect(handle.runtime_dir.socket_path()) .await .unwrap(); - let (reader, writer) = stream.into_split(); + let (reader, _writer) = stream.into_split(); let mut reader = protocol::stream::JsonLineReader::new(reader); - let mut writer = protocol::stream::JsonLineWriter::new(writer); - writer.write(&Method::GetHistory).await.unwrap(); - let event = reader.next::().await.unwrap().unwrap(); - match event { - Event::History { status, .. } => assert_eq!(status, PodStatus::Running), - other => panic!("expected History, got {other:?}"), + // First event after connect is the snapshot — it carries the current status. + loop { + let event = reader.next::().await.unwrap().unwrap(); + match event { + Event::Snapshot { status, .. } => { + assert_eq!(status, PodStatus::Running); + return; + } + Event::Alert(_) => continue, + other => panic!("expected Snapshot, got {other:?}"), + } } } @@ -275,11 +361,11 @@ async fn run_populates_history() { tokio::time::sleep(std::time::Duration::from_millis(100)).await; - let history = handle.shared_state.history_json(); - assert_ne!(history, "[]"); - let parsed: serde_json::Value = serde_json::from_str(&history).unwrap(); - assert!(parsed.is_array()); - assert!(parsed.as_array().unwrap().len() >= 2); // user + assistant + let history = history_from_sink(&handle); + assert!( + history.len() >= 2, + "history must include user + assistant items, got {history:?}" + ); } #[tokio::test] @@ -712,7 +798,7 @@ async fn notify_while_idle_auto_starts_turn_and_injects_system_message() { // The notification must also be persisted into the Worker history // (and therefore eventually into history.json), per // tickets/notify-history-persist.md. - let history = handle.shared_state.history(); + let history = history_from_sink(&handle); let notify_in_history = history.iter().any(|i| { i.as_text() .is_some_and(|t| t.contains("[Notification]") && t.contains("turn finished")) @@ -796,7 +882,7 @@ async fn pod_event_turn_ended_while_idle_auto_starts_turn_and_injects_system_mes // Same item must be present in worker.history (persisted lane), // not just the per-request clone — see tickets/notify-history-persist.md. - let history = handle.shared_state.history(); + let history = history_from_sink(&handle); let event_in_history = history.iter().any(|i| { i.as_text().is_some_and(|t| { t.contains("[Notification]") && t.contains("child") && t.contains("finished a turn") @@ -1149,22 +1235,42 @@ async fn pause_then_resume_transitions_and_preserves_history_consistency() { // History consistency: exactly [user "hello", assistant // "resumed output"]. No artifacts from the aborted stream // (partial text is not committed), no orphan tool_use. - let history_json = handle.shared_state.history_json(); - let items: Vec = serde_json::from_str(&history_json).unwrap(); - let roles: Vec<&str> = items.iter().filter_map(|i| i["role"].as_str()).collect(); + let history = history_from_sink(&handle); + let roles: Vec<&str> = history + .iter() + .filter_map(|i| match i { + Item::Message { role, .. } => match role { + llm_worker::Role::User => Some("user"), + llm_worker::Role::Assistant => Some("assistant"), + llm_worker::Role::System => Some("system"), + }, + _ => None, + }) + .collect(); assert_eq!( roles, vec!["user", "assistant"], - "history = user + assistant only; got {items:?}" + "history = user + assistant only; got {history:?}" ); - let assistant_text = items[1]["content"] - .as_array() - .and_then(|parts| parts.iter().filter_map(|p| p["text"].as_str()).next()) - .unwrap_or(""); - assert_eq!(assistant_text, "resumed output"); - let has_tool_call = items + let assistant_text = history .iter() - .any(|i| i["type"].as_str() == Some("tool_call")); + .find_map(|i| match i { + Item::Message { + role: llm_worker::Role::Assistant, + content, + .. + } => Some( + content + .iter() + .map(|p: &llm_worker::ContentPart| p.as_text().to_owned()) + .collect::>() + .join(""), + ), + _ => None, + }) + .unwrap_or_default(); + assert_eq!(assistant_text, "resumed output"); + let has_tool_call = history.iter().any(|i| i.is_tool_call()); assert!(!has_tool_call, "no orphan tool_call in history"); } diff --git a/crates/pod/tests/pod_comm_tools_test.rs b/crates/pod/tests/pod_comm_tools_test.rs index a72d276d..83483706 100644 --- a/crates/pod/tests/pod_comm_tools_test.rs +++ b/crates/pod/tests/pod_comm_tools_test.rs @@ -144,44 +144,41 @@ fn accept_method_and_respond( }) } -/// Pretend to be a spawned Pod that responds to `GetHistory` with a -/// fixed set of items. Accepts connections until the first one that -/// delivers a `GetHistory` method; earlier probes (empty accepts) and -/// non-history methods are ignored. Returns nothing — tests await the -/// handle only to keep the listener alive until shutdown. +/// Pretend to be a spawned Pod whose connect-time snapshot carries a +/// fixed set of assistant items. Sends `Event::Snapshot` immediately on +/// every accept — the real Pod does the same, so `ReadPodOutput`'s +/// `fetch_history` just consumes the first non-Alert event. fn serve_history(listener: UnixListener, items: Vec) -> JoinHandle<()> { tokio::spawn(async move { loop { let Ok((stream, _)) = listener.accept().await else { return; }; - let (r, w) = stream.into_split(); - let mut reader = JsonLineReader::new(r); + let (_r, w) = stream.into_split(); let mut writer = JsonLineWriter::new(w); - match reader.next::().await { - Ok(Some(Method::GetHistory)) => { - let values: Vec = items - .iter() - .map(|i| serde_json::to_value(i).unwrap()) - .collect(); - let event = Event::History { - items: values, - greeting: Greeting { - pod_name: "child".into(), - cwd: "/tmp".into(), - provider: "anthropic".into(), - model: "x".into(), - scope_summary: String::new(), - tools: Vec::new(), - }, - status: protocol::PodStatus::Idle, - }; - let _ = writer.write(&event).await; - } - Ok(Some(_)) | Ok(None) | Err(_) => { - // Ignore: loop accepts another connection. - } - } + // Wrap the assistant items in a single + // `LogEntry::AssistantItems` entry — that's the only kind + // that contributes assistant text via `extract_assistant_text`. + let logged: Vec = + items.iter().map(session_store::LoggedItem::from).collect(); + let entry = session_store::LogEntry::AssistantItems { + ts: 0, + items: logged, + }; + let entry_value = serde_json::to_value(&entry).unwrap(); + let event = Event::Snapshot { + entries: vec![entry_value], + greeting: Greeting { + pod_name: "child".into(), + cwd: "/tmp".into(), + provider: "anthropic".into(), + model: "x".into(), + scope_summary: String::new(), + tools: Vec::new(), + }, + status: protocol::PodStatus::Idle, + }; + let _ = writer.write(&event).await; } }) } diff --git a/crates/protocol/src/lib.rs b/crates/protocol/src/lib.rs index de83b12d..fd5bed1f 100644 --- a/crates/protocol/src/lib.rs +++ b/crates/protocol/src/lib.rs @@ -32,14 +32,13 @@ pub enum Method { /// synthetic tool result before the new user message is appended). Pause, Shutdown, - GetHistory, /// Request a list of completion candidates from the Pod. /// /// Reply is sent on the same socket as `Event::Completions` (not - /// broadcast). Same shape as `GetHistory` / `Event::History`: - /// the IPC server handles this directly and writes the response - /// straight back to the requesting socket. Empty results for - /// resolvers that are not yet wired up (Knowledge / Workflow). + /// broadcast). The IPC server handles this directly and writes + /// the response straight back to the requesting socket. Empty + /// results for resolvers that are not yet wired up + /// (Knowledge / Workflow). ListCompletions { kind: CompletionKind, prefix: String, @@ -224,15 +223,6 @@ pub enum Event { Notify { message: String, }, - /// Persisted `role:system` history item that should be rendered by - /// clients through the same path used for `Event::History` replay. - /// - /// The payload is the serialized history item, not an ad-hoc display - /// DTO, so live subscribers and late subscribers have the same source - /// of truth: worker history / history.json. - SystemMessage { - item: serde_json::Value, - }, /// Echo of `Method::PodEvent` received by this Pod. Same rationale /// as `Notify`: subscribers render the event as a log element, /// while a rendered summary is independently injected into the LLM @@ -312,16 +302,34 @@ pub enum Event { code: ErrorCode, message: String, }, - History { - items: Vec, + /// Sent exactly once at the start of every client connection. + /// + /// `entries` is the session-log mirror at subscribe time, serialised + /// as the JSON form of `session_store::LogEntry`. Late attachers + /// reconstruct view state by replaying entries through their own + /// `LogEntry → block` mapping, then continue applying live + /// `Event::Entry` updates received after the snapshot. + /// + /// `greeting` and `status` accompany the snapshot so clients render + /// pod identity and current controller state without an extra round + /// trip. + Snapshot { + entries: Vec, greeting: Greeting, - /// Current Pod controller status at the moment the history snapshot - /// was taken. This lets late-attaching clients render and route - /// controls from the real controller state instead of inferring from - /// replayed history. #[serde(default)] status: PodStatus, }, + /// A single session-log entry committed atomically with the disk + /// write. Streamed as the suffix following the connect-time + /// `Snapshot`; the prefix/suffix boundary is gap-free and + /// duplicate-free per `SessionLogSink` semantics. + /// + /// Payload is the JSON form of `session_store::LogEntry`. Clients + /// deserialize as needed to render typed atoms (e.g. + /// `UserInput.segments`). + Entry { + entry: serde_json::Value, + }, /// Current Pod controller status. Broadcast on every controller-level /// transition and included in `History` snapshots for late attach. Status { @@ -418,8 +426,8 @@ pub struct CompletionEntry { /// Pod self-description rendered by the TUI when a session starts empty. /// /// Built once in the Pod controller from the resolved manifest and -/// transmitted alongside `Event::History` so clients don't need their -/// own view of the manifest. +/// transmitted alongside `Event::Snapshot` so clients don't need +/// their own view of the manifest. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Greeting { pub pod_name: String, @@ -674,13 +682,6 @@ mod tests { assert_eq!(serialized, json); } - #[test] - fn method_get_history() { - let json = r#"{"method":"get_history"}"#; - let method: Method = serde_json::from_str(json).unwrap(); - assert!(matches!(method, Method::GetHistory)); - } - #[test] fn method_list_completions_roundtrip() { let method = Method::ListCompletions { @@ -734,9 +735,9 @@ mod tests { } #[test] - fn event_history_format() { - let event = Event::History { - items: vec![serde_json::json!({"type": "message", "role": "user"})], + fn event_snapshot_format() { + let event = Event::Snapshot { + entries: vec![serde_json::json!({"kind": "user_input", "ts": 1, "segments": []})], greeting: Greeting { pod_name: "test".into(), cwd: "/tmp".into(), @@ -749,14 +750,30 @@ mod tests { }; let json = serde_json::to_string(&event).unwrap(); let parsed: serde_json::Value = serde_json::from_str(&json).unwrap(); - assert_eq!(parsed["event"], "history"); - assert!(parsed["data"]["items"].is_array()); - assert_eq!(parsed["data"]["items"][0]["role"], "user"); + assert_eq!(parsed["event"], "snapshot"); + assert!(parsed["data"]["entries"].is_array()); + assert_eq!(parsed["data"]["entries"][0]["kind"], "user_input"); assert_eq!(parsed["data"]["greeting"]["pod_name"], "test"); assert_eq!(parsed["data"]["greeting"]["tools"][0], "Read"); assert_eq!(parsed["data"]["status"], "paused"); } + #[test] + fn event_entry_roundtrip() { + let event = Event::Entry { + entry: serde_json::json!({"kind": "assistant_items", "ts": 42, "items": []}), + }; + let json = serde_json::to_string(&event).unwrap(); + let parsed: serde_json::Value = serde_json::from_str(&json).unwrap(); + assert_eq!(parsed["event"], "entry"); + assert_eq!(parsed["data"]["entry"]["kind"], "assistant_items"); + let decoded: Event = serde_json::from_str(&json).unwrap(); + match decoded { + Event::Entry { entry } => assert_eq!(entry["kind"], "assistant_items"), + other => panic!("expected Entry, got {other:?}"), + } + } + #[test] fn event_status_format() { let event = Event::Status { @@ -777,12 +794,12 @@ mod tests { } #[test] - fn event_history_legacy_without_status_defaults_to_idle() { - let json = r#"{"event":"history","data":{"items":[],"greeting":{"pod_name":"test","cwd":"/tmp","provider":"anthropic","model":"claude","scope_summary":"","tools":[]}}}"#; + fn event_snapshot_legacy_without_status_defaults_to_idle() { + let json = r#"{"event":"snapshot","data":{"entries":[],"greeting":{"pod_name":"test","cwd":"/tmp","provider":"anthropic","model":"claude","scope_summary":"","tools":[]}}}"#; let decoded: Event = serde_json::from_str(json).unwrap(); match decoded { - Event::History { status, .. } => assert_eq!(status, PodStatus::Idle), - other => panic!("expected History, got {other:?}"), + Event::Snapshot { status, .. } => assert_eq!(status, PodStatus::Idle), + other => panic!("expected Snapshot, got {other:?}"), } } diff --git a/crates/session-store/src/lib.rs b/crates/session-store/src/lib.rs index 25261277..697c0ae6 100644 --- a/crates/session-store/src/lib.rs +++ b/crates/session-store/src/lib.rs @@ -39,10 +39,10 @@ pub use llm_worker::UsageRecord; pub use llm_worker::llm_client::types::{ContentPart, Item, Role}; pub use logged_item::{LoggedContentPart, LoggedItem, LoggedRole, from_logged, to_logged}; pub use session::{ - SessionStartState, create_compacted_session, create_session, create_session_with_id, - ensure_head_or_fork, fork, fork_at, restore, save_config_changed, save_delta, save_extension, - save_pod_scope, save_run_completed, save_run_errored, save_turn_end, save_usage, - save_user_input, + SessionStartState, append_entry, append_entry_with_hash, create_compacted_session, + create_session, create_session_with_id, ensure_head_or_fork, fork, fork_at, restore, + save_config_changed, save_delta, save_extension, save_pod_scope, save_run_completed, + save_run_errored, save_turn_end, save_usage, save_user_input, }; pub use session_log::{ EntryHash, HashedEntry, LogEntry, POD_SCOPE_EXTENSION_DOMAIN, PodScopeSnapshot, RestoredState, diff --git a/crates/session-store/src/session.rs b/crates/session-store/src/session.rs index 18f422ae..8351b031 100644 --- a/crates/session-store/src/session.rs +++ b/crates/session-store/src/session.rs @@ -457,14 +457,31 @@ pub async fn fork_at( Ok(fork_id) } -// ── Private helper ────────────────────────────────────────────────────── - -async fn append_entry( +/// Append a single `LogEntry`, chaining the hash and updating `head_hash`. +/// +/// Lower-level dual of the `save_*` convenience wrappers in this module. +/// Use when the caller already builds the typed entry itself (e.g. when +/// it needs the same value for an in-memory mirror + broadcast). +pub async fn append_entry( store: &impl Store, session_id: SessionId, head_hash: &mut Option, entry: LogEntry, ) -> Result<(), StoreError> { + append_entry_with_hash(store, session_id, head_hash, entry).await?; + Ok(()) +} + +/// Same as [`append_entry`] but returns the freshly computed entry hash. +/// +/// Used by paths that need the hash for downstream broadcast or mirror +/// updates (e.g. the Pod's `SessionLogSink`). +pub async fn append_entry_with_hash( + store: &impl Store, + session_id: SessionId, + head_hash: &mut Option, + entry: LogEntry, +) -> Result { let hash = session_log::compute_hash(head_hash.as_ref(), &entry); let hashed_entry = HashedEntry { hash: hash.clone(), @@ -472,6 +489,6 @@ async fn append_entry( entry, }; store.append(session_id, &hashed_entry).await?; - *head_hash = Some(hash); - Ok(()) + *head_hash = Some(hash.clone()); + Ok(hash) } diff --git a/crates/tui/Cargo.toml b/crates/tui/Cargo.toml index f240581a..70c05b83 100644 --- a/crates/tui/Cargo.toml +++ b/crates/tui/Cargo.toml @@ -19,6 +19,7 @@ session-store = { workspace = true } pod-registry = { workspace = true } serde = { workspace = true, features = ["derive"] } pulldown-cmark = { version = "0.13.3", default-features = false } +llm-worker.workspace = true [dev-dependencies] tools = { workspace = true } diff --git a/crates/tui/src/app.rs b/crates/tui/src/app.rs index f84bc8b8..4d88f48b 100644 --- a/crates/tui/src/app.rs +++ b/crates/tui/src/app.rs @@ -491,9 +491,8 @@ impl App { self.blocks.push(Block::PodEvent { event }); self.assistant_streaming = false; } - Event::SystemMessage { item } => { - self.push_history_item(&item); - self.assistant_streaming = false; + Event::Entry { entry } => { + self.apply_log_entry(&entry); } Event::TurnStart { .. } => { self.set_pod_status(PodStatus::Running); @@ -726,12 +725,12 @@ impl App { message: alert.message, }); } - Event::History { - items, + Event::Snapshot { + entries, greeting, status, } => { - self.restore_history(&items, greeting); + self.restore_snapshot(&entries, greeting); self.set_pod_status(status); } Event::Status { status } => { @@ -905,10 +904,13 @@ impl App { self.input.move_down(); } - fn restore_history(&mut self, items: &[serde_json::Value], greeting: protocol::Greeting) { - // Fresh session: greeting + any replayed items. Append-only — we - // don't try to merge with already-displayed live events because - // `History` only fires on an empty live state. + /// Reset the block list and replay a connect-time `Event::Snapshot`. + /// + /// Walks the session-log entries in commit order, expanding each + /// LogEntry variant into the same blocks live events would have + /// produced. Followed by `Event::Entry` updates for anything + /// committed after the snapshot. + fn restore_snapshot(&mut self, entries: &[serde_json::Value], greeting: protocol::Greeting) { self.turn_index = 0; self.blocks.clear(); self.cache = FileCache::new(); @@ -917,14 +919,86 @@ impl App { self.blocks.push(Block::Greeting(greeting)); self.assistant_streaming = false; - for item in items { - self.push_history_item(item); + for entry in entries { + self.apply_log_entry_raw(entry); } - // Any tool_call entries that never got paired with a - // tool_result (truncated or racing mid-turn on the server side) - // stay as Executing up to this point. Surface them as - // Incomplete so the replay matches live semantics. + self.mark_orphan_tool_calls_incomplete_pass(); + } + + /// Apply a single live `Event::Entry`. + /// + /// `SessionStart` entries that arrive live (compaction / fork) + /// reset the block list to a freshly seeded view, matching what a + /// reconnect's `Event::Snapshot` would produce. + fn apply_log_entry(&mut self, entry: &serde_json::Value) { + if entry.get("kind").and_then(|k| k.as_str()) == Some("session_start") { + // Compaction / fork on the server side. Reset our derived + // view but keep the greeting (identity hasn't changed). + let greeting = self + .blocks + .iter() + .find_map(|b| match b { + Block::Greeting(g) => Some(g.clone()), + _ => None, + }); + self.turn_index = 0; + self.blocks.clear(); + self.cache = FileCache::new(); + self.task_store = TaskStore::new(); + self.task_pane_scroll = 0; + if let Some(g) = greeting { + self.blocks.push(Block::Greeting(g)); + } + } + self.apply_log_entry_raw(entry); + self.assistant_streaming = false; + } + + /// Walk a single `LogEntry` JSON value and translate it into blocks + /// the live event path would have produced. Shared between + /// `restore_snapshot` (replay path) and `apply_log_entry` (live + /// path). + fn apply_log_entry_raw(&mut self, value: &serde_json::Value) { + let Ok(entry) = serde_json::from_value::(value.clone()) else { + return; + }; + match entry { + session_store::LogEntry::SessionStart { history, .. } => { + for logged in history { + let item: llm_worker::Item = logged.into(); + let item_value = serde_json::to_value(&item).expect("Item is Serialize"); + self.push_history_item(&item_value); + } + } + session_store::LogEntry::UserInput { segments, .. } => { + self.turn_index += 1; + self.blocks.push(Block::TurnHeader { + turn: self.turn_index, + }); + if !segments.is_empty() { + self.blocks.push(Block::UserMessage { segments }); + } + } + session_store::LogEntry::AssistantItems { items, .. } + | session_store::LogEntry::ToolResults { items, .. } + | session_store::LogEntry::HookInjectedItems { items, .. } => { + for logged in items { + let item: llm_worker::Item = logged.into(); + let item_value = serde_json::to_value(&item).expect("Item is Serialize"); + self.push_history_item(&item_value); + } + } + // Non-history-bearing variants don't affect the block view. + _ => {} + } + } + + /// Sweep all current tool-call blocks: any that never resolved into + /// a Done / Error state get marked Incomplete. Called after a + /// snapshot replay so dangling in-flight tool calls in the seed + /// log match live semantics. + fn mark_orphan_tool_calls_incomplete_pass(&mut self) { for b in self.blocks.iter_mut() { if let Block::ToolCall(tc) = b && matches!( @@ -1325,18 +1399,22 @@ mod completion_flow_tests { } #[test] - fn history_restore_renders_system_message_block() { + fn snapshot_renders_system_message_block_from_session_start() { let mut app = App::new("test".into()); - app.handle_pod_event(Event::History { + let session_start = session_store::LogEntry::SessionStart { + ts: 1, + system_prompt: None, + config: Default::default(), + history: vec![session_store::LoggedItem::from( + &llm_worker::Item::system_message("[File: src/main.rs]\nfn main() {}"), + )], + forked_from: None, + compacted_from: None, + }; + let session_start_value = serde_json::to_value(&session_start).unwrap(); + app.handle_pod_event(Event::Snapshot { greeting: test_greeting(), - items: vec![serde_json::json!({ - "type": "message", - "role": "system", - "content": [{ - "type": "text", - "text": "[File: src/main.rs]\nfn main() {}", - }], - })], + entries: vec![session_start_value], status: PodStatus::Running, }); @@ -1349,18 +1427,18 @@ mod completion_flow_tests { } #[test] - fn live_system_message_event_uses_history_item_path() { + fn live_entry_routes_system_message_via_hook_injected_items() { let mut app = App::new("test".into()); - app.handle_pod_event(Event::SystemMessage { - item: serde_json::json!({ - "type": "message", + let entry = serde_json::json!({ + "kind": "hook_injected_items", + "ts": 1, + "items": [{ + "kind": "message", "role": "system", - "content": [{ - "type": "text", - "text": "[Workflow /build]\nRun the build", - }], - }), + "content": [{ "kind": "text", "text": "[Workflow /build]\nRun the build" }], + }], }); + app.handle_pod_event(Event::Entry { entry }); assert!(matches!( app.blocks.as_slice(), @@ -1504,11 +1582,15 @@ mod completion_flow_tests { ```json\n{\n \"tasks\": [\n {\n \"taskid\": 4,\n \ \"status\": \"inprogress\",\n \"subject\": \"from snapshot\",\n \ \"description\": \"d\"\n }\n ]\n}\n```\n"; - app.handle_pod_event(Event::SystemMessage { - item: serde_json::json!({ - "type": "message", - "role": "system", - "content": [{ "type": "text", "text": snapshot }], + app.handle_pod_event(Event::Entry { + entry: serde_json::json!({ + "kind": "hook_injected_items", + "ts": 1, + "items": [{ + "kind": "message", + "role": "system", + "content": [{ "kind": "text", "text": snapshot }], + }], }), }); @@ -1519,10 +1601,10 @@ mod completion_flow_tests { } #[test] - fn history_replay_reconstructs_task_store() { + fn snapshot_reconstructs_task_store() { let mut app = App::new("test".into()); - // Live tool call before history lands — restore_history must - // wipe this so it doesn't double-count after replay. + // Live tool call before the snapshot lands — restore must wipe + // this so it doesn't double-count after replay. app.handle_pod_event(Event::ToolCallStart { id: "live".into(), name: "TaskCreate".into(), @@ -1533,28 +1615,33 @@ mod completion_flow_tests { arguments: r#"{"subject":"live","description":""}"#.into(), }); - app.handle_pod_event(Event::History { - greeting: test_greeting(), - items: vec![ - serde_json::json!({ - "type": "tool_call", + let assistant_items_entry = serde_json::json!({ + "kind": "assistant_items", + "ts": 1, + "items": [ + { + "kind": "tool_call", "call_id": "c1", "name": "TaskCreate", "arguments": r#"{"subject":"a","description":"A"}"#, - }), - serde_json::json!({ - "type": "tool_call", + }, + { + "kind": "tool_call", "call_id": "c2", "name": "TaskCreate", "arguments": r#"{"subject":"b","description":"B"}"#, - }), - serde_json::json!({ - "type": "tool_call", + }, + { + "kind": "tool_call", "call_id": "u1", "name": "TaskUpdate", "arguments": r#"{"taskid":2,"status":"inprogress"}"#, - }), + }, ], + }); + app.handle_pod_event(Event::Snapshot { + greeting: test_greeting(), + entries: vec![assistant_items_entry], status: PodStatus::Running, }); diff --git a/crates/tui/src/main.rs b/crates/tui/src/main.rs index f24425f8..95e96f2a 100644 --- a/crates/tui/src/main.rs +++ b/crates/tui/src/main.rs @@ -256,9 +256,10 @@ async fn run( let mut app = App::new(pod_name); match PodClient::connect(socket_path).await { - Ok(mut client) => { + Ok(client) => { app.connected = true; - let _ = client.send(&Method::GetHistory).await; + // The Pod sends `Event::Snapshot` automatically on connect; + // no explicit method call is required to fetch history. run_loop(terminal, &mut app, client).await?; } Err(e) => { diff --git a/tickets/pod-state-from-session-log.md b/tickets/pod-state-from-session-log.md deleted file mode 100644 index 2244bea8..00000000 --- a/tickets/pod-state-from-session-log.md +++ /dev/null @@ -1,55 +0,0 @@ -# Pod: 状態と socket 配信を session log 正本に統合する - -## 背景 - -Pod の状態は現在 3 つの形で同居している: - -1. **session log** (`crates/session-store/src/session_log.rs:LogEntry`) — append-only の typed 正本。 `UserInput { segments }` / `AssistantItems` / `ToolResults` / `HookInjectedItems` / `TurnEnd` / `SessionStart` で会話全体を表現できる。 -2. **worker.history** (`Vec`) — LLM に投げるために flatten / 加工された派生 view。 user_message は `Vec` を flatten した String になっている。 -3. **PodSharedState** の `history` + `user_segments` — worker.history を ipc 層に渡すための中継ミラー。 typed segments は parallel 配列で別途保持。 - -`Method::GetHistory` (`crates/pod/src/ipc/server.rs:132-182`) は (3) の中継から組み立てており、 平坦化された user_message に segments を後付けする overlay + skip-align ロジックが必要になっている。 broadcast (`event_tx`) はライブイベントだけを流し、 接続時 snapshot は別経路 + 別 Event 型 (`Event::History`) で返るため、 再アタッチ時に snapshot ↔ live が連続しない (`tickets/pod-socket-state-view.md` の問題)。 - -派生方向が逆転している: 正本は session log なのに (3) は (2) を経由した二次派生になっており、 (1) が既に持ってる typed 情報を flatten/復元で往復する歪んだ構造を生んでいる。 また `Method::GetHistory` が RPC 形を取っていることで、 同じ socket writer に「broadcast forwarder」 と「query handler」 の 2 経路が同居している。 - -## 方針 - -- session log を Pod 状態の単一正本として位置付け、 worker.history は LLM context 投影用の内部 view に格下げる。 ipc 経路には worker.history が現れない。 -- 接続クライアントへの配信を 「session log の prefix (snapshot) + suffix (live)」 という同型ストリームに統合する。 query/reply 型の `Method::GetHistory` を廃止し、 接続自体が暗黙の subscribe-with-replay として動作する。 -- ストリーミング系イベント (`TextDelta` / `ToolCallStart` / `ToolCallArgsDelta` 等) は progressive 描画用の best-effort hint に役割を限定する。 late attach で過去 delta が失われるのは仕様。 確定情報は session log entry の broadcast で別途到達する。 -- entry commit の hook 点は worker 側の確定 callback に置く。 現状 `Pod::run()` 末尾で `persist_turn` が `history_before..` を一括 flush しているが (`crates/pod/src/pod.rs:1491-1502`)、 これを 「worker が assistant block / tool call / tool result を確定した瞬間に append_entry を呼ぶ」 形へ移す。 `wire_event_bridges_on_worker` で worker → event_tx を bridge しているのと同じ箇所に append_entry hook を追加する想定で、 worker 内部構造への介入は確定 callback 受け口の追加に限定する。 -- atomicity の中身は「disk write が成功した entry のみ broadcast に乗る」 順序保証。 alerter は memory-only なので buffer lock + `broadcast::send` で完結するが、 session log は disk I/O が混じるため対称ではない。 `append_entry` は (1) disk write → (2) in-memory mirror 更新 → (3) `Event::Entry` broadcast の順で、 (1) 失敗時は (2)(3) を行わず error を上に返す。 (2)(3) は同一の subscribe lock 下で行い、 `subscribe_with_snapshot` が見る mirror と receiver 側のイベント列に重複・欠落・順序逆転が出ないようにする。 -- `Event::SystemMessage` 廃止に伴い、 system_message を LogEntry に焼く責務は controller 側の `Event::SystemMessage` 送信点 (`crates/pod/src/controller.rs:372`) を `LogEntry::HookInjectedItems` の append_entry 呼び出しに置き換える形で取る。 「context に乗せる前に history に commit する」 という CLAUDE.md の加工原則に揃う。 notify 系の history 焼き込みは `tickets/notify-history-persist.md` が別途扱う領域で、 本チケットは system_message 経路の置換のみを範囲とする。 - -## 要件 - -- session log entry の commit は単一経路 (`Pod::append_entry` 相当) を通り、 「永続書き込み + in-memory mirror 更新 + `Event::Entry(LogEntry)` broadcast」 を atomic に行う。 atomicity は alerter と同じパターンの `subscribe_with_snapshot` 用ロックで保証される。 -- entry commit は **per-item / per-block 粒度** で行う。 現在の turn 末尾一括の `persist_turn` / `save_delta` を分解し、 mid-turn 接続で進行中の tool call / 確定済み assistant block / user input すべてが snapshot から見える状態にする。 -- 接続クライアントは接続時に `Event::Snapshot { entries: Vec, greeting, status }` を受信し、 続けて live `Event::Entry(LogEntry)` を時系列で受信する。 prefix と suffix の境目に重複・欠落が無い。 -- typed user input (`Vec`) は flatten/復元の往復なく client に届く。 `PodSharedState.user_segments` と GetHistory の overlay+skip-align ロジックを廃止する。 -- ストリーミング hint は変更なし継続。 ただし「確定情報は entry にあり、 hint は描画進捗のみ」 という分担を protocol 上のドキュメントで明記する。 -- TUI は `Event::Snapshot` / `Event::Entry` 駆動で view を組み立てる。 既存ブロック描画と等価な LogEntry → Block mapping を実装する。 -- inter-pod query (`crates/pod/src/spawn/comm_tools.rs` の GetHistory 経路) は新 snapshot 形式に追従する。 - -## 完了条件 - -- session log entry 1 件の commit が、 永続書き込みと `Event::Entry` broadcast を atomic に同期させる経路で行われる。 mid-turn の任意の瞬間で、 session log と Event::Entry の到達順が常に整合する。 -- 接続時に `Event::Snapshot` が必ず流れ、 直後から live `Event::Entry` が同型で連続する。 mid-turn 再アタッチで進行中の user input / 確定済み assistant 出力 / 進行中の tool call / 確定済み tool result が view に再現される。 -- `Method::GetHistory` / `Event::History` / `Event::SystemMessage` が protocol から削除されている。 後者 2 つは `Event::Entry` (`HookInjectedItems` バリアント等) で代替される。 -- `PodSharedState.history` / `PodSharedState.user_segments` が削除されている。 `PodSharedState` は status / greeting / fs_view / workflow / knowledge の lookup ハブとして残る。 -- `crates/pod/src/runtime/dir.rs` の `history.json` write は廃止または用途縮小される (session log が正本)。 -- 既存テスト (`crates/pod/tests/controller_test.rs`、 `crates/session-store/tests/`、 TUI 関連) が通る。 ターン中再アタッチで in-flight turn の user_input が view に含まれることを示すテストが新規追加されている。 - -## 範囲外 - -- `LogEntry` スキーマの変更 (バリアントは現状維持)。 -- compaction / fork 動作の変更 (既存の `SessionStart.{compacted_from, forked_from}` がそのまま使われる)。 -- TUI rendering の機能拡張。 LogEntry → 既存 Block の mapping は等価再構成に留め、 装飾追加は別チケット。 -- `PodSharedState` の完全廃止と Pod 借用構造の分解。 controller が `&mut Pod` を握る構造は変えない。 -- broadcast cap (256) の最適化、 ストリーミング hint の replay buffer 化。 -- `Method::ListCompletions` の subscribe 化 (これは真の query なので RPC 形のまま残す)。 - -## 関連 - -- `tickets/pod-persistent-state.md` の「session log は引き続き会話状態の唯一の復元ソース」方針と整合する。 Pod identity 永続化は引き続き別チケット領域。 -- `tickets/notify-history-persist.md` の「context に乗せる前に history に commit」 原則と同根。 本チケットは system_message 経路の置換まで、 notify 経路は当該チケットで扱う。