fix: snapshot in-flight stream state

This commit is contained in:
Keisuke Hirata 2026-06-21 20:30:01 +09:00
parent 155e039e66
commit 74aca6f6c5
No known key found for this signature in database
17 changed files with 683 additions and 42 deletions

View File

@ -14,6 +14,7 @@ use tracing::{debug, warn};
use crate::discovery::{PodDiscovery, list_pods_tool, restore_pod_tool, send_to_peer_pod_tool}; use crate::discovery::{PodDiscovery, list_pods_tool, restore_pod_tool, send_to_peer_pod_tool};
use crate::feature::FeatureRegistryBuilder; use crate::feature::FeatureRegistryBuilder;
use crate::in_flight::InFlightEvents;
use crate::ipc::alerter::Alerter; use crate::ipc::alerter::Alerter;
use crate::ipc::notify_buffer::NotifyBuffer; use crate::ipc::notify_buffer::NotifyBuffer;
use crate::ipc::server::SocketServer; use crate::ipc::server::SocketServer;
@ -47,6 +48,7 @@ pub struct PodHandle {
pub shared_state: Arc<PodSharedState>, pub shared_state: Arc<PodSharedState>,
pub runtime_dir: Arc<RuntimeDir>, pub runtime_dir: Arc<RuntimeDir>,
pub alerter: Alerter, pub alerter: Alerter,
pub in_flight: InFlightEvents,
/// Segment-log mirror + broadcast handle. The IPC server snapshots /// Segment-log mirror + broadcast handle. The IPC server snapshots
/// it on every new connection (Event::Snapshot) and forwards /// it on every new connection (Event::Snapshot) and forwards
/// subsequent commits (Event::Entry) on the receiver. /// subsequent commits (Event::Entry) on the receiver.
@ -159,6 +161,8 @@ impl PodController {
let (method_tx, method_rx) = mpsc::channel::<Method>(32); let (method_tx, method_rx) = mpsc::channel::<Method>(32);
let (event_tx, _) = broadcast::channel::<Event>(256); let (event_tx, _) = broadcast::channel::<Event>(256);
let alerter = Alerter::new(event_tx.clone()); let alerter = Alerter::new(event_tx.clone());
let in_flight = InFlightEvents::new(event_tx.clone());
pod.attach_in_flight_events(in_flight.clone());
// Runtime directory is created before tool registration because // Runtime directory is created before tool registration because
// the spawn-tool factories need its socket path, and before the // the spawn-tool factories need its socket path, and before the
@ -225,7 +229,7 @@ impl PodController {
pod.wire_history_persistence(); pod.wire_history_persistence();
// === 2. Worker event bridge wiring === // === 2. Worker event bridge wiring ===
wire_event_bridges_on_worker(&mut pod, &event_tx, &alerter); wire_event_bridges_on_worker(&mut pod, &event_tx, &alerter, &in_flight);
// === 3. Tool registration (builtin / memory / spawn-orchestration) === // === 3. Tool registration (builtin / memory / spawn-orchestration) ===
let fs_for_view = register_pod_tools( let fs_for_view = register_pod_tools(
@ -289,6 +293,7 @@ impl PodController {
shared_state: shared_state.clone(), shared_state: shared_state.clone(),
runtime_dir: runtime_dir.clone(), runtime_dir: runtime_dir.clone(),
alerter: alerter.clone(), alerter: alerter.clone(),
in_flight: in_flight.clone(),
sink: pod.sink(), sink: pod.sink(),
}; };
@ -333,6 +338,7 @@ fn wire_event_bridges_on_worker<C, St>(
pod: &mut Pod<C, St>, pod: &mut Pod<C, St>,
event_tx: &broadcast::Sender<Event>, event_tx: &broadcast::Sender<Event>,
alerter: &Alerter, alerter: &Alerter,
in_flight: &InFlightEvents,
) where ) where
C: LlmClient + Clone + 'static, C: LlmClient + Clone + 'static,
St: Store + PodMetadataStore + Clone + 'static, St: Store + PodMetadataStore + Clone + 'static,
@ -386,83 +392,66 @@ fn wire_event_bridges_on_worker<C, St>(
}); });
}); });
let tx = event_tx.clone(); let in_flight_text = in_flight.clone();
let activity = ai_activity.clone(); let activity = ai_activity.clone();
worker.on_text_block(move |block| { worker.on_text_block(move |block| {
let tx_d = tx.clone(); let block_id = in_flight_text.start_text_block();
let in_flight_d = in_flight_text.clone();
let activity_d = activity.clone(); let activity_d = activity.clone();
block.on_delta(move |text| { block.on_delta(move |text| {
activity_d.fetch_add(1, Ordering::SeqCst); activity_d.fetch_add(1, Ordering::SeqCst);
let _ = tx_d.send(Event::TextDelta { in_flight_d.text_delta(block_id, text.to_owned());
text: text.to_owned(),
});
}); });
let tx_s = tx.clone(); let in_flight_s = in_flight_text.clone();
let activity_s = activity.clone(); let activity_s = activity.clone();
block.on_stop(move |text| { block.on_stop(move |text| {
if !text.is_empty() { if !text.is_empty() {
activity_s.fetch_add(1, Ordering::SeqCst); activity_s.fetch_add(1, Ordering::SeqCst);
} }
let _ = tx_s.send(Event::TextDone { in_flight_s.text_done(block_id, text.to_owned());
text: text.to_owned(),
});
}); });
}); });
let tx = event_tx.clone(); let in_flight_thinking = in_flight.clone();
let activity = ai_activity.clone(); let activity = ai_activity.clone();
worker.on_thinking_block(move |block| { worker.on_thinking_block(move |block| {
// Start fires unconditionally so the TUI can show "Thinking..." // Start fires unconditionally so the TUI can show "Thinking..."
// even when the provider doesn't emit plaintext deltas. // even when the provider doesn't emit plaintext deltas.
activity.fetch_add(1, Ordering::SeqCst); activity.fetch_add(1, Ordering::SeqCst);
let _ = tx.send(Event::ThinkingStart); let block_id = in_flight_thinking.thinking_start();
let tx_d = tx.clone(); let in_flight_d = in_flight_thinking.clone();
let activity_d = activity.clone(); let activity_d = activity.clone();
block.on_delta(move |text| { block.on_delta(move |text| {
activity_d.fetch_add(1, Ordering::SeqCst); activity_d.fetch_add(1, Ordering::SeqCst);
let _ = tx_d.send(Event::ThinkingDelta { in_flight_d.thinking_delta(block_id, text.to_owned());
text: text.to_owned(),
});
}); });
let tx_s = tx.clone(); let in_flight_s = in_flight_thinking.clone();
let activity_s = activity.clone(); let activity_s = activity.clone();
block.on_stop(move |text| { block.on_stop(move |text| {
if !text.is_empty() { if !text.is_empty() {
activity_s.fetch_add(1, Ordering::SeqCst); activity_s.fetch_add(1, Ordering::SeqCst);
} }
let _ = tx_s.send(Event::ThinkingDone { in_flight_s.thinking_done(block_id, text.to_owned());
text: text.to_owned(),
});
}); });
}); });
let tx = event_tx.clone(); let in_flight_tool = in_flight.clone();
let activity = ai_activity.clone(); let activity = ai_activity.clone();
worker.on_tool_use_block(move |start, block| { worker.on_tool_use_block(move |start, block| {
activity.fetch_add(1, Ordering::SeqCst); activity.fetch_add(1, Ordering::SeqCst);
let _ = tx.send(Event::ToolCallStart { let block_id = in_flight_tool.tool_call_start(start.id.clone(), start.name.clone());
id: start.id.clone(),
name: start.name.clone(),
});
let id_for_delta = start.id.clone(); let id_for_delta = start.id.clone();
let tx_d = tx.clone(); let in_flight_d = in_flight_tool.clone();
let activity_d = activity.clone(); let activity_d = activity.clone();
block.on_delta(move |json| { block.on_delta(move |json| {
activity_d.fetch_add(1, Ordering::SeqCst); activity_d.fetch_add(1, Ordering::SeqCst);
let _ = tx_d.send(Event::ToolCallArgsDelta { in_flight_d.tool_call_args_delta(block_id, id_for_delta.clone(), json.to_owned());
id: id_for_delta.clone(),
json: json.to_owned(),
});
}); });
let tx_s = tx.clone(); let in_flight_s = in_flight_tool.clone();
let activity_s = activity.clone(); let activity_s = activity.clone();
block.on_stop(move |call| { block.on_stop(move |call| {
activity_s.fetch_add(1, Ordering::SeqCst); activity_s.fetch_add(1, Ordering::SeqCst);
let _ = tx_s.send(Event::ToolCallDone { in_flight_s.tool_call_done(block_id, call.id.clone(), call.input.to_string());
id: call.id.clone(),
name: call.name.clone(),
arguments: call.input.to_string(),
});
}); });
}); });
@ -1535,6 +1524,7 @@ mod tests {
context_tokens: 0, context_tokens: 0,
}, },
status: PodStatus::Idle, status: PodStatus::Idle,
in_flight: Default::default(),
}) })
.await .await
.ok()?; .ok()?;

View File

@ -1463,6 +1463,7 @@ mod tests {
context_tokens: 0, context_tokens: 0,
}, },
status: PodStatus::Idle, status: PodStatus::Idle,
in_flight: Default::default(),
}) })
.await .await
.unwrap(); .unwrap();
@ -1494,6 +1495,7 @@ mod tests {
context_tokens: 0, context_tokens: 0,
}, },
status: PodStatus::Idle, status: PodStatus::Idle,
in_flight: Default::default(),
}) })
.await .await
.unwrap(); .unwrap();
@ -1579,6 +1581,7 @@ mod tests {
context_tokens: 0, context_tokens: 0,
}, },
status: PodStatus::Idle, status: PodStatus::Idle,
in_flight: Default::default(),
}) })
.await .await
.unwrap(); .unwrap();
@ -1601,6 +1604,7 @@ mod tests {
context_tokens: 0, context_tokens: 0,
}, },
status: PodStatus::Idle, status: PodStatus::Idle,
in_flight: Default::default(),
}) })
.await .await
.unwrap(); .unwrap();
@ -1700,6 +1704,7 @@ mod tests {
context_tokens: 0, context_tokens: 0,
}, },
status: PodStatus::Paused, status: PodStatus::Paused,
in_flight: Default::default(),
}) })
.await .await
.unwrap(); .unwrap();
@ -1748,6 +1753,7 @@ mod tests {
context_tokens: 0, context_tokens: 0,
}, },
status: PodStatus::Idle, status: PodStatus::Idle,
in_flight: Default::default(),
}) })
.await; .await;
}); });

370
crates/pod/src/in_flight.rs Normal file
View File

@ -0,0 +1,370 @@
use std::sync::{Arc, Mutex, MutexGuard};
use protocol::{Event, InFlightBlock, InFlightSnapshot, InFlightToolCallState};
use session_store::{LoggedContentPart, LoggedItem};
use tokio::sync::broadcast;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct InFlightBlockId(u64);
#[derive(Debug, Clone)]
pub struct InFlightEvents {
inner: Arc<Mutex<InFlightInner>>,
event_tx: broadcast::Sender<Event>,
}
#[derive(Debug)]
pub(crate) struct InFlightInner {
next_block_id: u64,
blocks: Vec<TrackedBlock>,
}
#[derive(Debug, Clone)]
enum TrackedBlock {
Text {
block_id: InFlightBlockId,
text: String,
finished: bool,
},
Thinking {
block_id: InFlightBlockId,
text: String,
finished: bool,
},
ToolCall {
block_id: InFlightBlockId,
id: String,
name: String,
args: String,
state: InFlightToolCallState,
},
}
impl InFlightEvents {
pub(crate) fn new(event_tx: broadcast::Sender<Event>) -> Self {
Self {
inner: Arc::new(Mutex::new(InFlightInner {
next_block_id: 1,
blocks: Vec::new(),
})),
event_tx,
}
}
pub(crate) fn snapshot_guard(&self) -> MutexGuard<'_, InFlightInner> {
self.inner.lock().expect("in-flight event mutex poisoned")
}
pub(crate) fn start_text_block(&self) -> InFlightBlockId {
let mut inner = self.lock();
let block_id = inner.next_id();
inner.blocks.push(TrackedBlock::Text {
block_id,
text: String::new(),
finished: false,
});
block_id
}
pub(crate) fn text_delta(&self, block_id: InFlightBlockId, text: String) {
let mut inner = self.lock();
if let Some(TrackedBlock::Text {
text: current,
finished,
..
}) = inner.find_block_mut(block_id)
{
current.push_str(&text);
*finished = false;
}
let _ = self.event_tx.send(Event::TextDelta { text });
}
pub(crate) fn text_done(&self, block_id: InFlightBlockId, text: String) {
let mut inner = self.lock();
if let Some(TrackedBlock::Text {
text: current,
finished,
..
}) = inner.find_block_mut(block_id)
{
if current.is_empty() {
*current = text.clone();
}
*finished = true;
}
let _ = self.event_tx.send(Event::TextDone { text });
}
pub(crate) fn thinking_start(&self) -> InFlightBlockId {
let mut inner = self.lock();
let block_id = inner.next_id();
inner.blocks.push(TrackedBlock::Thinking {
block_id,
text: String::new(),
finished: false,
});
let _ = self.event_tx.send(Event::ThinkingStart);
block_id
}
pub(crate) fn thinking_delta(&self, block_id: InFlightBlockId, text: String) {
let mut inner = self.lock();
if let Some(TrackedBlock::Thinking {
text: current,
finished,
..
}) = inner.find_block_mut(block_id)
{
current.push_str(&text);
*finished = false;
}
let _ = self.event_tx.send(Event::ThinkingDelta { text });
}
pub(crate) fn thinking_done(&self, block_id: InFlightBlockId, text: String) {
let mut inner = self.lock();
if let Some(TrackedBlock::Thinking {
text: current,
finished,
..
}) = inner.find_block_mut(block_id)
{
if current.is_empty() {
*current = text.clone();
}
*finished = true;
}
let _ = self.event_tx.send(Event::ThinkingDone { text });
}
pub(crate) fn tool_call_start(&self, id: String, name: String) -> InFlightBlockId {
let mut inner = self.lock();
let block_id = inner.next_id();
inner.blocks.push(TrackedBlock::ToolCall {
block_id,
id: id.clone(),
name: name.clone(),
args: String::new(),
state: InFlightToolCallState::Pending,
});
let _ = self.event_tx.send(Event::ToolCallStart { id, name });
block_id
}
pub(crate) fn tool_call_args_delta(
&self,
block_id: InFlightBlockId,
id: String,
delta: String,
) {
let mut inner = self.lock();
if let Some(TrackedBlock::ToolCall { args, state, .. }) = inner.find_block_mut(block_id) {
args.push_str(&delta);
*state = InFlightToolCallState::StreamingArgs;
}
let _ = self
.event_tx
.send(Event::ToolCallArgsDelta { id, json: delta });
}
pub(crate) fn tool_call_done(&self, block_id: InFlightBlockId, id: String, args: String) {
let mut inner = self.lock();
let mut name = String::new();
if let Some(TrackedBlock::ToolCall {
name: current_name,
args: current,
state,
..
}) = inner.find_block_mut(block_id)
{
name = current_name.clone();
if current.is_empty() {
*current = args.clone();
}
*state = InFlightToolCallState::Done;
}
let _ = self.event_tx.send(Event::ToolCallDone {
id,
name,
arguments: args,
});
}
pub(crate) fn clear_for_committed_item(&self, item: &LoggedItem) {
let mut inner = self.lock();
match item {
LoggedItem::Message { role, content }
if matches!(role, session_store::LoggedRole::Assistant) =>
{
let text = content
.iter()
.filter_map(|part| match part {
LoggedContentPart::Text { text } => Some(text.as_str()),
LoggedContentPart::Refusal { refusal } => Some(refusal.as_str()),
})
.collect::<String>();
if !text.is_empty() {
inner.remove_first_text_matching(&text);
}
}
LoggedItem::Reasoning { text, .. } => {
inner.remove_first_thinking_matching(text);
}
LoggedItem::ToolCall { call_id, .. } => {
inner.remove_tool_call(call_id);
}
_ => {}
}
}
fn lock(&self) -> MutexGuard<'_, InFlightInner> {
self.inner.lock().expect("in-flight event mutex poisoned")
}
}
impl InFlightInner {
fn next_id(&mut self) -> InFlightBlockId {
let id = InFlightBlockId(self.next_block_id);
self.next_block_id = self.next_block_id.saturating_add(1);
id
}
fn find_block_mut(&mut self, block_id: InFlightBlockId) -> Option<&mut TrackedBlock> {
self.blocks
.iter_mut()
.find(|block| block.block_id() == block_id)
}
fn snapshot(&self) -> InFlightSnapshot {
InFlightSnapshot {
blocks: self
.blocks
.iter()
.filter_map(TrackedBlock::to_snapshot_block)
.collect(),
}
}
fn remove_first_text_matching(&mut self, committed: &str) {
if let Some(index) = self.blocks.iter().position(|block| match block {
TrackedBlock::Text { text, .. } => text == committed,
_ => false,
}) {
self.blocks.remove(index);
}
}
fn remove_first_thinking_matching(&mut self, committed: &str) {
if let Some(index) = self.blocks.iter().position(|block| match block {
TrackedBlock::Thinking { text, .. } => text == committed,
_ => false,
}) {
self.blocks.remove(index);
}
}
fn remove_tool_call(&mut self, call_id: &str) {
if let Some(index) = self.blocks.iter().position(|block| match block {
TrackedBlock::ToolCall { id, .. } => id == call_id,
_ => false,
}) {
self.blocks.remove(index);
}
}
}
impl TrackedBlock {
fn block_id(&self) -> InFlightBlockId {
match self {
TrackedBlock::Text { block_id, .. }
| TrackedBlock::Thinking { block_id, .. }
| TrackedBlock::ToolCall { block_id, .. } => *block_id,
}
}
fn to_snapshot_block(&self) -> Option<InFlightBlock> {
match self {
TrackedBlock::Text { text, finished, .. } => {
if text.is_empty() {
None
} else {
Some(InFlightBlock::Text {
text: text.clone(),
finished: *finished,
})
}
}
TrackedBlock::Thinking { text, finished, .. } => Some(InFlightBlock::Thinking {
text: text.clone(),
finished: *finished,
}),
TrackedBlock::ToolCall {
id,
name,
args,
state,
..
} => Some(InFlightBlock::ToolCall {
id: id.clone(),
name: name.clone(),
args: args.clone(),
state: *state,
}),
}
}
}
pub(crate) fn snapshot_from_guard(guard: &MutexGuard<'_, InFlightInner>) -> InFlightSnapshot {
guard.snapshot()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn snapshot_boundary_does_not_duplicate_or_gap_delta_sent_after_subscribe() {
let (event_tx, _) = broadcast::channel(16);
let in_flight = InFlightEvents::new(event_tx.clone());
let block_id = in_flight.start_text_block();
in_flight.text_delta(block_id, "hel".into());
let guard = in_flight.snapshot_guard();
let mut rx = event_tx.subscribe();
let snapshot = snapshot_from_guard(&guard);
drop(guard);
in_flight.text_delta(block_id, "lo".into());
assert_eq!(
snapshot.blocks,
vec![InFlightBlock::Text {
text: "hel".into(),
finished: false,
}]
);
assert!(matches!(
rx.try_recv().unwrap(),
Event::TextDelta { text } if text == "lo"
));
assert!(rx.try_recv().is_err());
}
#[test]
fn committed_item_clears_matching_in_flight_block() {
let (event_tx, _) = broadcast::channel(16);
let in_flight = InFlightEvents::new(event_tx);
let block_id = in_flight.start_text_block();
in_flight.text_delta(block_id, "done".into());
in_flight.clear_for_committed_item(&LoggedItem::Message {
role: session_store::LoggedRole::Assistant,
content: vec![LoggedContentPart::Text {
text: "done".into(),
}],
});
let guard = in_flight.snapshot_guard();
assert!(snapshot_from_guard(&guard).is_empty());
}
}

View File

@ -14,7 +14,9 @@ use std::time::{SystemTime, UNIX_EPOCH};
use tokio::sync::broadcast; use tokio::sync::broadcast;
use protocol::{Alert, AlertLevel, AlertSource, Event}; use protocol::{Alert, AlertLevel, AlertSource, Event, InFlightSnapshot};
use crate::in_flight::{InFlightEvents, snapshot_from_guard};
/// Upper bound on buffered alerts. When exceeded, the oldest /// Upper bound on buffered alerts. When exceeded, the oldest
/// entries are discarded so a long-running session cannot leak /// entries are discarded so a long-running session cannot leak
@ -85,6 +87,22 @@ impl Alerter {
let snapshot: Vec<Alert> = buf.iter().cloned().collect(); let snapshot: Vec<Alert> = buf.iter().cloned().collect();
(snapshot, rx) (snapshot, rx)
} }
pub fn subscribe_with_alerts_and_in_flight_snapshot(
&self,
in_flight: &InFlightEvents,
) -> (Vec<Alert>, InFlightSnapshot, broadcast::Receiver<Event>) {
let buf = self
.inner
.buffer
.lock()
.expect("alerter buffer mutex poisoned");
let in_flight_guard = in_flight.snapshot_guard();
let rx = self.inner.event_tx.subscribe();
let alerts: Vec<Alert> = buf.iter().cloned().collect();
let in_flight = snapshot_from_guard(&in_flight_guard);
(alerts, in_flight, rx)
}
} }
fn now_ms() -> i64 { fn now_ms() -> i64 {

View File

@ -115,7 +115,9 @@ async fn handle_connection(stream: tokio::net::UnixStream, handle: PodHandle) {
// warnings emitted before this client connected are replayed // warnings emitted before this client connected are replayed
// exactly once — they appear in the snapshot, and any alert // exactly once — they appear in the snapshot, and any alert
// arriving afterwards reaches us through `rx`. // arriving afterwards reaches us through `rx`.
let (alert_snapshot, mut rx) = handle.alerter.subscribe_with_snapshot(); let (alert_snapshot, in_flight, mut rx) = handle
.alerter
.subscribe_with_alerts_and_in_flight_snapshot(&handle.in_flight);
for alert in alert_snapshot { for alert in alert_snapshot {
if writer.write(&Event::Alert(alert)).await.is_err() { if writer.write(&Event::Alert(alert)).await.is_err() {
return; return;
@ -131,6 +133,7 @@ async fn handle_connection(stream: tokio::net::UnixStream, handle: PodHandle) {
.collect(), .collect(),
greeting: handle.shared_state.greeting.clone(), greeting: handle.shared_state.greeting.clone(),
status: handle.shared_state.get_status(), status: handle.shared_state.get_status(),
in_flight,
}; };
if writer.write(&snapshot_event).await.is_err() { if writer.write(&snapshot_event).await.is_err() {
return; return;

View File

@ -6,6 +6,7 @@ pub mod entrypoint;
pub mod feature; pub mod feature;
pub mod fs_view; pub mod fs_view;
pub mod hook; pub mod hook;
pub(crate) mod in_flight;
pub mod ipc; pub mod ipc;
pub mod prompt; pub mod prompt;
pub mod runtime; pub mod runtime;

View File

@ -35,6 +35,7 @@ use crate::hook::{
Hook, HookRegistryBuilder, OnAbort, OnPromptSubmit, OnTurnEnd, PostToolCall, PreLlmRequest, Hook, HookRegistryBuilder, OnAbort, OnPromptSubmit, OnTurnEnd, PostToolCall, PreLlmRequest,
PreToolCall, PreToolCall,
}; };
use crate::in_flight::InFlightEvents;
use crate::ipc::alerter::Alerter; use crate::ipc::alerter::Alerter;
use crate::ipc::interceptor::PodInterceptor; use crate::ipc::interceptor::PodInterceptor;
use crate::ipc::notify_buffer::NotifyBuffer; use crate::ipc::notify_buffer::NotifyBuffer;
@ -167,6 +168,7 @@ pub struct LogWriterHandle<St: Clone> {
pub store: St, pub store: St,
pub state: Arc<SegmentState>, pub state: Arc<SegmentState>,
pub sink: SegmentLogSink, pub sink: SegmentLogSink,
pub in_flight: Option<InFlightEvents>,
} }
impl<St> LogWriterHandle<St> impl<St> LogWriterHandle<St>
@ -181,6 +183,11 @@ where
let loc = self.state.location(); let loc = self.state.location();
self.store.append(loc.session_id, loc.segment_id, &entry)?; self.store.append(loc.session_id, loc.segment_id, &entry)?;
self.state.increment_entries(); self.state.increment_entries();
if let Some(in_flight) = &self.in_flight {
if let LogEntry::AssistantItem { item, .. } = &entry {
in_flight.clear_for_committed_item(item);
}
}
self.sink.publish(entry); self.sink.publish(entry);
Ok(()) Ok(())
} }
@ -296,6 +303,7 @@ pub struct Pod<C: LlmClient, St: Store> {
/// notifications, events sent here are NOT replayed to clients that /// notifications, events sent here are NOT replayed to clients that
/// connect after the fact — they are fire-and-forget broadcasts. /// connect after the fact — they are fire-and-forget broadcasts.
event_tx: Option<broadcast::Sender<Event>>, event_tx: Option<broadcast::Sender<Event>>,
in_flight: Option<InFlightEvents>,
/// Monotonic counter incremented by worker event bridges when an /// Monotonic counter incremented by worker event bridges when an
/// assistant-side execution artifact becomes visible to clients before /// assistant-side execution artifact becomes visible to clients before
/// it is necessarily committed to history (e.g. streaming text deltas). /// it is necessarily committed to history (e.g. streaming text deltas).
@ -449,6 +457,7 @@ impl<C: LlmClient + Clone + 'static, St: Store + Clone + 'static> Pod<C, St> {
system_prompt_template: None, system_prompt_template: None,
alerter: self.alerter.clone(), alerter: self.alerter.clone(),
event_tx: self.event_tx.clone(), event_tx: self.event_tx.clone(),
in_flight: self.in_flight.clone(),
ai_activity_counter: self.ai_activity_counter.clone(), ai_activity_counter: self.ai_activity_counter.clone(),
pending_notifies: NotifyBuffer::new(), pending_notifies: NotifyBuffer::new(),
pending_attachments: Arc::new(Mutex::new(Vec::<SystemItem>::new())), pending_attachments: Arc::new(Mutex::new(Vec::<SystemItem>::new())),
@ -484,6 +493,7 @@ impl<C: LlmClient + Clone + 'static, St: Store + Clone + 'static> Pod<C, St> {
store: self.store.clone(), store: self.store.clone(),
state: self.segment_state.clone(), state: self.segment_state.clone(),
sink: self.sink.clone(), sink: self.sink.clone(),
in_flight: self.in_flight.clone(),
} }
} }
@ -495,6 +505,10 @@ impl<C: LlmClient + Clone + 'static, St: Store + Clone + 'static> Pod<C, St> {
self.log_writer = Some(writer); self.log_writer = Some(writer);
} }
pub fn attach_in_flight_events(&mut self, in_flight: InFlightEvents) {
self.in_flight = Some(in_flight);
}
/// Wire `Worker::on_history_append` to commit each appended item /// Wire `Worker::on_history_append` to commit each appended item
/// directly as a singular `LogEntry::AssistantItem` / `ToolResult` /// directly as a singular `LogEntry::AssistantItem` / `ToolResult`
/// through the writer. The controller calls this once per spawned /// through the writer. The controller calls this once per spawned
@ -633,6 +647,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
system_prompt_template: None, system_prompt_template: None,
alerter: None, alerter: None,
event_tx: None, event_tx: None,
in_flight: None,
ai_activity_counter: Arc::new(AtomicUsize::new(0)), ai_activity_counter: Arc::new(AtomicUsize::new(0)),
pending_notifies: NotifyBuffer::new(), pending_notifies: NotifyBuffer::new(),
pending_attachments: Arc::new(Mutex::new(Vec::<SystemItem>::new())), pending_attachments: Arc::new(Mutex::new(Vec::<SystemItem>::new())),
@ -3842,6 +3857,7 @@ where
system_prompt_template: common.system_prompt_template, system_prompt_template: common.system_prompt_template,
alerter: None, alerter: None,
event_tx: None, event_tx: None,
in_flight: None,
ai_activity_counter: Arc::new(AtomicUsize::new(0)), ai_activity_counter: Arc::new(AtomicUsize::new(0)),
pending_notifies: NotifyBuffer::new(), pending_notifies: NotifyBuffer::new(),
pending_attachments: Arc::new(Mutex::new(Vec::<SystemItem>::new())), pending_attachments: Arc::new(Mutex::new(Vec::<SystemItem>::new())),
@ -3951,6 +3967,7 @@ where
system_prompt_template: common.system_prompt_template, system_prompt_template: common.system_prompt_template,
alerter: None, alerter: None,
event_tx: None, event_tx: None,
in_flight: None,
ai_activity_counter: Arc::new(AtomicUsize::new(0)), ai_activity_counter: Arc::new(AtomicUsize::new(0)),
pending_notifies: NotifyBuffer::new(), pending_notifies: NotifyBuffer::new(),
pending_attachments: Arc::new(Mutex::new(Vec::<SystemItem>::new())), pending_attachments: Arc::new(Mutex::new(Vec::<SystemItem>::new())),
@ -4187,6 +4204,7 @@ where
system_prompt_template: None, system_prompt_template: None,
alerter: None, alerter: None,
event_tx: None, event_tx: None,
in_flight: None,
ai_activity_counter: Arc::new(AtomicUsize::new(0)), ai_activity_counter: Arc::new(AtomicUsize::new(0)),
pending_notifies: NotifyBuffer::new(), pending_notifies: NotifyBuffer::new(),
pending_attachments: Arc::new(Mutex::new(Vec::<SystemItem>::new())), pending_attachments: Arc::new(Mutex::new(Vec::<SystemItem>::new())),

View File

@ -515,6 +515,7 @@ mod tests {
context_tokens: 0, context_tokens: 0,
}, },
status: PodStatus::Idle, status: PodStatus::Idle,
in_flight: Default::default(),
} }
} }

View File

@ -435,6 +435,7 @@ mod tests {
context_tokens: 0, context_tokens: 0,
}, },
status: PodStatus::Idle, status: PodStatus::Idle,
in_flight: Default::default(),
}) })
.await .await
.unwrap(); .unwrap();
@ -457,6 +458,7 @@ mod tests {
context_tokens: 0, context_tokens: 0,
}, },
status: PodStatus::Idle, status: PodStatus::Idle,
in_flight: Default::default(),
}) })
.await .await
.unwrap(); .unwrap();

View File

@ -129,6 +129,7 @@ fn empty_snapshot() -> Event {
context_tokens: 0, context_tokens: 0,
}, },
status: protocol::PodStatus::Idle, status: protocol::PodStatus::Idle,
in_flight: Default::default(),
} }
} }
@ -203,6 +204,7 @@ fn serve_history(listener: UnixListener, items: Vec<Item>) -> JoinHandle<()> {
context_tokens: 0, context_tokens: 0,
}, },
status: protocol::PodStatus::Idle, status: protocol::PodStatus::Idle,
in_flight: Default::default(),
}; };
let _ = writer.write(&event).await; let _ = writer.write(&event).await;
} }

View File

@ -91,6 +91,7 @@ fn empty_snapshot() -> Event {
context_tokens: 0, context_tokens: 0,
}, },
status: PodStatus::Idle, status: PodStatus::Idle,
in_flight: Default::default(),
} }
} }

View File

@ -123,6 +123,7 @@ fn accept_one_method(listener: UnixListener) -> tokio::task::JoinHandle<Option<M
context_tokens: 0, context_tokens: 0,
}, },
status: protocol::PodStatus::Idle, status: protocol::PodStatus::Idle,
in_flight: Default::default(),
}) })
.await .await
.is_err() .is_err()

View File

@ -12,6 +12,10 @@ fn is_true(value: &bool) -> bool {
*value *value
} }
fn is_false(value: &bool) -> bool {
!*value
}
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Method (Client → Pod via Unix Socket) // Method (Client → Pod via Unix Socket)
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@ -453,6 +457,10 @@ pub enum Event {
greeting: Greeting, greeting: Greeting,
#[serde(default)] #[serde(default)]
status: PodStatus, status: PodStatus,
/// Unfinished model output that has already streamed in the current
/// run but is not yet represented by committed snapshot entries.
#[serde(default, skip_serializing_if = "InFlightSnapshot::is_empty")]
in_flight: InFlightSnapshot,
}, },
/// Server-side segment log rotated to a fresh `SegmentStart`. /// Server-side segment log rotated to a fresh `SegmentStart`.
/// ///
@ -636,6 +644,62 @@ pub struct RewindSummary {
/// Built once in the Pod controller from the resolved manifest and /// Built once in the Pod controller from the resolved manifest and
/// transmitted alongside `Event::Snapshot` so clients don't need /// transmitted alongside `Event::Snapshot` so clients don't need
/// their own view of the manifest. /// their own view of the manifest.
/// Unfinished model output included in `Event::Snapshot` for clients that
/// attach while an LLM response is still streaming.
///
/// These blocks are presentation state only: they are reconstructed from the
/// active Pod controller and must not be treated as committed assistant
/// history. Finalized assistant items continue to come from ordinary snapshot
/// entries.
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct InFlightSnapshot {
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub blocks: Vec<InFlightBlock>,
}
impl InFlightSnapshot {
pub fn is_empty(&self) -> bool {
self.blocks.is_empty()
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum InFlightBlock {
Text {
text: String,
#[serde(default, skip_serializing_if = "is_false")]
finished: bool,
},
Thinking {
text: String,
#[serde(default, skip_serializing_if = "is_false")]
finished: bool,
},
ToolCall {
id: String,
name: String,
args: String,
#[serde(default, skip_serializing_if = "InFlightToolCallState::is_pending")]
state: InFlightToolCallState,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "snake_case")]
pub enum InFlightToolCallState {
#[default]
Pending,
StreamingArgs,
Done,
}
impl InFlightToolCallState {
pub fn is_pending(&self) -> bool {
matches!(self, Self::Pending)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Greeting { pub struct Greeting {
pub pod_name: String, pub pod_name: String,
@ -1129,6 +1193,7 @@ mod tests {
context_tokens: 42_000, context_tokens: 42_000,
}, },
status: PodStatus::Paused, status: PodStatus::Paused,
in_flight: InFlightSnapshot::default(),
}; };
let json = serde_json::to_string(&event).unwrap(); let json = serde_json::to_string(&event).unwrap();
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap(); let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
@ -1142,6 +1207,62 @@ mod tests {
assert_eq!(parsed["data"]["status"], "paused"); assert_eq!(parsed["data"]["status"], "paused");
} }
#[test]
fn event_snapshot_in_flight_roundtrip_and_default() {
let inbound = r#"{"event":"snapshot","data":{"entries":[],"greeting":{"pod_name":"test","cwd":"/tmp","provider":"p","model":"m","scope_summary":"s","tools":[]},"status":"running"}}"#;
let decoded: Event = serde_json::from_str(inbound).unwrap();
match decoded {
Event::Snapshot { in_flight, .. } => assert!(in_flight.is_empty()),
other => panic!("expected Snapshot, got {other:?}"),
}
let event = Event::Snapshot {
entries: Vec::new(),
greeting: Greeting {
pod_name: "test".into(),
cwd: "/tmp".into(),
provider: "p".into(),
model: "m".into(),
scope_summary: "s".into(),
tools: Vec::new(),
context_window: 0,
context_tokens: 0,
},
status: PodStatus::Running,
in_flight: InFlightSnapshot {
blocks: vec![
InFlightBlock::Text {
text: "hel".into(),
finished: false,
},
InFlightBlock::Thinking {
text: "why".into(),
finished: true,
},
InFlightBlock::ToolCall {
id: "call_1".into(),
name: "Read".into(),
args: r#"{"file"#.into(),
state: InFlightToolCallState::StreamingArgs,
},
],
},
};
let json = serde_json::to_string(&event).unwrap();
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
assert_eq!(parsed["data"]["in_flight"]["blocks"][0]["text"], "hel");
assert_eq!(parsed["data"]["in_flight"]["blocks"][1]["finished"], true);
assert_eq!(
parsed["data"]["in_flight"]["blocks"][2]["state"],
"streaming_args"
);
match serde_json::from_str::<Event>(&json).unwrap() {
Event::Snapshot { in_flight, .. } => assert_eq!(in_flight.blocks.len(), 3),
other => panic!("expected Snapshot, got {other:?}"),
}
}
#[test] #[test]
fn event_segment_rotated_roundtrip() { fn event_segment_rotated_roundtrip() {
let event = Event::SegmentRotated { let event = Event::SegmentRotated {

View File

@ -3,8 +3,8 @@ use std::path::Path;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use protocol::{ use protocol::{
AlertLevel, AlertSource, CompletionEntry, CompletionKind, ErrorCode, Event, Method, PodStatus, AlertLevel, AlertSource, CompletionEntry, CompletionKind, ErrorCode, Event, InFlightBlock,
RewindTarget, RunResult, Segment, InFlightSnapshot, InFlightToolCallState, Method, PodStatus, RewindTarget, RunResult, Segment,
}; };
use crate::block::{ use crate::block::{
@ -1279,9 +1279,10 @@ impl App {
entries, entries,
greeting, greeting,
status, status,
in_flight,
} => { } => {
self.rewind_refresh_fence = false; self.rewind_refresh_fence = false;
self.restore_snapshot(&entries, greeting); self.restore_snapshot(&entries, greeting, in_flight);
self.set_pod_status(status); self.set_pod_status(status);
} }
Event::Status { status } => { Event::Status { status } => {
@ -1410,6 +1411,50 @@ impl App {
}); });
} }
fn apply_in_flight_snapshot(&mut self, snapshot: InFlightSnapshot) {
for block in snapshot.blocks {
match block {
InFlightBlock::Text { text, finished } => {
self.blocks.push(Block::AssistantText { text });
self.assistant_streaming = !finished;
}
InFlightBlock::Thinking { text, finished } => {
let state = if finished {
ThinkingState::Finished { elapsed_secs: None }
} else {
ThinkingState::Streaming {
started_at: Instant::now(),
}
};
self.blocks
.push(Block::Thinking(ThinkingBlock { text, state }));
}
InFlightBlock::ToolCall {
id,
name,
args,
state,
} => {
let (tool_state, arguments) = match state {
InFlightToolCallState::Pending => (ToolCallState::Pending, None),
InFlightToolCallState::StreamingArgs => (ToolCallState::Streaming, None),
InFlightToolCallState::Done => {
(ToolCallState::Executing, Some(args.clone()))
}
};
self.blocks.push(Block::ToolCall(ToolCallBlock {
id,
name,
args_stream: args,
arguments,
state: tool_state,
edit_snapshot: None,
}));
}
}
}
}
fn append_assistant_text(&mut self, text: &str) { fn append_assistant_text(&mut self, text: &str) {
if self.assistant_streaming { if self.assistant_streaming {
if let Some(Block::AssistantText { text: existing }) = self.blocks.last_mut() { if let Some(Block::AssistantText { text: existing }) = self.blocks.last_mut() {
@ -1913,11 +1958,17 @@ impl App {
/// LogEntry variant into the same blocks live events would have /// LogEntry variant into the same blocks live events would have
/// produced. Followed by `Event::Entry` updates for anything /// produced. Followed by `Event::Entry` updates for anything
/// committed after the snapshot. /// committed after the snapshot.
fn restore_snapshot(&mut self, entries: &[serde_json::Value], greeting: protocol::Greeting) { fn restore_snapshot(
&mut self,
entries: &[serde_json::Value],
greeting: protocol::Greeting,
in_flight: InFlightSnapshot,
) {
self.greeting = Some(greeting.clone()); self.greeting = Some(greeting.clone());
self.context_window = greeting.context_window; self.context_window = greeting.context_window;
self.session_context_tokens = greeting.context_tokens; self.session_context_tokens = greeting.context_tokens;
self.restore_entries(entries, Some(greeting)); self.restore_entries(entries, Some(greeting));
self.apply_in_flight_snapshot(in_flight);
} }
/// Restore after a successful destructive rewind. The Pod's /// Restore after a successful destructive rewind. The Pod's
@ -3151,6 +3202,7 @@ mod completion_flow_tests {
greeting: test_greeting(), greeting: test_greeting(),
entries: vec![session_start_value], entries: vec![session_start_value],
status: PodStatus::Running, status: PodStatus::Running,
in_flight: Default::default(),
}); });
assert!(matches!(app.pod_status, PodStatus::Running)); assert!(matches!(app.pod_status, PodStatus::Running));
@ -3161,6 +3213,54 @@ mod completion_flow_tests {
)); ));
} }
#[test]
fn snapshot_in_flight_blocks_continue_with_live_deltas() {
let mut app = App::new("test".into());
app.handle_pod_event(Event::Snapshot {
greeting: test_greeting(),
entries: Vec::new(),
status: PodStatus::Running,
in_flight: InFlightSnapshot {
blocks: vec![
InFlightBlock::Thinking {
text: "why".into(),
finished: false,
},
InFlightBlock::ToolCall {
id: "call_1".into(),
name: "Read".into(),
args: r#"{\"file"#.into(),
state: InFlightToolCallState::StreamingArgs,
},
InFlightBlock::Text {
text: "hel".into(),
finished: false,
},
],
},
});
app.handle_pod_event(Event::TextDelta { text: "lo".into() });
app.handle_pod_event(Event::ThinkingDelta { text: "?".into() });
app.handle_pod_event(Event::ToolCallArgsDelta {
id: "call_1".into(),
json: r#"\":\"src/lib.rs\"}"#.into(),
});
assert!(matches!(
app.blocks.iter().find(|block| matches!(block, Block::AssistantText { .. })),
Some(Block::AssistantText { text }) if text == "hello"
));
assert!(matches!(
app.blocks.iter().find(|block| matches!(block, Block::Thinking(_))),
Some(Block::Thinking(thinking)) if thinking.text == "why?"
));
assert!(matches!(
app.blocks.iter().find(|block| matches!(block, Block::ToolCall(_))),
Some(Block::ToolCall(call)) if call.args_stream == r#"{\"file\":\"src/lib.rs\"}"#
));
}
#[test] #[test]
fn live_system_item_workflow_appends_system_message_block() { fn live_system_item_workflow_appends_system_message_block() {
let mut app = App::new("test".into()); let mut app = App::new("test".into());
@ -3294,6 +3394,7 @@ mod completion_flow_tests {
entries: Vec::new(), entries: Vec::new(),
greeting, greeting,
status: PodStatus::Idle, status: PodStatus::Idle,
in_flight: Default::default(),
}); });
assert_eq!(app.context_window, 123_000); assert_eq!(app.context_window, 123_000);
@ -3492,6 +3593,7 @@ mod completion_flow_tests {
greeting: test_greeting(), greeting: test_greeting(),
entries: assistant_item_entries, entries: assistant_item_entries,
status: PodStatus::Running, status: PodStatus::Running,
in_flight: Default::default(),
}); });
let tasks = app.task_store.tasks(); let tasks = app.task_store.tasks();

View File

@ -1922,6 +1922,7 @@ mod tests {
greeting: test_greeting(), greeting: test_greeting(),
entries: vec![], entries: vec![],
status: PodStatus::Idle, status: PodStatus::Idle,
in_flight: Default::default(),
}); });
app.handle_pod_event(Event::RewindApplied { app.handle_pod_event(Event::RewindApplied {
entries: vec![], entries: vec![],
@ -1947,6 +1948,7 @@ mod tests {
greeting: test_greeting(), greeting: test_greeting(),
entries: vec![], entries: vec![],
status: PodStatus::Idle, status: PodStatus::Idle,
in_flight: Default::default(),
}); });
type_keys(&mut app, "draft"); type_keys(&mut app, "draft");

View File

@ -868,6 +868,7 @@ async fn ticket_queue_notification_sends_notify_when_socket_available() {
context_tokens: 0, context_tokens: 0,
}, },
status: PodStatus::Idle, status: PodStatus::Idle,
in_flight: Default::default(),
}) })
.await .await
.unwrap(); .unwrap();
@ -908,6 +909,7 @@ async fn send_notify_only_can_deliver_weak_notification_without_auto_run() {
context_tokens: 0, context_tokens: 0,
}, },
status: PodStatus::Idle, status: PodStatus::Idle,
in_flight: Default::default(),
}) })
.await .await
.unwrap(); .unwrap();

View File

@ -819,6 +819,7 @@ mod tests {
entries: vec![], entries: vec![],
greeting: test_greeting(), greeting: test_greeting(),
status: PodStatus::Idle, status: PodStatus::Idle,
in_flight: Default::default(),
}, },
]; ];