merge: TUI first message display fix

This commit is contained in:
Keisuke Hirata 2026-06-01 11:21:55 +09:00
commit ff279177c4
No known key found for this signature in database
6 changed files with 157 additions and 81 deletions

View File

@ -693,21 +693,15 @@ async fn controller_loop<C, St>(
}); });
continue; continue;
} }
// Broadcast the user message so every subscriber // Stage the run without a speculative user-message echo.
// (including the submitter) can render the turn header // `Pod::run` validates the input, commits
// + user line from a single source of truth. // `LogEntry::UserInput`, and the session-log sink turns that
// shared_state's `user_segments` is re-synced from // committed entry into the live `Event::UserMessage`. That
// `pod` after the run completes, so we don't push // keeps every client ordered against `SegmentStart` replay and
// here. Workflow-invocation validation happens inside // makes persisted history the single source of visible user
// `Pod::run`; on failure the turn errors out via // input. Paused→Run cleanup (orphan tool_result closure +
// `Event::Error { InvalidRequest }` before any // interrupt system note) is applied inside `Pod::run` itself
// UserInput is committed. Paused→Run cleanup (orphan // when the worker's `last_run_interrupted` flag is set.
// tool_result closure + interrupt system note) is
// applied inside `Pod::run` itself when the worker's
// `last_run_interrupted` flag is set.
let _ = event_tx.send(Event::UserMessage {
segments: input.clone(),
});
pending = Some(PendingRun::Run(input)); pending = Some(PendingRun::Run(input));
} }

View File

@ -68,6 +68,37 @@ fn is_peer_disconnect_read_error(error: &io::Error) -> bool {
) )
} }
fn live_entry_event(entry: session_store::LogEntry) -> Option<Event> {
match entry {
session_store::LogEntry::SegmentStart { .. } => {
let value = serde_json::to_value(&entry).expect("LogEntry is Serialize");
Some(Event::SegmentRotated { entry: value })
}
session_store::LogEntry::UserInput { segments, .. } => {
Some(Event::UserMessage { segments })
}
session_store::LogEntry::SystemItem { item, .. } => {
let value = serde_json::to_value(&item).expect("SystemItem is Serialize");
Some(Event::SystemItem { item: value })
}
session_store::LogEntry::Invoke { trigger, .. } => {
Some(Event::InvokeStart { kind: trigger })
}
other => {
// `SegmentLogSink::is_live_relevant` keeps non-live-relevant
// variants off the broadcast lane; reaching here means the two
// are out of sync and we silently dropped a wire event. Log so a
// future regression surfaces instead of vanishing.
tracing::error!(
entry_kind = ?std::mem::discriminant(&other),
"session-log broadcast emitted a non-live-relevant entry; \
sink filter and IPC dispatch are out of sync"
);
None
}
}
}
async fn handle_connection(stream: tokio::net::UnixStream, handle: PodHandle) { async fn handle_connection(stream: tokio::net::UnixStream, handle: PodHandle) {
let (reader, writer) = stream.into_split(); let (reader, writer) = stream.into_split();
let mut reader = JsonLineReader::new(reader); let mut reader = JsonLineReader::new(reader);
@ -108,43 +139,13 @@ async fn handle_connection(stream: tokio::net::UnixStream, handle: PodHandle) {
loop { loop {
tokio::select! { tokio::select! {
// Live session-log entries → dispatched as the role-specific // Live session-log entries → dispatched as the role-specific
// wire events. The sink only broadcasts entries that the // wire events. `SegmentLogSink` only broadcasts committed log
// streaming-event lane doesn't cover; everything else is // entries with live UI meaning; `UserInput` travels this lane so
// already on the wire via TextDelta / ToolCall* / etc., so we // the visible user line is ordered with `SegmentStart` rotation.
// never see (and never need to forward) other variants here.
entry = entry_rx.recv() => { entry = entry_rx.recv() => {
match entry { match entry {
Ok(entry) => { Ok(entry) => {
let outbound = match entry { if let Some(event) = live_entry_event(entry) {
session_store::LogEntry::SegmentStart { .. } => {
let value = serde_json::to_value(&entry)
.expect("LogEntry is Serialize");
Some(Event::SegmentRotated { entry: value })
}
session_store::LogEntry::SystemItem { item, .. } => {
let value = serde_json::to_value(&item)
.expect("SystemItem is Serialize");
Some(Event::SystemItem { item: value })
}
session_store::LogEntry::Invoke { trigger, .. } => {
Some(Event::InvokeStart { kind: trigger })
}
other => {
// `SegmentLogSink::is_live_relevant` keeps
// non-live-relevant variants off the
// broadcast lane; reaching here means the
// two are out of sync and we silently
// dropped a wire event. Log so a future
// regression surfaces instead of vanishing.
tracing::error!(
entry_kind = ?std::mem::discriminant(&other),
"session-log broadcast emitted a non-live-relevant entry; \
sink filter and IPC dispatch are out of sync"
);
None
}
};
if let Some(event) = outbound {
if writer.write(&event).await.is_err() { if writer.write(&event).await.is_err() {
break; break;
} }
@ -261,4 +262,19 @@ mod tests {
let error = io::Error::new(ErrorKind::InvalidData, "malformed method"); let error = io::Error::new(ErrorKind::InvalidData, "malformed method");
assert!(!is_peer_disconnect_read_error(&error)); assert!(!is_peer_disconnect_read_error(&error));
} }
#[test]
fn user_input_log_entry_maps_to_user_message_event() {
let segments = vec![protocol::Segment::text("hello from log")];
let event = live_entry_event(session_store::LogEntry::UserInput {
ts: session_store::segment_log::now_millis(),
segments: segments.clone(),
})
.expect("UserInput must be live-relevant");
match event {
Event::UserMessage { segments: echoed } => assert_eq!(echoed, segments),
other => panic!("expected UserMessage, got {other:?}"),
}
}
} }

View File

@ -87,18 +87,19 @@ impl SegmentLogSink {
/// entry to the underlying `Store` — disk write is the gate. Failed /// entry to the underlying `Store` — disk write is the gate. Failed
/// disk writes must not call `publish`. /// disk writes must not call `publish`.
/// ///
/// Live broadcast fires only for entries that the streaming-event /// Live broadcast fires for committed session-log entries that
/// lane does not cover: /// socket clients must see in log order:
/// - `LogEntry::SegmentStart` → `Event::SegmentRotated` on the wire. /// - `LogEntry::SegmentStart` → `Event::SegmentRotated` on the wire.
/// - `LogEntry::UserInput` → `Event::UserMessage`.
/// - `LogEntry::SystemItem` → `Event::SystemItem`. /// - `LogEntry::SystemItem` → `Event::SystemItem`.
/// - `LogEntry::Invoke` → `Event::InvokeStart`. /// - `LogEntry::Invoke` → `Event::InvokeStart`.
/// Everything else (AssistantItem, ToolResult, UserInput, TurnEnd, /// Everything else (AssistantItem, ToolResult, TurnEnd,
/// RunCompleted, RunErrored, LlmUsage, Extension, ConfigChanged) is /// RunCompleted, RunErrored, LlmUsage, Extension, ConfigChanged) is
/// reflected in the mirror so reconnect snapshots stay accurate, /// reflected in the mirror so reconnect snapshots stay accurate,
/// but is not sent live — the streaming events (TextDelta / /// but is not sent live — the streaming events (TextDelta /
/// ToolCallStart / ToolResult / UserMessage / TurnEnd / etc.) /// ToolCallStart / ToolResult / TurnEnd / etc.) already provide
/// already provide that data, and re-broadcasting it as a typed /// that data, and re-broadcasting it as a typed entry would just
/// entry would just double-render every block on the client side. /// double-render every block on the client side.
pub fn publish(&self, entry: LogEntry) { pub fn publish(&self, entry: LogEntry) {
let mut mirror = self let mut mirror = self
.inner .inner
@ -119,7 +120,10 @@ impl SegmentLogSink {
fn is_live_relevant(entry: &LogEntry) -> bool { fn is_live_relevant(entry: &LogEntry) -> bool {
matches!( matches!(
entry, entry,
LogEntry::SegmentStart { .. } | LogEntry::SystemItem { .. } | LogEntry::Invoke { .. } LogEntry::SegmentStart { .. }
| LogEntry::UserInput { .. }
| LogEntry::SystemItem { .. }
| LogEntry::Invoke { .. }
) )
} }
@ -227,6 +231,15 @@ mod tests {
} }
} }
fn user_input(text: &str) -> LogEntry {
LogEntry::UserInput {
ts: now_millis(),
segments: vec![protocol::Segment::Text {
content: text.to_owned(),
}],
}
}
#[test] #[test]
fn publish_then_subscribe_returns_history_in_snapshot() { fn publish_then_subscribe_returns_history_in_snapshot() {
let sink = SegmentLogSink::new(); let sink = SegmentLogSink::new();
@ -265,6 +278,16 @@ mod tests {
sink.publish(turn_end(1)); sink.publish(turn_end(1));
assert!(rx.try_recv().is_err(), "TurnEnd must not be broadcast live"); assert!(rx.try_recv().is_err(), "TurnEnd must not be broadcast live");
// UserInput is live-relevant because it is the persisted source
// for Event::UserMessage.
sink.publish(user_input("hi from log"));
match rx.try_recv() {
Ok(LogEntry::UserInput { segments, .. }) => {
assert_eq!(segments.len(), 1);
}
other => panic!("expected UserInput, got {other:?}"),
}
// SystemItem is live-relevant. // SystemItem is live-relevant.
sink.publish(notification_entry("hi")); sink.publish(notification_entry("hi"));
match rx.try_recv() { match rx.try_recv() {
@ -272,9 +295,9 @@ mod tests {
other => panic!("expected SystemItem, got {other:?}"), other => panic!("expected SystemItem, got {other:?}"),
} }
// Mirror still grew with both entries (snapshot completeness). // Mirror still grew with all entries (snapshot completeness).
let (after_snapshot, _) = sink.subscribe_with_snapshot(); let (after_snapshot, _) = sink.subscribe_with_snapshot();
assert_eq!(after_snapshot.len(), 3); assert_eq!(after_snapshot.len(), 4);
} }
#[test] #[test]

View File

@ -571,13 +571,14 @@ async fn run_with_paste_segment_inlines_content_and_emits_typed_user_message() {
let client_for_assert = client.clone(); let client_for_assert = client.clone();
let pod = make_pod(client).await; let pod = make_pod(client).await;
let handle = spawn_controller(pod).await; let handle = spawn_controller(pod).await;
let mut rx = handle.subscribe(); let (_snapshot, mut entry_rx) = handle.sink.subscribe_with_snapshot();
let mut event_rx = handle.subscribe();
// Mixed input: plain text + a paste chip + trailing text. Pod must // Mixed input: plain text + a paste chip + trailing text. Pod must
// flatten this into one user-message string (paste content inlined, // flatten this into one user-message string (paste content inlined,
// no `[Clipboard ...]` label leaking to the LLM); the // no `[Clipboard ...]` label leaking to the LLM); the committed
// `Event::UserMessage` re-broadcast must carry the typed segments // `LogEntry::UserInput` must carry the typed segments unchanged so
// unchanged so other clients can re-render the chip. // socket clients can derive `Event::UserMessage` and re-render the chip.
let segments = vec![ let segments = vec![
protocol::Segment::text("see "), protocol::Segment::text("see "),
protocol::Segment::Paste { protocol::Segment::Paste {
@ -596,21 +597,36 @@ async fn run_with_paste_segment_inlines_content_and_emits_typed_user_message() {
.unwrap(); .unwrap();
let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(2); let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(2);
let mut user_event_segments: Option<Vec<protocol::Segment>> = None; let mut saw_turn_end = false;
let mut user_input_segments: Option<Vec<protocol::Segment>> = None;
loop { loop {
tokio::select! { tokio::select! {
event = rx.recv() => match event { event = event_rx.recv() => match event {
Ok(Event::UserMessage { segments }) => user_event_segments = Some(segments), Ok(Event::TurnEnd { .. }) => {
Ok(Event::TurnEnd { .. }) => break, saw_turn_end = true;
if user_input_segments.is_some() {
break;
}
}
Err(_) => break,
_ => {}
},
entry = entry_rx.recv() => match entry {
Ok(session_store::LogEntry::UserInput { segments, .. }) => {
user_input_segments = Some(segments);
if saw_turn_end {
break;
}
}
Err(_) => break, Err(_) => break,
_ => {} _ => {}
}, },
_ = tokio::time::sleep_until(deadline) => break, _ = tokio::time::sleep_until(deadline) => break,
} }
} }
let echoed = user_event_segments.expect("UserMessage event missing"); assert!(saw_turn_end, "TurnEnd event missing");
assert_eq!(echoed.len(), 3, "all three segments must round-trip"); let echoed = user_input_segments.expect("committed UserInput entry missing");
assert!(matches!(echoed[1], protocol::Segment::Paste { id: 7, .. })); assert_eq!(echoed, segments, "typed segments must round-trip unchanged");
// The Worker received a single user message whose text is the // The Worker received a single user message whose text is the
// flattened body — paste content inlined, no chip label. // flattened body — paste content inlined, no chip label.

View File

@ -243,14 +243,14 @@ impl Method {
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "event", content = "data", rename_all = "snake_case")] #[serde(tag = "event", content = "data", rename_all = "snake_case")]
pub enum Event { pub enum Event {
/// A user input message was accepted by the Pod and is about to /// A user input message was accepted, persisted as
/// start a new turn. Broadcast to every subscribed client so /// `LogEntry::UserInput`, and is about to start a new turn.
/// additional TUI / GUI instances show the same pending user line /// Broadcast to every subscribed client so TUI / GUI instances show
/// that the submitter already sees — without this event, non- /// the same user line that reconnect snapshots would replay from
/// submitting clients would see tool calls and assistant text /// history; clients must not synthesize a separate pending/fake
/// appear without any preceding user message. /// message for accepted runs.
/// ///
/// Fires exactly once per accepted `Method::Run`, after /// Fires exactly once per committed user input, after
/// `InvokeStart { kind: UserSend }` and before the first /// `InvokeStart { kind: UserSend }` and before the first
/// `TurnStart`. Rejected runs (e.g. `AlreadyRunning`) do not emit. /// `TurnStart`. Rejected runs (e.g. `AlreadyRunning`) do not emit.
UserMessage { UserMessage {

View File

@ -568,11 +568,10 @@ impl App {
} }
fn method_for_run(&mut self, segments: Vec<Segment>) -> Method { fn method_for_run(&mut self, segments: Vec<Segment>) -> Method {
// TurnHeader / UserMessage blocks are pushed in response to // TurnHeader / UserMessage blocks are pushed only after the Pod
// `Event::UserMessage` (single source of truth, shared by every // emits `Event::UserMessage` from a committed `LogEntry::UserInput`.
// client subscribed to the Pod). Locally we only clear the // Locally we only clear the input buffer and forward the method,
// input buffer and forward the method, while remembering enough // while remembering enough local state to undo the visible submit if
// local state to undo the visible submit if the Pod reports that
// the accepted run produced no assistant output and was rolled back. // the accepted run produced no assistant output and was rolled back.
self.pending_submit_rollback = Some(RollbackSubmitState { self.pending_submit_rollback = Some(RollbackSubmitState {
text: Segment::flatten_to_text(&segments), text: Segment::flatten_to_text(&segments),
@ -2317,6 +2316,34 @@ mod completion_flow_tests {
assert!(app.completion.as_ref().unwrap().entries.is_empty()); assert!(app.completion.as_ref().unwrap().entries.is_empty());
} }
#[test]
fn committed_user_message_survives_fresh_segment_rotation() {
let mut app = App::new("test".into());
let start = session_store::LogEntry::SegmentStart {
ts: session_store::segment_log::now_millis(),
session_id: uuid::Uuid::nil(),
system_prompt: None,
config: llm_worker::llm_client::RequestConfig::default(),
history: vec![],
forked_from: None,
compacted_from: None,
};
app.handle_pod_event(Event::SegmentRotated {
entry: serde_json::to_value(start).expect("LogEntry is Serialize"),
});
app.handle_pod_event(Event::UserMessage {
segments: vec![Segment::text("first persisted message")],
});
assert_eq!(app.turn_index, 1);
assert!(app.blocks.iter().any(|b| matches!(
b,
Block::UserMessage { segments }
if Segment::flatten_to_text(segments) == "first persisted message"
)));
}
#[test] #[test]
fn rolled_back_run_restores_input_and_removes_submit_blocks() { fn rolled_back_run_restores_input_and_removes_submit_blocks() {
let mut app = App::new("test".into()); let mut app = App::new("test".into());