diff --git a/crates/pod/src/in_flight.rs b/crates/pod/src/in_flight.rs index 1ad2f216..b8ca6c58 100644 --- a/crates/pod/src/in_flight.rs +++ b/crates/pod/src/in_flight.rs @@ -191,31 +191,14 @@ impl InFlightEvents { }); } - pub(crate) fn clear_for_committed_item(&self, item: &LoggedItem) { + pub(crate) fn clear_for_committed_item_then( + &self, + item: &LoggedItem, + f: impl FnOnce() -> R, + ) -> R { let mut inner = self.lock(); - match item { - LoggedItem::Message { role, content } - if matches!(role, session_store::LoggedRole::Assistant) => - { - let text = content - .iter() - .filter_map(|part| match part { - LoggedContentPart::Text { text } => Some(text.as_str()), - LoggedContentPart::Refusal { refusal } => Some(refusal.as_str()), - }) - .collect::(); - if !text.is_empty() { - inner.remove_first_text_matching(&text); - } - } - LoggedItem::Reasoning { text, .. } => { - inner.remove_first_thinking_matching(text); - } - LoggedItem::ToolCall { call_id, .. } => { - inner.remove_tool_call(call_id); - } - _ => {} - } + inner.clear_for_committed_item(item); + f() } fn lock(&self) -> MutexGuard<'_, InFlightInner> { @@ -236,6 +219,32 @@ impl InFlightInner { .find(|block| block.block_id() == block_id) } + fn clear_for_committed_item(&mut self, item: &LoggedItem) { + match item { + LoggedItem::Message { role, content } + if matches!(role, session_store::LoggedRole::Assistant) => + { + let text = content + .iter() + .filter_map(|part| match part { + LoggedContentPart::Text { text } => Some(text.as_str()), + LoggedContentPart::Refusal { refusal } => Some(refusal.as_str()), + }) + .collect::(); + if !text.is_empty() { + self.remove_first_text_matching(&text); + } + } + LoggedItem::Reasoning { text, .. } => { + self.remove_first_thinking_matching(text); + } + LoggedItem::ToolCall { call_id, .. } => { + self.remove_tool_call(call_id); + } + _ => {} + } + } + fn snapshot(&self) -> InFlightSnapshot { InFlightSnapshot { blocks: self @@ -351,18 +360,116 @@ mod tests { assert!(rx.try_recv().is_err()); } + #[test] + fn session_log_and_in_flight_snapshot_prevents_mirror_only_assistant_gap() { + use std::sync::mpsc; + use std::thread; + + use crate::segment_log_sink::SegmentLogSink; + use session_store::{LogEntry, LoggedRole}; + + let (event_tx, _) = broadcast::channel(16); + let sink = SegmentLogSink::new(); + let in_flight = InFlightEvents::new(event_tx); + let block_id = in_flight.start_text_block(); + in_flight.text_delta(block_id, "done".into()); + in_flight.text_done(block_id, "done".into()); + + let assistant_item = LoggedItem::Message { + role: LoggedRole::Assistant, + content: vec![LoggedContentPart::Text { + text: "done".into(), + }], + }; + let assistant_entry = LogEntry::AssistantItem { + ts: 1, + item: assistant_item.clone(), + }; + + let in_flight_guard = in_flight.snapshot_guard(); + let in_flight_for_commit = in_flight.clone(); + let sink_for_commit = sink.clone(); + let (committed_tx, committed_rx) = mpsc::channel(); + let commit_thread = thread::spawn(move || { + // This mirrors Pod::append_entry ordering: clear in-flight first, + // then publish the finalized AssistantItem. AssistantItem entries + // are mirror-only and are not delivered as live entry events. + in_flight_for_commit.clear_for_committed_item_then(&assistant_item, || { + sink_for_commit.publish(assistant_entry); + }); + committed_tx.send(()).unwrap(); + }); + + let (entries_snapshot, mut entry_rx) = sink.subscribe_with_snapshot(); + let in_flight_snapshot = snapshot_from_guard(&in_flight_guard); + drop(in_flight_guard); + + committed_rx.recv().unwrap(); + commit_thread.join().unwrap(); + + assert!(entries_snapshot.is_empty()); + assert!(matches!( + in_flight_snapshot.blocks.as_slice(), + [InFlightBlock::Text { text, finished: true }] if text == "done" + )); + assert!(entry_rx.try_recv().is_err()); + let post_commit_guard = in_flight.snapshot_guard(); + assert!(snapshot_from_guard(&post_commit_guard).is_empty()); + } + + #[test] + fn committed_assistant_snapshot_does_not_duplicate_in_flight_block() { + use crate::segment_log_sink::SegmentLogSink; + use session_store::{LogEntry, LoggedRole}; + + let (event_tx, _) = broadcast::channel(16); + let sink = SegmentLogSink::new(); + let in_flight = InFlightEvents::new(event_tx); + let block_id = in_flight.start_text_block(); + in_flight.text_delta(block_id, "done".into()); + in_flight.text_done(block_id, "done".into()); + + let assistant_item = LoggedItem::Message { + role: LoggedRole::Assistant, + content: vec![LoggedContentPart::Text { + text: "done".into(), + }], + }; + let assistant_entry = LogEntry::AssistantItem { + ts: 1, + item: assistant_item.clone(), + }; + + in_flight.clear_for_committed_item_then(&assistant_item, || { + sink.publish(assistant_entry); + }); + + let in_flight_guard = in_flight.snapshot_guard(); + let (entries_snapshot, _entry_rx) = sink.subscribe_with_snapshot(); + let in_flight_snapshot = snapshot_from_guard(&in_flight_guard); + + assert!(matches!( + entries_snapshot.as_slice(), + [LogEntry::AssistantItem { item, .. }] if item == &assistant_item + )); + assert!(in_flight_snapshot.is_empty()); + } + #[test] fn committed_item_clears_matching_in_flight_block() { let (event_tx, _) = broadcast::channel(16); let in_flight = InFlightEvents::new(event_tx); let block_id = in_flight.start_text_block(); in_flight.text_delta(block_id, "done".into()); - in_flight.clear_for_committed_item(&LoggedItem::Message { - role: session_store::LoggedRole::Assistant, - content: vec![LoggedContentPart::Text { - text: "done".into(), - }], - }); + in_flight.clear_for_committed_item_then( + &LoggedItem::Message { + role: session_store::LoggedRole::Assistant, + content: vec![LoggedContentPart::Text { + text: "done".into(), + }], + }, + || (), + ); let guard = in_flight.snapshot_guard(); assert!(snapshot_from_guard(&guard).is_empty()); diff --git a/crates/pod/src/ipc/alerter.rs b/crates/pod/src/ipc/alerter.rs index c1089099..390e3158 100644 --- a/crates/pod/src/ipc/alerter.rs +++ b/crates/pod/src/ipc/alerter.rs @@ -14,9 +14,7 @@ use std::time::{SystemTime, UNIX_EPOCH}; use tokio::sync::broadcast; -use protocol::{Alert, AlertLevel, AlertSource, Event, InFlightSnapshot}; - -use crate::in_flight::{InFlightEvents, snapshot_from_guard}; +use protocol::{Alert, AlertLevel, AlertSource, Event}; /// Upper bound on buffered alerts. When exceeded, the oldest /// entries are discarded so a long-running session cannot leak @@ -87,22 +85,6 @@ impl Alerter { let snapshot: Vec = buf.iter().cloned().collect(); (snapshot, rx) } - - pub fn subscribe_with_alerts_and_in_flight_snapshot( - &self, - in_flight: &InFlightEvents, - ) -> (Vec, InFlightSnapshot, broadcast::Receiver) { - let buf = self - .inner - .buffer - .lock() - .expect("alerter buffer mutex poisoned"); - let in_flight_guard = in_flight.snapshot_guard(); - let rx = self.inner.event_tx.subscribe(); - let alerts: Vec = buf.iter().cloned().collect(); - let in_flight = snapshot_from_guard(&in_flight_guard); - (alerts, in_flight, rx) - } } fn now_ms() -> i64 { diff --git a/crates/pod/src/ipc/server.rs b/crates/pod/src/ipc/server.rs index a6be3432..82305cb4 100644 --- a/crates/pod/src/ipc/server.rs +++ b/crates/pod/src/ipc/server.rs @@ -7,6 +7,7 @@ use tokio::net::UnixListener; use tokio::task::JoinHandle; use crate::controller::PodHandle; +use crate::in_flight::snapshot_from_guard; use protocol::{Event, Method}; /// Unix socket server for Pod Protocol. @@ -104,20 +105,22 @@ async fn handle_connection(stream: tokio::net::UnixStream, handle: PodHandle) { let mut reader = JsonLineReader::new(reader); let mut writer = JsonLineWriter::new(writer); - // Atomically subscribe to the session-log mirror first. The - // returned (snapshot, rx) pair partitions the entry timeline: - // entries committed before this call appear in `entries`, every - // entry after lands on `entry_rx`. Doing this before the alert - // snapshot keeps both ordering pairs internally consistent. - let (entries_snapshot, mut entry_rx) = handle.sink.subscribe_with_snapshot(); + // Hold the in-flight stream lock while taking the session-log mirror + // snapshot. `LogEntry::AssistantItem` is mirror-only for live clients, + // so a finalized assistant block must be observed either as an already + // committed entry or as the still-present in-flight block. This lock + // order matches `append_entry` (in-flight clear before sink publish) and + // keeps the snapshot/live boundary gap-free. + let (entries_snapshot, mut entry_rx, alert_snapshot, mut rx, in_flight) = { + let in_flight_guard = handle.in_flight.snapshot_guard(); + let (entries_snapshot, entry_rx) = handle.sink.subscribe_with_snapshot(); - // Atomically subscribe and snapshot buffered alerts so that - // warnings emitted before this client connected are replayed - // exactly once — they appear in the snapshot, and any alert - // arriving afterwards reaches us through `rx`. - let (alert_snapshot, in_flight, mut rx) = handle - .alerter - .subscribe_with_alerts_and_in_flight_snapshot(&handle.in_flight); + // Atomically subscribe and snapshot buffered alerts so that warnings + // emitted before this client connected are replayed exactly once. + let (alert_snapshot, rx) = handle.alerter.subscribe_with_snapshot(); + let in_flight = snapshot_from_guard(&in_flight_guard); + (entries_snapshot, entry_rx, alert_snapshot, rx, in_flight) + }; for alert in alert_snapshot { if writer.write(&Event::Alert(alert)).await.is_err() { return; diff --git a/crates/pod/src/pod.rs b/crates/pod/src/pod.rs index 58b83e96..21f8a04d 100644 --- a/crates/pod/src/pod.rs +++ b/crates/pod/src/pod.rs @@ -185,7 +185,11 @@ where self.state.increment_entries(); if let Some(in_flight) = &self.in_flight { if let LogEntry::AssistantItem { item, .. } = &entry { - in_flight.clear_for_committed_item(item); + let item_for_clear = item.clone(); + in_flight.clear_for_committed_item_then(&item_for_clear, || { + self.sink.publish(entry); + }); + return Ok(()); } } self.sink.publish(entry); diff --git a/crates/protocol/src/lib.rs b/crates/protocol/src/lib.rs index 12cdd3c8..ee2e38f8 100644 --- a/crates/protocol/src/lib.rs +++ b/crates/protocol/src/lib.rs @@ -639,11 +639,6 @@ pub struct RewindSummary { pub tool_side_effect_warning: bool, } -/// Pod self-description rendered by the TUI when a session starts empty. -/// -/// Built once in the Pod controller from the resolved manifest and -/// transmitted alongside `Event::Snapshot` so clients don't need -/// their own view of the manifest. /// Unfinished model output included in `Event::Snapshot` for clients that /// attach while an LLM response is still streaming. /// @@ -700,6 +695,11 @@ impl InFlightToolCallState { } } +/// Pod self-description rendered by the TUI when a session starts empty. +/// +/// Built once in the Pod controller from the resolved manifest and +/// transmitted alongside `Event::Snapshot` so clients don't need +/// their own view of the manifest. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Greeting { pub pod_name: String,