From e29861f78786c10c9a227650512247e7d72a4d71 Mon Sep 17 00:00:00 2001 From: Hare Date: Sat, 23 May 2026 07:15:39 +0900 Subject: [PATCH] fix: preserve tui input during streaming --- crates/tui/src/main.rs | 262 ++++++++++++++++++++++++++++++++++------- 1 file changed, 221 insertions(+), 41 deletions(-) diff --git a/crates/tui/src/main.rs b/crates/tui/src/main.rs index 4bc61da9..85195aa1 100644 --- a/crates/tui/src/main.rs +++ b/crates/tui/src/main.rs @@ -10,9 +10,16 @@ mod task; mod tool; mod ui; +use std::future::Future; use std::io; use std::path::PathBuf; use std::process::ExitCode; +use std::sync::{ + Arc, + atomic::{AtomicBool, Ordering}, +}; +use std::thread; +use std::time::Duration; use crossterm::event::{ self, DisableBracketedPaste, DisableMouseCapture, EnableBracketedPaste, EnableMouseCapture, @@ -26,6 +33,7 @@ use protocol::{Method, PodStatus}; use ratatui::Terminal; use ratatui::backend::CrosstermBackend; use session_store::SegmentId; +use tokio::sync::mpsc; use client::PodClient; @@ -350,11 +358,137 @@ async fn run( Ok(()) } +type TerminalEventResult = io::Result; + +const TERMINAL_POLL_INTERVAL: Duration = Duration::from_millis(50); +const TERMINAL_EVENT_DRAIN_LIMIT: usize = 64; +const POD_EVENT_DRAIN_LIMIT: usize = 32; + +struct TerminalEventReader { + stop: Arc, + _thread: thread::JoinHandle<()>, +} + +impl TerminalEventReader { + fn spawn() -> io::Result<(Self, mpsc::UnboundedReceiver)> { + let (tx, rx) = mpsc::unbounded_channel(); + let stop = Arc::new(AtomicBool::new(false)); + let thread_stop = Arc::clone(&stop); + let thread = thread::Builder::new() + .name("insomnia-tui-terminal-reader".to_string()) + .spawn(move || read_terminal_events(thread_stop, tx))?; + + Ok(( + Self { + stop, + _thread: thread, + }, + rx, + )) + } +} + +impl Drop for TerminalEventReader { + fn drop(&mut self) { + self.stop.store(true, Ordering::Relaxed); + } +} + +fn read_terminal_events(stop: Arc, tx: mpsc::UnboundedSender) { + while !stop.load(Ordering::Relaxed) { + match event::poll(TERMINAL_POLL_INTERVAL) { + Ok(false) => {} + Ok(true) => { + let event = event::read(); + let should_stop = event.is_err(); + if tx.send(event).is_err() || should_stop { + break; + } + } + Err(e) => { + let _ = tx.send(Err(e)); + break; + } + } + } +} + +enum LoopInput

{ + Terminal(TerminalEventResult), + Pod(Option

), +} + +async fn next_loop_input( + term_rx: &mut mpsc::UnboundedReceiver, + connected: bool, + pod_next: F, +) -> LoopInput

+where + F: Future>, +{ + tokio::select! { + biased; + + term_event = term_rx.recv() => { + LoopInput::Terminal(term_event.unwrap_or_else(|| { + Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "terminal event reader stopped", + )) + })) + } + event = pod_next, if connected => LoopInput::Pod(event), + } +} + +async fn drain_terminal_events( + app: &mut App, + client: &mut PodClient, + term_rx: &mut mpsc::UnboundedReceiver, +) -> Result> { + let mut handled = false; + for _ in 0..TERMINAL_EVENT_DRAIN_LIMIT { + match term_rx.try_recv() { + Ok(event) => { + handled = true; + handle_terminal_event(app, client, event?).await?; + if app.quit { + break; + } + } + Err(mpsc::error::TryRecvError::Empty) => break, + Err(mpsc::error::TryRecvError::Disconnected) => { + return Err(Box::new(io::Error::new( + io::ErrorKind::UnexpectedEof, + "terminal event reader stopped", + ))); + } + } + } + Ok(handled) +} + +fn drain_pod_events(app: &mut App, client: &mut PodClient) -> bool { + let mut handled = false; + for _ in 0..POD_EVENT_DRAIN_LIMIT { + match client.try_next_event() { + Some(ev) => { + handled = true; + app.handle_pod_event(ev); + } + None => break, + } + } + handled +} + async fn run_loop( terminal: &mut Terminal>, app: &mut App, mut client: PodClient, ) -> Result<(), Box> { + let (_terminal_reader, mut term_rx) = TerminalEventReader::spawn()?; + terminal.draw(|f| ui::draw(f, app))?; loop { @@ -362,56 +496,28 @@ async fn run_loop( break; } - // Drain any already-buffered Pod events in a bounded batch before - // polling the terminal. This keeps status fresh without letting a - // busy event stream starve Ctrl-C / Ctrl-X input. - for _ in 0..32 { - match client.try_next_event() { - Some(ev) => app.handle_pod_event(ev), - None => break, - } - } - - // Always give the terminal queue a non-blocking pass each frame. - // The awaited select below only waits after this pass found nothing. - let mut handled_term_event = false; - while event::poll(std::time::Duration::ZERO)? { - handled_term_event = true; - handle_terminal_event(app, &mut client, event::read()?).await?; - if app.quit { - break; - } - } + let handled_term_event = drain_terminal_events(app, &mut client, &mut term_rx).await?; if app.quit { break; } - if handled_term_event { + let handled_pod_event = drain_pod_events(app, &mut client); + if handled_term_event || handled_pod_event { terminal.draw(|f| ui::draw(f, app))?; continue; } - tokio::select! { - term_event = tokio::task::spawn_blocking(|| { - if event::poll(std::time::Duration::from_millis(50))? { - event::read().map(Some) - } else { - Ok(None) - } - }) => { - if let Some(term_event) = term_event?? { - handle_terminal_event(app, &mut client, term_event).await?; - } + match next_loop_input(&mut term_rx, app.connected, client.next_event()).await { + LoopInput::Terminal(term_event) => { + handle_terminal_event(app, &mut client, term_event?).await?; } - event = client.next_event(), if app.connected => { - match event { - Some(ev) => app.handle_pod_event(ev), - None => { - app.connected = false; - app.mark_orphan_compacts_incomplete(); - app.push_error("Connection lost"); - } + LoopInput::Pod(event) => match event { + Some(ev) => app.handle_pod_event(ev), + None => { + app.connected = false; + app.mark_orphan_compacts_incomplete(); + app.push_error("Connection lost"); } - } + }, } terminal.draw(|f| ui::draw(f, app))?; @@ -716,4 +822,78 @@ mod tests { "--pod and --session are mutually exclusive" ); } + + #[tokio::test] + async fn terminal_event_is_selected_before_ready_pod_event() { + let (tx, mut rx) = mpsc::unbounded_channel(); + tx.send(Ok(TermEvent::Key(KeyEvent::new( + KeyCode::Char('x'), + KeyModifiers::NONE, + )))) + .unwrap(); + + match next_loop_input(&mut rx, true, std::future::ready(Some(()))).await { + LoopInput::Terminal(Ok(TermEvent::Key(key))) => { + assert_eq!(key.code, KeyCode::Char('x')); + } + _ => panic!("ready terminal input should win over a ready Pod event"), + } + } + + #[tokio::test] + async fn terminal_event_is_preserved_after_pod_event_wins() { + let (tx, mut rx) = mpsc::unbounded_channel(); + + match next_loop_input(&mut rx, true, std::future::ready(Some(1_u8))).await { + LoopInput::Pod(Some(1)) => {} + _ => panic!("expected the first ready Pod event to win before any terminal input"), + } + + tx.send(Ok(TermEvent::Key(KeyEvent::new( + KeyCode::Char('y'), + KeyModifiers::NONE, + )))) + .unwrap(); + + match next_loop_input(&mut rx, true, std::future::ready(Some(2_u8))).await { + LoopInput::Terminal(Ok(TermEvent::Key(key))) => { + assert_eq!(key.code, KeyCode::Char('y')); + } + _ => panic!("queued terminal input should not be lost to subsequent Pod events"), + } + } + + #[test] + fn running_status_still_allows_text_editing() { + let mut app = App::new("agent".to_string()); + app.set_pod_status(PodStatus::Running); + + assert!( + handle_key( + &mut app, + KeyEvent::new(KeyCode::Char('a'), KeyModifiers::NONE) + ) + .is_none() + ); + assert!( + handle_key( + &mut app, + KeyEvent::new(KeyCode::Char('c'), KeyModifiers::NONE) + ) + .is_none() + ); + assert!(handle_key(&mut app, KeyEvent::new(KeyCode::Left, KeyModifiers::NONE)).is_none()); + assert!( + handle_key( + &mut app, + KeyEvent::new(KeyCode::Char('b'), KeyModifiers::NONE) + ) + .is_none() + ); + + assert_eq!( + protocol::Segment::flatten_to_text(&app.input.submit_segments()), + "abc" + ); + } }