From 501dcc916fcc7de8bfa2ff7584df48c682044fe8 Mon Sep 17 00:00:00 2001 From: Hare Date: Mon, 1 Jun 2026 11:13:28 +0900 Subject: [PATCH] fix: show initial TUI user message --- crates/pod/src/controller.rs | 24 ++++----- crates/pod/src/ipc/server.rs | 84 +++++++++++++++++------------ crates/pod/src/segment_log_sink.rs | 41 ++++++++++---- crates/pod/tests/controller_test.rs | 38 +++++++++---- crates/protocol/src/lib.rs | 14 ++--- crates/tui/src/app.rs | 37 +++++++++++-- 6 files changed, 157 insertions(+), 81 deletions(-) diff --git a/crates/pod/src/controller.rs b/crates/pod/src/controller.rs index 32044460..03058aa1 100644 --- a/crates/pod/src/controller.rs +++ b/crates/pod/src/controller.rs @@ -693,21 +693,15 @@ async fn controller_loop( }); continue; } - // Broadcast the user message so every subscriber - // (including the submitter) can render the turn header - // + user line from a single source of truth. - // shared_state's `user_segments` is re-synced from - // `pod` after the run completes, so we don't push - // here. Workflow-invocation validation happens inside - // `Pod::run`; on failure the turn errors out via - // `Event::Error { InvalidRequest }` before any - // UserInput is committed. Paused→Run cleanup (orphan - // tool_result closure + interrupt system note) is - // applied inside `Pod::run` itself when the worker's - // `last_run_interrupted` flag is set. - let _ = event_tx.send(Event::UserMessage { - segments: input.clone(), - }); + // Stage the run without a speculative user-message echo. + // `Pod::run` validates the input, commits + // `LogEntry::UserInput`, and the session-log sink turns that + // committed entry into the live `Event::UserMessage`. That + // keeps every client ordered against `SegmentStart` replay and + // makes persisted history the single source of visible user + // input. Paused→Run cleanup (orphan tool_result closure + + // interrupt system note) is applied inside `Pod::run` itself + // when the worker's `last_run_interrupted` flag is set. pending = Some(PendingRun::Run(input)); } diff --git a/crates/pod/src/ipc/server.rs b/crates/pod/src/ipc/server.rs index 9ad57ace..0ebe0752 100644 --- a/crates/pod/src/ipc/server.rs +++ b/crates/pod/src/ipc/server.rs @@ -68,6 +68,37 @@ fn is_peer_disconnect_read_error(error: &io::Error) -> bool { ) } +fn live_entry_event(entry: session_store::LogEntry) -> Option { + match entry { + session_store::LogEntry::SegmentStart { .. } => { + let value = serde_json::to_value(&entry).expect("LogEntry is Serialize"); + Some(Event::SegmentRotated { entry: value }) + } + session_store::LogEntry::UserInput { segments, .. } => { + Some(Event::UserMessage { segments }) + } + session_store::LogEntry::SystemItem { item, .. } => { + let value = serde_json::to_value(&item).expect("SystemItem is Serialize"); + Some(Event::SystemItem { item: value }) + } + session_store::LogEntry::Invoke { trigger, .. } => { + Some(Event::InvokeStart { kind: trigger }) + } + other => { + // `SegmentLogSink::is_live_relevant` keeps non-live-relevant + // variants off the broadcast lane; reaching here means the two + // are out of sync and we silently dropped a wire event. Log so a + // future regression surfaces instead of vanishing. + tracing::error!( + entry_kind = ?std::mem::discriminant(&other), + "session-log broadcast emitted a non-live-relevant entry; \ + sink filter and IPC dispatch are out of sync" + ); + None + } + } +} + async fn handle_connection(stream: tokio::net::UnixStream, handle: PodHandle) { let (reader, writer) = stream.into_split(); let mut reader = JsonLineReader::new(reader); @@ -108,43 +139,13 @@ async fn handle_connection(stream: tokio::net::UnixStream, handle: PodHandle) { loop { tokio::select! { // Live session-log entries → dispatched as the role-specific - // wire events. The sink only broadcasts entries that the - // streaming-event lane doesn't cover; everything else is - // already on the wire via TextDelta / ToolCall* / etc., so we - // never see (and never need to forward) other variants here. + // wire events. `SegmentLogSink` only broadcasts committed log + // entries with live UI meaning; `UserInput` travels this lane so + // the visible user line is ordered with `SegmentStart` rotation. entry = entry_rx.recv() => { match entry { Ok(entry) => { - let outbound = match entry { - session_store::LogEntry::SegmentStart { .. } => { - let value = serde_json::to_value(&entry) - .expect("LogEntry is Serialize"); - Some(Event::SegmentRotated { entry: value }) - } - session_store::LogEntry::SystemItem { item, .. } => { - let value = serde_json::to_value(&item) - .expect("SystemItem is Serialize"); - Some(Event::SystemItem { item: value }) - } - session_store::LogEntry::Invoke { trigger, .. } => { - Some(Event::InvokeStart { kind: trigger }) - } - other => { - // `SegmentLogSink::is_live_relevant` keeps - // non-live-relevant variants off the - // broadcast lane; reaching here means the - // two are out of sync and we silently - // dropped a wire event. Log so a future - // regression surfaces instead of vanishing. - tracing::error!( - entry_kind = ?std::mem::discriminant(&other), - "session-log broadcast emitted a non-live-relevant entry; \ - sink filter and IPC dispatch are out of sync" - ); - None - } - }; - if let Some(event) = outbound { + if let Some(event) = live_entry_event(entry) { if writer.write(&event).await.is_err() { break; } @@ -261,4 +262,19 @@ mod tests { let error = io::Error::new(ErrorKind::InvalidData, "malformed method"); assert!(!is_peer_disconnect_read_error(&error)); } + + #[test] + fn user_input_log_entry_maps_to_user_message_event() { + let segments = vec![protocol::Segment::text("hello from log")]; + let event = live_entry_event(session_store::LogEntry::UserInput { + ts: session_store::segment_log::now_millis(), + segments: segments.clone(), + }) + .expect("UserInput must be live-relevant"); + + match event { + Event::UserMessage { segments: echoed } => assert_eq!(echoed, segments), + other => panic!("expected UserMessage, got {other:?}"), + } + } } diff --git a/crates/pod/src/segment_log_sink.rs b/crates/pod/src/segment_log_sink.rs index ccd536e9..42e6e6c4 100644 --- a/crates/pod/src/segment_log_sink.rs +++ b/crates/pod/src/segment_log_sink.rs @@ -87,18 +87,19 @@ impl SegmentLogSink { /// entry to the underlying `Store` — disk write is the gate. Failed /// disk writes must not call `publish`. /// - /// Live broadcast fires only for entries that the streaming-event - /// lane does not cover: + /// Live broadcast fires for committed session-log entries that + /// socket clients must see in log order: /// - `LogEntry::SegmentStart` → `Event::SegmentRotated` on the wire. + /// - `LogEntry::UserInput` → `Event::UserMessage`. /// - `LogEntry::SystemItem` → `Event::SystemItem`. /// - `LogEntry::Invoke` → `Event::InvokeStart`. - /// Everything else (AssistantItem, ToolResult, UserInput, TurnEnd, + /// Everything else (AssistantItem, ToolResult, TurnEnd, /// RunCompleted, RunErrored, LlmUsage, Extension, ConfigChanged) is /// reflected in the mirror so reconnect snapshots stay accurate, /// but is not sent live — the streaming events (TextDelta / - /// ToolCallStart / ToolResult / UserMessage / TurnEnd / etc.) - /// already provide that data, and re-broadcasting it as a typed - /// entry would just double-render every block on the client side. + /// ToolCallStart / ToolResult / TurnEnd / etc.) already provide + /// that data, and re-broadcasting it as a typed entry would just + /// double-render every block on the client side. pub fn publish(&self, entry: LogEntry) { let mut mirror = self .inner @@ -119,7 +120,10 @@ impl SegmentLogSink { fn is_live_relevant(entry: &LogEntry) -> bool { matches!( entry, - LogEntry::SegmentStart { .. } | LogEntry::SystemItem { .. } | LogEntry::Invoke { .. } + LogEntry::SegmentStart { .. } + | LogEntry::UserInput { .. } + | LogEntry::SystemItem { .. } + | LogEntry::Invoke { .. } ) } @@ -227,6 +231,15 @@ mod tests { } } + fn user_input(text: &str) -> LogEntry { + LogEntry::UserInput { + ts: now_millis(), + segments: vec![protocol::Segment::Text { + content: text.to_owned(), + }], + } + } + #[test] fn publish_then_subscribe_returns_history_in_snapshot() { let sink = SegmentLogSink::new(); @@ -265,6 +278,16 @@ mod tests { sink.publish(turn_end(1)); assert!(rx.try_recv().is_err(), "TurnEnd must not be broadcast live"); + // UserInput is live-relevant because it is the persisted source + // for Event::UserMessage. + sink.publish(user_input("hi from log")); + match rx.try_recv() { + Ok(LogEntry::UserInput { segments, .. }) => { + assert_eq!(segments.len(), 1); + } + other => panic!("expected UserInput, got {other:?}"), + } + // SystemItem is live-relevant. sink.publish(notification_entry("hi")); match rx.try_recv() { @@ -272,9 +295,9 @@ mod tests { other => panic!("expected SystemItem, got {other:?}"), } - // Mirror still grew with both entries (snapshot completeness). + // Mirror still grew with all entries (snapshot completeness). let (after_snapshot, _) = sink.subscribe_with_snapshot(); - assert_eq!(after_snapshot.len(), 3); + assert_eq!(after_snapshot.len(), 4); } #[test] diff --git a/crates/pod/tests/controller_test.rs b/crates/pod/tests/controller_test.rs index fd7c90e7..54b91334 100644 --- a/crates/pod/tests/controller_test.rs +++ b/crates/pod/tests/controller_test.rs @@ -571,13 +571,14 @@ async fn run_with_paste_segment_inlines_content_and_emits_typed_user_message() { let client_for_assert = client.clone(); let pod = make_pod(client).await; let handle = spawn_controller(pod).await; - let mut rx = handle.subscribe(); + let (_snapshot, mut entry_rx) = handle.sink.subscribe_with_snapshot(); + let mut event_rx = handle.subscribe(); // Mixed input: plain text + a paste chip + trailing text. Pod must // flatten this into one user-message string (paste content inlined, - // no `[Clipboard ...]` label leaking to the LLM); the - // `Event::UserMessage` re-broadcast must carry the typed segments - // unchanged so other clients can re-render the chip. + // no `[Clipboard ...]` label leaking to the LLM); the committed + // `LogEntry::UserInput` must carry the typed segments unchanged so + // socket clients can derive `Event::UserMessage` and re-render the chip. let segments = vec![ protocol::Segment::text("see "), protocol::Segment::Paste { @@ -596,21 +597,36 @@ async fn run_with_paste_segment_inlines_content_and_emits_typed_user_message() { .unwrap(); let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(2); - let mut user_event_segments: Option> = None; + let mut saw_turn_end = false; + let mut user_input_segments: Option> = None; loop { tokio::select! { - event = rx.recv() => match event { - Ok(Event::UserMessage { segments }) => user_event_segments = Some(segments), - Ok(Event::TurnEnd { .. }) => break, + event = event_rx.recv() => match event { + Ok(Event::TurnEnd { .. }) => { + saw_turn_end = true; + if user_input_segments.is_some() { + break; + } + } + Err(_) => break, + _ => {} + }, + entry = entry_rx.recv() => match entry { + Ok(session_store::LogEntry::UserInput { segments, .. }) => { + user_input_segments = Some(segments); + if saw_turn_end { + break; + } + } Err(_) => break, _ => {} }, _ = tokio::time::sleep_until(deadline) => break, } } - let echoed = user_event_segments.expect("UserMessage event missing"); - assert_eq!(echoed.len(), 3, "all three segments must round-trip"); - assert!(matches!(echoed[1], protocol::Segment::Paste { id: 7, .. })); + assert!(saw_turn_end, "TurnEnd event missing"); + let echoed = user_input_segments.expect("committed UserInput entry missing"); + assert_eq!(echoed, segments, "typed segments must round-trip unchanged"); // The Worker received a single user message whose text is the // flattened body — paste content inlined, no chip label. diff --git a/crates/protocol/src/lib.rs b/crates/protocol/src/lib.rs index cce3da53..5d9fceeb 100644 --- a/crates/protocol/src/lib.rs +++ b/crates/protocol/src/lib.rs @@ -243,14 +243,14 @@ impl Method { #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "event", content = "data", rename_all = "snake_case")] pub enum Event { - /// A user input message was accepted by the Pod and is about to - /// start a new turn. Broadcast to every subscribed client so - /// additional TUI / GUI instances show the same pending user line - /// that the submitter already sees — without this event, non- - /// submitting clients would see tool calls and assistant text - /// appear without any preceding user message. + /// A user input message was accepted, persisted as + /// `LogEntry::UserInput`, and is about to start a new turn. + /// Broadcast to every subscribed client so TUI / GUI instances show + /// the same user line that reconnect snapshots would replay from + /// history; clients must not synthesize a separate pending/fake + /// message for accepted runs. /// - /// Fires exactly once per accepted `Method::Run`, after + /// Fires exactly once per committed user input, after /// `InvokeStart { kind: UserSend }` and before the first /// `TurnStart`. Rejected runs (e.g. `AlreadyRunning`) do not emit. UserMessage { diff --git a/crates/tui/src/app.rs b/crates/tui/src/app.rs index eba58534..ee16cda7 100644 --- a/crates/tui/src/app.rs +++ b/crates/tui/src/app.rs @@ -568,11 +568,10 @@ impl App { } fn method_for_run(&mut self, segments: Vec) -> Method { - // TurnHeader / UserMessage blocks are pushed in response to - // `Event::UserMessage` (single source of truth, shared by every - // client subscribed to the Pod). Locally we only clear the - // input buffer and forward the method, while remembering enough - // local state to undo the visible submit if the Pod reports that + // TurnHeader / UserMessage blocks are pushed only after the Pod + // emits `Event::UserMessage` from a committed `LogEntry::UserInput`. + // Locally we only clear the input buffer and forward the method, + // while remembering enough local state to undo the visible submit if // the accepted run produced no assistant output and was rolled back. self.pending_submit_rollback = Some(RollbackSubmitState { text: Segment::flatten_to_text(&segments), @@ -2317,6 +2316,34 @@ mod completion_flow_tests { assert!(app.completion.as_ref().unwrap().entries.is_empty()); } + #[test] + fn committed_user_message_survives_fresh_segment_rotation() { + let mut app = App::new("test".into()); + let start = session_store::LogEntry::SegmentStart { + ts: session_store::segment_log::now_millis(), + session_id: uuid::Uuid::nil(), + system_prompt: None, + config: llm_worker::llm_client::RequestConfig::default(), + history: vec![], + forked_from: None, + compacted_from: None, + }; + + app.handle_pod_event(Event::SegmentRotated { + entry: serde_json::to_value(start).expect("LogEntry is Serialize"), + }); + app.handle_pod_event(Event::UserMessage { + segments: vec![Segment::text("first persisted message")], + }); + + assert_eq!(app.turn_index, 1); + assert!(app.blocks.iter().any(|b| matches!( + b, + Block::UserMessage { segments } + if Segment::flatten_to_text(segments) == "first persisted message" + ))); + } + #[test] fn rolled_back_run_restores_input_and_removes_submit_blocks() { let mut app = App::new("test".into());