merge: inflight reconnect snapshot
This commit is contained in:
commit
b21638f56c
|
|
@ -14,6 +14,7 @@ use tracing::{debug, warn};
|
|||
|
||||
use crate::discovery::{PodDiscovery, list_pods_tool, restore_pod_tool, send_to_peer_pod_tool};
|
||||
use crate::feature::FeatureRegistryBuilder;
|
||||
use crate::in_flight::InFlightEvents;
|
||||
use crate::ipc::alerter::Alerter;
|
||||
use crate::ipc::notify_buffer::NotifyBuffer;
|
||||
use crate::ipc::server::SocketServer;
|
||||
|
|
@ -47,6 +48,7 @@ pub struct PodHandle {
|
|||
pub shared_state: Arc<PodSharedState>,
|
||||
pub runtime_dir: Arc<RuntimeDir>,
|
||||
pub alerter: Alerter,
|
||||
pub in_flight: InFlightEvents,
|
||||
/// Segment-log mirror + broadcast handle. The IPC server snapshots
|
||||
/// it on every new connection (Event::Snapshot) and forwards
|
||||
/// subsequent commits (Event::Entry) on the receiver.
|
||||
|
|
@ -159,6 +161,8 @@ impl PodController {
|
|||
let (method_tx, method_rx) = mpsc::channel::<Method>(32);
|
||||
let (event_tx, _) = broadcast::channel::<Event>(256);
|
||||
let alerter = Alerter::new(event_tx.clone());
|
||||
let in_flight = InFlightEvents::new(event_tx.clone());
|
||||
pod.attach_in_flight_events(in_flight.clone());
|
||||
|
||||
// Runtime directory is created before tool registration because
|
||||
// the spawn-tool factories need its socket path, and before the
|
||||
|
|
@ -225,7 +229,7 @@ impl PodController {
|
|||
pod.wire_history_persistence();
|
||||
|
||||
// === 2. Worker event bridge wiring ===
|
||||
wire_event_bridges_on_worker(&mut pod, &event_tx, &alerter);
|
||||
wire_event_bridges_on_worker(&mut pod, &event_tx, &alerter, &in_flight);
|
||||
|
||||
// === 3. Tool registration (builtin / memory / spawn-orchestration) ===
|
||||
let fs_for_view = register_pod_tools(
|
||||
|
|
@ -289,6 +293,7 @@ impl PodController {
|
|||
shared_state: shared_state.clone(),
|
||||
runtime_dir: runtime_dir.clone(),
|
||||
alerter: alerter.clone(),
|
||||
in_flight: in_flight.clone(),
|
||||
sink: pod.sink(),
|
||||
};
|
||||
|
||||
|
|
@ -333,6 +338,7 @@ fn wire_event_bridges_on_worker<C, St>(
|
|||
pod: &mut Pod<C, St>,
|
||||
event_tx: &broadcast::Sender<Event>,
|
||||
alerter: &Alerter,
|
||||
in_flight: &InFlightEvents,
|
||||
) where
|
||||
C: LlmClient + Clone + 'static,
|
||||
St: Store + PodMetadataStore + Clone + 'static,
|
||||
|
|
@ -386,83 +392,66 @@ fn wire_event_bridges_on_worker<C, St>(
|
|||
});
|
||||
});
|
||||
|
||||
let tx = event_tx.clone();
|
||||
let in_flight_text = in_flight.clone();
|
||||
let activity = ai_activity.clone();
|
||||
worker.on_text_block(move |block| {
|
||||
let tx_d = tx.clone();
|
||||
let block_id = in_flight_text.start_text_block();
|
||||
let in_flight_d = in_flight_text.clone();
|
||||
let activity_d = activity.clone();
|
||||
block.on_delta(move |text| {
|
||||
activity_d.fetch_add(1, Ordering::SeqCst);
|
||||
let _ = tx_d.send(Event::TextDelta {
|
||||
text: text.to_owned(),
|
||||
});
|
||||
in_flight_d.text_delta(block_id, text.to_owned());
|
||||
});
|
||||
let tx_s = tx.clone();
|
||||
let in_flight_s = in_flight_text.clone();
|
||||
let activity_s = activity.clone();
|
||||
block.on_stop(move |text| {
|
||||
if !text.is_empty() {
|
||||
activity_s.fetch_add(1, Ordering::SeqCst);
|
||||
}
|
||||
let _ = tx_s.send(Event::TextDone {
|
||||
text: text.to_owned(),
|
||||
});
|
||||
in_flight_s.text_done(block_id, text.to_owned());
|
||||
});
|
||||
});
|
||||
|
||||
let tx = event_tx.clone();
|
||||
let in_flight_thinking = in_flight.clone();
|
||||
let activity = ai_activity.clone();
|
||||
worker.on_thinking_block(move |block| {
|
||||
// Start fires unconditionally so the TUI can show "Thinking..."
|
||||
// even when the provider doesn't emit plaintext deltas.
|
||||
activity.fetch_add(1, Ordering::SeqCst);
|
||||
let _ = tx.send(Event::ThinkingStart);
|
||||
let tx_d = tx.clone();
|
||||
let block_id = in_flight_thinking.thinking_start();
|
||||
let in_flight_d = in_flight_thinking.clone();
|
||||
let activity_d = activity.clone();
|
||||
block.on_delta(move |text| {
|
||||
activity_d.fetch_add(1, Ordering::SeqCst);
|
||||
let _ = tx_d.send(Event::ThinkingDelta {
|
||||
text: text.to_owned(),
|
||||
});
|
||||
in_flight_d.thinking_delta(block_id, text.to_owned());
|
||||
});
|
||||
let tx_s = tx.clone();
|
||||
let in_flight_s = in_flight_thinking.clone();
|
||||
let activity_s = activity.clone();
|
||||
block.on_stop(move |text| {
|
||||
if !text.is_empty() {
|
||||
activity_s.fetch_add(1, Ordering::SeqCst);
|
||||
}
|
||||
let _ = tx_s.send(Event::ThinkingDone {
|
||||
text: text.to_owned(),
|
||||
});
|
||||
in_flight_s.thinking_done(block_id, text.to_owned());
|
||||
});
|
||||
});
|
||||
|
||||
let tx = event_tx.clone();
|
||||
let in_flight_tool = in_flight.clone();
|
||||
let activity = ai_activity.clone();
|
||||
worker.on_tool_use_block(move |start, block| {
|
||||
activity.fetch_add(1, Ordering::SeqCst);
|
||||
let _ = tx.send(Event::ToolCallStart {
|
||||
id: start.id.clone(),
|
||||
name: start.name.clone(),
|
||||
});
|
||||
let block_id = in_flight_tool.tool_call_start(start.id.clone(), start.name.clone());
|
||||
let id_for_delta = start.id.clone();
|
||||
let tx_d = tx.clone();
|
||||
let in_flight_d = in_flight_tool.clone();
|
||||
let activity_d = activity.clone();
|
||||
block.on_delta(move |json| {
|
||||
activity_d.fetch_add(1, Ordering::SeqCst);
|
||||
let _ = tx_d.send(Event::ToolCallArgsDelta {
|
||||
id: id_for_delta.clone(),
|
||||
json: json.to_owned(),
|
||||
});
|
||||
in_flight_d.tool_call_args_delta(block_id, id_for_delta.clone(), json.to_owned());
|
||||
});
|
||||
let tx_s = tx.clone();
|
||||
let in_flight_s = in_flight_tool.clone();
|
||||
let activity_s = activity.clone();
|
||||
block.on_stop(move |call| {
|
||||
activity_s.fetch_add(1, Ordering::SeqCst);
|
||||
let _ = tx_s.send(Event::ToolCallDone {
|
||||
id: call.id.clone(),
|
||||
name: call.name.clone(),
|
||||
arguments: call.input.to_string(),
|
||||
});
|
||||
in_flight_s.tool_call_done(block_id, call.id.clone(), call.input.to_string());
|
||||
});
|
||||
});
|
||||
|
||||
|
|
@ -1535,6 +1524,7 @@ mod tests {
|
|||
context_tokens: 0,
|
||||
},
|
||||
status: PodStatus::Idle,
|
||||
in_flight: Default::default(),
|
||||
})
|
||||
.await
|
||||
.ok()?;
|
||||
|
|
|
|||
|
|
@ -1463,6 +1463,7 @@ mod tests {
|
|||
context_tokens: 0,
|
||||
},
|
||||
status: PodStatus::Idle,
|
||||
in_flight: Default::default(),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
|
@ -1494,6 +1495,7 @@ mod tests {
|
|||
context_tokens: 0,
|
||||
},
|
||||
status: PodStatus::Idle,
|
||||
in_flight: Default::default(),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
|
@ -1579,6 +1581,7 @@ mod tests {
|
|||
context_tokens: 0,
|
||||
},
|
||||
status: PodStatus::Idle,
|
||||
in_flight: Default::default(),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
|
@ -1601,6 +1604,7 @@ mod tests {
|
|||
context_tokens: 0,
|
||||
},
|
||||
status: PodStatus::Idle,
|
||||
in_flight: Default::default(),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
|
@ -1700,6 +1704,7 @@ mod tests {
|
|||
context_tokens: 0,
|
||||
},
|
||||
status: PodStatus::Paused,
|
||||
in_flight: Default::default(),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
|
@ -1748,6 +1753,7 @@ mod tests {
|
|||
context_tokens: 0,
|
||||
},
|
||||
status: PodStatus::Idle,
|
||||
in_flight: Default::default(),
|
||||
})
|
||||
.await;
|
||||
});
|
||||
|
|
|
|||
477
crates/pod/src/in_flight.rs
Normal file
477
crates/pod/src/in_flight.rs
Normal file
|
|
@ -0,0 +1,477 @@
|
|||
use std::sync::{Arc, Mutex, MutexGuard};
|
||||
|
||||
use protocol::{Event, InFlightBlock, InFlightSnapshot, InFlightToolCallState};
|
||||
use session_store::{LoggedContentPart, LoggedItem};
|
||||
use tokio::sync::broadcast;
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub struct InFlightBlockId(u64);
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct InFlightEvents {
|
||||
inner: Arc<Mutex<InFlightInner>>,
|
||||
event_tx: broadcast::Sender<Event>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct InFlightInner {
|
||||
next_block_id: u64,
|
||||
blocks: Vec<TrackedBlock>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
enum TrackedBlock {
|
||||
Text {
|
||||
block_id: InFlightBlockId,
|
||||
text: String,
|
||||
finished: bool,
|
||||
},
|
||||
Thinking {
|
||||
block_id: InFlightBlockId,
|
||||
text: String,
|
||||
finished: bool,
|
||||
},
|
||||
ToolCall {
|
||||
block_id: InFlightBlockId,
|
||||
id: String,
|
||||
name: String,
|
||||
args: String,
|
||||
state: InFlightToolCallState,
|
||||
},
|
||||
}
|
||||
|
||||
impl InFlightEvents {
|
||||
pub(crate) fn new(event_tx: broadcast::Sender<Event>) -> Self {
|
||||
Self {
|
||||
inner: Arc::new(Mutex::new(InFlightInner {
|
||||
next_block_id: 1,
|
||||
blocks: Vec::new(),
|
||||
})),
|
||||
event_tx,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn snapshot_guard(&self) -> MutexGuard<'_, InFlightInner> {
|
||||
self.inner.lock().expect("in-flight event mutex poisoned")
|
||||
}
|
||||
|
||||
pub(crate) fn start_text_block(&self) -> InFlightBlockId {
|
||||
let mut inner = self.lock();
|
||||
let block_id = inner.next_id();
|
||||
inner.blocks.push(TrackedBlock::Text {
|
||||
block_id,
|
||||
text: String::new(),
|
||||
finished: false,
|
||||
});
|
||||
block_id
|
||||
}
|
||||
|
||||
pub(crate) fn text_delta(&self, block_id: InFlightBlockId, text: String) {
|
||||
let mut inner = self.lock();
|
||||
if let Some(TrackedBlock::Text {
|
||||
text: current,
|
||||
finished,
|
||||
..
|
||||
}) = inner.find_block_mut(block_id)
|
||||
{
|
||||
current.push_str(&text);
|
||||
*finished = false;
|
||||
}
|
||||
let _ = self.event_tx.send(Event::TextDelta { text });
|
||||
}
|
||||
|
||||
pub(crate) fn text_done(&self, block_id: InFlightBlockId, text: String) {
|
||||
let mut inner = self.lock();
|
||||
if let Some(TrackedBlock::Text {
|
||||
text: current,
|
||||
finished,
|
||||
..
|
||||
}) = inner.find_block_mut(block_id)
|
||||
{
|
||||
if current.is_empty() {
|
||||
*current = text.clone();
|
||||
}
|
||||
*finished = true;
|
||||
}
|
||||
let _ = self.event_tx.send(Event::TextDone { text });
|
||||
}
|
||||
|
||||
pub(crate) fn thinking_start(&self) -> InFlightBlockId {
|
||||
let mut inner = self.lock();
|
||||
let block_id = inner.next_id();
|
||||
inner.blocks.push(TrackedBlock::Thinking {
|
||||
block_id,
|
||||
text: String::new(),
|
||||
finished: false,
|
||||
});
|
||||
let _ = self.event_tx.send(Event::ThinkingStart);
|
||||
block_id
|
||||
}
|
||||
|
||||
pub(crate) fn thinking_delta(&self, block_id: InFlightBlockId, text: String) {
|
||||
let mut inner = self.lock();
|
||||
if let Some(TrackedBlock::Thinking {
|
||||
text: current,
|
||||
finished,
|
||||
..
|
||||
}) = inner.find_block_mut(block_id)
|
||||
{
|
||||
current.push_str(&text);
|
||||
*finished = false;
|
||||
}
|
||||
let _ = self.event_tx.send(Event::ThinkingDelta { text });
|
||||
}
|
||||
|
||||
pub(crate) fn thinking_done(&self, block_id: InFlightBlockId, text: String) {
|
||||
let mut inner = self.lock();
|
||||
if let Some(TrackedBlock::Thinking {
|
||||
text: current,
|
||||
finished,
|
||||
..
|
||||
}) = inner.find_block_mut(block_id)
|
||||
{
|
||||
if current.is_empty() {
|
||||
*current = text.clone();
|
||||
}
|
||||
*finished = true;
|
||||
}
|
||||
let _ = self.event_tx.send(Event::ThinkingDone { text });
|
||||
}
|
||||
|
||||
pub(crate) fn tool_call_start(&self, id: String, name: String) -> InFlightBlockId {
|
||||
let mut inner = self.lock();
|
||||
let block_id = inner.next_id();
|
||||
inner.blocks.push(TrackedBlock::ToolCall {
|
||||
block_id,
|
||||
id: id.clone(),
|
||||
name: name.clone(),
|
||||
args: String::new(),
|
||||
state: InFlightToolCallState::Pending,
|
||||
});
|
||||
let _ = self.event_tx.send(Event::ToolCallStart { id, name });
|
||||
block_id
|
||||
}
|
||||
|
||||
pub(crate) fn tool_call_args_delta(
|
||||
&self,
|
||||
block_id: InFlightBlockId,
|
||||
id: String,
|
||||
delta: String,
|
||||
) {
|
||||
let mut inner = self.lock();
|
||||
if let Some(TrackedBlock::ToolCall { args, state, .. }) = inner.find_block_mut(block_id) {
|
||||
args.push_str(&delta);
|
||||
*state = InFlightToolCallState::StreamingArgs;
|
||||
}
|
||||
let _ = self
|
||||
.event_tx
|
||||
.send(Event::ToolCallArgsDelta { id, json: delta });
|
||||
}
|
||||
|
||||
pub(crate) fn tool_call_done(&self, block_id: InFlightBlockId, id: String, args: String) {
|
||||
let mut inner = self.lock();
|
||||
let mut name = String::new();
|
||||
if let Some(TrackedBlock::ToolCall {
|
||||
name: current_name,
|
||||
args: current,
|
||||
state,
|
||||
..
|
||||
}) = inner.find_block_mut(block_id)
|
||||
{
|
||||
name = current_name.clone();
|
||||
if current.is_empty() {
|
||||
*current = args.clone();
|
||||
}
|
||||
*state = InFlightToolCallState::Done;
|
||||
}
|
||||
let _ = self.event_tx.send(Event::ToolCallDone {
|
||||
id,
|
||||
name,
|
||||
arguments: args,
|
||||
});
|
||||
}
|
||||
|
||||
pub(crate) fn clear_for_committed_item_then<R>(
|
||||
&self,
|
||||
item: &LoggedItem,
|
||||
f: impl FnOnce() -> R,
|
||||
) -> R {
|
||||
let mut inner = self.lock();
|
||||
inner.clear_for_committed_item(item);
|
||||
f()
|
||||
}
|
||||
|
||||
fn lock(&self) -> MutexGuard<'_, InFlightInner> {
|
||||
self.inner.lock().expect("in-flight event mutex poisoned")
|
||||
}
|
||||
}
|
||||
|
||||
impl InFlightInner {
|
||||
fn next_id(&mut self) -> InFlightBlockId {
|
||||
let id = InFlightBlockId(self.next_block_id);
|
||||
self.next_block_id = self.next_block_id.saturating_add(1);
|
||||
id
|
||||
}
|
||||
|
||||
fn find_block_mut(&mut self, block_id: InFlightBlockId) -> Option<&mut TrackedBlock> {
|
||||
self.blocks
|
||||
.iter_mut()
|
||||
.find(|block| block.block_id() == block_id)
|
||||
}
|
||||
|
||||
fn clear_for_committed_item(&mut self, item: &LoggedItem) {
|
||||
match item {
|
||||
LoggedItem::Message { role, content }
|
||||
if matches!(role, session_store::LoggedRole::Assistant) =>
|
||||
{
|
||||
let text = content
|
||||
.iter()
|
||||
.filter_map(|part| match part {
|
||||
LoggedContentPart::Text { text } => Some(text.as_str()),
|
||||
LoggedContentPart::Refusal { refusal } => Some(refusal.as_str()),
|
||||
})
|
||||
.collect::<String>();
|
||||
if !text.is_empty() {
|
||||
self.remove_first_text_matching(&text);
|
||||
}
|
||||
}
|
||||
LoggedItem::Reasoning { text, .. } => {
|
||||
self.remove_first_thinking_matching(text);
|
||||
}
|
||||
LoggedItem::ToolCall { call_id, .. } => {
|
||||
self.remove_tool_call(call_id);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
fn snapshot(&self) -> InFlightSnapshot {
|
||||
InFlightSnapshot {
|
||||
blocks: self
|
||||
.blocks
|
||||
.iter()
|
||||
.filter_map(TrackedBlock::to_snapshot_block)
|
||||
.collect(),
|
||||
}
|
||||
}
|
||||
|
||||
fn remove_first_text_matching(&mut self, committed: &str) {
|
||||
if let Some(index) = self.blocks.iter().position(|block| match block {
|
||||
TrackedBlock::Text { text, .. } => text == committed,
|
||||
_ => false,
|
||||
}) {
|
||||
self.blocks.remove(index);
|
||||
}
|
||||
}
|
||||
|
||||
fn remove_first_thinking_matching(&mut self, committed: &str) {
|
||||
if let Some(index) = self.blocks.iter().position(|block| match block {
|
||||
TrackedBlock::Thinking { text, .. } => text == committed,
|
||||
_ => false,
|
||||
}) {
|
||||
self.blocks.remove(index);
|
||||
}
|
||||
}
|
||||
|
||||
fn remove_tool_call(&mut self, call_id: &str) {
|
||||
if let Some(index) = self.blocks.iter().position(|block| match block {
|
||||
TrackedBlock::ToolCall { id, .. } => id == call_id,
|
||||
_ => false,
|
||||
}) {
|
||||
self.blocks.remove(index);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TrackedBlock {
|
||||
fn block_id(&self) -> InFlightBlockId {
|
||||
match self {
|
||||
TrackedBlock::Text { block_id, .. }
|
||||
| TrackedBlock::Thinking { block_id, .. }
|
||||
| TrackedBlock::ToolCall { block_id, .. } => *block_id,
|
||||
}
|
||||
}
|
||||
|
||||
fn to_snapshot_block(&self) -> Option<InFlightBlock> {
|
||||
match self {
|
||||
TrackedBlock::Text { text, finished, .. } => {
|
||||
if text.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(InFlightBlock::Text {
|
||||
text: text.clone(),
|
||||
finished: *finished,
|
||||
})
|
||||
}
|
||||
}
|
||||
TrackedBlock::Thinking { text, finished, .. } => Some(InFlightBlock::Thinking {
|
||||
text: text.clone(),
|
||||
finished: *finished,
|
||||
}),
|
||||
TrackedBlock::ToolCall {
|
||||
id,
|
||||
name,
|
||||
args,
|
||||
state,
|
||||
..
|
||||
} => Some(InFlightBlock::ToolCall {
|
||||
id: id.clone(),
|
||||
name: name.clone(),
|
||||
args: args.clone(),
|
||||
state: *state,
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn snapshot_from_guard(guard: &MutexGuard<'_, InFlightInner>) -> InFlightSnapshot {
|
||||
guard.snapshot()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn snapshot_boundary_does_not_duplicate_or_gap_delta_sent_after_subscribe() {
|
||||
let (event_tx, _) = broadcast::channel(16);
|
||||
let in_flight = InFlightEvents::new(event_tx.clone());
|
||||
let block_id = in_flight.start_text_block();
|
||||
in_flight.text_delta(block_id, "hel".into());
|
||||
|
||||
let guard = in_flight.snapshot_guard();
|
||||
let mut rx = event_tx.subscribe();
|
||||
let snapshot = snapshot_from_guard(&guard);
|
||||
drop(guard);
|
||||
|
||||
in_flight.text_delta(block_id, "lo".into());
|
||||
|
||||
assert_eq!(
|
||||
snapshot.blocks,
|
||||
vec![InFlightBlock::Text {
|
||||
text: "hel".into(),
|
||||
finished: false,
|
||||
}]
|
||||
);
|
||||
assert!(matches!(
|
||||
rx.try_recv().unwrap(),
|
||||
Event::TextDelta { text } if text == "lo"
|
||||
));
|
||||
assert!(rx.try_recv().is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn session_log_and_in_flight_snapshot_prevents_mirror_only_assistant_gap() {
|
||||
use std::sync::mpsc;
|
||||
use std::thread;
|
||||
|
||||
use crate::segment_log_sink::SegmentLogSink;
|
||||
use session_store::{LogEntry, LoggedRole};
|
||||
|
||||
let (event_tx, _) = broadcast::channel(16);
|
||||
let sink = SegmentLogSink::new();
|
||||
let in_flight = InFlightEvents::new(event_tx);
|
||||
let block_id = in_flight.start_text_block();
|
||||
in_flight.text_delta(block_id, "done".into());
|
||||
in_flight.text_done(block_id, "done".into());
|
||||
|
||||
let assistant_item = LoggedItem::Message {
|
||||
role: LoggedRole::Assistant,
|
||||
content: vec![LoggedContentPart::Text {
|
||||
text: "done".into(),
|
||||
}],
|
||||
};
|
||||
let assistant_entry = LogEntry::AssistantItem {
|
||||
ts: 1,
|
||||
item: assistant_item.clone(),
|
||||
};
|
||||
|
||||
let in_flight_guard = in_flight.snapshot_guard();
|
||||
let in_flight_for_commit = in_flight.clone();
|
||||
let sink_for_commit = sink.clone();
|
||||
let (committed_tx, committed_rx) = mpsc::channel();
|
||||
let commit_thread = thread::spawn(move || {
|
||||
// This mirrors Pod::append_entry ordering: clear in-flight first,
|
||||
// then publish the finalized AssistantItem. AssistantItem entries
|
||||
// are mirror-only and are not delivered as live entry events.
|
||||
in_flight_for_commit.clear_for_committed_item_then(&assistant_item, || {
|
||||
sink_for_commit.publish(assistant_entry);
|
||||
});
|
||||
committed_tx.send(()).unwrap();
|
||||
});
|
||||
|
||||
let (entries_snapshot, mut entry_rx) = sink.subscribe_with_snapshot();
|
||||
let in_flight_snapshot = snapshot_from_guard(&in_flight_guard);
|
||||
drop(in_flight_guard);
|
||||
|
||||
committed_rx.recv().unwrap();
|
||||
commit_thread.join().unwrap();
|
||||
|
||||
assert!(entries_snapshot.is_empty());
|
||||
assert!(matches!(
|
||||
in_flight_snapshot.blocks.as_slice(),
|
||||
[InFlightBlock::Text { text, finished: true }] if text == "done"
|
||||
));
|
||||
assert!(entry_rx.try_recv().is_err());
|
||||
let post_commit_guard = in_flight.snapshot_guard();
|
||||
assert!(snapshot_from_guard(&post_commit_guard).is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn committed_assistant_snapshot_does_not_duplicate_in_flight_block() {
|
||||
use crate::segment_log_sink::SegmentLogSink;
|
||||
use session_store::{LogEntry, LoggedRole};
|
||||
|
||||
let (event_tx, _) = broadcast::channel(16);
|
||||
let sink = SegmentLogSink::new();
|
||||
let in_flight = InFlightEvents::new(event_tx);
|
||||
let block_id = in_flight.start_text_block();
|
||||
in_flight.text_delta(block_id, "done".into());
|
||||
in_flight.text_done(block_id, "done".into());
|
||||
|
||||
let assistant_item = LoggedItem::Message {
|
||||
role: LoggedRole::Assistant,
|
||||
content: vec![LoggedContentPart::Text {
|
||||
text: "done".into(),
|
||||
}],
|
||||
};
|
||||
let assistant_entry = LogEntry::AssistantItem {
|
||||
ts: 1,
|
||||
item: assistant_item.clone(),
|
||||
};
|
||||
|
||||
in_flight.clear_for_committed_item_then(&assistant_item, || {
|
||||
sink.publish(assistant_entry);
|
||||
});
|
||||
|
||||
let in_flight_guard = in_flight.snapshot_guard();
|
||||
let (entries_snapshot, _entry_rx) = sink.subscribe_with_snapshot();
|
||||
let in_flight_snapshot = snapshot_from_guard(&in_flight_guard);
|
||||
|
||||
assert!(matches!(
|
||||
entries_snapshot.as_slice(),
|
||||
[LogEntry::AssistantItem { item, .. }] if item == &assistant_item
|
||||
));
|
||||
assert!(in_flight_snapshot.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn committed_item_clears_matching_in_flight_block() {
|
||||
let (event_tx, _) = broadcast::channel(16);
|
||||
let in_flight = InFlightEvents::new(event_tx);
|
||||
let block_id = in_flight.start_text_block();
|
||||
in_flight.text_delta(block_id, "done".into());
|
||||
in_flight.clear_for_committed_item_then(
|
||||
&LoggedItem::Message {
|
||||
role: session_store::LoggedRole::Assistant,
|
||||
content: vec![LoggedContentPart::Text {
|
||||
text: "done".into(),
|
||||
}],
|
||||
},
|
||||
|| (),
|
||||
);
|
||||
|
||||
let guard = in_flight.snapshot_guard();
|
||||
assert!(snapshot_from_guard(&guard).is_empty());
|
||||
}
|
||||
}
|
||||
|
|
@ -7,6 +7,7 @@ use tokio::net::UnixListener;
|
|||
use tokio::task::JoinHandle;
|
||||
|
||||
use crate::controller::PodHandle;
|
||||
use crate::in_flight::snapshot_from_guard;
|
||||
use protocol::{Event, Method};
|
||||
|
||||
/// Unix socket server for Pod Protocol.
|
||||
|
|
@ -104,18 +105,22 @@ async fn handle_connection(stream: tokio::net::UnixStream, handle: PodHandle) {
|
|||
let mut reader = JsonLineReader::new(reader);
|
||||
let mut writer = JsonLineWriter::new(writer);
|
||||
|
||||
// Atomically subscribe to the session-log mirror first. The
|
||||
// returned (snapshot, rx) pair partitions the entry timeline:
|
||||
// entries committed before this call appear in `entries`, every
|
||||
// entry after lands on `entry_rx`. Doing this before the alert
|
||||
// snapshot keeps both ordering pairs internally consistent.
|
||||
let (entries_snapshot, mut entry_rx) = handle.sink.subscribe_with_snapshot();
|
||||
// Hold the in-flight stream lock while taking the session-log mirror
|
||||
// snapshot. `LogEntry::AssistantItem` is mirror-only for live clients,
|
||||
// so a finalized assistant block must be observed either as an already
|
||||
// committed entry or as the still-present in-flight block. This lock
|
||||
// order matches `append_entry` (in-flight clear before sink publish) and
|
||||
// keeps the snapshot/live boundary gap-free.
|
||||
let (entries_snapshot, mut entry_rx, alert_snapshot, mut rx, in_flight) = {
|
||||
let in_flight_guard = handle.in_flight.snapshot_guard();
|
||||
let (entries_snapshot, entry_rx) = handle.sink.subscribe_with_snapshot();
|
||||
|
||||
// Atomically subscribe and snapshot buffered alerts so that
|
||||
// warnings emitted before this client connected are replayed
|
||||
// exactly once — they appear in the snapshot, and any alert
|
||||
// arriving afterwards reaches us through `rx`.
|
||||
let (alert_snapshot, mut rx) = handle.alerter.subscribe_with_snapshot();
|
||||
// Atomically subscribe and snapshot buffered alerts so that warnings
|
||||
// emitted before this client connected are replayed exactly once.
|
||||
let (alert_snapshot, rx) = handle.alerter.subscribe_with_snapshot();
|
||||
let in_flight = snapshot_from_guard(&in_flight_guard);
|
||||
(entries_snapshot, entry_rx, alert_snapshot, rx, in_flight)
|
||||
};
|
||||
for alert in alert_snapshot {
|
||||
if writer.write(&Event::Alert(alert)).await.is_err() {
|
||||
return;
|
||||
|
|
@ -131,6 +136,7 @@ async fn handle_connection(stream: tokio::net::UnixStream, handle: PodHandle) {
|
|||
.collect(),
|
||||
greeting: handle.shared_state.greeting.clone(),
|
||||
status: handle.shared_state.get_status(),
|
||||
in_flight,
|
||||
};
|
||||
if writer.write(&snapshot_event).await.is_err() {
|
||||
return;
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ pub mod entrypoint;
|
|||
pub mod feature;
|
||||
pub mod fs_view;
|
||||
pub mod hook;
|
||||
pub(crate) mod in_flight;
|
||||
pub mod ipc;
|
||||
pub mod prompt;
|
||||
pub mod runtime;
|
||||
|
|
|
|||
|
|
@ -35,6 +35,7 @@ use crate::hook::{
|
|||
Hook, HookRegistryBuilder, OnAbort, OnPromptSubmit, OnTurnEnd, PostToolCall, PreLlmRequest,
|
||||
PreToolCall,
|
||||
};
|
||||
use crate::in_flight::InFlightEvents;
|
||||
use crate::ipc::alerter::Alerter;
|
||||
use crate::ipc::interceptor::PodInterceptor;
|
||||
use crate::ipc::notify_buffer::NotifyBuffer;
|
||||
|
|
@ -167,6 +168,7 @@ pub struct LogWriterHandle<St: Clone> {
|
|||
pub store: St,
|
||||
pub state: Arc<SegmentState>,
|
||||
pub sink: SegmentLogSink,
|
||||
pub in_flight: Option<InFlightEvents>,
|
||||
}
|
||||
|
||||
impl<St> LogWriterHandle<St>
|
||||
|
|
@ -181,6 +183,15 @@ where
|
|||
let loc = self.state.location();
|
||||
self.store.append(loc.session_id, loc.segment_id, &entry)?;
|
||||
self.state.increment_entries();
|
||||
if let Some(in_flight) = &self.in_flight {
|
||||
if let LogEntry::AssistantItem { item, .. } = &entry {
|
||||
let item_for_clear = item.clone();
|
||||
in_flight.clear_for_committed_item_then(&item_for_clear, || {
|
||||
self.sink.publish(entry);
|
||||
});
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
self.sink.publish(entry);
|
||||
Ok(())
|
||||
}
|
||||
|
|
@ -296,6 +307,7 @@ pub struct Pod<C: LlmClient, St: Store> {
|
|||
/// notifications, events sent here are NOT replayed to clients that
|
||||
/// connect after the fact — they are fire-and-forget broadcasts.
|
||||
event_tx: Option<broadcast::Sender<Event>>,
|
||||
in_flight: Option<InFlightEvents>,
|
||||
/// Monotonic counter incremented by worker event bridges when an
|
||||
/// assistant-side execution artifact becomes visible to clients before
|
||||
/// it is necessarily committed to history (e.g. streaming text deltas).
|
||||
|
|
@ -449,6 +461,7 @@ impl<C: LlmClient + Clone + 'static, St: Store + Clone + 'static> Pod<C, St> {
|
|||
system_prompt_template: None,
|
||||
alerter: self.alerter.clone(),
|
||||
event_tx: self.event_tx.clone(),
|
||||
in_flight: self.in_flight.clone(),
|
||||
ai_activity_counter: self.ai_activity_counter.clone(),
|
||||
pending_notifies: NotifyBuffer::new(),
|
||||
pending_attachments: Arc::new(Mutex::new(Vec::<SystemItem>::new())),
|
||||
|
|
@ -484,6 +497,7 @@ impl<C: LlmClient + Clone + 'static, St: Store + Clone + 'static> Pod<C, St> {
|
|||
store: self.store.clone(),
|
||||
state: self.segment_state.clone(),
|
||||
sink: self.sink.clone(),
|
||||
in_flight: self.in_flight.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -495,6 +509,10 @@ impl<C: LlmClient + Clone + 'static, St: Store + Clone + 'static> Pod<C, St> {
|
|||
self.log_writer = Some(writer);
|
||||
}
|
||||
|
||||
pub fn attach_in_flight_events(&mut self, in_flight: InFlightEvents) {
|
||||
self.in_flight = Some(in_flight);
|
||||
}
|
||||
|
||||
/// Wire `Worker::on_history_append` to commit each appended item
|
||||
/// directly as a singular `LogEntry::AssistantItem` / `ToolResult`
|
||||
/// through the writer. The controller calls this once per spawned
|
||||
|
|
@ -633,6 +651,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
system_prompt_template: None,
|
||||
alerter: None,
|
||||
event_tx: None,
|
||||
in_flight: None,
|
||||
ai_activity_counter: Arc::new(AtomicUsize::new(0)),
|
||||
pending_notifies: NotifyBuffer::new(),
|
||||
pending_attachments: Arc::new(Mutex::new(Vec::<SystemItem>::new())),
|
||||
|
|
@ -3842,6 +3861,7 @@ where
|
|||
system_prompt_template: common.system_prompt_template,
|
||||
alerter: None,
|
||||
event_tx: None,
|
||||
in_flight: None,
|
||||
ai_activity_counter: Arc::new(AtomicUsize::new(0)),
|
||||
pending_notifies: NotifyBuffer::new(),
|
||||
pending_attachments: Arc::new(Mutex::new(Vec::<SystemItem>::new())),
|
||||
|
|
@ -3951,6 +3971,7 @@ where
|
|||
system_prompt_template: common.system_prompt_template,
|
||||
alerter: None,
|
||||
event_tx: None,
|
||||
in_flight: None,
|
||||
ai_activity_counter: Arc::new(AtomicUsize::new(0)),
|
||||
pending_notifies: NotifyBuffer::new(),
|
||||
pending_attachments: Arc::new(Mutex::new(Vec::<SystemItem>::new())),
|
||||
|
|
@ -4187,6 +4208,7 @@ where
|
|||
system_prompt_template: None,
|
||||
alerter: None,
|
||||
event_tx: None,
|
||||
in_flight: None,
|
||||
ai_activity_counter: Arc::new(AtomicUsize::new(0)),
|
||||
pending_notifies: NotifyBuffer::new(),
|
||||
pending_attachments: Arc::new(Mutex::new(Vec::<SystemItem>::new())),
|
||||
|
|
|
|||
|
|
@ -515,6 +515,7 @@ mod tests {
|
|||
context_tokens: 0,
|
||||
},
|
||||
status: PodStatus::Idle,
|
||||
in_flight: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -435,6 +435,7 @@ mod tests {
|
|||
context_tokens: 0,
|
||||
},
|
||||
status: PodStatus::Idle,
|
||||
in_flight: Default::default(),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
|
@ -457,6 +458,7 @@ mod tests {
|
|||
context_tokens: 0,
|
||||
},
|
||||
status: PodStatus::Idle,
|
||||
in_flight: Default::default(),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
|
|
|||
|
|
@ -129,6 +129,7 @@ fn empty_snapshot() -> Event {
|
|||
context_tokens: 0,
|
||||
},
|
||||
status: protocol::PodStatus::Idle,
|
||||
in_flight: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -203,6 +204,7 @@ fn serve_history(listener: UnixListener, items: Vec<Item>) -> JoinHandle<()> {
|
|||
context_tokens: 0,
|
||||
},
|
||||
status: protocol::PodStatus::Idle,
|
||||
in_flight: Default::default(),
|
||||
};
|
||||
let _ = writer.write(&event).await;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -91,6 +91,7 @@ fn empty_snapshot() -> Event {
|
|||
context_tokens: 0,
|
||||
},
|
||||
status: PodStatus::Idle,
|
||||
in_flight: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -123,6 +123,7 @@ fn accept_one_method(listener: UnixListener) -> tokio::task::JoinHandle<Option<M
|
|||
context_tokens: 0,
|
||||
},
|
||||
status: protocol::PodStatus::Idle,
|
||||
in_flight: Default::default(),
|
||||
})
|
||||
.await
|
||||
.is_err()
|
||||
|
|
|
|||
|
|
@ -12,6 +12,10 @@ fn is_true(value: &bool) -> bool {
|
|||
*value
|
||||
}
|
||||
|
||||
fn is_false(value: &bool) -> bool {
|
||||
!*value
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Method (Client → Pod via Unix Socket)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
|
@ -453,6 +457,10 @@ pub enum Event {
|
|||
greeting: Greeting,
|
||||
#[serde(default)]
|
||||
status: PodStatus,
|
||||
/// Unfinished model output that has already streamed in the current
|
||||
/// run but is not yet represented by committed snapshot entries.
|
||||
#[serde(default, skip_serializing_if = "InFlightSnapshot::is_empty")]
|
||||
in_flight: InFlightSnapshot,
|
||||
},
|
||||
/// Server-side segment log rotated to a fresh `SegmentStart`.
|
||||
///
|
||||
|
|
@ -631,6 +639,62 @@ pub struct RewindSummary {
|
|||
pub tool_side_effect_warning: bool,
|
||||
}
|
||||
|
||||
/// Unfinished model output included in `Event::Snapshot` for clients that
|
||||
/// attach while an LLM response is still streaming.
|
||||
///
|
||||
/// These blocks are presentation state only: they are reconstructed from the
|
||||
/// active Pod controller and must not be treated as committed assistant
|
||||
/// history. Finalized assistant items continue to come from ordinary snapshot
|
||||
/// entries.
|
||||
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct InFlightSnapshot {
|
||||
#[serde(default, skip_serializing_if = "Vec::is_empty")]
|
||||
pub blocks: Vec<InFlightBlock>,
|
||||
}
|
||||
|
||||
impl InFlightSnapshot {
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.blocks.is_empty()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
#[serde(tag = "kind", rename_all = "snake_case")]
|
||||
pub enum InFlightBlock {
|
||||
Text {
|
||||
text: String,
|
||||
#[serde(default, skip_serializing_if = "is_false")]
|
||||
finished: bool,
|
||||
},
|
||||
Thinking {
|
||||
text: String,
|
||||
#[serde(default, skip_serializing_if = "is_false")]
|
||||
finished: bool,
|
||||
},
|
||||
ToolCall {
|
||||
id: String,
|
||||
name: String,
|
||||
args: String,
|
||||
#[serde(default, skip_serializing_if = "InFlightToolCallState::is_pending")]
|
||||
state: InFlightToolCallState,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum InFlightToolCallState {
|
||||
#[default]
|
||||
Pending,
|
||||
StreamingArgs,
|
||||
Done,
|
||||
}
|
||||
|
||||
impl InFlightToolCallState {
|
||||
pub fn is_pending(&self) -> bool {
|
||||
matches!(self, Self::Pending)
|
||||
}
|
||||
}
|
||||
|
||||
/// Pod self-description rendered by the TUI when a session starts empty.
|
||||
///
|
||||
/// Built once in the Pod controller from the resolved manifest and
|
||||
|
|
@ -1129,6 +1193,7 @@ mod tests {
|
|||
context_tokens: 42_000,
|
||||
},
|
||||
status: PodStatus::Paused,
|
||||
in_flight: InFlightSnapshot::default(),
|
||||
};
|
||||
let json = serde_json::to_string(&event).unwrap();
|
||||
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
|
||||
|
|
@ -1142,6 +1207,62 @@ mod tests {
|
|||
assert_eq!(parsed["data"]["status"], "paused");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn event_snapshot_in_flight_roundtrip_and_default() {
|
||||
let inbound = r#"{"event":"snapshot","data":{"entries":[],"greeting":{"pod_name":"test","cwd":"/tmp","provider":"p","model":"m","scope_summary":"s","tools":[]},"status":"running"}}"#;
|
||||
let decoded: Event = serde_json::from_str(inbound).unwrap();
|
||||
match decoded {
|
||||
Event::Snapshot { in_flight, .. } => assert!(in_flight.is_empty()),
|
||||
other => panic!("expected Snapshot, got {other:?}"),
|
||||
}
|
||||
|
||||
let event = Event::Snapshot {
|
||||
entries: Vec::new(),
|
||||
greeting: Greeting {
|
||||
pod_name: "test".into(),
|
||||
cwd: "/tmp".into(),
|
||||
provider: "p".into(),
|
||||
model: "m".into(),
|
||||
scope_summary: "s".into(),
|
||||
tools: Vec::new(),
|
||||
context_window: 0,
|
||||
context_tokens: 0,
|
||||
},
|
||||
status: PodStatus::Running,
|
||||
in_flight: InFlightSnapshot {
|
||||
blocks: vec![
|
||||
InFlightBlock::Text {
|
||||
text: "hel".into(),
|
||||
finished: false,
|
||||
},
|
||||
InFlightBlock::Thinking {
|
||||
text: "why".into(),
|
||||
finished: true,
|
||||
},
|
||||
InFlightBlock::ToolCall {
|
||||
id: "call_1".into(),
|
||||
name: "Read".into(),
|
||||
args: r#"{"file"#.into(),
|
||||
state: InFlightToolCallState::StreamingArgs,
|
||||
},
|
||||
],
|
||||
},
|
||||
};
|
||||
let json = serde_json::to_string(&event).unwrap();
|
||||
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
|
||||
assert_eq!(parsed["data"]["in_flight"]["blocks"][0]["text"], "hel");
|
||||
assert_eq!(parsed["data"]["in_flight"]["blocks"][1]["finished"], true);
|
||||
assert_eq!(
|
||||
parsed["data"]["in_flight"]["blocks"][2]["state"],
|
||||
"streaming_args"
|
||||
);
|
||||
|
||||
match serde_json::from_str::<Event>(&json).unwrap() {
|
||||
Event::Snapshot { in_flight, .. } => assert_eq!(in_flight.blocks.len(), 3),
|
||||
other => panic!("expected Snapshot, got {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn event_segment_rotated_roundtrip() {
|
||||
let event = Event::SegmentRotated {
|
||||
|
|
|
|||
|
|
@ -3,8 +3,8 @@ use std::path::Path;
|
|||
use std::time::{Duration, Instant};
|
||||
|
||||
use protocol::{
|
||||
AlertLevel, AlertSource, CompletionEntry, CompletionKind, ErrorCode, Event, Method, PodStatus,
|
||||
RewindTarget, RunResult, Segment,
|
||||
AlertLevel, AlertSource, CompletionEntry, CompletionKind, ErrorCode, Event, InFlightBlock,
|
||||
InFlightSnapshot, InFlightToolCallState, Method, PodStatus, RewindTarget, RunResult, Segment,
|
||||
};
|
||||
|
||||
use crate::block::{
|
||||
|
|
@ -1279,9 +1279,10 @@ impl App {
|
|||
entries,
|
||||
greeting,
|
||||
status,
|
||||
in_flight,
|
||||
} => {
|
||||
self.rewind_refresh_fence = false;
|
||||
self.restore_snapshot(&entries, greeting);
|
||||
self.restore_snapshot(&entries, greeting, in_flight);
|
||||
self.set_pod_status(status);
|
||||
}
|
||||
Event::Status { status } => {
|
||||
|
|
@ -1410,6 +1411,50 @@ impl App {
|
|||
});
|
||||
}
|
||||
|
||||
fn apply_in_flight_snapshot(&mut self, snapshot: InFlightSnapshot) {
|
||||
for block in snapshot.blocks {
|
||||
match block {
|
||||
InFlightBlock::Text { text, finished } => {
|
||||
self.blocks.push(Block::AssistantText { text });
|
||||
self.assistant_streaming = !finished;
|
||||
}
|
||||
InFlightBlock::Thinking { text, finished } => {
|
||||
let state = if finished {
|
||||
ThinkingState::Finished { elapsed_secs: None }
|
||||
} else {
|
||||
ThinkingState::Streaming {
|
||||
started_at: Instant::now(),
|
||||
}
|
||||
};
|
||||
self.blocks
|
||||
.push(Block::Thinking(ThinkingBlock { text, state }));
|
||||
}
|
||||
InFlightBlock::ToolCall {
|
||||
id,
|
||||
name,
|
||||
args,
|
||||
state,
|
||||
} => {
|
||||
let (tool_state, arguments) = match state {
|
||||
InFlightToolCallState::Pending => (ToolCallState::Pending, None),
|
||||
InFlightToolCallState::StreamingArgs => (ToolCallState::Streaming, None),
|
||||
InFlightToolCallState::Done => {
|
||||
(ToolCallState::Executing, Some(args.clone()))
|
||||
}
|
||||
};
|
||||
self.blocks.push(Block::ToolCall(ToolCallBlock {
|
||||
id,
|
||||
name,
|
||||
args_stream: args,
|
||||
arguments,
|
||||
state: tool_state,
|
||||
edit_snapshot: None,
|
||||
}));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn append_assistant_text(&mut self, text: &str) {
|
||||
if self.assistant_streaming {
|
||||
if let Some(Block::AssistantText { text: existing }) = self.blocks.last_mut() {
|
||||
|
|
@ -1913,11 +1958,17 @@ impl App {
|
|||
/// LogEntry variant into the same blocks live events would have
|
||||
/// produced. Followed by `Event::Entry` updates for anything
|
||||
/// committed after the snapshot.
|
||||
fn restore_snapshot(&mut self, entries: &[serde_json::Value], greeting: protocol::Greeting) {
|
||||
fn restore_snapshot(
|
||||
&mut self,
|
||||
entries: &[serde_json::Value],
|
||||
greeting: protocol::Greeting,
|
||||
in_flight: InFlightSnapshot,
|
||||
) {
|
||||
self.greeting = Some(greeting.clone());
|
||||
self.context_window = greeting.context_window;
|
||||
self.session_context_tokens = greeting.context_tokens;
|
||||
self.restore_entries(entries, Some(greeting));
|
||||
self.apply_in_flight_snapshot(in_flight);
|
||||
}
|
||||
|
||||
/// Restore after a successful destructive rewind. The Pod's
|
||||
|
|
@ -3151,6 +3202,7 @@ mod completion_flow_tests {
|
|||
greeting: test_greeting(),
|
||||
entries: vec![session_start_value],
|
||||
status: PodStatus::Running,
|
||||
in_flight: Default::default(),
|
||||
});
|
||||
|
||||
assert!(matches!(app.pod_status, PodStatus::Running));
|
||||
|
|
@ -3161,6 +3213,54 @@ mod completion_flow_tests {
|
|||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn snapshot_in_flight_blocks_continue_with_live_deltas() {
|
||||
let mut app = App::new("test".into());
|
||||
app.handle_pod_event(Event::Snapshot {
|
||||
greeting: test_greeting(),
|
||||
entries: Vec::new(),
|
||||
status: PodStatus::Running,
|
||||
in_flight: InFlightSnapshot {
|
||||
blocks: vec![
|
||||
InFlightBlock::Thinking {
|
||||
text: "why".into(),
|
||||
finished: false,
|
||||
},
|
||||
InFlightBlock::ToolCall {
|
||||
id: "call_1".into(),
|
||||
name: "Read".into(),
|
||||
args: r#"{\"file"#.into(),
|
||||
state: InFlightToolCallState::StreamingArgs,
|
||||
},
|
||||
InFlightBlock::Text {
|
||||
text: "hel".into(),
|
||||
finished: false,
|
||||
},
|
||||
],
|
||||
},
|
||||
});
|
||||
|
||||
app.handle_pod_event(Event::TextDelta { text: "lo".into() });
|
||||
app.handle_pod_event(Event::ThinkingDelta { text: "?".into() });
|
||||
app.handle_pod_event(Event::ToolCallArgsDelta {
|
||||
id: "call_1".into(),
|
||||
json: r#"\":\"src/lib.rs\"}"#.into(),
|
||||
});
|
||||
|
||||
assert!(matches!(
|
||||
app.blocks.iter().find(|block| matches!(block, Block::AssistantText { .. })),
|
||||
Some(Block::AssistantText { text }) if text == "hello"
|
||||
));
|
||||
assert!(matches!(
|
||||
app.blocks.iter().find(|block| matches!(block, Block::Thinking(_))),
|
||||
Some(Block::Thinking(thinking)) if thinking.text == "why?"
|
||||
));
|
||||
assert!(matches!(
|
||||
app.blocks.iter().find(|block| matches!(block, Block::ToolCall(_))),
|
||||
Some(Block::ToolCall(call)) if call.args_stream == r#"{\"file\":\"src/lib.rs\"}"#
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn live_system_item_workflow_appends_system_message_block() {
|
||||
let mut app = App::new("test".into());
|
||||
|
|
@ -3294,6 +3394,7 @@ mod completion_flow_tests {
|
|||
entries: Vec::new(),
|
||||
greeting,
|
||||
status: PodStatus::Idle,
|
||||
in_flight: Default::default(),
|
||||
});
|
||||
|
||||
assert_eq!(app.context_window, 123_000);
|
||||
|
|
@ -3492,6 +3593,7 @@ mod completion_flow_tests {
|
|||
greeting: test_greeting(),
|
||||
entries: assistant_item_entries,
|
||||
status: PodStatus::Running,
|
||||
in_flight: Default::default(),
|
||||
});
|
||||
|
||||
let tasks = app.task_store.tasks();
|
||||
|
|
|
|||
|
|
@ -1922,6 +1922,7 @@ mod tests {
|
|||
greeting: test_greeting(),
|
||||
entries: vec![],
|
||||
status: PodStatus::Idle,
|
||||
in_flight: Default::default(),
|
||||
});
|
||||
app.handle_pod_event(Event::RewindApplied {
|
||||
entries: vec![],
|
||||
|
|
@ -1947,6 +1948,7 @@ mod tests {
|
|||
greeting: test_greeting(),
|
||||
entries: vec![],
|
||||
status: PodStatus::Idle,
|
||||
in_flight: Default::default(),
|
||||
});
|
||||
type_keys(&mut app, "draft");
|
||||
|
||||
|
|
|
|||
|
|
@ -868,6 +868,7 @@ async fn ticket_queue_notification_sends_notify_when_socket_available() {
|
|||
context_tokens: 0,
|
||||
},
|
||||
status: PodStatus::Idle,
|
||||
in_flight: Default::default(),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
|
@ -908,6 +909,7 @@ async fn send_notify_only_can_deliver_weak_notification_without_auto_run() {
|
|||
context_tokens: 0,
|
||||
},
|
||||
status: PodStatus::Idle,
|
||||
in_flight: Default::default(),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
|
|
|||
|
|
@ -819,6 +819,7 @@ mod tests {
|
|||
entries: vec![],
|
||||
greeting: test_greeting(),
|
||||
status: PodStatus::Idle,
|
||||
in_flight: Default::default(),
|
||||
},
|
||||
];
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user