merge: tui-streaming-input-loss

This commit is contained in:
Keisuke Hirata 2026-05-23 07:15:55 +09:00
commit 7c9abb37ad
No known key found for this signature in database

View File

@ -10,9 +10,16 @@ mod task;
mod tool;
mod ui;
use std::future::Future;
use std::io;
use std::path::PathBuf;
use std::process::ExitCode;
use std::sync::{
Arc,
atomic::{AtomicBool, Ordering},
};
use std::thread;
use std::time::Duration;
use crossterm::event::{
self, DisableBracketedPaste, DisableMouseCapture, EnableBracketedPaste, EnableMouseCapture,
@ -26,6 +33,7 @@ use protocol::{Method, PodStatus};
use ratatui::Terminal;
use ratatui::backend::CrosstermBackend;
use session_store::SegmentId;
use tokio::sync::mpsc;
use client::PodClient;
@ -350,11 +358,137 @@ async fn run(
Ok(())
}
type TerminalEventResult = io::Result<TermEvent>;
const TERMINAL_POLL_INTERVAL: Duration = Duration::from_millis(50);
const TERMINAL_EVENT_DRAIN_LIMIT: usize = 64;
const POD_EVENT_DRAIN_LIMIT: usize = 32;
struct TerminalEventReader {
stop: Arc<AtomicBool>,
_thread: thread::JoinHandle<()>,
}
impl TerminalEventReader {
fn spawn() -> io::Result<(Self, mpsc::UnboundedReceiver<TerminalEventResult>)> {
let (tx, rx) = mpsc::unbounded_channel();
let stop = Arc::new(AtomicBool::new(false));
let thread_stop = Arc::clone(&stop);
let thread = thread::Builder::new()
.name("insomnia-tui-terminal-reader".to_string())
.spawn(move || read_terminal_events(thread_stop, tx))?;
Ok((
Self {
stop,
_thread: thread,
},
rx,
))
}
}
impl Drop for TerminalEventReader {
fn drop(&mut self) {
self.stop.store(true, Ordering::Relaxed);
}
}
fn read_terminal_events(stop: Arc<AtomicBool>, tx: mpsc::UnboundedSender<TerminalEventResult>) {
while !stop.load(Ordering::Relaxed) {
match event::poll(TERMINAL_POLL_INTERVAL) {
Ok(false) => {}
Ok(true) => {
let event = event::read();
let should_stop = event.is_err();
if tx.send(event).is_err() || should_stop {
break;
}
}
Err(e) => {
let _ = tx.send(Err(e));
break;
}
}
}
}
enum LoopInput<P> {
Terminal(TerminalEventResult),
Pod(Option<P>),
}
async fn next_loop_input<P, F>(
term_rx: &mut mpsc::UnboundedReceiver<TerminalEventResult>,
connected: bool,
pod_next: F,
) -> LoopInput<P>
where
F: Future<Output = Option<P>>,
{
tokio::select! {
biased;
term_event = term_rx.recv() => {
LoopInput::Terminal(term_event.unwrap_or_else(|| {
Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"terminal event reader stopped",
))
}))
}
event = pod_next, if connected => LoopInput::Pod(event),
}
}
async fn drain_terminal_events(
app: &mut App,
client: &mut PodClient,
term_rx: &mut mpsc::UnboundedReceiver<TerminalEventResult>,
) -> Result<bool, Box<dyn std::error::Error>> {
let mut handled = false;
for _ in 0..TERMINAL_EVENT_DRAIN_LIMIT {
match term_rx.try_recv() {
Ok(event) => {
handled = true;
handle_terminal_event(app, client, event?).await?;
if app.quit {
break;
}
}
Err(mpsc::error::TryRecvError::Empty) => break,
Err(mpsc::error::TryRecvError::Disconnected) => {
return Err(Box::new(io::Error::new(
io::ErrorKind::UnexpectedEof,
"terminal event reader stopped",
)));
}
}
}
Ok(handled)
}
fn drain_pod_events(app: &mut App, client: &mut PodClient) -> bool {
let mut handled = false;
for _ in 0..POD_EVENT_DRAIN_LIMIT {
match client.try_next_event() {
Some(ev) => {
handled = true;
app.handle_pod_event(ev);
}
None => break,
}
}
handled
}
async fn run_loop(
terminal: &mut Terminal<CrosstermBackend<io::Stdout>>,
app: &mut App,
mut client: PodClient,
) -> Result<(), Box<dyn std::error::Error>> {
let (_terminal_reader, mut term_rx) = TerminalEventReader::spawn()?;
terminal.draw(|f| ui::draw(f, app))?;
loop {
@ -362,56 +496,28 @@ async fn run_loop(
break;
}
// Drain any already-buffered Pod events in a bounded batch before
// polling the terminal. This keeps status fresh without letting a
// busy event stream starve Ctrl-C / Ctrl-X input.
for _ in 0..32 {
match client.try_next_event() {
Some(ev) => app.handle_pod_event(ev),
None => break,
}
}
// Always give the terminal queue a non-blocking pass each frame.
// The awaited select below only waits after this pass found nothing.
let mut handled_term_event = false;
while event::poll(std::time::Duration::ZERO)? {
handled_term_event = true;
handle_terminal_event(app, &mut client, event::read()?).await?;
if app.quit {
break;
}
}
let handled_term_event = drain_terminal_events(app, &mut client, &mut term_rx).await?;
if app.quit {
break;
}
if handled_term_event {
let handled_pod_event = drain_pod_events(app, &mut client);
if handled_term_event || handled_pod_event {
terminal.draw(|f| ui::draw(f, app))?;
continue;
}
tokio::select! {
term_event = tokio::task::spawn_blocking(|| {
if event::poll(std::time::Duration::from_millis(50))? {
event::read().map(Some)
} else {
Ok(None)
}
}) => {
if let Some(term_event) = term_event?? {
handle_terminal_event(app, &mut client, term_event).await?;
}
match next_loop_input(&mut term_rx, app.connected, client.next_event()).await {
LoopInput::Terminal(term_event) => {
handle_terminal_event(app, &mut client, term_event?).await?;
}
event = client.next_event(), if app.connected => {
match event {
Some(ev) => app.handle_pod_event(ev),
None => {
app.connected = false;
app.mark_orphan_compacts_incomplete();
app.push_error("Connection lost");
}
LoopInput::Pod(event) => match event {
Some(ev) => app.handle_pod_event(ev),
None => {
app.connected = false;
app.mark_orphan_compacts_incomplete();
app.push_error("Connection lost");
}
}
},
}
terminal.draw(|f| ui::draw(f, app))?;
@ -716,4 +822,78 @@ mod tests {
"--pod and --session are mutually exclusive"
);
}
#[tokio::test]
async fn terminal_event_is_selected_before_ready_pod_event() {
let (tx, mut rx) = mpsc::unbounded_channel();
tx.send(Ok(TermEvent::Key(KeyEvent::new(
KeyCode::Char('x'),
KeyModifiers::NONE,
))))
.unwrap();
match next_loop_input(&mut rx, true, std::future::ready(Some(()))).await {
LoopInput::Terminal(Ok(TermEvent::Key(key))) => {
assert_eq!(key.code, KeyCode::Char('x'));
}
_ => panic!("ready terminal input should win over a ready Pod event"),
}
}
#[tokio::test]
async fn terminal_event_is_preserved_after_pod_event_wins() {
let (tx, mut rx) = mpsc::unbounded_channel();
match next_loop_input(&mut rx, true, std::future::ready(Some(1_u8))).await {
LoopInput::Pod(Some(1)) => {}
_ => panic!("expected the first ready Pod event to win before any terminal input"),
}
tx.send(Ok(TermEvent::Key(KeyEvent::new(
KeyCode::Char('y'),
KeyModifiers::NONE,
))))
.unwrap();
match next_loop_input(&mut rx, true, std::future::ready(Some(2_u8))).await {
LoopInput::Terminal(Ok(TermEvent::Key(key))) => {
assert_eq!(key.code, KeyCode::Char('y'));
}
_ => panic!("queued terminal input should not be lost to subsequent Pod events"),
}
}
#[test]
fn running_status_still_allows_text_editing() {
let mut app = App::new("agent".to_string());
app.set_pod_status(PodStatus::Running);
assert!(
handle_key(
&mut app,
KeyEvent::new(KeyCode::Char('a'), KeyModifiers::NONE)
)
.is_none()
);
assert!(
handle_key(
&mut app,
KeyEvent::new(KeyCode::Char('c'), KeyModifiers::NONE)
)
.is_none()
);
assert!(handle_key(&mut app, KeyEvent::new(KeyCode::Left, KeyModifiers::NONE)).is_none());
assert!(
handle_key(
&mut app,
KeyEvent::new(KeyCode::Char('b'), KeyModifiers::NONE)
)
.is_none()
);
assert_eq!(
protocol::Segment::flatten_to_text(&app.input.submit_segments()),
"abc"
);
}
}