fix: close in-flight snapshot commit race

This commit is contained in:
Keisuke Hirata 2026-06-21 20:51:48 +09:00
parent 74aca6f6c5
commit 061136d798
No known key found for this signature in database
5 changed files with 164 additions and 68 deletions

View File

@ -191,31 +191,14 @@ impl InFlightEvents {
}); });
} }
pub(crate) fn clear_for_committed_item(&self, item: &LoggedItem) { pub(crate) fn clear_for_committed_item_then<R>(
&self,
item: &LoggedItem,
f: impl FnOnce() -> R,
) -> R {
let mut inner = self.lock(); let mut inner = self.lock();
match item { inner.clear_for_committed_item(item);
LoggedItem::Message { role, content } f()
if matches!(role, session_store::LoggedRole::Assistant) =>
{
let text = content
.iter()
.filter_map(|part| match part {
LoggedContentPart::Text { text } => Some(text.as_str()),
LoggedContentPart::Refusal { refusal } => Some(refusal.as_str()),
})
.collect::<String>();
if !text.is_empty() {
inner.remove_first_text_matching(&text);
}
}
LoggedItem::Reasoning { text, .. } => {
inner.remove_first_thinking_matching(text);
}
LoggedItem::ToolCall { call_id, .. } => {
inner.remove_tool_call(call_id);
}
_ => {}
}
} }
fn lock(&self) -> MutexGuard<'_, InFlightInner> { fn lock(&self) -> MutexGuard<'_, InFlightInner> {
@ -236,6 +219,32 @@ impl InFlightInner {
.find(|block| block.block_id() == block_id) .find(|block| block.block_id() == block_id)
} }
fn clear_for_committed_item(&mut self, item: &LoggedItem) {
match item {
LoggedItem::Message { role, content }
if matches!(role, session_store::LoggedRole::Assistant) =>
{
let text = content
.iter()
.filter_map(|part| match part {
LoggedContentPart::Text { text } => Some(text.as_str()),
LoggedContentPart::Refusal { refusal } => Some(refusal.as_str()),
})
.collect::<String>();
if !text.is_empty() {
self.remove_first_text_matching(&text);
}
}
LoggedItem::Reasoning { text, .. } => {
self.remove_first_thinking_matching(text);
}
LoggedItem::ToolCall { call_id, .. } => {
self.remove_tool_call(call_id);
}
_ => {}
}
}
fn snapshot(&self) -> InFlightSnapshot { fn snapshot(&self) -> InFlightSnapshot {
InFlightSnapshot { InFlightSnapshot {
blocks: self blocks: self
@ -351,18 +360,116 @@ mod tests {
assert!(rx.try_recv().is_err()); assert!(rx.try_recv().is_err());
} }
#[test]
fn session_log_and_in_flight_snapshot_prevents_mirror_only_assistant_gap() {
use std::sync::mpsc;
use std::thread;
use crate::segment_log_sink::SegmentLogSink;
use session_store::{LogEntry, LoggedRole};
let (event_tx, _) = broadcast::channel(16);
let sink = SegmentLogSink::new();
let in_flight = InFlightEvents::new(event_tx);
let block_id = in_flight.start_text_block();
in_flight.text_delta(block_id, "done".into());
in_flight.text_done(block_id, "done".into());
let assistant_item = LoggedItem::Message {
role: LoggedRole::Assistant,
content: vec![LoggedContentPart::Text {
text: "done".into(),
}],
};
let assistant_entry = LogEntry::AssistantItem {
ts: 1,
item: assistant_item.clone(),
};
let in_flight_guard = in_flight.snapshot_guard();
let in_flight_for_commit = in_flight.clone();
let sink_for_commit = sink.clone();
let (committed_tx, committed_rx) = mpsc::channel();
let commit_thread = thread::spawn(move || {
// This mirrors Pod::append_entry ordering: clear in-flight first,
// then publish the finalized AssistantItem. AssistantItem entries
// are mirror-only and are not delivered as live entry events.
in_flight_for_commit.clear_for_committed_item_then(&assistant_item, || {
sink_for_commit.publish(assistant_entry);
});
committed_tx.send(()).unwrap();
});
let (entries_snapshot, mut entry_rx) = sink.subscribe_with_snapshot();
let in_flight_snapshot = snapshot_from_guard(&in_flight_guard);
drop(in_flight_guard);
committed_rx.recv().unwrap();
commit_thread.join().unwrap();
assert!(entries_snapshot.is_empty());
assert!(matches!(
in_flight_snapshot.blocks.as_slice(),
[InFlightBlock::Text { text, finished: true }] if text == "done"
));
assert!(entry_rx.try_recv().is_err());
let post_commit_guard = in_flight.snapshot_guard();
assert!(snapshot_from_guard(&post_commit_guard).is_empty());
}
#[test]
fn committed_assistant_snapshot_does_not_duplicate_in_flight_block() {
use crate::segment_log_sink::SegmentLogSink;
use session_store::{LogEntry, LoggedRole};
let (event_tx, _) = broadcast::channel(16);
let sink = SegmentLogSink::new();
let in_flight = InFlightEvents::new(event_tx);
let block_id = in_flight.start_text_block();
in_flight.text_delta(block_id, "done".into());
in_flight.text_done(block_id, "done".into());
let assistant_item = LoggedItem::Message {
role: LoggedRole::Assistant,
content: vec![LoggedContentPart::Text {
text: "done".into(),
}],
};
let assistant_entry = LogEntry::AssistantItem {
ts: 1,
item: assistant_item.clone(),
};
in_flight.clear_for_committed_item_then(&assistant_item, || {
sink.publish(assistant_entry);
});
let in_flight_guard = in_flight.snapshot_guard();
let (entries_snapshot, _entry_rx) = sink.subscribe_with_snapshot();
let in_flight_snapshot = snapshot_from_guard(&in_flight_guard);
assert!(matches!(
entries_snapshot.as_slice(),
[LogEntry::AssistantItem { item, .. }] if item == &assistant_item
));
assert!(in_flight_snapshot.is_empty());
}
#[test] #[test]
fn committed_item_clears_matching_in_flight_block() { fn committed_item_clears_matching_in_flight_block() {
let (event_tx, _) = broadcast::channel(16); let (event_tx, _) = broadcast::channel(16);
let in_flight = InFlightEvents::new(event_tx); let in_flight = InFlightEvents::new(event_tx);
let block_id = in_flight.start_text_block(); let block_id = in_flight.start_text_block();
in_flight.text_delta(block_id, "done".into()); in_flight.text_delta(block_id, "done".into());
in_flight.clear_for_committed_item(&LoggedItem::Message { in_flight.clear_for_committed_item_then(
role: session_store::LoggedRole::Assistant, &LoggedItem::Message {
content: vec![LoggedContentPart::Text { role: session_store::LoggedRole::Assistant,
text: "done".into(), content: vec![LoggedContentPart::Text {
}], text: "done".into(),
}); }],
},
|| (),
);
let guard = in_flight.snapshot_guard(); let guard = in_flight.snapshot_guard();
assert!(snapshot_from_guard(&guard).is_empty()); assert!(snapshot_from_guard(&guard).is_empty());

View File

@ -14,9 +14,7 @@ use std::time::{SystemTime, UNIX_EPOCH};
use tokio::sync::broadcast; use tokio::sync::broadcast;
use protocol::{Alert, AlertLevel, AlertSource, Event, InFlightSnapshot}; use protocol::{Alert, AlertLevel, AlertSource, Event};
use crate::in_flight::{InFlightEvents, snapshot_from_guard};
/// Upper bound on buffered alerts. When exceeded, the oldest /// Upper bound on buffered alerts. When exceeded, the oldest
/// entries are discarded so a long-running session cannot leak /// entries are discarded so a long-running session cannot leak
@ -87,22 +85,6 @@ impl Alerter {
let snapshot: Vec<Alert> = buf.iter().cloned().collect(); let snapshot: Vec<Alert> = buf.iter().cloned().collect();
(snapshot, rx) (snapshot, rx)
} }
pub fn subscribe_with_alerts_and_in_flight_snapshot(
&self,
in_flight: &InFlightEvents,
) -> (Vec<Alert>, InFlightSnapshot, broadcast::Receiver<Event>) {
let buf = self
.inner
.buffer
.lock()
.expect("alerter buffer mutex poisoned");
let in_flight_guard = in_flight.snapshot_guard();
let rx = self.inner.event_tx.subscribe();
let alerts: Vec<Alert> = buf.iter().cloned().collect();
let in_flight = snapshot_from_guard(&in_flight_guard);
(alerts, in_flight, rx)
}
} }
fn now_ms() -> i64 { fn now_ms() -> i64 {

View File

@ -7,6 +7,7 @@ use tokio::net::UnixListener;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use crate::controller::PodHandle; use crate::controller::PodHandle;
use crate::in_flight::snapshot_from_guard;
use protocol::{Event, Method}; use protocol::{Event, Method};
/// Unix socket server for Pod Protocol. /// Unix socket server for Pod Protocol.
@ -104,20 +105,22 @@ async fn handle_connection(stream: tokio::net::UnixStream, handle: PodHandle) {
let mut reader = JsonLineReader::new(reader); let mut reader = JsonLineReader::new(reader);
let mut writer = JsonLineWriter::new(writer); let mut writer = JsonLineWriter::new(writer);
// Atomically subscribe to the session-log mirror first. The // Hold the in-flight stream lock while taking the session-log mirror
// returned (snapshot, rx) pair partitions the entry timeline: // snapshot. `LogEntry::AssistantItem` is mirror-only for live clients,
// entries committed before this call appear in `entries`, every // so a finalized assistant block must be observed either as an already
// entry after lands on `entry_rx`. Doing this before the alert // committed entry or as the still-present in-flight block. This lock
// snapshot keeps both ordering pairs internally consistent. // order matches `append_entry` (in-flight clear before sink publish) and
let (entries_snapshot, mut entry_rx) = handle.sink.subscribe_with_snapshot(); // keeps the snapshot/live boundary gap-free.
let (entries_snapshot, mut entry_rx, alert_snapshot, mut rx, in_flight) = {
let in_flight_guard = handle.in_flight.snapshot_guard();
let (entries_snapshot, entry_rx) = handle.sink.subscribe_with_snapshot();
// Atomically subscribe and snapshot buffered alerts so that // Atomically subscribe and snapshot buffered alerts so that warnings
// warnings emitted before this client connected are replayed // emitted before this client connected are replayed exactly once.
// exactly once — they appear in the snapshot, and any alert let (alert_snapshot, rx) = handle.alerter.subscribe_with_snapshot();
// arriving afterwards reaches us through `rx`. let in_flight = snapshot_from_guard(&in_flight_guard);
let (alert_snapshot, in_flight, mut rx) = handle (entries_snapshot, entry_rx, alert_snapshot, rx, in_flight)
.alerter };
.subscribe_with_alerts_and_in_flight_snapshot(&handle.in_flight);
for alert in alert_snapshot { for alert in alert_snapshot {
if writer.write(&Event::Alert(alert)).await.is_err() { if writer.write(&Event::Alert(alert)).await.is_err() {
return; return;

View File

@ -185,7 +185,11 @@ where
self.state.increment_entries(); self.state.increment_entries();
if let Some(in_flight) = &self.in_flight { if let Some(in_flight) = &self.in_flight {
if let LogEntry::AssistantItem { item, .. } = &entry { if let LogEntry::AssistantItem { item, .. } = &entry {
in_flight.clear_for_committed_item(item); let item_for_clear = item.clone();
in_flight.clear_for_committed_item_then(&item_for_clear, || {
self.sink.publish(entry);
});
return Ok(());
} }
} }
self.sink.publish(entry); self.sink.publish(entry);

View File

@ -639,11 +639,6 @@ pub struct RewindSummary {
pub tool_side_effect_warning: bool, pub tool_side_effect_warning: bool,
} }
/// Pod self-description rendered by the TUI when a session starts empty.
///
/// Built once in the Pod controller from the resolved manifest and
/// transmitted alongside `Event::Snapshot` so clients don't need
/// their own view of the manifest.
/// Unfinished model output included in `Event::Snapshot` for clients that /// Unfinished model output included in `Event::Snapshot` for clients that
/// attach while an LLM response is still streaming. /// attach while an LLM response is still streaming.
/// ///
@ -700,6 +695,11 @@ impl InFlightToolCallState {
} }
} }
/// Pod self-description rendered by the TUI when a session starts empty.
///
/// Built once in the Pod controller from the resolved manifest and
/// transmitted alongside `Event::Snapshot` so clients don't need
/// their own view of the manifest.
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Greeting { pub struct Greeting {
pub pod_name: String, pub pod_name: String,