refactor: Podのメインループのリファクタリング

This commit is contained in:
Keisuke Hirata 2026-05-14 03:27:49 +09:00
parent e57e23b999
commit 1ef094f039
22 changed files with 1463 additions and 612 deletions

1
Cargo.lock generated
View File

@ -3645,6 +3645,7 @@ version = "0.1.0"
dependencies = [
"client",
"crossterm 0.28.1",
"llm-worker",
"manifest",
"pod-registry",
"protocol",

View File

@ -8,7 +8,6 @@
- Pod: 任意ターンからの Fork複数ターン巻き戻しを汎用化 → [tickets/pod-session-fork.md](tickets/pod-session-fork.md)
- Pod: 子→親の TurnEnded/Errored callback を親由来ターンのみに絞る → [tickets/pod-parent-turn-callback.md](tickets/pod-parent-turn-callback.md)
- Pod: セッションログをバックエンドにした Pod 単位の永続化 → [tickets/pod-persistent-state.md](tickets/pod-persistent-state.md)
- Pod: 状態と socket 配信を session log 正本に統合 → [tickets/pod-state-from-session-log.md](tickets/pod-state-from-session-log.md)
- 永続化層のセマンティック整理 → [tickets/persistence-semantics.md](tickets/persistence-semantics.md)
- Exchange / Turn / Call セマンティクス整理 → [tickets/exchange-turn-call-semantics.md](tickets/exchange-turn-call-semantics.md)
- llm-worker のエラー耐性

View File

@ -1013,13 +1013,16 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
}
self.turn_count += 1;
// Collect and commit assistant items
// Collect and commit assistant items. Routed through
// `extend_history_with_callbacks` so observers (e.g. the
// Pod-side per-item session-log committer) see each item
// as it lands.
let reasoning_items = self.reasoning_item_collector.take_collected();
let text_blocks = self.text_block_collector.take_collected();
let tool_calls = self.tool_call_collector.take_collected();
let assistant_items =
self.build_assistant_items(&reasoning_items, &text_blocks, &tool_calls);
self.history.extend(assistant_items);
self.extend_history_with_callbacks(assistant_items);
if tool_calls.is_empty() {
match self.interceptor.on_turn_end(&self.history).await {
@ -1134,14 +1137,18 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
Ok(Some(WorkerResult::Paused))
}
Ok(ToolExecutionResult::Completed(results)) => {
for result in results {
self.history.push(Item::tool_result_item(
// Route per-result pushes through the callback path so
// observers (e.g. the Pod-side per-item session-log
// committer) see each tool result as it lands.
let items = results.into_iter().map(|result| {
Item::tool_result_item(
&result.tool_use_id,
&result.summary,
result.content,
result.is_error,
));
}
)
});
self.extend_history_with_callbacks(items);
Ok(None)
}
Err(err) => {

View File

@ -104,10 +104,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
"\n[shared_state] final: {}",
handle.shared_state.status_json()
);
println!(
"[history] {} bytes",
handle.shared_state.history_json().len()
);
println!("[session log] {} entries", handle.sink.len());
drop(handle);
let _ = listener.await;

View File

@ -3,15 +3,19 @@ use std::sync::Arc;
use llm_worker::WorkerError;
use llm_worker::llm_client::client::LlmClient;
use llm_worker::llm_client::types::{Item, Role};
use session_store::Store;
use tokio::sync::{broadcast, mpsc, oneshot};
use llm_worker::Item;
use session_store::LogEntry;
use session_store::session_log;
use crate::ipc::alerter::Alerter;
use crate::ipc::notify_buffer::NotifyBuffer;
use crate::ipc::server::SocketServer;
use crate::pod::{Pod, PodError, PodRunResult};
use crate::pod::{LogCommand, LogDrainHandle, Pod, PodError, PodRunResult};
use crate::runtime::dir::RuntimeDir;
use crate::session_log_sink::SessionLogSink;
use crate::shared_state::PodSharedState;
use crate::spawn::comm_tools::{
list_pods_tool, read_pod_output_tool, send_to_pod_tool, stop_pod_tool,
@ -22,16 +26,6 @@ use protocol::{
AlertLevel, AlertSource, ErrorCode, Event, Method, PodStatus, RunResult, Segment, TurnResult,
};
fn is_system_message_item(item: &Item) -> bool {
matches!(
item,
Item::Message {
role: Role::System,
..
}
)
}
// ---------------------------------------------------------------------------
// PodHandle — client-facing, Clone-able
// ---------------------------------------------------------------------------
@ -43,6 +37,10 @@ pub struct PodHandle {
pub shared_state: Arc<PodSharedState>,
pub runtime_dir: Arc<RuntimeDir>,
pub alerter: Alerter,
/// Session-log mirror + broadcast handle. The IPC server snapshots
/// it on every new connection (Event::Snapshot) and forwards
/// subsequent commits (Event::Entry) on the receiver.
pub sink: SessionLogSink,
}
impl PodHandle {
@ -86,11 +84,11 @@ async fn finish_controller_run<C, St>(
C: LlmClient + Clone + 'static,
St: Store + Clone + 'static,
{
let items = pod.worker().history().to_vec();
shared_state.update_history(items);
shared_state.set_user_segments(pod.user_segments().to_vec());
// history / user_segments are no longer mirrored on PodSharedState —
// clients reconstruct them from `Event::Snapshot` + live
// `Event::Entry` deliveries driven by the session-log sink. We
// only flip the status and kick post-run memory jobs here.
set_controller_status(shared_state, runtime_dir, event_tx, new_status).await;
let _ = runtime_dir.write_history(shared_state).await;
pod.spawn_post_run_memory_jobs();
}
@ -167,8 +165,23 @@ impl PodController {
}])
.map_err(std::io::Error::other)?;
// === 1.5. Per-item history-commit drain task ===
//
// Worker callbacks fire `on_history_append` for each assistant
// item / tool result / hook-injected item that lands in
// history. The drain task picks them up off an unbounded mpsc
// and commits each as a typed `LogEntry` through the sink,
// serialised against the same `session_head` lock the Pod uses
// for its own commits. This gives mid-turn snapshot visibility:
// a late-attaching client sees in-flight tool calls + completed
// assistant blocks without waiting for the turn-end persist.
let (log_cmd_tx, log_cmd_rx) = mpsc::unbounded_channel::<LogCommand>();
let drain_ctx = pod.log_drain_handle();
let _drain_task = tokio::spawn(run_log_drain(log_cmd_rx, drain_ctx));
pod.attach_log_cmd_tx(log_cmd_tx.clone());
// === 2. Worker event bridge wiring ===
wire_event_bridges_on_worker(&mut pod, &event_tx, &alerter);
wire_event_bridges_on_worker(&mut pod, &event_tx, &alerter, log_cmd_tx);
// === 3. Tool registration (builtin / memory / spawn-orchestration) ===
let fs_for_view = register_pod_tools(
@ -193,8 +206,6 @@ impl PodController {
manifest_toml.clone(),
greeting,
));
shared_state.update_history(pod.worker().history().to_vec());
shared_state.set_user_segments(pod.user_segments().to_vec());
shared_state.set_fs_view(crate::fs_view::PodFsView::new(fs_for_view));
shared_state.set_workflows(
pod.workflow_completions()
@ -210,7 +221,6 @@ impl PodController {
);
runtime_dir.write_manifest(&manifest_toml).await?;
runtime_dir.write_status(&shared_state).await?;
runtime_dir.write_history(&shared_state).await?;
let handle = PodHandle {
method_tx,
@ -218,6 +228,7 @@ impl PodController {
shared_state: shared_state.clone(),
runtime_dir: runtime_dir.clone(),
alerter: alerter.clone(),
sink: pod.sink(),
};
let socket_server = SocketServer::start(&handle).await?;
@ -251,16 +262,30 @@ impl PodController {
/// Wire the per-event broadcast bridges on the Pod's Worker. Each callback
/// re-publishes a worker-level signal as a `protocol::Event` on `event_tx`
/// so subscribers (TUI, socket clients) get a single typed stream.
///
/// Also wires `on_history_append` into the per-item drain channel so
/// every history append observed by the worker becomes a typed
/// `LogEntry` commit (via the drain task).
fn wire_event_bridges_on_worker<C, St>(
pod: &mut Pod<C, St>,
event_tx: &broadcast::Sender<Event>,
alerter: &Alerter,
log_cmd_tx: mpsc::UnboundedSender<LogCommand>,
) where
C: LlmClient + Clone + 'static,
St: Store + Clone + 'static,
{
let worker = pod.worker_mut();
// Per-history-append → drain channel. Sends are infallible-by-design
// here (UnboundedSender never blocks); a closed receiver just means
// the controller is shutting down, in which case dropping the item
// is acceptable.
let drain_tx = log_cmd_tx.clone();
worker.on_history_append(move |item| {
let _ = drain_tx.send(LogCommand::Item(item.clone()));
});
let tx = event_tx.clone();
worker.on_turn_start(move |turn| {
let _ = tx.send(Event::TurnStart { turn });
@ -365,13 +390,80 @@ fn wire_event_bridges_on_worker<C, St>(
alerter_for_worker.alert(AlertLevel::Warn, AlertSource::Worker, message.to_owned());
});
let tx = event_tx.clone();
worker.on_history_append(move |item| {
if is_system_message_item(item) {
let value = serde_json::to_value(item).expect("Item is Serialize");
let _ = tx.send(Event::SystemMessage { item: value });
// History-append broadcasts (previously `Event::SystemMessage`)
// have been removed: every persistent history item is now committed
// through the session-log sink as a typed `LogEntry`, and clients
// see it via `Event::Snapshot` + live `Event::Entry`. The
// per-item commit channel is wired at the top of this function.
}
/// Drain task: consumes `LogCommand::Item` and `LogCommand::Flush`
/// off the channel and commits each item as a typed `LogEntry` through
/// the supplied store + sink. Lives as long as the controller; exits
/// when the sender is dropped (controller shutdown).
async fn run_log_drain<St>(
mut rx: mpsc::UnboundedReceiver<LogCommand>,
ctx: LogDrainHandle<St>,
) where
St: session_store::Store + Clone + Send + 'static,
{
while let Some(cmd) = rx.recv().await {
match cmd {
LogCommand::Item(item) => {
let Some(entry) = classify_history_item(item) else {
continue;
};
let mut head = ctx.session_head.lock().await;
match session_store::append_entry_with_hash(
&ctx.store,
head.session_id,
&mut head.head_hash,
entry.clone(),
)
.await
{
Ok(_) => {
// Publish under the same critical section view
// a `subscribe_with_snapshot` would observe.
ctx.sink.publish(entry);
}
Err(e) => {
tracing::warn!(error = %e, "drain: append_entry failed; entry dropped");
}
}
}
LogCommand::Flush(ack) => {
let _ = ack.send(());
}
}
});
}
}
/// Map a single worker-history `Item` to its corresponding `LogEntry`
/// classification. `None` is the skip signal for `user_message` items —
/// those are committed via `LogEntry::UserInput` by `Pod::run` at
/// submit time and would otherwise produce a duplicate entry here.
fn classify_history_item(item: Item) -> Option<LogEntry> {
let ts = session_log::now_millis();
if item.is_user_message() {
return None;
}
if item.is_tool_result() {
return Some(LogEntry::ToolResults {
ts,
items: vec![session_store::LoggedItem::from(&item)],
});
}
if item.is_assistant_message() || item.is_tool_call() || item.is_reasoning() {
return Some(LogEntry::AssistantItems {
ts,
items: vec![session_store::LoggedItem::from(&item)],
});
}
Some(LogEntry::HookInjectedItems {
ts,
items: vec![session_store::LoggedItem::from(&item)],
})
}
/// Register the builtin file-manipulation tools, optional memory tools,
@ -656,10 +748,9 @@ async fn controller_loop<C, St>(
break;
}
// GetHistory / ListCompletions are handled at the socket
// layer (direct response). If they reach the controller,
// ignore them.
Method::GetHistory | Method::ListCompletions { .. } => {}
// ListCompletions is handled at the socket layer (direct
// response). If it reaches the controller, ignore it.
Method::ListCompletions { .. } => {}
Method::PodEvent(event) => {
// Echo the received event to all subscribers so every
@ -820,7 +911,7 @@ where
// drain it at its next pre_llm_request.
notify_buffer.push(message);
}
Some(Method::GetHistory | Method::ListCompletions { .. }) => {}
Some(Method::ListCompletions { .. }) => {}
Some(Method::PodEvent(event)) => {
let _ = event_tx.send(Event::PodEvent(event.clone()));
// mpsc is consume-once, so we cannot defer this

View File

@ -62,6 +62,13 @@ async fn handle_connection(stream: tokio::net::UnixStream, handle: PodHandle) {
let mut reader = JsonLineReader::new(reader);
let mut writer = JsonLineWriter::new(writer);
// Atomically subscribe to the session-log mirror first. The
// returned (snapshot, rx) pair partitions the entry timeline:
// entries committed before this call appear in `entries`, every
// entry after lands on `entry_rx`. Doing this before the alert
// snapshot keeps both ordering pairs internally consistent.
let (entries_snapshot, mut entry_rx) = handle.sink.subscribe_with_snapshot();
// Atomically subscribe and snapshot buffered alerts so that
// warnings emitted before this client connected are replayed
// exactly once — they appear in the snapshot, and any alert
@ -73,8 +80,41 @@ async fn handle_connection(stream: tokio::net::UnixStream, handle: PodHandle) {
}
}
// Send the typed snapshot up front so late attachers can
// reconstruct view state without an extra round trip.
let snapshot_event = Event::Snapshot {
entries: entries_snapshot
.into_iter()
.map(|e| serde_json::to_value(&e).expect("LogEntry is Serialize"))
.collect(),
greeting: handle.shared_state.greeting.clone(),
status: handle.shared_state.get_status(),
};
if writer.write(&snapshot_event).await.is_err() {
return;
}
loop {
tokio::select! {
// Live session-log entries → this client as Event::Entry.
entry = entry_rx.recv() => {
match entry {
Ok(entry) => {
let value = serde_json::to_value(&entry)
.expect("LogEntry is Serialize");
if writer.write(&Event::Entry { entry: value }).await.is_err() {
break;
}
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
// Slow client fell behind the broadcast buffer.
// Drop the connection so the next reconnect
// re-seeds the prefix via subscribe_with_snapshot.
break;
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
}
}
// Broadcast events → this client
event = rx.recv() => {
match event {
@ -129,57 +169,6 @@ async fn handle_connection(stream: tokio::net::UnixStream, handle: PodHandle) {
break;
}
}
Ok(Some(Method::GetHistory)) => {
let items = handle.shared_state.history();
let segments_per_user = handle.shared_state.user_segments();
// Embed `segments` on user-message JSON values so
// the TUI can re-render typed atoms on restore.
// Alignment: segments are recorded only for
// submissions made during the live session, never
// for seed history loaded via `SessionStart.history`
// (post-compaction). The seed user_messages always
// come first in worker history, so the last
// `segments_per_user.len()` user_messages are the
// ones that map 1:1 to the segments list.
let total_user_msgs =
items.iter().filter(|i| i.is_user_message()).count();
let skip = total_user_msgs.saturating_sub(segments_per_user.len());
let mut user_idx = 0usize;
let values = items
.iter()
.map(|item| {
let mut value =
serde_json::to_value(item).expect("Item is Serialize");
if item.is_user_message() {
if user_idx >= skip {
let seg_idx = user_idx - skip;
if let Some(obj) = value.as_object_mut() {
let segs = serde_json::to_value(
&segments_per_user[seg_idx],
)
.expect("Segment is Serialize");
obj.insert("segments".into(), segs);
}
}
user_idx += 1;
}
value
})
.collect();
let greeting = handle.shared_state.greeting.clone();
let status = handle.shared_state.get_status();
if writer
.write(&Event::History {
items: values,
greeting,
status,
})
.await
.is_err()
{
break;
}
}
Ok(Some(method)) => {
let _ = handle.send(method).await;
}

View File

@ -5,6 +5,7 @@ pub mod hook;
pub mod ipc;
pub mod prompt;
pub mod runtime;
pub mod session_log_sink;
pub mod shared_state;
pub mod spawn;
pub mod workflow;
@ -30,4 +31,5 @@ pub use prompt::system::{SystemPromptContext, SystemPromptError, SystemPromptTem
pub use protocol::{ErrorCode, Event, Method, PodStatus, TurnResult};
pub use provider::{ProviderError, build_client};
pub use runtime::dir::RuntimeDir;
pub use session_log_sink::{SessionLogSink, SessionLogWriter};
pub use shared_state::PodSharedState;

View File

@ -7,10 +7,40 @@ use llm_worker::Item;
use llm_worker::llm_client::RequestConfig;
use llm_worker::llm_client::client::LlmClient;
use llm_worker::state::Mutable;
use llm_worker::{Role, ToolOutputLimits, UsageRecord, Worker, WorkerError, WorkerResult};
use session_store::{EntryHash, PodScopeSnapshot, SessionId, SessionStartState, Store, StoreError};
use llm_worker::{ToolOutputLimits, UsageRecord, Worker, WorkerError, WorkerResult};
use session_store::{
EntryHash, HashedEntry, LogEntry, PodScopeSnapshot, SessionId, Store, StoreError, session_log,
to_logged,
};
use tracing::{info, warn};
use crate::session_log_sink::SessionLogSink;
/// Command sent to the per-Pod history-drain task.
///
/// `Item` carries one worker-history append observed via
/// `Worker::on_history_append`; the drain classifies it into a
/// `LogEntry::AssistantItems` / `LogEntry::ToolResults` /
/// `LogEntry::HookInjectedItems` and commits it through the sink.
/// `Flush(ack)` is the barrier used by `persist_turn` to ensure every
/// in-flight item is committed before the trailing `TurnEnd` entry
/// lands.
#[derive(Debug)]
pub enum LogCommand {
Item(Item),
Flush(tokio::sync::oneshot::Sender<()>),
}
/// State shared between Pod and the controller-spawned history-drain
/// task: store + session-head lock + broadcast sink. All three are
/// `Clone`able (the latter two as `Arc` clones, the store per its
/// `Clone` impl) so handing a copy to the drain task is cheap.
pub struct LogDrainHandle<St> {
pub store: St,
pub session_head: Arc<AsyncMutex<SessionHead>>,
pub sink: SessionLogSink,
}
use manifest::{
Permission, PodManifest, PodManifestConfig, ResolveError, Scope, ScopeConfig, ScopeError,
ScopeRule, SharedScope, WorkerManifest,
@ -38,9 +68,9 @@ use protocol::{AlertLevel, AlertSource, Event, Segment};
use tokio::sync::broadcast;
use tokio::task::JoinHandle;
struct SessionHead {
session_id: SessionId,
head_hash: Option<EntryHash>,
pub struct SessionHead {
pub session_id: SessionId,
pub head_hash: Option<EntryHash>,
}
/// Pre-LLM-request hook that records `history.len()` at send time into a
@ -190,9 +220,22 @@ pub struct Pod<C: LlmClient, St: Store> {
/// the K-th `Item::user_message` in `worker.history()` (modulo seed
/// history loaded via `SessionStart.history`, whose original segments
/// are not preserved). Populated from log on `restore_from_manifest`,
/// appended after `save_user_input` on each `run`. Mirrored to
/// `PodSharedState` by the controller for `Event::History` use.
/// appended after `save_user_input` on each `run`. Pre-`Event::Snapshot`
/// this fed `PodSharedState.user_segments`; the new wire format
/// carries typed atoms via `LogEntry::UserInput { segments }` so
/// this remains purely an in-memory tracker for compact alignment.
user_segments: Vec<Vec<Segment>>,
/// Pod-side session-log mirror + broadcast sink. Populated alongside
/// every successful `session_store::append_entry` write so connected
/// clients see a `(snapshot, live)` stream consistent with what's
/// on disk.
sink: SessionLogSink,
/// Sender into the controller-spawned history-drain task.
/// `None` when no controller has wired one (tests, low-level Pod
/// usage). The drain task is the source of mid-turn `AssistantItems`
/// / `ToolResults` / `HookInjectedItems` commits, fed by the
/// `Worker::on_history_append` callback.
log_cmd_tx: Option<tokio::sync::mpsc::UnboundedSender<LogCommand>>,
}
impl<C: LlmClient + 'static, St: Store + 'static> Pod<C, St> {
@ -249,6 +292,22 @@ impl<C: LlmClient + Clone + 'static, St: Store + Clone + 'static> Pod<C, St> {
extract_pointer: self.extract_pointer.clone(),
memory_task: None,
user_segments: self.user_segments.clone(),
// The memory-task clone never appends to the session log
// (it only reads `worker.history()`), so a fresh sink is
// fine — nothing observes its broadcast.
sink: SessionLogSink::new(),
log_cmd_tx: None,
}
}
/// Build a `LogDrainHandle` carrying everything the controller's
/// drain task needs: store handle, the shared session-head lock,
/// and the broadcast sink. All three are cheap clones.
pub fn log_drain_handle(&self) -> LogDrainHandle<St> {
LogDrainHandle {
store: self.store.clone(),
session_head: self.session_head.clone(),
sink: self.sink.clone(),
}
}
@ -332,6 +391,8 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
extract_pointer: Arc::new(Mutex::new(None)),
memory_task: None,
user_segments: Vec::new(),
sink: SessionLogSink::new(),
log_cmd_tx: None,
};
pod.apply_permissions_from_manifest();
pod.apply_prune_from_manifest();
@ -426,8 +487,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
/// can restore the narrowed scope instead of reclaiming delegated
/// writes.
pub async fn persist_scope_snapshot(&mut self) -> Result<(), StoreError> {
let mut head = self.session_head.lock().await;
if head.head_hash.is_none() {
if self.session_head.lock().await.head_hash.is_none() {
return Ok(());
}
let snapshot = {
@ -437,8 +497,50 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
deny: scope.deny_rules(),
}
};
session_store::save_pod_scope(&self.store, head.session_id, &mut head.head_hash, &snapshot)
.await
let payload = serde_json::to_value(&snapshot).expect("PodScopeSnapshot is Serialize");
self.commit_entry(LogEntry::Extension {
ts: session_log::now_millis(),
domain: session_store::POD_SCOPE_EXTENSION_DOMAIN.into(),
payload,
})
.await
.map(|_| ())
}
/// Append `entry` to the session log AND publish it through the
/// broadcast sink. Holds the session-head async lock across the
/// disk write and the sink publish so subscribers see a gap-free
/// `(snapshot, live)` stream consistent with what's on disk.
pub(crate) async fn commit_entry(
&self,
entry: LogEntry,
) -> Result<EntryHash, StoreError> {
let mut head = self.session_head.lock().await;
let hash = session_store::append_entry_with_hash(
&self.store,
head.session_id,
&mut head.head_hash,
entry.clone(),
)
.await?;
self.sink.publish(entry);
Ok(hash)
}
/// Cloneable sink handle. Exposed to the controller so the IPC
/// layer can `subscribe_with_snapshot` and stream entries to
/// clients without consulting any other state.
pub fn sink(&self) -> SessionLogSink {
self.sink.clone()
}
/// Wire a history-drain task. The controller calls this once per
/// Pod after the drain task is spawned; the matching mpsc receiver
/// drives per-item commits of assistant items / tool results /
/// hook-injected items committed by the worker via
/// `Worker::on_history_append`.
pub fn attach_log_cmd_tx(&mut self, tx: tokio::sync::mpsc::UnboundedSender<LogCommand>) {
self.log_cmd_tx = Some(tx);
}
/// Cloneable callback handed to dynamic-scope tools. It cannot append
@ -459,13 +561,12 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
.expect("pending_scope_snapshot poisoned")
.take();
if let Some(snapshot) = snapshot {
let mut head = self.session_head.lock().await;
session_store::save_pod_scope(
&self.store,
head.session_id,
&mut head.head_hash,
&snapshot,
)
let payload = serde_json::to_value(&snapshot).expect("PodScopeSnapshot is Serialize");
self.commit_entry(LogEntry::Extension {
ts: session_log::now_millis(),
domain: session_store::POD_SCOPE_EXTENSION_DOMAIN.into(),
payload,
})
.await?;
}
Ok(())
@ -629,15 +730,13 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
/// (the entry is dropped) and a `Warn` alert + `tracing::warn!` are
/// emitted so the failure isn't completely silent.
async fn try_record_metric(&mut self, metric: &session_metrics::Metric) {
let mut head = self.session_head.lock().await;
if let Err(err) = session_metrics::record_metric(
&self.store,
head.session_id,
&mut head.head_hash,
metric,
)
.await
{
let payload = serde_json::to_value(metric).expect("Metric is Serialize");
let entry = LogEntry::Extension {
ts: session_log::now_millis(),
domain: session_metrics::DOMAIN.into(),
payload,
};
if let Err(err) = self.commit_entry(entry).await {
warn!(name = %metric.name, error = %err, "failed to record session metric; dropping");
self.alert(
AlertLevel::Warn,
@ -656,20 +755,6 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
}
}
fn broadcast_system_message_item(&self, item: &Item) {
if !matches!(
item,
Item::Message {
role: Role::System,
..
}
) {
return;
}
let value = serde_json::to_value(item).expect("Item is Serialize");
self.send_event(Event::SystemMessage { item: value });
}
/// Push a `Method::Notify` (or rendered `Method::PodEvent`) entry
/// onto the pending buffer.
///
@ -975,17 +1060,12 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
// Persist the user input as typed segments before the worker
// pushes its flattened copy into history. save_delta deliberately
// skips the resulting `is_user_message()` item to avoid double-write.
{
let mut head = self.session_head.lock().await;
self.session_id = head.session_id;
session_store::save_user_input(
&self.store,
head.session_id,
&mut head.head_hash,
input.clone(),
)
.await?;
}
self.session_id = self.session_head.lock().await.session_id;
self.commit_entry(LogEntry::UserInput {
ts: session_log::now_millis(),
segments: input.clone(),
})
.await?;
self.user_segments.push(input.clone());
// Resolve `@<path>` refs, `#<slug>` Knowledge refs, and `/<slug>`
@ -1330,34 +1410,64 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
/// another writer has advanced the store head behind our back.
async fn ensure_session_head(&mut self) -> Result<(), PodError> {
let w = self.worker.as_ref().unwrap();
let state = SessionStartState {
system_prompt: w.get_system_prompt(),
config: w.request_config(),
history: w.history(),
let prev_session_id;
let initial_state = {
let head = self.session_head.lock().await;
prev_session_id = head.session_id;
head.head_hash.is_none()
};
let mut head = self.session_head.lock().await;
if head.head_hash.is_none() {
let hash =
session_store::create_session_with_id(&self.store, head.session_id, state).await?;
head.head_hash = Some(hash);
drop(head);
if initial_state {
let initial = LogEntry::SessionStart {
ts: session_log::now_millis(),
system_prompt: w.get_system_prompt().map(String::from),
config: w.request_config().clone(),
history: to_logged(w.history()),
forked_from: None,
compacted_from: None,
};
self.commit_entry(initial).await?;
self.persist_scope_snapshot().await?;
return Ok(());
}
let prev_session_id = head.session_id;
let mut session_id = head.session_id;
let mut head_hash = head.head_hash.clone();
session_store::ensure_head_or_fork(&self.store, &mut session_id, &mut head_hash, state)
.await?;
head.session_id = session_id;
head.head_hash = head_hash;
self.session_id = session_id;
// ensure_head_or_fork mints a fresh session_id when it auto-
// forks. Sync that to pods.json so a concurrent
// restore_from_manifest can't see "no live writer" for the new
// session and grab it.
if session_id != prev_session_id && self.scope_allocation.is_some() {
pod_registry::update_session(&self.manifest.pod.name, session_id)?;
// Check store head + auto-fork if it drifted.
let store_head = self
.store
.read_head_hash(prev_session_id)
.await
.map_err(PodError::from)?;
let mut head = self.session_head.lock().await;
if store_head == head.head_hash {
return Ok(());
}
// Fork: mint a fresh session and switch to it. The new
// SessionStart entry replaces the mirror and is broadcast
// through the sink so existing subscribers reset their view.
let fork_id = session_store::new_session_id();
let entry = LogEntry::SessionStart {
ts: session_log::now_millis(),
system_prompt: w.get_system_prompt().map(String::from),
config: w.request_config().clone(),
history: to_logged(w.history()),
forked_from: None,
compacted_from: None,
};
let hash = session_log::compute_hash(None, &entry);
let hashed = HashedEntry {
hash: hash.clone(),
prev_hash: None,
entry: entry.clone(),
};
self.store
.create_session(fork_id, &[hashed])
.await
.map_err(PodError::from)?;
head.session_id = fork_id;
head.head_hash = Some(hash);
self.session_id = fork_id;
self.sink.reset_with_initial(entry);
drop(head);
if self.scope_allocation.is_some() {
pod_registry::update_session(&self.manifest.pod.name, fork_id)?;
}
Ok(())
}
@ -1493,28 +1603,84 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
history_before: usize,
result: &Result<WorkerResult, WorkerError>,
) -> Result<(), StoreError> {
// Use direct field access for split borrows (worker immutable,
// head_hash mutable).
let w = self.worker.as_ref().unwrap();
let new_items = &w.history()[history_before..];
let mut head = self.session_head.lock().await;
self.session_id = head.session_id;
session_store::save_delta(&self.store, head.session_id, &mut head.head_hash, new_items)
.await?;
// Per-item commits for AssistantItems / ToolResults /
// HookInjectedItems already landed mid-turn through the
// controller-spawned drain task, fed by
// `Worker::on_history_append`. Drain the queue here so every
// in-flight item has actually been committed before the
// trailing `TurnEnd` entry. When no drain is wired (low-level
// tests / direct `Pod::new` usage) we fall back to a synchronous
// pass that replicates the legacy `save_delta` classification —
// those code paths don't fire `on_history_append`, so the items
// would otherwise be lost.
let _ = history_before; // referenced only by the fallback below.
self.session_id = self.session_head.lock().await.session_id;
if let Some(tx) = self.log_cmd_tx.as_ref() {
let (ack_tx, ack_rx) = tokio::sync::oneshot::channel();
if tx.send(LogCommand::Flush(ack_tx)).is_ok() {
let _ = ack_rx.await;
}
} else {
// Fallback path for tests / Pod::new: classify and commit
// the post-`history_before` slice inline, matching the old
// `save_delta` shape.
let new_items: Vec<Item> = self.worker.as_ref().unwrap().history()[history_before..]
.iter()
.cloned()
.collect();
let ts = session_log::now_millis();
let mut i = 0;
while i < new_items.len() {
let item = &new_items[i];
if item.is_user_message() {
i += 1;
} else if item.is_tool_result() {
let start = i;
while i < new_items.len() && new_items[i].is_tool_result() {
i += 1;
}
let items = new_items[start..i]
.iter()
.map(session_store::LoggedItem::from)
.collect();
self.commit_entry(LogEntry::ToolResults { ts, items }).await?;
} else if item.is_assistant_message()
|| item.is_tool_call()
|| item.is_reasoning()
{
let start = i;
while i < new_items.len()
&& (new_items[i].is_assistant_message()
|| new_items[i].is_tool_call()
|| new_items[i].is_reasoning())
{
i += 1;
}
let items = new_items[start..i]
.iter()
.map(session_store::LoggedItem::from)
.collect();
self.commit_entry(LogEntry::AssistantItems { ts, items })
.await?;
} else {
self.commit_entry(LogEntry::HookInjectedItems {
ts,
items: vec![session_store::LoggedItem::from(&new_items[i])],
})
.await?;
i += 1;
}
}
}
drop(head);
self.flush_pending_scope_snapshot().await?;
let turn_count = self.worker.as_ref().unwrap().turn_count();
let mut head = self.session_head.lock().await;
session_store::save_turn_end(
&self.store,
head.session_id,
&mut head.head_hash,
self.commit_entry(LogEntry::TurnEnd {
ts: session_log::now_millis(),
turn_count,
)
})
.await?;
drop(head);
// Flush any sync-buffered metrics from this run first
// (currently `prune.fire` / `prune.skip` from the prune observer).
@ -1547,19 +1713,15 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
record,
correlation_id,
} = recorded;
let mut head = self.session_head.lock().await;
session_store::save_usage(
&self.store,
head.session_id,
&mut head.head_hash,
record.history_len,
record.input_total_tokens,
record.cache_read_tokens,
record.cache_write_tokens,
record.output_tokens,
)
self.commit_entry(LogEntry::LlmUsage {
ts: session_log::now_millis(),
history_len: record.history_len,
input_total_tokens: record.input_total_tokens,
cache_read_tokens: record.cache_read_tokens,
cache_write_tokens: record.cache_write_tokens,
output_tokens: record.output_tokens,
})
.await?;
drop(head);
if let Some(id) = correlation_id {
let metric = session_metrics::Metric::now("prune.post_request")
.with_correlation_id(&id)
@ -1577,25 +1739,19 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
let interrupted = self.worker.as_ref().unwrap().last_run_interrupted();
match result {
Ok(r) => {
let mut head = self.session_head.lock().await;
session_store::save_run_completed(
&self.store,
head.session_id,
&mut head.head_hash,
r.clone(),
self.commit_entry(LogEntry::RunCompleted {
ts: session_log::now_millis(),
interrupted,
)
result: r.clone(),
})
.await?;
}
Err(e) => {
let mut head = self.session_head.lock().await;
session_store::save_run_errored(
&self.store,
head.session_id,
&mut head.head_hash,
e.to_string(),
self.commit_entry(LogEntry::RunErrored {
ts: session_log::now_millis(),
interrupted,
)
message: e.to_string(),
})
.await?;
}
}
@ -1824,34 +1980,46 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
task_snapshot_text.clone(),
));
// Persist as a new compacted session.
let mut head = self.session_head.lock().await;
let old_session_id = head.session_id;
let old_head_hash = head
.head_hash
.clone()
.expect("head_hash should be set after at least one entry");
let w = self.worker.as_ref().unwrap();
let state = SessionStartState {
system_prompt: w.get_system_prompt(),
config: w.request_config(),
history: &new_history,
// Build the SessionStart entry for the new compacted session,
// then atomically rotate to it: create on disk, swap head, reset
// the broadcast sink so existing subscribers see the new
// `SessionStart { compacted_from }` and reset their view.
let new_session_id = session_store::new_session_id();
let session_start = {
let mut head = self.session_head.lock().await;
let old_session_id = head.session_id;
let old_head_hash = head
.head_hash
.clone()
.expect("head_hash should be set after at least one entry");
let w = self.worker.as_ref().unwrap();
let entry = LogEntry::SessionStart {
ts: session_log::now_millis(),
system_prompt: w.get_system_prompt().map(String::from),
config: w.request_config().clone(),
history: to_logged(&new_history),
forked_from: None,
compacted_from: Some(session_store::SessionOrigin {
session_id: old_session_id,
at_hash: old_head_hash,
}),
};
let hash = session_log::compute_hash(None, &entry);
let hashed = HashedEntry {
hash: hash.clone(),
prev_hash: None,
entry: entry.clone(),
};
self.store.create_session(new_session_id, &[hashed]).await?;
head.session_id = new_session_id;
head.head_hash = Some(hash);
self.session_id = new_session_id;
entry
};
let (new_session_id, new_head_hash) = session_store::create_compacted_session(
&self.store,
state,
old_session_id,
old_head_hash,
)
.await?;
// Swap in the new session state. usage_history belongs to the old
// session — the new compacted session starts with no measurements
// until its first LLM call.
self.session_id = new_session_id;
head.session_id = new_session_id;
head.head_hash = Some(new_head_hash);
// Broadcast the SessionStart through the sink. This atomically
// resets the mirror to `[SessionStart]` so any subscriber
// querying after this point sees the post-compaction prefix.
self.sink.reset_with_initial(session_start);
// Keep pods.json pointing at the live session_id. Without this
// a concurrent `restore_from_manifest(new_session_id)` would
// see no live writer and grab the session this Pod just moved
@ -1861,7 +2029,6 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
if self.scope_allocation.is_some() {
pod_registry::update_session(&self.manifest.pod.name, new_session_id)?;
}
drop(head);
// Align user_segments with the post-compaction history. Items
// before `retain_from` (now folded into the summary) lose their
// segments; only the user_messages surviving in retained_items
@ -1873,9 +2040,11 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
}
self.worker.as_mut().unwrap().set_history(new_history);
for item in &compact_introduced_system_messages {
self.broadcast_system_message_item(item);
}
// Compaction-introduced system messages are part of the new
// SessionStart's history (broadcast above) — clients derive
// their blocks from `SessionStart.history`. No per-item
// broadcast is required.
let _ = &compact_introduced_system_messages;
let worker = self.worker.as_mut().unwrap();
// Anchor the prompt cache at the summary item so that Anthropic
// can place a durable `cache_control` breakpoint there — our
@ -2139,18 +2308,13 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
};
let payload_value = serde_json::to_value(&pointer_payload)
.expect("ExtractPointerPayload is always JSON-serializable");
{
let mut head = self.session_head.lock().await;
session_store::save_extension(
&self.store,
head.session_id,
&mut head.head_hash,
extract::EXTRACT_DOMAIN,
payload_value,
)
.await?;
self.session_id = head.session_id;
}
self.commit_entry(LogEntry::Extension {
ts: session_log::now_millis(),
domain: extract::EXTRACT_DOMAIN.into(),
payload: payload_value,
})
.await?;
self.session_id = self.session_head.lock().await.session_id;
*self
.extract_pointer
@ -2488,6 +2652,8 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
extract_pointer: Arc::new(Mutex::new(None)),
memory_task: None,
user_segments: Vec::new(),
sink: SessionLogSink::new(),
log_cmd_tx: None,
};
pod.apply_permissions_from_manifest();
pod.apply_prune_from_manifest();
@ -2559,6 +2725,8 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
extract_pointer: Arc::new(Mutex::new(None)),
memory_task: None,
user_segments: Vec::new(),
sink: SessionLogSink::new(),
log_cmd_tx: None,
};
pod.apply_permissions_from_manifest();
pod.apply_prune_from_manifest();
@ -2590,10 +2758,16 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
store: St,
loader: PromptLoader,
) -> Result<Self, PodError> {
let state = session_store::restore(&store, session_id).await?;
// Read raw entries once so we can both reconstruct state and
// seed the broadcast sink's mirror with the same prefix that
// sits on disk.
let raw_entries = store.read_all(session_id).await?;
let state = session_store::collect_state(&raw_entries);
if state.head_hash.is_none() {
return Err(PodError::SessionEmpty { session_id });
}
let mirror_entries: Vec<LogEntry> =
raw_entries.iter().map(|e| e.entry.clone()).collect();
let scope_snapshot = state
.pod_scope
.clone()
@ -2696,6 +2870,11 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
extract_pointer: Arc::new(Mutex::new(extract_pointer)),
memory_task: None,
user_segments: state.user_segments,
// Seed the mirror with the entries we just replayed so a
// late-attaching client sees the full prefix without an
// extra round trip.
sink: SessionLogSink::with_initial(mirror_entries),
log_cmd_tx: None,
};
pod.apply_permissions_from_manifest();
pod.apply_prune_from_manifest();

View File

@ -73,12 +73,6 @@ impl RuntimeDir {
atomic_write(&self.path.join("manifest.toml"), toml.as_bytes()).await
}
/// Write history.json atomically.
pub async fn write_history(&self, state: &PodSharedState) -> Result<(), io::Error> {
let content = state.history_json();
atomic_write(&self.path.join("history.json"), content.as_bytes()).await
}
/// Write `spawned_pods.json` atomically. The entries are the full
/// set of spawned children known to this Pod — callers pass the
/// replacement list, no incremental merge.
@ -223,18 +217,6 @@ mod tests {
assert_eq!(parsed[0].pod_name, "child");
}
#[tokio::test]
async fn write_history_creates_file() {
let tmp = tempfile::tempdir().unwrap();
let rt = RuntimeDir::create(tmp.path(), "my-pod").await.unwrap();
let state = test_state();
rt.write_history(&state).await.unwrap();
let content = std::fs::read_to_string(rt.path().join("history.json")).unwrap();
assert_eq!(content, "[]");
}
#[tokio::test]
async fn socket_path() {
let tmp = tempfile::tempdir().unwrap();

View File

@ -0,0 +1,474 @@
//! Pod-side session-log mirror + broadcast.
//!
//! Owns the in-memory `Vec<LogEntry>` mirror that backs `Event::Snapshot`
//! delivery to newly connected clients and the
//! `broadcast::Sender<LogEntry>` that fans out per-entry commits to
//! existing subscribers. Disk writes remain the responsibility of the
//! Pod (which still owns the `Store` handle); the sink stays focused on
//! the wire-side fan-out.
//!
//! Atomicity contract (see ticket `tickets/pod-state-from-session-log.md`):
//!
//! 1. Pod writes the entry to disk via the `Store`.
//! 2. Pod calls [`SessionLogSink::publish`] which acquires the mirror
//! mutex, pushes the entry, and fires `broadcast::send` — all under
//! the same critical section.
//!
//! [`SessionLogSink::subscribe_with_snapshot`] takes the same mutex,
//! so the `(snapshot, receiver)` pair returned to a connecting client
//! splits the entry sequence cleanly: every entry shows up in exactly
//! one of `snapshot` or on `receiver`.
//!
//! Disk-write failures short-circuit before `publish`, so a failed
//! entry never appears in the mirror or on the broadcast.
use std::sync::{Arc, Mutex as StdMutex};
use session_store::{
EntryHash, HashedEntry, LogEntry, SessionId, SessionStartState, Store, StoreError, session_log,
};
use tokio::sync::{Mutex as AsyncMutex, MutexGuard, broadcast};
/// Broadcast capacity for the live receiver. Slow subscribers that
/// fall behind will see `RecvError::Lagged` and are expected to drop
/// the connection so that the next reconnect's `subscribe_with_snapshot`
/// re-seeds the prefix.
const BROADCAST_CAPACITY: usize = 256;
/// In-memory mirror + broadcast fan-out for the active session log.
///
/// Clone is cheap (`Arc` clone) — the Pod hands one to the IPC layer
/// for read-only `subscribe_with_snapshot` access and keeps one for
/// its own write path.
#[derive(Clone)]
pub struct SessionLogSink {
inner: Arc<SinkInner>,
}
struct SinkInner {
/// Full session log mirror in commit order. Reset on session swap
/// (compaction / fork) via [`SessionLogSink::reset_with_initial`].
mirror: StdMutex<Vec<LogEntry>>,
/// Broadcast channel for live entry updates. The same `Sender`
/// survives session swaps so existing subscribers keep their
/// receiver — they observe the swap as a freshly broadcast
/// `LogEntry::SessionStart` and reset their view accordingly.
broadcast_tx: broadcast::Sender<LogEntry>,
}
impl SessionLogSink {
/// Create a fresh sink with an empty mirror. Used before any entry
/// has been written (deferred SessionStart) or as a placeholder in
/// tests.
pub fn new() -> Self {
let (broadcast_tx, _) = broadcast::channel(BROADCAST_CAPACITY);
Self {
inner: Arc::new(SinkInner {
mirror: StdMutex::new(Vec::new()),
broadcast_tx,
}),
}
}
/// Create a sink seeded with a prefix of entries already on disk.
/// Used by restore / fork-at-restore code paths that materialise
/// the existing log before the sink starts taking new commits.
pub fn with_initial(entries: Vec<LogEntry>) -> Self {
let (broadcast_tx, _) = broadcast::channel(BROADCAST_CAPACITY);
Self {
inner: Arc::new(SinkInner {
mirror: StdMutex::new(entries),
broadcast_tx,
}),
}
}
/// Push `entry` to the mirror and broadcast it.
///
/// MUST be called only after the Pod has successfully persisted the
/// entry to the underlying `Store` — disk write is the gate. Failed
/// disk writes must not call `publish`.
pub fn publish(&self, entry: LogEntry) {
let mut mirror = self
.inner
.mirror
.lock()
.expect("session log mirror mutex poisoned");
mirror.push(entry.clone());
// SendError means there are zero subscribers; harmless. We hold
// the mirror lock across `send` so that `subscribe_with_snapshot`
// cannot observe an inconsistent (snapshot, receiver) pair.
let _ = self.inner.broadcast_tx.send(entry);
}
/// Atomically swap the mirror to `[initial]` and broadcast the new
/// session-start entry. Used during compaction / fork: the new
/// `LogEntry::SessionStart` is the first entry of the replacement
/// session, and existing subscribers transition by replaying it
/// like any other live entry.
///
/// Existing snapshot prefixes seen by old subscribers stay valid
/// for the prior session; the new `SessionStart` on the broadcast
/// is the signal to reset their derived view.
pub fn reset_with_initial(&self, initial: LogEntry) {
let mut mirror = self
.inner
.mirror
.lock()
.expect("session log mirror mutex poisoned");
mirror.clear();
mirror.push(initial.clone());
let _ = self.inner.broadcast_tx.send(initial);
}
/// Replace the mirror with the supplied prefix without broadcasting.
///
/// Used by restore paths that load a session's complete log into
/// the mirror before any subscriber is connected. Callers that need
/// to notify existing subscribers should use [`reset_with_initial`].
pub fn replace_silent(&self, entries: Vec<LogEntry>) {
let mut mirror = self
.inner
.mirror
.lock()
.expect("session log mirror mutex poisoned");
*mirror = entries;
}
/// Atomically read the current mirror and subscribe to subsequent
/// commits. The returned snapshot and receiver split the entry
/// timeline into a duplicate-free, gap-free prefix/suffix pair.
pub fn subscribe_with_snapshot(&self) -> (Vec<LogEntry>, broadcast::Receiver<LogEntry>) {
let mirror = self
.inner
.mirror
.lock()
.expect("session log mirror mutex poisoned");
let snapshot = mirror.clone();
let rx = self.inner.broadcast_tx.subscribe();
(snapshot, rx)
}
/// Current entry count. Useful for tests / diagnostics.
pub fn len(&self) -> usize {
self.inner
.mirror
.lock()
.expect("session log mirror mutex poisoned")
.len()
}
/// Whether the mirror is empty.
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
impl Default for SessionLogSink {
fn default() -> Self {
Self::new()
}
}
/// Active session head for the Pod's persistent log: session id +
/// last-committed entry hash. Replaces the previous `SessionHead`
/// struct local to `Pod`; bundled here so the writer can hand a
/// cloneable handle to background tasks (e.g. the per-item drain
/// task spawned by the controller).
#[derive(Debug, Clone)]
pub struct SessionHeadState {
pub session_id: SessionId,
pub head_hash: Option<EntryHash>,
}
/// Pod-side session-log writer.
///
/// Bundles the (1) persistent store, (2) the in-memory session-head
/// state (id + hash), and (3) the broadcast sink. `append_entry`
/// chains the hash on disk, advances the head, then publishes the
/// entry through the sink — under a single async mutex so two writers
/// cannot interleave the chain.
///
/// `Clone` is a cheap `Arc` clone. The Pod keeps one writer for its
/// inline commits (UserInput, TurnEnd, Usage, RunCompleted/Errored,
/// scope snapshots, metrics) and hands clones to background tasks
/// (e.g. the controller's per-item history drain task).
pub struct SessionLogWriter<St> {
inner: Arc<WriterInner<St>>,
}
struct WriterInner<St> {
store: St,
head: AsyncMutex<SessionHeadState>,
sink: SessionLogSink,
}
impl<St> Clone for SessionLogWriter<St> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
impl<St> SessionLogWriter<St>
where
St: Store + Clone,
{
/// Create a writer for a fresh Pod with no entries on disk yet.
/// `head_hash` is `None` until the first `append_entry` (typically
/// the deferred `SessionStart` written by `ensure_session_head`).
pub fn new(store: St, session_id: SessionId) -> Self {
Self {
inner: Arc::new(WriterInner {
store,
head: AsyncMutex::new(SessionHeadState {
session_id,
head_hash: None,
}),
sink: SessionLogSink::new(),
}),
}
}
/// Create a writer seeded with a session already on disk. The
/// mirror is populated with `mirror` (typically loaded via
/// `Store::read_all`), and `head_hash` should be the hash of the
/// last entry.
pub fn restored(
store: St,
session_id: SessionId,
head_hash: Option<EntryHash>,
mirror: Vec<LogEntry>,
) -> Self {
Self {
inner: Arc::new(WriterInner {
store,
head: AsyncMutex::new(SessionHeadState {
session_id,
head_hash,
}),
sink: SessionLogSink::with_initial(mirror),
}),
}
}
/// Append `entry` to the log: disk write → in-memory mirror push →
/// broadcast — atomic w.r.t. `subscribe_with_snapshot` callers.
pub async fn append_entry(&self, entry: LogEntry) -> Result<EntryHash, StoreError> {
let mut head = self.inner.head.lock().await;
let hash = session_store::append_entry_with_hash(
&self.inner.store,
head.session_id,
&mut head.head_hash,
entry.clone(),
)
.await?;
self.inner.sink.publish(entry);
Ok(hash)
}
/// Atomically swap to a new compacted session.
///
/// Creates the new session on disk with `initial` as its
/// `SessionStart`, advances the head, and resets the sink mirror
/// to `[initial]` while broadcasting the entry. Existing
/// subscribers observe the swap as a freshly broadcast
/// `SessionStart` (with `compacted_from` set), which is their
/// signal to reset their derived view.
pub async fn swap_session(
&self,
new_session_id: SessionId,
initial: LogEntry,
) -> Result<EntryHash, StoreError> {
let hash = session_log::compute_hash(None, &initial);
let hashed = HashedEntry {
hash: hash.clone(),
prev_hash: None,
entry: initial.clone(),
};
self.inner
.store
.create_session(new_session_id, &[hashed])
.await?;
let mut head = self.inner.head.lock().await;
head.session_id = new_session_id;
head.head_hash = Some(hash.clone());
self.inner.sink.reset_with_initial(initial);
Ok(hash)
}
/// If the store's head no longer matches our cached head, mint a
/// fresh session that forks from the current state and switch to
/// it. Returns `true` when a fork happened.
pub async fn ensure_head_or_fork(
&self,
state: SessionStartState<'_>,
) -> Result<bool, StoreError> {
let mut head = self.inner.head.lock().await;
let store_head = self.inner.store.read_head_hash(head.session_id).await?;
if store_head == head.head_hash {
return Ok(false);
}
let fork_id = session_store::new_session_id();
let entry = LogEntry::SessionStart {
ts: session_log::now_millis(),
system_prompt: state.system_prompt.map(String::from),
config: state.config.clone(),
history: session_store::to_logged(state.history),
forked_from: None,
compacted_from: None,
};
let hash = session_log::compute_hash(None, &entry);
let hashed = HashedEntry {
hash: hash.clone(),
prev_hash: None,
entry: entry.clone(),
};
self.inner.store.create_session(fork_id, &[hashed]).await?;
head.session_id = fork_id;
head.head_hash = Some(hash);
self.inner.sink.reset_with_initial(entry);
Ok(true)
}
/// Cloneable handle to the broadcast sink. Used by the IPC layer
/// for `subscribe_with_snapshot` and by tests that just want the
/// non-write side.
pub fn sink(&self) -> SessionLogSink {
self.inner.sink.clone()
}
/// Underlying store handle. Direct access is preserved for callers
/// that read state (`read_all`, `read_head_hash`) without going
/// through the writer's hash chain.
pub fn store(&self) -> &St {
&self.inner.store
}
/// Cheap snapshot of the current session id.
pub async fn current_session_id(&self) -> SessionId {
self.inner.head.lock().await.session_id
}
/// Cheap snapshot of the current head hash.
pub async fn current_head_hash(&self) -> Option<EntryHash> {
self.inner.head.lock().await.head_hash.clone()
}
/// Direct lock on the head. Used by paths that need to coordinate
/// custom writes with the hash chain (currently
/// `session_metrics::record_metric`).
pub async fn lock_head(&self) -> MutexGuard<'_, SessionHeadState> {
self.inner.head.lock().await
}
}
#[cfg(test)]
mod tests {
use super::*;
use llm_worker::llm_client::RequestConfig;
use session_store::session_log::now_millis;
fn session_start() -> LogEntry {
LogEntry::SessionStart {
ts: now_millis(),
system_prompt: None,
config: RequestConfig::default(),
history: vec![],
forked_from: None,
compacted_from: None,
}
}
fn turn_end(n: usize) -> LogEntry {
LogEntry::TurnEnd {
ts: now_millis(),
turn_count: n,
}
}
#[test]
fn publish_then_subscribe_returns_history_in_snapshot() {
let sink = SessionLogSink::new();
sink.publish(session_start());
sink.publish(turn_end(1));
let (snapshot, mut rx) = sink.subscribe_with_snapshot();
assert_eq!(snapshot.len(), 2);
assert!(matches!(snapshot[0], LogEntry::SessionStart { .. }));
assert!(matches!(snapshot[1], LogEntry::TurnEnd { turn_count: 1, .. }));
assert!(rx.try_recv().is_err());
}
#[test]
fn subscribe_then_publish_delivers_live_entries() {
let sink = SessionLogSink::new();
sink.publish(session_start());
let (snapshot, mut rx) = sink.subscribe_with_snapshot();
assert_eq!(snapshot.len(), 1);
sink.publish(turn_end(1));
match rx.try_recv() {
Ok(LogEntry::TurnEnd { turn_count: 1, .. }) => {}
other => panic!("unexpected: {other:?}"),
}
assert!(rx.try_recv().is_err());
}
#[test]
fn snapshot_and_live_never_overlap() {
let sink = SessionLogSink::new();
sink.publish(session_start());
let (snapshot, mut rx) = sink.subscribe_with_snapshot();
sink.publish(turn_end(1));
assert_eq!(snapshot.len(), 1);
match rx.try_recv() {
Ok(LogEntry::TurnEnd { turn_count: 1, .. }) => {}
other => panic!("unexpected: {other:?}"),
}
assert!(rx.try_recv().is_err());
}
#[test]
fn reset_with_initial_clears_and_broadcasts() {
let sink = SessionLogSink::new();
sink.publish(session_start());
sink.publish(turn_end(1));
let (_pre_snapshot, mut rx) = sink.subscribe_with_snapshot();
sink.reset_with_initial(session_start());
match rx.try_recv() {
Ok(LogEntry::SessionStart { .. }) => {}
other => panic!("expected SessionStart broadcast, got {other:?}"),
}
let (post_snapshot, _) = sink.subscribe_with_snapshot();
assert_eq!(post_snapshot.len(), 1);
assert!(matches!(post_snapshot[0], LogEntry::SessionStart { .. }));
}
#[test]
fn replace_silent_does_not_broadcast() {
let sink = SessionLogSink::new();
sink.publish(session_start());
let (_pre_snapshot, mut rx) = sink.subscribe_with_snapshot();
sink.replace_silent(vec![session_start(), turn_end(1)]);
// No broadcast fired.
assert!(rx.try_recv().is_err());
let (post_snapshot, _) = sink.subscribe_with_snapshot();
assert_eq!(post_snapshot.len(), 2);
}
#[test]
fn with_initial_seeds_the_mirror() {
let sink = SessionLogSink::with_initial(vec![session_start(), turn_end(1)]);
let (snapshot, _) = sink.subscribe_with_snapshot();
assert_eq!(snapshot.len(), 2);
}
}

View File

@ -1,7 +1,6 @@
use std::sync::{OnceLock, RwLock};
use llm_worker::llm_client::types::Item;
use protocol::{PodStatus, Segment};
use protocol::PodStatus;
use serde_json::json;
use session_store::SessionId;
@ -19,21 +18,20 @@ pub struct KnowledgeCandidate {
/// Shared state between PodController and runtime directory.
///
/// Controller updates this in-memory; RuntimeDir writes it to disk.
/// Wrapped in `Arc` for sharing.
/// Controller updates this in-memory; RuntimeDir writes the status
/// snapshot to disk. Wrapped in `Arc` for sharing.
///
/// History and typed user-segment mirrors used to live here so the
/// IPC layer could answer `Method::GetHistory`. Those reads now go
/// directly through the session-log sink (`Event::Snapshot` +
/// `Event::Entry`), so this struct holds only status, identity,
/// greeting, and completion lookup hubs.
pub struct PodSharedState {
pub pod_name: String,
pub session_id: SessionId,
pub manifest_toml: String,
pub greeting: protocol::Greeting,
pub status: RwLock<PodStatus>,
pub history: RwLock<Vec<Item>>,
/// Typed user submissions in submit order. The K-th entry corresponds
/// to the K-th `Item::user_message` in `history` (modulo seed history
/// loaded from a pre-compaction `SessionStart.history`, whose original
/// segments are not preserved). Surfaced via `Event::History` so
/// clients can re-render typed atoms on session restore.
pub user_segments: RwLock<Vec<Vec<Segment>>>,
/// Pod-from-the-inside view of the filesystem. Set once in
/// `PodController::start` after the `ScopedFs` is materialised, and
/// read from the IPC server layer to answer `ListCompletions`
@ -58,8 +56,6 @@ impl PodSharedState {
manifest_toml,
greeting,
status: RwLock::new(PodStatus::Idle),
history: RwLock::new(Vec::new()),
user_segments: RwLock::new(Vec::new()),
fs_view: OnceLock::new(),
workflows: OnceLock::new(),
knowledge: OnceLock::new(),
@ -112,25 +108,6 @@ impl PodSharedState {
.unwrap_or_default()
}
pub fn user_segments(&self) -> Vec<Vec<Segment>> {
self.user_segments
.read()
.map(|s| s.clone())
.unwrap_or_default()
}
pub fn set_user_segments(&self, segments: Vec<Vec<Segment>>) {
if let Ok(mut s) = self.user_segments.write() {
*s = segments;
}
}
pub fn push_user_segments(&self, segments: Vec<Segment>) {
if let Ok(mut s) = self.user_segments.write() {
s.push(segments);
}
}
pub fn set_status(&self, status: PodStatus) {
if let Ok(mut s) = self.status.write() {
*s = status;
@ -141,16 +118,6 @@ impl PodSharedState {
self.status.read().map(|s| *s).unwrap_or(PodStatus::Idle)
}
pub fn history(&self) -> Vec<Item> {
self.history.read().map(|h| h.clone()).unwrap_or_default()
}
pub fn update_history(&self, items: Vec<Item>) {
if let Ok(mut h) = self.history.write() {
*h = items;
}
}
/// Serialize status as JSON.
pub fn status_json(&self) -> String {
let status = self.get_status();
@ -161,21 +128,11 @@ impl PodSharedState {
})
.to_string()
}
/// Serialize history as JSON.
pub fn history_json(&self) -> String {
if let Ok(h) = self.history.read() {
serde_json::to_string(&*h).unwrap_or_else(|_| "[]".into())
} else {
"[]".into()
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use llm_worker::llm_client::types::{ContentPart, Item, Role};
fn test_state() -> PodSharedState {
PodSharedState::new(
@ -231,29 +188,6 @@ mod tests {
assert_eq!(parsed["state"], "running");
}
#[test]
fn history_json_empty_initially() {
let state = test_state();
assert_eq!(state.history_json(), "[]");
}
#[test]
fn history_json_after_update() {
let state = test_state();
let items = vec![Item::Message {
id: None,
role: Role::Assistant,
content: vec![ContentPart::Text {
text: "Hello".into(),
}],
status: None,
}];
state.update_history(items);
let json = state.history_json();
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
assert!(parsed.is_array());
assert_eq!(parsed[0]["role"], "assistant");
}
#[test]
fn knowledge_completions_empty_when_unset() {

View File

@ -19,6 +19,7 @@ use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput};
use protocol::stream::{JsonLineReader, JsonLineWriter};
use protocol::{ErrorCode, Event, Method};
use serde::Deserialize;
use session_store::LogEntry;
use tokio::net::UnixStream;
use crate::runtime::dir::SpawnedPodRecord;
@ -385,33 +386,31 @@ async fn send_run_and_confirm(socket: &Path, input: String) -> Result<(), SendRu
}
}
/// Connect and ask the Pod for its conversation history. Skips
/// pre-History events (such as buffered alerts replayed to new
/// clients). Returns the raw JSON items as `serde_json::Value` since
/// the pod crate already round-trips via `Value` on the wire.
/// Connect to a Pod's socket and read the connect-time `Event::Snapshot`.
///
/// Pods deliver the session-log mirror as the first non-Alert event on
/// every new connection, so consuming it is sufficient — no explicit
/// `GetHistory` method round trip. Returns the entries as raw JSON
/// values; callers deserialize as `session_store::LogEntry` if they
/// need typed access.
async fn fetch_history(socket: &Path) -> std::io::Result<Vec<serde_json::Value>> {
let stream = tokio::time::timeout(SOCKET_OP_TIMEOUT, UnixStream::connect(socket))
.await
.map_err(|_| std::io::Error::new(std::io::ErrorKind::TimedOut, "connect timed out"))??;
let (r, w) = stream.into_split();
let mut writer = JsonLineWriter::new(w);
let (r, _w) = stream.into_split();
let mut reader = JsonLineReader::new(r);
tokio::time::timeout(SOCKET_OP_TIMEOUT, writer.write(&Method::GetHistory))
.await
.map_err(|_| std::io::Error::new(std::io::ErrorKind::TimedOut, "write timed out"))??;
loop {
let event = tokio::time::timeout(SOCKET_OP_TIMEOUT, reader.next::<Event>())
.await
.map_err(|_| std::io::Error::new(std::io::ErrorKind::TimedOut, "read timed out"))??;
match event {
Some(Event::History { items, .. }) => return Ok(items),
Some(Event::Snapshot { entries, .. }) => return Ok(entries),
Some(_) => continue,
None => {
return Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"pod closed connection before History event",
"pod closed connection before Snapshot event",
));
}
}
@ -426,24 +425,36 @@ async fn is_reachable(socket: &Path) -> bool {
.unwrap_or(false)
}
fn extract_assistant_text(items: &[serde_json::Value]) -> String {
fn extract_assistant_text(entries: &[serde_json::Value]) -> String {
let mut out = String::new();
for value in items {
let Ok(item) = serde_json::from_value::<Item>(value.clone()) else {
for value in entries {
// The wire payload is the JSON form of `session_store::LogEntry`.
// Walk Assistant items inside each entry that can carry them:
// post-compaction `SessionStart.history` (seed) and per-LLM-call
// `AssistantItems` deltas.
let Ok(entry) = serde_json::from_value::<LogEntry>(value.clone()) else {
continue;
};
if let Item::Message {
role: Role::Assistant,
content,
..
} = item
{
for part in content {
if let ContentPart::Text { text } = part {
if !out.is_empty() {
out.push_str("\n\n");
let logged_items = match entry {
LogEntry::SessionStart { history, .. } => history,
LogEntry::AssistantItems { items, .. } => items,
_ => continue,
};
for logged in logged_items {
let item: Item = logged.into();
if let Item::Message {
role: Role::Assistant,
content,
..
} = item
{
for part in content {
if let ContentPart::Text { text } = part {
if !out.is_empty() {
out.push_str("\n\n");
}
out.push_str(&text);
}
out.push_str(&text);
}
}
}

View File

@ -177,17 +177,39 @@ fn drain(rx: &mut broadcast::Receiver<Event>) -> Vec<Event> {
out
}
fn system_event_text(event: &Event) -> Option<&str> {
match event {
Event::SystemMessage { item } => item["content"]
.as_array()
.and_then(|parts| parts.iter().filter_map(|p| p["text"].as_str()).next()),
_ => None,
/// Collect every system-message text that the post-compaction
/// `SessionStart.history` carries, by reading the sink mirror directly.
fn system_texts_in_sink_session_start(pod: &pod::Pod<impl llm_worker::llm_client::client::LlmClient + Clone + 'static, impl session_store::Store + Clone + 'static>) -> Vec<String> {
let (entries, _rx) = pod.sink().subscribe_with_snapshot();
for entry in entries.into_iter().rev() {
if let session_store::LogEntry::SessionStart { history, .. } = entry {
return history
.into_iter()
.filter_map(|logged| {
let item: Item = logged.into();
match item {
Item::Message {
role: llm_worker::Role::System,
content,
..
} => Some(
content
.iter()
.map(|p| p.as_text().to_owned())
.collect::<Vec<_>>()
.join(""),
),
_ => None,
}
})
.collect();
}
}
Vec::new()
}
#[tokio::test]
async fn compact_broadcasts_only_new_system_messages_not_retained_ones() {
async fn compact_emits_session_start_carrying_summary_and_task_snapshot() {
let client = MockClient::new(vec![
single_text_events("hi"),
write_summary_tool_use_events("call-1", "summary"),
@ -195,18 +217,16 @@ async fn compact_broadcasts_only_new_system_messages_not_retained_ones() {
]);
let mut pod = make_pod(client).await;
let (tx, mut rx) = broadcast::channel::<Event>(64);
let (tx, _rx_keep) = broadcast::channel::<Event>(64);
pod.attach_event_tx(tx);
pod.run_text("first").await.unwrap();
let retained_message = Item::system_message("[Retained system]\nold");
pod.worker_mut().push_item(retained_message);
let _ = drain(&mut rx);
pod.compact(10_000).await.unwrap();
let events = drain(&mut rx);
let system_texts: Vec<&str> = events.iter().filter_map(system_event_text).collect();
let system_texts = system_texts_in_sink_session_start(&pod);
// The post-compaction `SessionStart.history` carries the new system
// messages introduced by the compactor. Clients re-seed their view
// from this entry alone, so it is the load-bearing payload.
assert!(
system_texts
.iter()
@ -219,12 +239,6 @@ async fn compact_broadcasts_only_new_system_messages_not_retained_ones() {
.any(|text| text.starts_with("[Session TaskStore snapshot]")),
"task snapshot system message missing from {system_texts:?}"
);
assert!(
!system_texts
.iter()
.any(|text| text.starts_with("[Retained system]")),
"retained system message should not be rebroadcast: {system_texts:?}"
);
}
#[tokio::test]

View File

@ -6,12 +6,40 @@ use async_trait::async_trait;
use futures::{Stream, StreamExt};
use llm_worker::Worker;
use llm_worker::llm_client::event::{Event as LlmEvent, ResponseStatus, StatusEvent};
use llm_worker::llm_client::types::Item;
use llm_worker::llm_client::{ClientError, LlmClient, Request};
use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput};
use session_store::FsStore;
use session_store::{FsStore, LogEntry};
use pod::{Event, Method, Pod, PodController, PodHandle, PodManifest, PodStatus};
/// Reconstruct a worker-history-like `Vec<Item>` from the live session
/// log mirror held by the Pod's broadcast sink. Replaces the previous
/// `PodSharedState.history()` test helper now that the mirror lives in
/// the sink.
fn history_from_sink(handle: &PodHandle) -> Vec<Item> {
let (entries, _rx) = handle.sink.subscribe_with_snapshot();
let mut items = Vec::new();
for entry in entries {
match entry {
LogEntry::SessionStart { history, .. } => {
items.extend(history.into_iter().map(Item::from));
}
LogEntry::UserInput { segments, .. } => {
let text = protocol::Segment::flatten_to_text(&segments);
items.push(Item::user_message(text));
}
LogEntry::AssistantItems { items: i, .. }
| LogEntry::ToolResults { items: i, .. }
| LogEntry::HookInjectedItems { items: i, .. } => {
items.extend(i.into_iter().map(Item::from));
}
_ => {}
}
}
items
}
// ---------------------------------------------------------------------------
// Mock LLM Client
// ---------------------------------------------------------------------------
@ -218,8 +246,61 @@ async fn run_end_returns_to_idle_without_busy_status() {
assert_eq!(handle.shared_state.get_status(), PodStatus::Idle);
}
/// Mid-turn re-attach: a client connecting while the worker is still
/// running observes the in-flight `UserInput` entry in the connect-time
/// `Event::Snapshot`. This is the load-bearing property of the new
/// session-log-driven IPC: a late attacher reconstructs the running
/// view without needing the prior client's diff.
#[tokio::test]
async fn attach_history_includes_current_status() {
async fn snapshot_includes_user_input_for_in_flight_turn() {
let client = MockClient::sequential(vec![MockResponse::Hang(simple_text_events())]);
let pod = make_pod(client).await;
let handle = spawn_controller(pod).await;
handle
.send(Method::run_text("hello in-flight"))
.await
.unwrap();
wait_for_status(&handle, PodStatus::Running).await;
let stream = tokio::net::UnixStream::connect(handle.runtime_dir.socket_path())
.await
.unwrap();
let (reader, _writer) = stream.into_split();
let mut reader = protocol::stream::JsonLineReader::new(reader);
loop {
let event = reader.next::<Event>().await.unwrap().unwrap();
match event {
Event::Snapshot { entries, .. } => {
// Walk the entries, find a `LogEntry::UserInput` and
// confirm its segments flatten to our submitted text.
let mut found = false;
for value in entries {
let entry: session_store::LogEntry =
serde_json::from_value(value).expect("LogEntry deserialise");
if let session_store::LogEntry::UserInput { segments, .. } = entry {
let text = protocol::Segment::flatten_to_text(&segments);
if text == "hello in-flight" {
found = true;
break;
}
}
}
assert!(
found,
"snapshot must carry the in-flight UserInput entry"
);
return;
}
Event::Alert(_) => continue,
other => panic!("expected Snapshot first, got {other:?}"),
}
}
}
#[tokio::test]
async fn attach_snapshot_includes_current_status() {
let client = MockClient::sequential(vec![MockResponse::Hang(simple_text_events())]);
let pod = make_pod(client).await;
let handle = spawn_controller(pod).await;
@ -230,15 +311,20 @@ async fn attach_history_includes_current_status() {
let stream = tokio::net::UnixStream::connect(handle.runtime_dir.socket_path())
.await
.unwrap();
let (reader, writer) = stream.into_split();
let (reader, _writer) = stream.into_split();
let mut reader = protocol::stream::JsonLineReader::new(reader);
let mut writer = protocol::stream::JsonLineWriter::new(writer);
writer.write(&Method::GetHistory).await.unwrap();
let event = reader.next::<Event>().await.unwrap().unwrap();
match event {
Event::History { status, .. } => assert_eq!(status, PodStatus::Running),
other => panic!("expected History, got {other:?}"),
// First event after connect is the snapshot — it carries the current status.
loop {
let event = reader.next::<Event>().await.unwrap().unwrap();
match event {
Event::Snapshot { status, .. } => {
assert_eq!(status, PodStatus::Running);
return;
}
Event::Alert(_) => continue,
other => panic!("expected Snapshot, got {other:?}"),
}
}
}
@ -275,11 +361,11 @@ async fn run_populates_history() {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let history = handle.shared_state.history_json();
assert_ne!(history, "[]");
let parsed: serde_json::Value = serde_json::from_str(&history).unwrap();
assert!(parsed.is_array());
assert!(parsed.as_array().unwrap().len() >= 2); // user + assistant
let history = history_from_sink(&handle);
assert!(
history.len() >= 2,
"history must include user + assistant items, got {history:?}"
);
}
#[tokio::test]
@ -712,7 +798,7 @@ async fn notify_while_idle_auto_starts_turn_and_injects_system_message() {
// The notification must also be persisted into the Worker history
// (and therefore eventually into history.json), per
// tickets/notify-history-persist.md.
let history = handle.shared_state.history();
let history = history_from_sink(&handle);
let notify_in_history = history.iter().any(|i| {
i.as_text()
.is_some_and(|t| t.contains("[Notification]") && t.contains("turn finished"))
@ -796,7 +882,7 @@ async fn pod_event_turn_ended_while_idle_auto_starts_turn_and_injects_system_mes
// Same item must be present in worker.history (persisted lane),
// not just the per-request clone — see tickets/notify-history-persist.md.
let history = handle.shared_state.history();
let history = history_from_sink(&handle);
let event_in_history = history.iter().any(|i| {
i.as_text().is_some_and(|t| {
t.contains("[Notification]") && t.contains("child") && t.contains("finished a turn")
@ -1149,22 +1235,42 @@ async fn pause_then_resume_transitions_and_preserves_history_consistency() {
// History consistency: exactly [user "hello", assistant
// "resumed output"]. No artifacts from the aborted stream
// (partial text is not committed), no orphan tool_use.
let history_json = handle.shared_state.history_json();
let items: Vec<serde_json::Value> = serde_json::from_str(&history_json).unwrap();
let roles: Vec<&str> = items.iter().filter_map(|i| i["role"].as_str()).collect();
let history = history_from_sink(&handle);
let roles: Vec<&str> = history
.iter()
.filter_map(|i| match i {
Item::Message { role, .. } => match role {
llm_worker::Role::User => Some("user"),
llm_worker::Role::Assistant => Some("assistant"),
llm_worker::Role::System => Some("system"),
},
_ => None,
})
.collect();
assert_eq!(
roles,
vec!["user", "assistant"],
"history = user + assistant only; got {items:?}"
"history = user + assistant only; got {history:?}"
);
let assistant_text = items[1]["content"]
.as_array()
.and_then(|parts| parts.iter().filter_map(|p| p["text"].as_str()).next())
.unwrap_or("");
assert_eq!(assistant_text, "resumed output");
let has_tool_call = items
let assistant_text = history
.iter()
.any(|i| i["type"].as_str() == Some("tool_call"));
.find_map(|i| match i {
Item::Message {
role: llm_worker::Role::Assistant,
content,
..
} => Some(
content
.iter()
.map(|p: &llm_worker::ContentPart| p.as_text().to_owned())
.collect::<Vec<_>>()
.join(""),
),
_ => None,
})
.unwrap_or_default();
assert_eq!(assistant_text, "resumed output");
let has_tool_call = history.iter().any(|i| i.is_tool_call());
assert!(!has_tool_call, "no orphan tool_call in history");
}

View File

@ -144,44 +144,41 @@ fn accept_method_and_respond(
})
}
/// Pretend to be a spawned Pod that responds to `GetHistory` with a
/// fixed set of items. Accepts connections until the first one that
/// delivers a `GetHistory` method; earlier probes (empty accepts) and
/// non-history methods are ignored. Returns nothing — tests await the
/// handle only to keep the listener alive until shutdown.
/// Pretend to be a spawned Pod whose connect-time snapshot carries a
/// fixed set of assistant items. Sends `Event::Snapshot` immediately on
/// every accept — the real Pod does the same, so `ReadPodOutput`'s
/// `fetch_history` just consumes the first non-Alert event.
fn serve_history(listener: UnixListener, items: Vec<Item>) -> JoinHandle<()> {
tokio::spawn(async move {
loop {
let Ok((stream, _)) = listener.accept().await else {
return;
};
let (r, w) = stream.into_split();
let mut reader = JsonLineReader::new(r);
let (_r, w) = stream.into_split();
let mut writer = JsonLineWriter::new(w);
match reader.next::<Method>().await {
Ok(Some(Method::GetHistory)) => {
let values: Vec<serde_json::Value> = items
.iter()
.map(|i| serde_json::to_value(i).unwrap())
.collect();
let event = Event::History {
items: values,
greeting: Greeting {
pod_name: "child".into(),
cwd: "/tmp".into(),
provider: "anthropic".into(),
model: "x".into(),
scope_summary: String::new(),
tools: Vec::new(),
},
status: protocol::PodStatus::Idle,
};
let _ = writer.write(&event).await;
}
Ok(Some(_)) | Ok(None) | Err(_) => {
// Ignore: loop accepts another connection.
}
}
// Wrap the assistant items in a single
// `LogEntry::AssistantItems` entry — that's the only kind
// that contributes assistant text via `extract_assistant_text`.
let logged: Vec<session_store::LoggedItem> =
items.iter().map(session_store::LoggedItem::from).collect();
let entry = session_store::LogEntry::AssistantItems {
ts: 0,
items: logged,
};
let entry_value = serde_json::to_value(&entry).unwrap();
let event = Event::Snapshot {
entries: vec![entry_value],
greeting: Greeting {
pod_name: "child".into(),
cwd: "/tmp".into(),
provider: "anthropic".into(),
model: "x".into(),
scope_summary: String::new(),
tools: Vec::new(),
},
status: protocol::PodStatus::Idle,
};
let _ = writer.write(&event).await;
}
})
}

View File

@ -32,14 +32,13 @@ pub enum Method {
/// synthetic tool result before the new user message is appended).
Pause,
Shutdown,
GetHistory,
/// Request a list of completion candidates from the Pod.
///
/// Reply is sent on the same socket as `Event::Completions` (not
/// broadcast). Same shape as `GetHistory` / `Event::History`:
/// the IPC server handles this directly and writes the response
/// straight back to the requesting socket. Empty results for
/// resolvers that are not yet wired up (Knowledge / Workflow).
/// broadcast). The IPC server handles this directly and writes
/// the response straight back to the requesting socket. Empty
/// results for resolvers that are not yet wired up
/// (Knowledge / Workflow).
ListCompletions {
kind: CompletionKind,
prefix: String,
@ -224,15 +223,6 @@ pub enum Event {
Notify {
message: String,
},
/// Persisted `role:system` history item that should be rendered by
/// clients through the same path used for `Event::History` replay.
///
/// The payload is the serialized history item, not an ad-hoc display
/// DTO, so live subscribers and late subscribers have the same source
/// of truth: worker history / history.json.
SystemMessage {
item: serde_json::Value,
},
/// Echo of `Method::PodEvent` received by this Pod. Same rationale
/// as `Notify`: subscribers render the event as a log element,
/// while a rendered summary is independently injected into the LLM
@ -312,16 +302,34 @@ pub enum Event {
code: ErrorCode,
message: String,
},
History {
items: Vec<serde_json::Value>,
/// Sent exactly once at the start of every client connection.
///
/// `entries` is the session-log mirror at subscribe time, serialised
/// as the JSON form of `session_store::LogEntry`. Late attachers
/// reconstruct view state by replaying entries through their own
/// `LogEntry → block` mapping, then continue applying live
/// `Event::Entry` updates received after the snapshot.
///
/// `greeting` and `status` accompany the snapshot so clients render
/// pod identity and current controller state without an extra round
/// trip.
Snapshot {
entries: Vec<serde_json::Value>,
greeting: Greeting,
/// Current Pod controller status at the moment the history snapshot
/// was taken. This lets late-attaching clients render and route
/// controls from the real controller state instead of inferring from
/// replayed history.
#[serde(default)]
status: PodStatus,
},
/// A single session-log entry committed atomically with the disk
/// write. Streamed as the suffix following the connect-time
/// `Snapshot`; the prefix/suffix boundary is gap-free and
/// duplicate-free per `SessionLogSink` semantics.
///
/// Payload is the JSON form of `session_store::LogEntry`. Clients
/// deserialize as needed to render typed atoms (e.g.
/// `UserInput.segments`).
Entry {
entry: serde_json::Value,
},
/// Current Pod controller status. Broadcast on every controller-level
/// transition and included in `History` snapshots for late attach.
Status {
@ -418,8 +426,8 @@ pub struct CompletionEntry {
/// Pod self-description rendered by the TUI when a session starts empty.
///
/// Built once in the Pod controller from the resolved manifest and
/// transmitted alongside `Event::History` so clients don't need their
/// own view of the manifest.
/// transmitted alongside `Event::Snapshot` so clients don't need
/// their own view of the manifest.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Greeting {
pub pod_name: String,
@ -674,13 +682,6 @@ mod tests {
assert_eq!(serialized, json);
}
#[test]
fn method_get_history() {
let json = r#"{"method":"get_history"}"#;
let method: Method = serde_json::from_str(json).unwrap();
assert!(matches!(method, Method::GetHistory));
}
#[test]
fn method_list_completions_roundtrip() {
let method = Method::ListCompletions {
@ -734,9 +735,9 @@ mod tests {
}
#[test]
fn event_history_format() {
let event = Event::History {
items: vec![serde_json::json!({"type": "message", "role": "user"})],
fn event_snapshot_format() {
let event = Event::Snapshot {
entries: vec![serde_json::json!({"kind": "user_input", "ts": 1, "segments": []})],
greeting: Greeting {
pod_name: "test".into(),
cwd: "/tmp".into(),
@ -749,14 +750,30 @@ mod tests {
};
let json = serde_json::to_string(&event).unwrap();
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
assert_eq!(parsed["event"], "history");
assert!(parsed["data"]["items"].is_array());
assert_eq!(parsed["data"]["items"][0]["role"], "user");
assert_eq!(parsed["event"], "snapshot");
assert!(parsed["data"]["entries"].is_array());
assert_eq!(parsed["data"]["entries"][0]["kind"], "user_input");
assert_eq!(parsed["data"]["greeting"]["pod_name"], "test");
assert_eq!(parsed["data"]["greeting"]["tools"][0], "Read");
assert_eq!(parsed["data"]["status"], "paused");
}
#[test]
fn event_entry_roundtrip() {
let event = Event::Entry {
entry: serde_json::json!({"kind": "assistant_items", "ts": 42, "items": []}),
};
let json = serde_json::to_string(&event).unwrap();
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
assert_eq!(parsed["event"], "entry");
assert_eq!(parsed["data"]["entry"]["kind"], "assistant_items");
let decoded: Event = serde_json::from_str(&json).unwrap();
match decoded {
Event::Entry { entry } => assert_eq!(entry["kind"], "assistant_items"),
other => panic!("expected Entry, got {other:?}"),
}
}
#[test]
fn event_status_format() {
let event = Event::Status {
@ -777,12 +794,12 @@ mod tests {
}
#[test]
fn event_history_legacy_without_status_defaults_to_idle() {
let json = r#"{"event":"history","data":{"items":[],"greeting":{"pod_name":"test","cwd":"/tmp","provider":"anthropic","model":"claude","scope_summary":"","tools":[]}}}"#;
fn event_snapshot_legacy_without_status_defaults_to_idle() {
let json = r#"{"event":"snapshot","data":{"entries":[],"greeting":{"pod_name":"test","cwd":"/tmp","provider":"anthropic","model":"claude","scope_summary":"","tools":[]}}}"#;
let decoded: Event = serde_json::from_str(json).unwrap();
match decoded {
Event::History { status, .. } => assert_eq!(status, PodStatus::Idle),
other => panic!("expected History, got {other:?}"),
Event::Snapshot { status, .. } => assert_eq!(status, PodStatus::Idle),
other => panic!("expected Snapshot, got {other:?}"),
}
}

View File

@ -39,10 +39,10 @@ pub use llm_worker::UsageRecord;
pub use llm_worker::llm_client::types::{ContentPart, Item, Role};
pub use logged_item::{LoggedContentPart, LoggedItem, LoggedRole, from_logged, to_logged};
pub use session::{
SessionStartState, create_compacted_session, create_session, create_session_with_id,
ensure_head_or_fork, fork, fork_at, restore, save_config_changed, save_delta, save_extension,
save_pod_scope, save_run_completed, save_run_errored, save_turn_end, save_usage,
save_user_input,
SessionStartState, append_entry, append_entry_with_hash, create_compacted_session,
create_session, create_session_with_id, ensure_head_or_fork, fork, fork_at, restore,
save_config_changed, save_delta, save_extension, save_pod_scope, save_run_completed,
save_run_errored, save_turn_end, save_usage, save_user_input,
};
pub use session_log::{
EntryHash, HashedEntry, LogEntry, POD_SCOPE_EXTENSION_DOMAIN, PodScopeSnapshot, RestoredState,

View File

@ -457,14 +457,31 @@ pub async fn fork_at(
Ok(fork_id)
}
// ── Private helper ──────────────────────────────────────────────────────
async fn append_entry(
/// Append a single `LogEntry`, chaining the hash and updating `head_hash`.
///
/// Lower-level dual of the `save_*` convenience wrappers in this module.
/// Use when the caller already builds the typed entry itself (e.g. when
/// it needs the same value for an in-memory mirror + broadcast).
pub async fn append_entry(
store: &impl Store,
session_id: SessionId,
head_hash: &mut Option<EntryHash>,
entry: LogEntry,
) -> Result<(), StoreError> {
append_entry_with_hash(store, session_id, head_hash, entry).await?;
Ok(())
}
/// Same as [`append_entry`] but returns the freshly computed entry hash.
///
/// Used by paths that need the hash for downstream broadcast or mirror
/// updates (e.g. the Pod's `SessionLogSink`).
pub async fn append_entry_with_hash(
store: &impl Store,
session_id: SessionId,
head_hash: &mut Option<EntryHash>,
entry: LogEntry,
) -> Result<EntryHash, StoreError> {
let hash = session_log::compute_hash(head_hash.as_ref(), &entry);
let hashed_entry = HashedEntry {
hash: hash.clone(),
@ -472,6 +489,6 @@ async fn append_entry(
entry,
};
store.append(session_id, &hashed_entry).await?;
*head_hash = Some(hash);
Ok(())
*head_hash = Some(hash.clone());
Ok(hash)
}

View File

@ -19,6 +19,7 @@ session-store = { workspace = true }
pod-registry = { workspace = true }
serde = { workspace = true, features = ["derive"] }
pulldown-cmark = { version = "0.13.3", default-features = false }
llm-worker.workspace = true
[dev-dependencies]
tools = { workspace = true }

View File

@ -491,9 +491,8 @@ impl App {
self.blocks.push(Block::PodEvent { event });
self.assistant_streaming = false;
}
Event::SystemMessage { item } => {
self.push_history_item(&item);
self.assistant_streaming = false;
Event::Entry { entry } => {
self.apply_log_entry(&entry);
}
Event::TurnStart { .. } => {
self.set_pod_status(PodStatus::Running);
@ -726,12 +725,12 @@ impl App {
message: alert.message,
});
}
Event::History {
items,
Event::Snapshot {
entries,
greeting,
status,
} => {
self.restore_history(&items, greeting);
self.restore_snapshot(&entries, greeting);
self.set_pod_status(status);
}
Event::Status { status } => {
@ -905,10 +904,13 @@ impl App {
self.input.move_down();
}
fn restore_history(&mut self, items: &[serde_json::Value], greeting: protocol::Greeting) {
// Fresh session: greeting + any replayed items. Append-only — we
// don't try to merge with already-displayed live events because
// `History` only fires on an empty live state.
/// Reset the block list and replay a connect-time `Event::Snapshot`.
///
/// Walks the session-log entries in commit order, expanding each
/// LogEntry variant into the same blocks live events would have
/// produced. Followed by `Event::Entry` updates for anything
/// committed after the snapshot.
fn restore_snapshot(&mut self, entries: &[serde_json::Value], greeting: protocol::Greeting) {
self.turn_index = 0;
self.blocks.clear();
self.cache = FileCache::new();
@ -917,14 +919,86 @@ impl App {
self.blocks.push(Block::Greeting(greeting));
self.assistant_streaming = false;
for item in items {
self.push_history_item(item);
for entry in entries {
self.apply_log_entry_raw(entry);
}
// Any tool_call entries that never got paired with a
// tool_result (truncated or racing mid-turn on the server side)
// stay as Executing up to this point. Surface them as
// Incomplete so the replay matches live semantics.
self.mark_orphan_tool_calls_incomplete_pass();
}
/// Apply a single live `Event::Entry`.
///
/// `SessionStart` entries that arrive live (compaction / fork)
/// reset the block list to a freshly seeded view, matching what a
/// reconnect's `Event::Snapshot` would produce.
fn apply_log_entry(&mut self, entry: &serde_json::Value) {
if entry.get("kind").and_then(|k| k.as_str()) == Some("session_start") {
// Compaction / fork on the server side. Reset our derived
// view but keep the greeting (identity hasn't changed).
let greeting = self
.blocks
.iter()
.find_map(|b| match b {
Block::Greeting(g) => Some(g.clone()),
_ => None,
});
self.turn_index = 0;
self.blocks.clear();
self.cache = FileCache::new();
self.task_store = TaskStore::new();
self.task_pane_scroll = 0;
if let Some(g) = greeting {
self.blocks.push(Block::Greeting(g));
}
}
self.apply_log_entry_raw(entry);
self.assistant_streaming = false;
}
/// Walk a single `LogEntry` JSON value and translate it into blocks
/// the live event path would have produced. Shared between
/// `restore_snapshot` (replay path) and `apply_log_entry` (live
/// path).
fn apply_log_entry_raw(&mut self, value: &serde_json::Value) {
let Ok(entry) = serde_json::from_value::<session_store::LogEntry>(value.clone()) else {
return;
};
match entry {
session_store::LogEntry::SessionStart { history, .. } => {
for logged in history {
let item: llm_worker::Item = logged.into();
let item_value = serde_json::to_value(&item).expect("Item is Serialize");
self.push_history_item(&item_value);
}
}
session_store::LogEntry::UserInput { segments, .. } => {
self.turn_index += 1;
self.blocks.push(Block::TurnHeader {
turn: self.turn_index,
});
if !segments.is_empty() {
self.blocks.push(Block::UserMessage { segments });
}
}
session_store::LogEntry::AssistantItems { items, .. }
| session_store::LogEntry::ToolResults { items, .. }
| session_store::LogEntry::HookInjectedItems { items, .. } => {
for logged in items {
let item: llm_worker::Item = logged.into();
let item_value = serde_json::to_value(&item).expect("Item is Serialize");
self.push_history_item(&item_value);
}
}
// Non-history-bearing variants don't affect the block view.
_ => {}
}
}
/// Sweep all current tool-call blocks: any that never resolved into
/// a Done / Error state get marked Incomplete. Called after a
/// snapshot replay so dangling in-flight tool calls in the seed
/// log match live semantics.
fn mark_orphan_tool_calls_incomplete_pass(&mut self) {
for b in self.blocks.iter_mut() {
if let Block::ToolCall(tc) = b
&& matches!(
@ -1325,18 +1399,22 @@ mod completion_flow_tests {
}
#[test]
fn history_restore_renders_system_message_block() {
fn snapshot_renders_system_message_block_from_session_start() {
let mut app = App::new("test".into());
app.handle_pod_event(Event::History {
let session_start = session_store::LogEntry::SessionStart {
ts: 1,
system_prompt: None,
config: Default::default(),
history: vec![session_store::LoggedItem::from(
&llm_worker::Item::system_message("[File: src/main.rs]\nfn main() {}"),
)],
forked_from: None,
compacted_from: None,
};
let session_start_value = serde_json::to_value(&session_start).unwrap();
app.handle_pod_event(Event::Snapshot {
greeting: test_greeting(),
items: vec![serde_json::json!({
"type": "message",
"role": "system",
"content": [{
"type": "text",
"text": "[File: src/main.rs]\nfn main() {}",
}],
})],
entries: vec![session_start_value],
status: PodStatus::Running,
});
@ -1349,18 +1427,18 @@ mod completion_flow_tests {
}
#[test]
fn live_system_message_event_uses_history_item_path() {
fn live_entry_routes_system_message_via_hook_injected_items() {
let mut app = App::new("test".into());
app.handle_pod_event(Event::SystemMessage {
item: serde_json::json!({
"type": "message",
let entry = serde_json::json!({
"kind": "hook_injected_items",
"ts": 1,
"items": [{
"kind": "message",
"role": "system",
"content": [{
"type": "text",
"text": "[Workflow /build]\nRun the build",
}],
}),
"content": [{ "kind": "text", "text": "[Workflow /build]\nRun the build" }],
}],
});
app.handle_pod_event(Event::Entry { entry });
assert!(matches!(
app.blocks.as_slice(),
@ -1504,11 +1582,15 @@ mod completion_flow_tests {
```json\n{\n \"tasks\": [\n {\n \"taskid\": 4,\n \
\"status\": \"inprogress\",\n \"subject\": \"from snapshot\",\n \
\"description\": \"d\"\n }\n ]\n}\n```\n";
app.handle_pod_event(Event::SystemMessage {
item: serde_json::json!({
"type": "message",
"role": "system",
"content": [{ "type": "text", "text": snapshot }],
app.handle_pod_event(Event::Entry {
entry: serde_json::json!({
"kind": "hook_injected_items",
"ts": 1,
"items": [{
"kind": "message",
"role": "system",
"content": [{ "kind": "text", "text": snapshot }],
}],
}),
});
@ -1519,10 +1601,10 @@ mod completion_flow_tests {
}
#[test]
fn history_replay_reconstructs_task_store() {
fn snapshot_reconstructs_task_store() {
let mut app = App::new("test".into());
// Live tool call before history lands — restore_history must
// wipe this so it doesn't double-count after replay.
// Live tool call before the snapshot lands — restore must wipe
// this so it doesn't double-count after replay.
app.handle_pod_event(Event::ToolCallStart {
id: "live".into(),
name: "TaskCreate".into(),
@ -1533,28 +1615,33 @@ mod completion_flow_tests {
arguments: r#"{"subject":"live","description":""}"#.into(),
});
app.handle_pod_event(Event::History {
greeting: test_greeting(),
items: vec![
serde_json::json!({
"type": "tool_call",
let assistant_items_entry = serde_json::json!({
"kind": "assistant_items",
"ts": 1,
"items": [
{
"kind": "tool_call",
"call_id": "c1",
"name": "TaskCreate",
"arguments": r#"{"subject":"a","description":"A"}"#,
}),
serde_json::json!({
"type": "tool_call",
},
{
"kind": "tool_call",
"call_id": "c2",
"name": "TaskCreate",
"arguments": r#"{"subject":"b","description":"B"}"#,
}),
serde_json::json!({
"type": "tool_call",
},
{
"kind": "tool_call",
"call_id": "u1",
"name": "TaskUpdate",
"arguments": r#"{"taskid":2,"status":"inprogress"}"#,
}),
},
],
});
app.handle_pod_event(Event::Snapshot {
greeting: test_greeting(),
entries: vec![assistant_items_entry],
status: PodStatus::Running,
});

View File

@ -256,9 +256,10 @@ async fn run(
let mut app = App::new(pod_name);
match PodClient::connect(socket_path).await {
Ok(mut client) => {
Ok(client) => {
app.connected = true;
let _ = client.send(&Method::GetHistory).await;
// The Pod sends `Event::Snapshot` automatically on connect;
// no explicit method call is required to fetch history.
run_loop(terminal, &mut app, client).await?;
}
Err(e) => {

View File

@ -1,55 +0,0 @@
# Pod: 状態と socket 配信を session log 正本に統合する
## 背景
Pod の状態は現在 3 つの形で同居している:
1. **session log** (`crates/session-store/src/session_log.rs:LogEntry`) — append-only の typed 正本。 `UserInput { segments }` / `AssistantItems` / `ToolResults` / `HookInjectedItems` / `TurnEnd` / `SessionStart` で会話全体を表現できる。
2. **worker.history** (`Vec<Item>`) — LLM に投げるために flatten / 加工された派生 view。 user_message は `Vec<Segment>` を flatten した String になっている。
3. **PodSharedState**`history` + `user_segments` — worker.history を ipc 層に渡すための中継ミラー。 typed segments は parallel 配列で別途保持。
`Method::GetHistory` (`crates/pod/src/ipc/server.rs:132-182`) は (3) の中継から組み立てており、 平坦化された user_message に segments を後付けする overlay + skip-align ロジックが必要になっている。 broadcast (`event_tx`) はライブイベントだけを流し、 接続時 snapshot は別経路 + 別 Event 型 (`Event::History`) で返るため、 再アタッチ時に snapshot ↔ live が連続しない (`tickets/pod-socket-state-view.md` の問題)。
派生方向が逆転している: 正本は session log なのに (3) は (2) を経由した二次派生になっており、 (1) が既に持ってる typed 情報を flatten/復元で往復する歪んだ構造を生んでいる。 また `Method::GetHistory` が RPC 形を取っていることで、 同じ socket writer に「broadcast forwarder」 と「query handler」 の 2 経路が同居している。
## 方針
- session log を Pod 状態の単一正本として位置付け、 worker.history は LLM context 投影用の内部 view に格下げる。 ipc 経路には worker.history が現れない。
- 接続クライアントへの配信を 「session log の prefix (snapshot) + suffix (live)」 という同型ストリームに統合する。 query/reply 型の `Method::GetHistory` を廃止し、 接続自体が暗黙の subscribe-with-replay として動作する。
- ストリーミング系イベント (`TextDelta` / `ToolCallStart` / `ToolCallArgsDelta` 等) は progressive 描画用の best-effort hint に役割を限定する。 late attach で過去 delta が失われるのは仕様。 確定情報は session log entry の broadcast で別途到達する。
- entry commit の hook 点は worker 側の確定 callback に置く。 現状 `Pod::run()` 末尾で `persist_turn``history_before..` を一括 flush しているが (`crates/pod/src/pod.rs:1491-1502`)、 これを 「worker が assistant block / tool call / tool result を確定した瞬間に append_entry を呼ぶ」 形へ移す。 `wire_event_bridges_on_worker` で worker → event_tx を bridge しているのと同じ箇所に append_entry hook を追加する想定で、 worker 内部構造への介入は確定 callback 受け口の追加に限定する。
- atomicity の中身は「disk write が成功した entry のみ broadcast に乗る」 順序保証。 alerter は memory-only なので buffer lock + `broadcast::send` で完結するが、 session log は disk I/O が混じるため対称ではない。 `append_entry` は (1) disk write → (2) in-memory mirror 更新 → (3) `Event::Entry` broadcast の順で、 (1) 失敗時は (2)(3) を行わず error を上に返す。 (2)(3) は同一の subscribe lock 下で行い、 `subscribe_with_snapshot` が見る mirror と receiver 側のイベント列に重複・欠落・順序逆転が出ないようにする。
- `Event::SystemMessage` 廃止に伴い、 system_message を LogEntry に焼く責務は controller 側の `Event::SystemMessage` 送信点 (`crates/pod/src/controller.rs:372`) を `LogEntry::HookInjectedItems` の append_entry 呼び出しに置き換える形で取る。 「context に乗せる前に history に commit する」 という CLAUDE.md の加工原則に揃う。 notify 系の history 焼き込みは `tickets/notify-history-persist.md` が別途扱う領域で、 本チケットは system_message 経路の置換のみを範囲とする。
## 要件
- session log entry の commit は単一経路 (`Pod::append_entry` 相当) を通り、 「永続書き込み + in-memory mirror 更新 + `Event::Entry(LogEntry)` broadcast」 を atomic に行う。 atomicity は alerter と同じパターンの `subscribe_with_snapshot` 用ロックで保証される。
- entry commit は **per-item / per-block 粒度** で行う。 現在の turn 末尾一括の `persist_turn` / `save_delta` を分解し、 mid-turn 接続で進行中の tool call / 確定済み assistant block / user input すべてが snapshot から見える状態にする。
- 接続クライアントは接続時に `Event::Snapshot { entries: Vec<LogEntry>, greeting, status }` を受信し、 続けて live `Event::Entry(LogEntry)` を時系列で受信する。 prefix と suffix の境目に重複・欠落が無い。
- typed user input (`Vec<Segment>`) は flatten/復元の往復なく client に届く。 `PodSharedState.user_segments` と GetHistory の overlay+skip-align ロジックを廃止する。
- ストリーミング hint は変更なし継続。 ただし「確定情報は entry にあり、 hint は描画進捗のみ」 という分担を protocol 上のドキュメントで明記する。
- TUI は `Event::Snapshot` / `Event::Entry` 駆動で view を組み立てる。 既存ブロック描画と等価な LogEntry → Block mapping を実装する。
- inter-pod query (`crates/pod/src/spawn/comm_tools.rs` の GetHistory 経路) は新 snapshot 形式に追従する。
## 完了条件
- session log entry 1 件の commit が、 永続書き込みと `Event::Entry` broadcast を atomic に同期させる経路で行われる。 mid-turn の任意の瞬間で、 session log と Event::Entry の到達順が常に整合する。
- 接続時に `Event::Snapshot` が必ず流れ、 直後から live `Event::Entry` が同型で連続する。 mid-turn 再アタッチで進行中の user input / 確定済み assistant 出力 / 進行中の tool call / 確定済み tool result が view に再現される。
- `Method::GetHistory` / `Event::History` / `Event::SystemMessage` が protocol から削除されている。 後者 2 つは `Event::Entry` (`HookInjectedItems` バリアント等) で代替される。
- `PodSharedState.history` / `PodSharedState.user_segments` が削除されている。 `PodSharedState` は status / greeting / fs_view / workflow / knowledge の lookup ハブとして残る。
- `crates/pod/src/runtime/dir.rs``history.json` write は廃止または用途縮小される (session log が正本)。
- 既存テスト (`crates/pod/tests/controller_test.rs`、 `crates/session-store/tests/`、 TUI 関連) が通る。 ターン中再アタッチで in-flight turn の user_input が view に含まれることを示すテストが新規追加されている。
## 範囲外
- `LogEntry` スキーマの変更 (バリアントは現状維持)。
- compaction / fork 動作の変更 (既存の `SessionStart.{compacted_from, forked_from}` がそのまま使われる)。
- TUI rendering の機能拡張。 LogEntry → 既存 Block の mapping は等価再構成に留め、 装飾追加は別チケット。
- `PodSharedState` の完全廃止と Pod 借用構造の分解。 controller が `&mut Pod` を握る構造は変えない。
- broadcast cap (256) の最適化、 ストリーミング hint の replay buffer 化。
- `Method::ListCompletions` の subscribe 化 (これは真の query なので RPC 形のまま残す)。
## 関連
- `tickets/pod-persistent-state.md` の「session log は引き続き会話状態の唯一の復元ソース」方針と整合する。 Pod identity 永続化は引き続き別チケット領域。
- `tickets/notify-history-persist.md` の「context に乗せる前に history に commit」 原則と同根。 本チケットは system_message 経路の置換まで、 notify 経路は当該チケットで扱う。