From 74aca6f6c5d0a367c04a8f8934afc9accaaa6df9 Mon Sep 17 00:00:00 2001 From: Hare Date: Sun, 21 Jun 2026 20:30:01 +0900 Subject: [PATCH 1/2] fix: snapshot in-flight stream state --- crates/pod/src/controller.rs | 62 ++-- crates/pod/src/discovery.rs | 6 + crates/pod/src/in_flight.rs | 370 ++++++++++++++++++++++++ crates/pod/src/ipc/alerter.rs | 20 +- crates/pod/src/ipc/server.rs | 5 +- crates/pod/src/lib.rs | 1 + crates/pod/src/pod.rs | 18 ++ crates/pod/src/spawn/comm_tools.rs | 1 + crates/pod/src/ticket_event_notify.rs | 2 + crates/pod/tests/pod_comm_tools_test.rs | 2 + crates/pod/tests/pod_events_test.rs | 1 + crates/pod/tests/spawn_pod_test.rs | 1 + crates/protocol/src/lib.rs | 121 ++++++++ crates/tui/src/app.rs | 110 ++++++- crates/tui/src/console/mod.rs | 2 + crates/tui/src/dashboard/tests.rs | 2 + crates/tui/src/pod_list.rs | 1 + 17 files changed, 683 insertions(+), 42 deletions(-) create mode 100644 crates/pod/src/in_flight.rs diff --git a/crates/pod/src/controller.rs b/crates/pod/src/controller.rs index 8b7c9fcd..c0b64ea1 100644 --- a/crates/pod/src/controller.rs +++ b/crates/pod/src/controller.rs @@ -14,6 +14,7 @@ use tracing::{debug, warn}; use crate::discovery::{PodDiscovery, list_pods_tool, restore_pod_tool, send_to_peer_pod_tool}; use crate::feature::FeatureRegistryBuilder; +use crate::in_flight::InFlightEvents; use crate::ipc::alerter::Alerter; use crate::ipc::notify_buffer::NotifyBuffer; use crate::ipc::server::SocketServer; @@ -47,6 +48,7 @@ pub struct PodHandle { pub shared_state: Arc, pub runtime_dir: Arc, pub alerter: Alerter, + pub in_flight: InFlightEvents, /// Segment-log mirror + broadcast handle. The IPC server snapshots /// it on every new connection (Event::Snapshot) and forwards /// subsequent commits (Event::Entry) on the receiver. @@ -159,6 +161,8 @@ impl PodController { let (method_tx, method_rx) = mpsc::channel::(32); let (event_tx, _) = broadcast::channel::(256); let alerter = Alerter::new(event_tx.clone()); + let in_flight = InFlightEvents::new(event_tx.clone()); + pod.attach_in_flight_events(in_flight.clone()); // Runtime directory is created before tool registration because // the spawn-tool factories need its socket path, and before the @@ -225,7 +229,7 @@ impl PodController { pod.wire_history_persistence(); // === 2. Worker event bridge wiring === - wire_event_bridges_on_worker(&mut pod, &event_tx, &alerter); + wire_event_bridges_on_worker(&mut pod, &event_tx, &alerter, &in_flight); // === 3. Tool registration (builtin / memory / spawn-orchestration) === let fs_for_view = register_pod_tools( @@ -289,6 +293,7 @@ impl PodController { shared_state: shared_state.clone(), runtime_dir: runtime_dir.clone(), alerter: alerter.clone(), + in_flight: in_flight.clone(), sink: pod.sink(), }; @@ -333,6 +338,7 @@ fn wire_event_bridges_on_worker( pod: &mut Pod, event_tx: &broadcast::Sender, alerter: &Alerter, + in_flight: &InFlightEvents, ) where C: LlmClient + Clone + 'static, St: Store + PodMetadataStore + Clone + 'static, @@ -386,83 +392,66 @@ fn wire_event_bridges_on_worker( }); }); - let tx = event_tx.clone(); + let in_flight_text = in_flight.clone(); let activity = ai_activity.clone(); worker.on_text_block(move |block| { - let tx_d = tx.clone(); + let block_id = in_flight_text.start_text_block(); + let in_flight_d = in_flight_text.clone(); let activity_d = activity.clone(); block.on_delta(move |text| { activity_d.fetch_add(1, Ordering::SeqCst); - let _ = tx_d.send(Event::TextDelta { - text: text.to_owned(), - }); + in_flight_d.text_delta(block_id, text.to_owned()); }); - let tx_s = tx.clone(); + let in_flight_s = in_flight_text.clone(); let activity_s = activity.clone(); block.on_stop(move |text| { if !text.is_empty() { activity_s.fetch_add(1, Ordering::SeqCst); } - let _ = tx_s.send(Event::TextDone { - text: text.to_owned(), - }); + in_flight_s.text_done(block_id, text.to_owned()); }); }); - let tx = event_tx.clone(); + let in_flight_thinking = in_flight.clone(); let activity = ai_activity.clone(); worker.on_thinking_block(move |block| { // Start fires unconditionally so the TUI can show "Thinking..." // even when the provider doesn't emit plaintext deltas. activity.fetch_add(1, Ordering::SeqCst); - let _ = tx.send(Event::ThinkingStart); - let tx_d = tx.clone(); + let block_id = in_flight_thinking.thinking_start(); + let in_flight_d = in_flight_thinking.clone(); let activity_d = activity.clone(); block.on_delta(move |text| { activity_d.fetch_add(1, Ordering::SeqCst); - let _ = tx_d.send(Event::ThinkingDelta { - text: text.to_owned(), - }); + in_flight_d.thinking_delta(block_id, text.to_owned()); }); - let tx_s = tx.clone(); + let in_flight_s = in_flight_thinking.clone(); let activity_s = activity.clone(); block.on_stop(move |text| { if !text.is_empty() { activity_s.fetch_add(1, Ordering::SeqCst); } - let _ = tx_s.send(Event::ThinkingDone { - text: text.to_owned(), - }); + in_flight_s.thinking_done(block_id, text.to_owned()); }); }); - let tx = event_tx.clone(); + let in_flight_tool = in_flight.clone(); let activity = ai_activity.clone(); worker.on_tool_use_block(move |start, block| { activity.fetch_add(1, Ordering::SeqCst); - let _ = tx.send(Event::ToolCallStart { - id: start.id.clone(), - name: start.name.clone(), - }); + let block_id = in_flight_tool.tool_call_start(start.id.clone(), start.name.clone()); let id_for_delta = start.id.clone(); - let tx_d = tx.clone(); + let in_flight_d = in_flight_tool.clone(); let activity_d = activity.clone(); block.on_delta(move |json| { activity_d.fetch_add(1, Ordering::SeqCst); - let _ = tx_d.send(Event::ToolCallArgsDelta { - id: id_for_delta.clone(), - json: json.to_owned(), - }); + in_flight_d.tool_call_args_delta(block_id, id_for_delta.clone(), json.to_owned()); }); - let tx_s = tx.clone(); + let in_flight_s = in_flight_tool.clone(); let activity_s = activity.clone(); block.on_stop(move |call| { activity_s.fetch_add(1, Ordering::SeqCst); - let _ = tx_s.send(Event::ToolCallDone { - id: call.id.clone(), - name: call.name.clone(), - arguments: call.input.to_string(), - }); + in_flight_s.tool_call_done(block_id, call.id.clone(), call.input.to_string()); }); }); @@ -1535,6 +1524,7 @@ mod tests { context_tokens: 0, }, status: PodStatus::Idle, + in_flight: Default::default(), }) .await .ok()?; diff --git a/crates/pod/src/discovery.rs b/crates/pod/src/discovery.rs index 935238ae..254d90ea 100644 --- a/crates/pod/src/discovery.rs +++ b/crates/pod/src/discovery.rs @@ -1463,6 +1463,7 @@ mod tests { context_tokens: 0, }, status: PodStatus::Idle, + in_flight: Default::default(), }) .await .unwrap(); @@ -1494,6 +1495,7 @@ mod tests { context_tokens: 0, }, status: PodStatus::Idle, + in_flight: Default::default(), }) .await .unwrap(); @@ -1579,6 +1581,7 @@ mod tests { context_tokens: 0, }, status: PodStatus::Idle, + in_flight: Default::default(), }) .await .unwrap(); @@ -1601,6 +1604,7 @@ mod tests { context_tokens: 0, }, status: PodStatus::Idle, + in_flight: Default::default(), }) .await .unwrap(); @@ -1700,6 +1704,7 @@ mod tests { context_tokens: 0, }, status: PodStatus::Paused, + in_flight: Default::default(), }) .await .unwrap(); @@ -1748,6 +1753,7 @@ mod tests { context_tokens: 0, }, status: PodStatus::Idle, + in_flight: Default::default(), }) .await; }); diff --git a/crates/pod/src/in_flight.rs b/crates/pod/src/in_flight.rs new file mode 100644 index 00000000..1ad2f216 --- /dev/null +++ b/crates/pod/src/in_flight.rs @@ -0,0 +1,370 @@ +use std::sync::{Arc, Mutex, MutexGuard}; + +use protocol::{Event, InFlightBlock, InFlightSnapshot, InFlightToolCallState}; +use session_store::{LoggedContentPart, LoggedItem}; +use tokio::sync::broadcast; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct InFlightBlockId(u64); + +#[derive(Debug, Clone)] +pub struct InFlightEvents { + inner: Arc>, + event_tx: broadcast::Sender, +} + +#[derive(Debug)] +pub(crate) struct InFlightInner { + next_block_id: u64, + blocks: Vec, +} + +#[derive(Debug, Clone)] +enum TrackedBlock { + Text { + block_id: InFlightBlockId, + text: String, + finished: bool, + }, + Thinking { + block_id: InFlightBlockId, + text: String, + finished: bool, + }, + ToolCall { + block_id: InFlightBlockId, + id: String, + name: String, + args: String, + state: InFlightToolCallState, + }, +} + +impl InFlightEvents { + pub(crate) fn new(event_tx: broadcast::Sender) -> Self { + Self { + inner: Arc::new(Mutex::new(InFlightInner { + next_block_id: 1, + blocks: Vec::new(), + })), + event_tx, + } + } + + pub(crate) fn snapshot_guard(&self) -> MutexGuard<'_, InFlightInner> { + self.inner.lock().expect("in-flight event mutex poisoned") + } + + pub(crate) fn start_text_block(&self) -> InFlightBlockId { + let mut inner = self.lock(); + let block_id = inner.next_id(); + inner.blocks.push(TrackedBlock::Text { + block_id, + text: String::new(), + finished: false, + }); + block_id + } + + pub(crate) fn text_delta(&self, block_id: InFlightBlockId, text: String) { + let mut inner = self.lock(); + if let Some(TrackedBlock::Text { + text: current, + finished, + .. + }) = inner.find_block_mut(block_id) + { + current.push_str(&text); + *finished = false; + } + let _ = self.event_tx.send(Event::TextDelta { text }); + } + + pub(crate) fn text_done(&self, block_id: InFlightBlockId, text: String) { + let mut inner = self.lock(); + if let Some(TrackedBlock::Text { + text: current, + finished, + .. + }) = inner.find_block_mut(block_id) + { + if current.is_empty() { + *current = text.clone(); + } + *finished = true; + } + let _ = self.event_tx.send(Event::TextDone { text }); + } + + pub(crate) fn thinking_start(&self) -> InFlightBlockId { + let mut inner = self.lock(); + let block_id = inner.next_id(); + inner.blocks.push(TrackedBlock::Thinking { + block_id, + text: String::new(), + finished: false, + }); + let _ = self.event_tx.send(Event::ThinkingStart); + block_id + } + + pub(crate) fn thinking_delta(&self, block_id: InFlightBlockId, text: String) { + let mut inner = self.lock(); + if let Some(TrackedBlock::Thinking { + text: current, + finished, + .. + }) = inner.find_block_mut(block_id) + { + current.push_str(&text); + *finished = false; + } + let _ = self.event_tx.send(Event::ThinkingDelta { text }); + } + + pub(crate) fn thinking_done(&self, block_id: InFlightBlockId, text: String) { + let mut inner = self.lock(); + if let Some(TrackedBlock::Thinking { + text: current, + finished, + .. + }) = inner.find_block_mut(block_id) + { + if current.is_empty() { + *current = text.clone(); + } + *finished = true; + } + let _ = self.event_tx.send(Event::ThinkingDone { text }); + } + + pub(crate) fn tool_call_start(&self, id: String, name: String) -> InFlightBlockId { + let mut inner = self.lock(); + let block_id = inner.next_id(); + inner.blocks.push(TrackedBlock::ToolCall { + block_id, + id: id.clone(), + name: name.clone(), + args: String::new(), + state: InFlightToolCallState::Pending, + }); + let _ = self.event_tx.send(Event::ToolCallStart { id, name }); + block_id + } + + pub(crate) fn tool_call_args_delta( + &self, + block_id: InFlightBlockId, + id: String, + delta: String, + ) { + let mut inner = self.lock(); + if let Some(TrackedBlock::ToolCall { args, state, .. }) = inner.find_block_mut(block_id) { + args.push_str(&delta); + *state = InFlightToolCallState::StreamingArgs; + } + let _ = self + .event_tx + .send(Event::ToolCallArgsDelta { id, json: delta }); + } + + pub(crate) fn tool_call_done(&self, block_id: InFlightBlockId, id: String, args: String) { + let mut inner = self.lock(); + let mut name = String::new(); + if let Some(TrackedBlock::ToolCall { + name: current_name, + args: current, + state, + .. + }) = inner.find_block_mut(block_id) + { + name = current_name.clone(); + if current.is_empty() { + *current = args.clone(); + } + *state = InFlightToolCallState::Done; + } + let _ = self.event_tx.send(Event::ToolCallDone { + id, + name, + arguments: args, + }); + } + + pub(crate) fn clear_for_committed_item(&self, item: &LoggedItem) { + let mut inner = self.lock(); + match item { + LoggedItem::Message { role, content } + if matches!(role, session_store::LoggedRole::Assistant) => + { + let text = content + .iter() + .filter_map(|part| match part { + LoggedContentPart::Text { text } => Some(text.as_str()), + LoggedContentPart::Refusal { refusal } => Some(refusal.as_str()), + }) + .collect::(); + if !text.is_empty() { + inner.remove_first_text_matching(&text); + } + } + LoggedItem::Reasoning { text, .. } => { + inner.remove_first_thinking_matching(text); + } + LoggedItem::ToolCall { call_id, .. } => { + inner.remove_tool_call(call_id); + } + _ => {} + } + } + + fn lock(&self) -> MutexGuard<'_, InFlightInner> { + self.inner.lock().expect("in-flight event mutex poisoned") + } +} + +impl InFlightInner { + fn next_id(&mut self) -> InFlightBlockId { + let id = InFlightBlockId(self.next_block_id); + self.next_block_id = self.next_block_id.saturating_add(1); + id + } + + fn find_block_mut(&mut self, block_id: InFlightBlockId) -> Option<&mut TrackedBlock> { + self.blocks + .iter_mut() + .find(|block| block.block_id() == block_id) + } + + fn snapshot(&self) -> InFlightSnapshot { + InFlightSnapshot { + blocks: self + .blocks + .iter() + .filter_map(TrackedBlock::to_snapshot_block) + .collect(), + } + } + + fn remove_first_text_matching(&mut self, committed: &str) { + if let Some(index) = self.blocks.iter().position(|block| match block { + TrackedBlock::Text { text, .. } => text == committed, + _ => false, + }) { + self.blocks.remove(index); + } + } + + fn remove_first_thinking_matching(&mut self, committed: &str) { + if let Some(index) = self.blocks.iter().position(|block| match block { + TrackedBlock::Thinking { text, .. } => text == committed, + _ => false, + }) { + self.blocks.remove(index); + } + } + + fn remove_tool_call(&mut self, call_id: &str) { + if let Some(index) = self.blocks.iter().position(|block| match block { + TrackedBlock::ToolCall { id, .. } => id == call_id, + _ => false, + }) { + self.blocks.remove(index); + } + } +} + +impl TrackedBlock { + fn block_id(&self) -> InFlightBlockId { + match self { + TrackedBlock::Text { block_id, .. } + | TrackedBlock::Thinking { block_id, .. } + | TrackedBlock::ToolCall { block_id, .. } => *block_id, + } + } + + fn to_snapshot_block(&self) -> Option { + match self { + TrackedBlock::Text { text, finished, .. } => { + if text.is_empty() { + None + } else { + Some(InFlightBlock::Text { + text: text.clone(), + finished: *finished, + }) + } + } + TrackedBlock::Thinking { text, finished, .. } => Some(InFlightBlock::Thinking { + text: text.clone(), + finished: *finished, + }), + TrackedBlock::ToolCall { + id, + name, + args, + state, + .. + } => Some(InFlightBlock::ToolCall { + id: id.clone(), + name: name.clone(), + args: args.clone(), + state: *state, + }), + } + } +} + +pub(crate) fn snapshot_from_guard(guard: &MutexGuard<'_, InFlightInner>) -> InFlightSnapshot { + guard.snapshot() +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn snapshot_boundary_does_not_duplicate_or_gap_delta_sent_after_subscribe() { + let (event_tx, _) = broadcast::channel(16); + let in_flight = InFlightEvents::new(event_tx.clone()); + let block_id = in_flight.start_text_block(); + in_flight.text_delta(block_id, "hel".into()); + + let guard = in_flight.snapshot_guard(); + let mut rx = event_tx.subscribe(); + let snapshot = snapshot_from_guard(&guard); + drop(guard); + + in_flight.text_delta(block_id, "lo".into()); + + assert_eq!( + snapshot.blocks, + vec![InFlightBlock::Text { + text: "hel".into(), + finished: false, + }] + ); + assert!(matches!( + rx.try_recv().unwrap(), + Event::TextDelta { text } if text == "lo" + )); + assert!(rx.try_recv().is_err()); + } + + #[test] + fn committed_item_clears_matching_in_flight_block() { + let (event_tx, _) = broadcast::channel(16); + let in_flight = InFlightEvents::new(event_tx); + let block_id = in_flight.start_text_block(); + in_flight.text_delta(block_id, "done".into()); + in_flight.clear_for_committed_item(&LoggedItem::Message { + role: session_store::LoggedRole::Assistant, + content: vec![LoggedContentPart::Text { + text: "done".into(), + }], + }); + + let guard = in_flight.snapshot_guard(); + assert!(snapshot_from_guard(&guard).is_empty()); + } +} diff --git a/crates/pod/src/ipc/alerter.rs b/crates/pod/src/ipc/alerter.rs index 390e3158..c1089099 100644 --- a/crates/pod/src/ipc/alerter.rs +++ b/crates/pod/src/ipc/alerter.rs @@ -14,7 +14,9 @@ use std::time::{SystemTime, UNIX_EPOCH}; use tokio::sync::broadcast; -use protocol::{Alert, AlertLevel, AlertSource, Event}; +use protocol::{Alert, AlertLevel, AlertSource, Event, InFlightSnapshot}; + +use crate::in_flight::{InFlightEvents, snapshot_from_guard}; /// Upper bound on buffered alerts. When exceeded, the oldest /// entries are discarded so a long-running session cannot leak @@ -85,6 +87,22 @@ impl Alerter { let snapshot: Vec = buf.iter().cloned().collect(); (snapshot, rx) } + + pub fn subscribe_with_alerts_and_in_flight_snapshot( + &self, + in_flight: &InFlightEvents, + ) -> (Vec, InFlightSnapshot, broadcast::Receiver) { + let buf = self + .inner + .buffer + .lock() + .expect("alerter buffer mutex poisoned"); + let in_flight_guard = in_flight.snapshot_guard(); + let rx = self.inner.event_tx.subscribe(); + let alerts: Vec = buf.iter().cloned().collect(); + let in_flight = snapshot_from_guard(&in_flight_guard); + (alerts, in_flight, rx) + } } fn now_ms() -> i64 { diff --git a/crates/pod/src/ipc/server.rs b/crates/pod/src/ipc/server.rs index 0ebe0752..a6be3432 100644 --- a/crates/pod/src/ipc/server.rs +++ b/crates/pod/src/ipc/server.rs @@ -115,7 +115,9 @@ async fn handle_connection(stream: tokio::net::UnixStream, handle: PodHandle) { // warnings emitted before this client connected are replayed // exactly once — they appear in the snapshot, and any alert // arriving afterwards reaches us through `rx`. - let (alert_snapshot, mut rx) = handle.alerter.subscribe_with_snapshot(); + let (alert_snapshot, in_flight, mut rx) = handle + .alerter + .subscribe_with_alerts_and_in_flight_snapshot(&handle.in_flight); for alert in alert_snapshot { if writer.write(&Event::Alert(alert)).await.is_err() { return; @@ -131,6 +133,7 @@ async fn handle_connection(stream: tokio::net::UnixStream, handle: PodHandle) { .collect(), greeting: handle.shared_state.greeting.clone(), status: handle.shared_state.get_status(), + in_flight, }; if writer.write(&snapshot_event).await.is_err() { return; diff --git a/crates/pod/src/lib.rs b/crates/pod/src/lib.rs index 2153b268..15e37bac 100644 --- a/crates/pod/src/lib.rs +++ b/crates/pod/src/lib.rs @@ -6,6 +6,7 @@ pub mod entrypoint; pub mod feature; pub mod fs_view; pub mod hook; +pub(crate) mod in_flight; pub mod ipc; pub mod prompt; pub mod runtime; diff --git a/crates/pod/src/pod.rs b/crates/pod/src/pod.rs index 010ceafc..58b83e96 100644 --- a/crates/pod/src/pod.rs +++ b/crates/pod/src/pod.rs @@ -35,6 +35,7 @@ use crate::hook::{ Hook, HookRegistryBuilder, OnAbort, OnPromptSubmit, OnTurnEnd, PostToolCall, PreLlmRequest, PreToolCall, }; +use crate::in_flight::InFlightEvents; use crate::ipc::alerter::Alerter; use crate::ipc::interceptor::PodInterceptor; use crate::ipc::notify_buffer::NotifyBuffer; @@ -167,6 +168,7 @@ pub struct LogWriterHandle { pub store: St, pub state: Arc, pub sink: SegmentLogSink, + pub in_flight: Option, } impl LogWriterHandle @@ -181,6 +183,11 @@ where let loc = self.state.location(); self.store.append(loc.session_id, loc.segment_id, &entry)?; self.state.increment_entries(); + if let Some(in_flight) = &self.in_flight { + if let LogEntry::AssistantItem { item, .. } = &entry { + in_flight.clear_for_committed_item(item); + } + } self.sink.publish(entry); Ok(()) } @@ -296,6 +303,7 @@ pub struct Pod { /// notifications, events sent here are NOT replayed to clients that /// connect after the fact — they are fire-and-forget broadcasts. event_tx: Option>, + in_flight: Option, /// Monotonic counter incremented by worker event bridges when an /// assistant-side execution artifact becomes visible to clients before /// it is necessarily committed to history (e.g. streaming text deltas). @@ -449,6 +457,7 @@ impl Pod { system_prompt_template: None, alerter: self.alerter.clone(), event_tx: self.event_tx.clone(), + in_flight: self.in_flight.clone(), ai_activity_counter: self.ai_activity_counter.clone(), pending_notifies: NotifyBuffer::new(), pending_attachments: Arc::new(Mutex::new(Vec::::new())), @@ -484,6 +493,7 @@ impl Pod { store: self.store.clone(), state: self.segment_state.clone(), sink: self.sink.clone(), + in_flight: self.in_flight.clone(), } } @@ -495,6 +505,10 @@ impl Pod { self.log_writer = Some(writer); } + pub fn attach_in_flight_events(&mut self, in_flight: InFlightEvents) { + self.in_flight = Some(in_flight); + } + /// Wire `Worker::on_history_append` to commit each appended item /// directly as a singular `LogEntry::AssistantItem` / `ToolResult` /// through the writer. The controller calls this once per spawned @@ -633,6 +647,7 @@ impl Pod { system_prompt_template: None, alerter: None, event_tx: None, + in_flight: None, ai_activity_counter: Arc::new(AtomicUsize::new(0)), pending_notifies: NotifyBuffer::new(), pending_attachments: Arc::new(Mutex::new(Vec::::new())), @@ -3842,6 +3857,7 @@ where system_prompt_template: common.system_prompt_template, alerter: None, event_tx: None, + in_flight: None, ai_activity_counter: Arc::new(AtomicUsize::new(0)), pending_notifies: NotifyBuffer::new(), pending_attachments: Arc::new(Mutex::new(Vec::::new())), @@ -3951,6 +3967,7 @@ where system_prompt_template: common.system_prompt_template, alerter: None, event_tx: None, + in_flight: None, ai_activity_counter: Arc::new(AtomicUsize::new(0)), pending_notifies: NotifyBuffer::new(), pending_attachments: Arc::new(Mutex::new(Vec::::new())), @@ -4187,6 +4204,7 @@ where system_prompt_template: None, alerter: None, event_tx: None, + in_flight: None, ai_activity_counter: Arc::new(AtomicUsize::new(0)), pending_notifies: NotifyBuffer::new(), pending_attachments: Arc::new(Mutex::new(Vec::::new())), diff --git a/crates/pod/src/spawn/comm_tools.rs b/crates/pod/src/spawn/comm_tools.rs index 27eed70d..57b953b4 100644 --- a/crates/pod/src/spawn/comm_tools.rs +++ b/crates/pod/src/spawn/comm_tools.rs @@ -515,6 +515,7 @@ mod tests { context_tokens: 0, }, status: PodStatus::Idle, + in_flight: Default::default(), } } diff --git a/crates/pod/src/ticket_event_notify.rs b/crates/pod/src/ticket_event_notify.rs index 0cbc9fe7..cabc946c 100644 --- a/crates/pod/src/ticket_event_notify.rs +++ b/crates/pod/src/ticket_event_notify.rs @@ -435,6 +435,7 @@ mod tests { context_tokens: 0, }, status: PodStatus::Idle, + in_flight: Default::default(), }) .await .unwrap(); @@ -457,6 +458,7 @@ mod tests { context_tokens: 0, }, status: PodStatus::Idle, + in_flight: Default::default(), }) .await .unwrap(); diff --git a/crates/pod/tests/pod_comm_tools_test.rs b/crates/pod/tests/pod_comm_tools_test.rs index fa3206ea..6bd63e93 100644 --- a/crates/pod/tests/pod_comm_tools_test.rs +++ b/crates/pod/tests/pod_comm_tools_test.rs @@ -129,6 +129,7 @@ fn empty_snapshot() -> Event { context_tokens: 0, }, status: protocol::PodStatus::Idle, + in_flight: Default::default(), } } @@ -203,6 +204,7 @@ fn serve_history(listener: UnixListener, items: Vec) -> JoinHandle<()> { context_tokens: 0, }, status: protocol::PodStatus::Idle, + in_flight: Default::default(), }; let _ = writer.write(&event).await; } diff --git a/crates/pod/tests/pod_events_test.rs b/crates/pod/tests/pod_events_test.rs index 7e909e77..8247fdbf 100644 --- a/crates/pod/tests/pod_events_test.rs +++ b/crates/pod/tests/pod_events_test.rs @@ -91,6 +91,7 @@ fn empty_snapshot() -> Event { context_tokens: 0, }, status: PodStatus::Idle, + in_flight: Default::default(), } } diff --git a/crates/pod/tests/spawn_pod_test.rs b/crates/pod/tests/spawn_pod_test.rs index db8e15b5..a175da4b 100644 --- a/crates/pod/tests/spawn_pod_test.rs +++ b/crates/pod/tests/spawn_pod_test.rs @@ -123,6 +123,7 @@ fn accept_one_method(listener: UnixListener) -> tokio::task::JoinHandle bool { *value } +fn is_false(value: &bool) -> bool { + !*value +} + // --------------------------------------------------------------------------- // Method (Client → Pod via Unix Socket) // --------------------------------------------------------------------------- @@ -453,6 +457,10 @@ pub enum Event { greeting: Greeting, #[serde(default)] status: PodStatus, + /// Unfinished model output that has already streamed in the current + /// run but is not yet represented by committed snapshot entries. + #[serde(default, skip_serializing_if = "InFlightSnapshot::is_empty")] + in_flight: InFlightSnapshot, }, /// Server-side segment log rotated to a fresh `SegmentStart`. /// @@ -636,6 +644,62 @@ pub struct RewindSummary { /// Built once in the Pod controller from the resolved manifest and /// transmitted alongside `Event::Snapshot` so clients don't need /// their own view of the manifest. +/// Unfinished model output included in `Event::Snapshot` for clients that +/// attach while an LLM response is still streaming. +/// +/// These blocks are presentation state only: they are reconstructed from the +/// active Pod controller and must not be treated as committed assistant +/// history. Finalized assistant items continue to come from ordinary snapshot +/// entries. +#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)] +pub struct InFlightSnapshot { + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub blocks: Vec, +} + +impl InFlightSnapshot { + pub fn is_empty(&self) -> bool { + self.blocks.is_empty() + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(tag = "kind", rename_all = "snake_case")] +pub enum InFlightBlock { + Text { + text: String, + #[serde(default, skip_serializing_if = "is_false")] + finished: bool, + }, + Thinking { + text: String, + #[serde(default, skip_serializing_if = "is_false")] + finished: bool, + }, + ToolCall { + id: String, + name: String, + args: String, + #[serde(default, skip_serializing_if = "InFlightToolCallState::is_pending")] + state: InFlightToolCallState, + }, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)] +#[serde(rename_all = "snake_case")] +pub enum InFlightToolCallState { + #[default] + Pending, + StreamingArgs, + Done, +} + +impl InFlightToolCallState { + pub fn is_pending(&self) -> bool { + matches!(self, Self::Pending) + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Greeting { pub pod_name: String, @@ -1129,6 +1193,7 @@ mod tests { context_tokens: 42_000, }, status: PodStatus::Paused, + in_flight: InFlightSnapshot::default(), }; let json = serde_json::to_string(&event).unwrap(); let parsed: serde_json::Value = serde_json::from_str(&json).unwrap(); @@ -1142,6 +1207,62 @@ mod tests { assert_eq!(parsed["data"]["status"], "paused"); } + #[test] + fn event_snapshot_in_flight_roundtrip_and_default() { + let inbound = r#"{"event":"snapshot","data":{"entries":[],"greeting":{"pod_name":"test","cwd":"/tmp","provider":"p","model":"m","scope_summary":"s","tools":[]},"status":"running"}}"#; + let decoded: Event = serde_json::from_str(inbound).unwrap(); + match decoded { + Event::Snapshot { in_flight, .. } => assert!(in_flight.is_empty()), + other => panic!("expected Snapshot, got {other:?}"), + } + + let event = Event::Snapshot { + entries: Vec::new(), + greeting: Greeting { + pod_name: "test".into(), + cwd: "/tmp".into(), + provider: "p".into(), + model: "m".into(), + scope_summary: "s".into(), + tools: Vec::new(), + context_window: 0, + context_tokens: 0, + }, + status: PodStatus::Running, + in_flight: InFlightSnapshot { + blocks: vec![ + InFlightBlock::Text { + text: "hel".into(), + finished: false, + }, + InFlightBlock::Thinking { + text: "why".into(), + finished: true, + }, + InFlightBlock::ToolCall { + id: "call_1".into(), + name: "Read".into(), + args: r#"{"file"#.into(), + state: InFlightToolCallState::StreamingArgs, + }, + ], + }, + }; + let json = serde_json::to_string(&event).unwrap(); + let parsed: serde_json::Value = serde_json::from_str(&json).unwrap(); + assert_eq!(parsed["data"]["in_flight"]["blocks"][0]["text"], "hel"); + assert_eq!(parsed["data"]["in_flight"]["blocks"][1]["finished"], true); + assert_eq!( + parsed["data"]["in_flight"]["blocks"][2]["state"], + "streaming_args" + ); + + match serde_json::from_str::(&json).unwrap() { + Event::Snapshot { in_flight, .. } => assert_eq!(in_flight.blocks.len(), 3), + other => panic!("expected Snapshot, got {other:?}"), + } + } + #[test] fn event_segment_rotated_roundtrip() { let event = Event::SegmentRotated { diff --git a/crates/tui/src/app.rs b/crates/tui/src/app.rs index a1c3f130..87f77ffa 100644 --- a/crates/tui/src/app.rs +++ b/crates/tui/src/app.rs @@ -3,8 +3,8 @@ use std::path::Path; use std::time::{Duration, Instant}; use protocol::{ - AlertLevel, AlertSource, CompletionEntry, CompletionKind, ErrorCode, Event, Method, PodStatus, - RewindTarget, RunResult, Segment, + AlertLevel, AlertSource, CompletionEntry, CompletionKind, ErrorCode, Event, InFlightBlock, + InFlightSnapshot, InFlightToolCallState, Method, PodStatus, RewindTarget, RunResult, Segment, }; use crate::block::{ @@ -1279,9 +1279,10 @@ impl App { entries, greeting, status, + in_flight, } => { self.rewind_refresh_fence = false; - self.restore_snapshot(&entries, greeting); + self.restore_snapshot(&entries, greeting, in_flight); self.set_pod_status(status); } Event::Status { status } => { @@ -1410,6 +1411,50 @@ impl App { }); } + fn apply_in_flight_snapshot(&mut self, snapshot: InFlightSnapshot) { + for block in snapshot.blocks { + match block { + InFlightBlock::Text { text, finished } => { + self.blocks.push(Block::AssistantText { text }); + self.assistant_streaming = !finished; + } + InFlightBlock::Thinking { text, finished } => { + let state = if finished { + ThinkingState::Finished { elapsed_secs: None } + } else { + ThinkingState::Streaming { + started_at: Instant::now(), + } + }; + self.blocks + .push(Block::Thinking(ThinkingBlock { text, state })); + } + InFlightBlock::ToolCall { + id, + name, + args, + state, + } => { + let (tool_state, arguments) = match state { + InFlightToolCallState::Pending => (ToolCallState::Pending, None), + InFlightToolCallState::StreamingArgs => (ToolCallState::Streaming, None), + InFlightToolCallState::Done => { + (ToolCallState::Executing, Some(args.clone())) + } + }; + self.blocks.push(Block::ToolCall(ToolCallBlock { + id, + name, + args_stream: args, + arguments, + state: tool_state, + edit_snapshot: None, + })); + } + } + } + } + fn append_assistant_text(&mut self, text: &str) { if self.assistant_streaming { if let Some(Block::AssistantText { text: existing }) = self.blocks.last_mut() { @@ -1913,11 +1958,17 @@ impl App { /// LogEntry variant into the same blocks live events would have /// produced. Followed by `Event::Entry` updates for anything /// committed after the snapshot. - fn restore_snapshot(&mut self, entries: &[serde_json::Value], greeting: protocol::Greeting) { + fn restore_snapshot( + &mut self, + entries: &[serde_json::Value], + greeting: protocol::Greeting, + in_flight: InFlightSnapshot, + ) { self.greeting = Some(greeting.clone()); self.context_window = greeting.context_window; self.session_context_tokens = greeting.context_tokens; self.restore_entries(entries, Some(greeting)); + self.apply_in_flight_snapshot(in_flight); } /// Restore after a successful destructive rewind. The Pod's @@ -3151,6 +3202,7 @@ mod completion_flow_tests { greeting: test_greeting(), entries: vec![session_start_value], status: PodStatus::Running, + in_flight: Default::default(), }); assert!(matches!(app.pod_status, PodStatus::Running)); @@ -3161,6 +3213,54 @@ mod completion_flow_tests { )); } + #[test] + fn snapshot_in_flight_blocks_continue_with_live_deltas() { + let mut app = App::new("test".into()); + app.handle_pod_event(Event::Snapshot { + greeting: test_greeting(), + entries: Vec::new(), + status: PodStatus::Running, + in_flight: InFlightSnapshot { + blocks: vec![ + InFlightBlock::Thinking { + text: "why".into(), + finished: false, + }, + InFlightBlock::ToolCall { + id: "call_1".into(), + name: "Read".into(), + args: r#"{\"file"#.into(), + state: InFlightToolCallState::StreamingArgs, + }, + InFlightBlock::Text { + text: "hel".into(), + finished: false, + }, + ], + }, + }); + + app.handle_pod_event(Event::TextDelta { text: "lo".into() }); + app.handle_pod_event(Event::ThinkingDelta { text: "?".into() }); + app.handle_pod_event(Event::ToolCallArgsDelta { + id: "call_1".into(), + json: r#"\":\"src/lib.rs\"}"#.into(), + }); + + assert!(matches!( + app.blocks.iter().find(|block| matches!(block, Block::AssistantText { .. })), + Some(Block::AssistantText { text }) if text == "hello" + )); + assert!(matches!( + app.blocks.iter().find(|block| matches!(block, Block::Thinking(_))), + Some(Block::Thinking(thinking)) if thinking.text == "why?" + )); + assert!(matches!( + app.blocks.iter().find(|block| matches!(block, Block::ToolCall(_))), + Some(Block::ToolCall(call)) if call.args_stream == r#"{\"file\":\"src/lib.rs\"}"# + )); + } + #[test] fn live_system_item_workflow_appends_system_message_block() { let mut app = App::new("test".into()); @@ -3294,6 +3394,7 @@ mod completion_flow_tests { entries: Vec::new(), greeting, status: PodStatus::Idle, + in_flight: Default::default(), }); assert_eq!(app.context_window, 123_000); @@ -3492,6 +3593,7 @@ mod completion_flow_tests { greeting: test_greeting(), entries: assistant_item_entries, status: PodStatus::Running, + in_flight: Default::default(), }); let tasks = app.task_store.tasks(); diff --git a/crates/tui/src/console/mod.rs b/crates/tui/src/console/mod.rs index dccd8a58..5bfbe6c2 100644 --- a/crates/tui/src/console/mod.rs +++ b/crates/tui/src/console/mod.rs @@ -1922,6 +1922,7 @@ mod tests { greeting: test_greeting(), entries: vec![], status: PodStatus::Idle, + in_flight: Default::default(), }); app.handle_pod_event(Event::RewindApplied { entries: vec![], @@ -1947,6 +1948,7 @@ mod tests { greeting: test_greeting(), entries: vec![], status: PodStatus::Idle, + in_flight: Default::default(), }); type_keys(&mut app, "draft"); diff --git a/crates/tui/src/dashboard/tests.rs b/crates/tui/src/dashboard/tests.rs index c8e67469..9a8367f4 100644 --- a/crates/tui/src/dashboard/tests.rs +++ b/crates/tui/src/dashboard/tests.rs @@ -868,6 +868,7 @@ async fn ticket_queue_notification_sends_notify_when_socket_available() { context_tokens: 0, }, status: PodStatus::Idle, + in_flight: Default::default(), }) .await .unwrap(); @@ -908,6 +909,7 @@ async fn send_notify_only_can_deliver_weak_notification_without_auto_run() { context_tokens: 0, }, status: PodStatus::Idle, + in_flight: Default::default(), }) .await .unwrap(); diff --git a/crates/tui/src/pod_list.rs b/crates/tui/src/pod_list.rs index bc0210a1..838a5497 100644 --- a/crates/tui/src/pod_list.rs +++ b/crates/tui/src/pod_list.rs @@ -819,6 +819,7 @@ mod tests { entries: vec![], greeting: test_greeting(), status: PodStatus::Idle, + in_flight: Default::default(), }, ]; From 061136d798172cdb0b8bdc2de27644feca062a28 Mon Sep 17 00:00:00 2001 From: Hare Date: Sun, 21 Jun 2026 20:51:48 +0900 Subject: [PATCH 2/2] fix: close in-flight snapshot commit race --- crates/pod/src/in_flight.rs | 167 ++++++++++++++++++++++++++++------ crates/pod/src/ipc/alerter.rs | 20 +--- crates/pod/src/ipc/server.rs | 29 +++--- crates/pod/src/pod.rs | 6 +- crates/protocol/src/lib.rs | 10 +- 5 files changed, 164 insertions(+), 68 deletions(-) diff --git a/crates/pod/src/in_flight.rs b/crates/pod/src/in_flight.rs index 1ad2f216..b8ca6c58 100644 --- a/crates/pod/src/in_flight.rs +++ b/crates/pod/src/in_flight.rs @@ -191,31 +191,14 @@ impl InFlightEvents { }); } - pub(crate) fn clear_for_committed_item(&self, item: &LoggedItem) { + pub(crate) fn clear_for_committed_item_then( + &self, + item: &LoggedItem, + f: impl FnOnce() -> R, + ) -> R { let mut inner = self.lock(); - match item { - LoggedItem::Message { role, content } - if matches!(role, session_store::LoggedRole::Assistant) => - { - let text = content - .iter() - .filter_map(|part| match part { - LoggedContentPart::Text { text } => Some(text.as_str()), - LoggedContentPart::Refusal { refusal } => Some(refusal.as_str()), - }) - .collect::(); - if !text.is_empty() { - inner.remove_first_text_matching(&text); - } - } - LoggedItem::Reasoning { text, .. } => { - inner.remove_first_thinking_matching(text); - } - LoggedItem::ToolCall { call_id, .. } => { - inner.remove_tool_call(call_id); - } - _ => {} - } + inner.clear_for_committed_item(item); + f() } fn lock(&self) -> MutexGuard<'_, InFlightInner> { @@ -236,6 +219,32 @@ impl InFlightInner { .find(|block| block.block_id() == block_id) } + fn clear_for_committed_item(&mut self, item: &LoggedItem) { + match item { + LoggedItem::Message { role, content } + if matches!(role, session_store::LoggedRole::Assistant) => + { + let text = content + .iter() + .filter_map(|part| match part { + LoggedContentPart::Text { text } => Some(text.as_str()), + LoggedContentPart::Refusal { refusal } => Some(refusal.as_str()), + }) + .collect::(); + if !text.is_empty() { + self.remove_first_text_matching(&text); + } + } + LoggedItem::Reasoning { text, .. } => { + self.remove_first_thinking_matching(text); + } + LoggedItem::ToolCall { call_id, .. } => { + self.remove_tool_call(call_id); + } + _ => {} + } + } + fn snapshot(&self) -> InFlightSnapshot { InFlightSnapshot { blocks: self @@ -351,18 +360,116 @@ mod tests { assert!(rx.try_recv().is_err()); } + #[test] + fn session_log_and_in_flight_snapshot_prevents_mirror_only_assistant_gap() { + use std::sync::mpsc; + use std::thread; + + use crate::segment_log_sink::SegmentLogSink; + use session_store::{LogEntry, LoggedRole}; + + let (event_tx, _) = broadcast::channel(16); + let sink = SegmentLogSink::new(); + let in_flight = InFlightEvents::new(event_tx); + let block_id = in_flight.start_text_block(); + in_flight.text_delta(block_id, "done".into()); + in_flight.text_done(block_id, "done".into()); + + let assistant_item = LoggedItem::Message { + role: LoggedRole::Assistant, + content: vec![LoggedContentPart::Text { + text: "done".into(), + }], + }; + let assistant_entry = LogEntry::AssistantItem { + ts: 1, + item: assistant_item.clone(), + }; + + let in_flight_guard = in_flight.snapshot_guard(); + let in_flight_for_commit = in_flight.clone(); + let sink_for_commit = sink.clone(); + let (committed_tx, committed_rx) = mpsc::channel(); + let commit_thread = thread::spawn(move || { + // This mirrors Pod::append_entry ordering: clear in-flight first, + // then publish the finalized AssistantItem. AssistantItem entries + // are mirror-only and are not delivered as live entry events. + in_flight_for_commit.clear_for_committed_item_then(&assistant_item, || { + sink_for_commit.publish(assistant_entry); + }); + committed_tx.send(()).unwrap(); + }); + + let (entries_snapshot, mut entry_rx) = sink.subscribe_with_snapshot(); + let in_flight_snapshot = snapshot_from_guard(&in_flight_guard); + drop(in_flight_guard); + + committed_rx.recv().unwrap(); + commit_thread.join().unwrap(); + + assert!(entries_snapshot.is_empty()); + assert!(matches!( + in_flight_snapshot.blocks.as_slice(), + [InFlightBlock::Text { text, finished: true }] if text == "done" + )); + assert!(entry_rx.try_recv().is_err()); + let post_commit_guard = in_flight.snapshot_guard(); + assert!(snapshot_from_guard(&post_commit_guard).is_empty()); + } + + #[test] + fn committed_assistant_snapshot_does_not_duplicate_in_flight_block() { + use crate::segment_log_sink::SegmentLogSink; + use session_store::{LogEntry, LoggedRole}; + + let (event_tx, _) = broadcast::channel(16); + let sink = SegmentLogSink::new(); + let in_flight = InFlightEvents::new(event_tx); + let block_id = in_flight.start_text_block(); + in_flight.text_delta(block_id, "done".into()); + in_flight.text_done(block_id, "done".into()); + + let assistant_item = LoggedItem::Message { + role: LoggedRole::Assistant, + content: vec![LoggedContentPart::Text { + text: "done".into(), + }], + }; + let assistant_entry = LogEntry::AssistantItem { + ts: 1, + item: assistant_item.clone(), + }; + + in_flight.clear_for_committed_item_then(&assistant_item, || { + sink.publish(assistant_entry); + }); + + let in_flight_guard = in_flight.snapshot_guard(); + let (entries_snapshot, _entry_rx) = sink.subscribe_with_snapshot(); + let in_flight_snapshot = snapshot_from_guard(&in_flight_guard); + + assert!(matches!( + entries_snapshot.as_slice(), + [LogEntry::AssistantItem { item, .. }] if item == &assistant_item + )); + assert!(in_flight_snapshot.is_empty()); + } + #[test] fn committed_item_clears_matching_in_flight_block() { let (event_tx, _) = broadcast::channel(16); let in_flight = InFlightEvents::new(event_tx); let block_id = in_flight.start_text_block(); in_flight.text_delta(block_id, "done".into()); - in_flight.clear_for_committed_item(&LoggedItem::Message { - role: session_store::LoggedRole::Assistant, - content: vec![LoggedContentPart::Text { - text: "done".into(), - }], - }); + in_flight.clear_for_committed_item_then( + &LoggedItem::Message { + role: session_store::LoggedRole::Assistant, + content: vec![LoggedContentPart::Text { + text: "done".into(), + }], + }, + || (), + ); let guard = in_flight.snapshot_guard(); assert!(snapshot_from_guard(&guard).is_empty()); diff --git a/crates/pod/src/ipc/alerter.rs b/crates/pod/src/ipc/alerter.rs index c1089099..390e3158 100644 --- a/crates/pod/src/ipc/alerter.rs +++ b/crates/pod/src/ipc/alerter.rs @@ -14,9 +14,7 @@ use std::time::{SystemTime, UNIX_EPOCH}; use tokio::sync::broadcast; -use protocol::{Alert, AlertLevel, AlertSource, Event, InFlightSnapshot}; - -use crate::in_flight::{InFlightEvents, snapshot_from_guard}; +use protocol::{Alert, AlertLevel, AlertSource, Event}; /// Upper bound on buffered alerts. When exceeded, the oldest /// entries are discarded so a long-running session cannot leak @@ -87,22 +85,6 @@ impl Alerter { let snapshot: Vec = buf.iter().cloned().collect(); (snapshot, rx) } - - pub fn subscribe_with_alerts_and_in_flight_snapshot( - &self, - in_flight: &InFlightEvents, - ) -> (Vec, InFlightSnapshot, broadcast::Receiver) { - let buf = self - .inner - .buffer - .lock() - .expect("alerter buffer mutex poisoned"); - let in_flight_guard = in_flight.snapshot_guard(); - let rx = self.inner.event_tx.subscribe(); - let alerts: Vec = buf.iter().cloned().collect(); - let in_flight = snapshot_from_guard(&in_flight_guard); - (alerts, in_flight, rx) - } } fn now_ms() -> i64 { diff --git a/crates/pod/src/ipc/server.rs b/crates/pod/src/ipc/server.rs index a6be3432..82305cb4 100644 --- a/crates/pod/src/ipc/server.rs +++ b/crates/pod/src/ipc/server.rs @@ -7,6 +7,7 @@ use tokio::net::UnixListener; use tokio::task::JoinHandle; use crate::controller::PodHandle; +use crate::in_flight::snapshot_from_guard; use protocol::{Event, Method}; /// Unix socket server for Pod Protocol. @@ -104,20 +105,22 @@ async fn handle_connection(stream: tokio::net::UnixStream, handle: PodHandle) { let mut reader = JsonLineReader::new(reader); let mut writer = JsonLineWriter::new(writer); - // Atomically subscribe to the session-log mirror first. The - // returned (snapshot, rx) pair partitions the entry timeline: - // entries committed before this call appear in `entries`, every - // entry after lands on `entry_rx`. Doing this before the alert - // snapshot keeps both ordering pairs internally consistent. - let (entries_snapshot, mut entry_rx) = handle.sink.subscribe_with_snapshot(); + // Hold the in-flight stream lock while taking the session-log mirror + // snapshot. `LogEntry::AssistantItem` is mirror-only for live clients, + // so a finalized assistant block must be observed either as an already + // committed entry or as the still-present in-flight block. This lock + // order matches `append_entry` (in-flight clear before sink publish) and + // keeps the snapshot/live boundary gap-free. + let (entries_snapshot, mut entry_rx, alert_snapshot, mut rx, in_flight) = { + let in_flight_guard = handle.in_flight.snapshot_guard(); + let (entries_snapshot, entry_rx) = handle.sink.subscribe_with_snapshot(); - // Atomically subscribe and snapshot buffered alerts so that - // warnings emitted before this client connected are replayed - // exactly once — they appear in the snapshot, and any alert - // arriving afterwards reaches us through `rx`. - let (alert_snapshot, in_flight, mut rx) = handle - .alerter - .subscribe_with_alerts_and_in_flight_snapshot(&handle.in_flight); + // Atomically subscribe and snapshot buffered alerts so that warnings + // emitted before this client connected are replayed exactly once. + let (alert_snapshot, rx) = handle.alerter.subscribe_with_snapshot(); + let in_flight = snapshot_from_guard(&in_flight_guard); + (entries_snapshot, entry_rx, alert_snapshot, rx, in_flight) + }; for alert in alert_snapshot { if writer.write(&Event::Alert(alert)).await.is_err() { return; diff --git a/crates/pod/src/pod.rs b/crates/pod/src/pod.rs index 58b83e96..21f8a04d 100644 --- a/crates/pod/src/pod.rs +++ b/crates/pod/src/pod.rs @@ -185,7 +185,11 @@ where self.state.increment_entries(); if let Some(in_flight) = &self.in_flight { if let LogEntry::AssistantItem { item, .. } = &entry { - in_flight.clear_for_committed_item(item); + let item_for_clear = item.clone(); + in_flight.clear_for_committed_item_then(&item_for_clear, || { + self.sink.publish(entry); + }); + return Ok(()); } } self.sink.publish(entry); diff --git a/crates/protocol/src/lib.rs b/crates/protocol/src/lib.rs index 12cdd3c8..ee2e38f8 100644 --- a/crates/protocol/src/lib.rs +++ b/crates/protocol/src/lib.rs @@ -639,11 +639,6 @@ pub struct RewindSummary { pub tool_side_effect_warning: bool, } -/// Pod self-description rendered by the TUI when a session starts empty. -/// -/// Built once in the Pod controller from the resolved manifest and -/// transmitted alongside `Event::Snapshot` so clients don't need -/// their own view of the manifest. /// Unfinished model output included in `Event::Snapshot` for clients that /// attach while an LLM response is still streaming. /// @@ -700,6 +695,11 @@ impl InFlightToolCallState { } } +/// Pod self-description rendered by the TUI when a session starts empty. +/// +/// Built once in the Pod controller from the resolved manifest and +/// transmitted alongside `Event::Snapshot` so clients don't need +/// their own view of the manifest. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Greeting { pub pod_name: String,