fix: preserve tui input during streaming
This commit is contained in:
parent
7315114b20
commit
e29861f787
|
|
@ -10,9 +10,16 @@ mod task;
|
|||
mod tool;
|
||||
mod ui;
|
||||
|
||||
use std::future::Future;
|
||||
use std::io;
|
||||
use std::path::PathBuf;
|
||||
use std::process::ExitCode;
|
||||
use std::sync::{
|
||||
Arc,
|
||||
atomic::{AtomicBool, Ordering},
|
||||
};
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
|
||||
use crossterm::event::{
|
||||
self, DisableBracketedPaste, DisableMouseCapture, EnableBracketedPaste, EnableMouseCapture,
|
||||
|
|
@ -26,6 +33,7 @@ use protocol::{Method, PodStatus};
|
|||
use ratatui::Terminal;
|
||||
use ratatui::backend::CrosstermBackend;
|
||||
use session_store::SegmentId;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
use client::PodClient;
|
||||
|
||||
|
|
@ -350,11 +358,137 @@ async fn run(
|
|||
Ok(())
|
||||
}
|
||||
|
||||
type TerminalEventResult = io::Result<TermEvent>;
|
||||
|
||||
const TERMINAL_POLL_INTERVAL: Duration = Duration::from_millis(50);
|
||||
const TERMINAL_EVENT_DRAIN_LIMIT: usize = 64;
|
||||
const POD_EVENT_DRAIN_LIMIT: usize = 32;
|
||||
|
||||
struct TerminalEventReader {
|
||||
stop: Arc<AtomicBool>,
|
||||
_thread: thread::JoinHandle<()>,
|
||||
}
|
||||
|
||||
impl TerminalEventReader {
|
||||
fn spawn() -> io::Result<(Self, mpsc::UnboundedReceiver<TerminalEventResult>)> {
|
||||
let (tx, rx) = mpsc::unbounded_channel();
|
||||
let stop = Arc::new(AtomicBool::new(false));
|
||||
let thread_stop = Arc::clone(&stop);
|
||||
let thread = thread::Builder::new()
|
||||
.name("insomnia-tui-terminal-reader".to_string())
|
||||
.spawn(move || read_terminal_events(thread_stop, tx))?;
|
||||
|
||||
Ok((
|
||||
Self {
|
||||
stop,
|
||||
_thread: thread,
|
||||
},
|
||||
rx,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for TerminalEventReader {
|
||||
fn drop(&mut self) {
|
||||
self.stop.store(true, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
fn read_terminal_events(stop: Arc<AtomicBool>, tx: mpsc::UnboundedSender<TerminalEventResult>) {
|
||||
while !stop.load(Ordering::Relaxed) {
|
||||
match event::poll(TERMINAL_POLL_INTERVAL) {
|
||||
Ok(false) => {}
|
||||
Ok(true) => {
|
||||
let event = event::read();
|
||||
let should_stop = event.is_err();
|
||||
if tx.send(event).is_err() || should_stop {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
let _ = tx.send(Err(e));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
enum LoopInput<P> {
|
||||
Terminal(TerminalEventResult),
|
||||
Pod(Option<P>),
|
||||
}
|
||||
|
||||
async fn next_loop_input<P, F>(
|
||||
term_rx: &mut mpsc::UnboundedReceiver<TerminalEventResult>,
|
||||
connected: bool,
|
||||
pod_next: F,
|
||||
) -> LoopInput<P>
|
||||
where
|
||||
F: Future<Output = Option<P>>,
|
||||
{
|
||||
tokio::select! {
|
||||
biased;
|
||||
|
||||
term_event = term_rx.recv() => {
|
||||
LoopInput::Terminal(term_event.unwrap_or_else(|| {
|
||||
Err(io::Error::new(
|
||||
io::ErrorKind::UnexpectedEof,
|
||||
"terminal event reader stopped",
|
||||
))
|
||||
}))
|
||||
}
|
||||
event = pod_next, if connected => LoopInput::Pod(event),
|
||||
}
|
||||
}
|
||||
|
||||
async fn drain_terminal_events(
|
||||
app: &mut App,
|
||||
client: &mut PodClient,
|
||||
term_rx: &mut mpsc::UnboundedReceiver<TerminalEventResult>,
|
||||
) -> Result<bool, Box<dyn std::error::Error>> {
|
||||
let mut handled = false;
|
||||
for _ in 0..TERMINAL_EVENT_DRAIN_LIMIT {
|
||||
match term_rx.try_recv() {
|
||||
Ok(event) => {
|
||||
handled = true;
|
||||
handle_terminal_event(app, client, event?).await?;
|
||||
if app.quit {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(mpsc::error::TryRecvError::Empty) => break,
|
||||
Err(mpsc::error::TryRecvError::Disconnected) => {
|
||||
return Err(Box::new(io::Error::new(
|
||||
io::ErrorKind::UnexpectedEof,
|
||||
"terminal event reader stopped",
|
||||
)));
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(handled)
|
||||
}
|
||||
|
||||
fn drain_pod_events(app: &mut App, client: &mut PodClient) -> bool {
|
||||
let mut handled = false;
|
||||
for _ in 0..POD_EVENT_DRAIN_LIMIT {
|
||||
match client.try_next_event() {
|
||||
Some(ev) => {
|
||||
handled = true;
|
||||
app.handle_pod_event(ev);
|
||||
}
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
handled
|
||||
}
|
||||
|
||||
async fn run_loop(
|
||||
terminal: &mut Terminal<CrosstermBackend<io::Stdout>>,
|
||||
app: &mut App,
|
||||
mut client: PodClient,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let (_terminal_reader, mut term_rx) = TerminalEventReader::spawn()?;
|
||||
|
||||
terminal.draw(|f| ui::draw(f, app))?;
|
||||
|
||||
loop {
|
||||
|
|
@ -362,56 +496,28 @@ async fn run_loop(
|
|||
break;
|
||||
}
|
||||
|
||||
// Drain any already-buffered Pod events in a bounded batch before
|
||||
// polling the terminal. This keeps status fresh without letting a
|
||||
// busy event stream starve Ctrl-C / Ctrl-X input.
|
||||
for _ in 0..32 {
|
||||
match client.try_next_event() {
|
||||
Some(ev) => app.handle_pod_event(ev),
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
|
||||
// Always give the terminal queue a non-blocking pass each frame.
|
||||
// The awaited select below only waits after this pass found nothing.
|
||||
let mut handled_term_event = false;
|
||||
while event::poll(std::time::Duration::ZERO)? {
|
||||
handled_term_event = true;
|
||||
handle_terminal_event(app, &mut client, event::read()?).await?;
|
||||
if app.quit {
|
||||
break;
|
||||
}
|
||||
}
|
||||
let handled_term_event = drain_terminal_events(app, &mut client, &mut term_rx).await?;
|
||||
if app.quit {
|
||||
break;
|
||||
}
|
||||
if handled_term_event {
|
||||
let handled_pod_event = drain_pod_events(app, &mut client);
|
||||
if handled_term_event || handled_pod_event {
|
||||
terminal.draw(|f| ui::draw(f, app))?;
|
||||
continue;
|
||||
}
|
||||
|
||||
tokio::select! {
|
||||
term_event = tokio::task::spawn_blocking(|| {
|
||||
if event::poll(std::time::Duration::from_millis(50))? {
|
||||
event::read().map(Some)
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}) => {
|
||||
if let Some(term_event) = term_event?? {
|
||||
handle_terminal_event(app, &mut client, term_event).await?;
|
||||
}
|
||||
match next_loop_input(&mut term_rx, app.connected, client.next_event()).await {
|
||||
LoopInput::Terminal(term_event) => {
|
||||
handle_terminal_event(app, &mut client, term_event?).await?;
|
||||
}
|
||||
event = client.next_event(), if app.connected => {
|
||||
match event {
|
||||
Some(ev) => app.handle_pod_event(ev),
|
||||
None => {
|
||||
app.connected = false;
|
||||
app.mark_orphan_compacts_incomplete();
|
||||
app.push_error("Connection lost");
|
||||
}
|
||||
LoopInput::Pod(event) => match event {
|
||||
Some(ev) => app.handle_pod_event(ev),
|
||||
None => {
|
||||
app.connected = false;
|
||||
app.mark_orphan_compacts_incomplete();
|
||||
app.push_error("Connection lost");
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
terminal.draw(|f| ui::draw(f, app))?;
|
||||
|
|
@ -716,4 +822,78 @@ mod tests {
|
|||
"--pod and --session are mutually exclusive"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn terminal_event_is_selected_before_ready_pod_event() {
|
||||
let (tx, mut rx) = mpsc::unbounded_channel();
|
||||
tx.send(Ok(TermEvent::Key(KeyEvent::new(
|
||||
KeyCode::Char('x'),
|
||||
KeyModifiers::NONE,
|
||||
))))
|
||||
.unwrap();
|
||||
|
||||
match next_loop_input(&mut rx, true, std::future::ready(Some(()))).await {
|
||||
LoopInput::Terminal(Ok(TermEvent::Key(key))) => {
|
||||
assert_eq!(key.code, KeyCode::Char('x'));
|
||||
}
|
||||
_ => panic!("ready terminal input should win over a ready Pod event"),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn terminal_event_is_preserved_after_pod_event_wins() {
|
||||
let (tx, mut rx) = mpsc::unbounded_channel();
|
||||
|
||||
match next_loop_input(&mut rx, true, std::future::ready(Some(1_u8))).await {
|
||||
LoopInput::Pod(Some(1)) => {}
|
||||
_ => panic!("expected the first ready Pod event to win before any terminal input"),
|
||||
}
|
||||
|
||||
tx.send(Ok(TermEvent::Key(KeyEvent::new(
|
||||
KeyCode::Char('y'),
|
||||
KeyModifiers::NONE,
|
||||
))))
|
||||
.unwrap();
|
||||
|
||||
match next_loop_input(&mut rx, true, std::future::ready(Some(2_u8))).await {
|
||||
LoopInput::Terminal(Ok(TermEvent::Key(key))) => {
|
||||
assert_eq!(key.code, KeyCode::Char('y'));
|
||||
}
|
||||
_ => panic!("queued terminal input should not be lost to subsequent Pod events"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn running_status_still_allows_text_editing() {
|
||||
let mut app = App::new("agent".to_string());
|
||||
app.set_pod_status(PodStatus::Running);
|
||||
|
||||
assert!(
|
||||
handle_key(
|
||||
&mut app,
|
||||
KeyEvent::new(KeyCode::Char('a'), KeyModifiers::NONE)
|
||||
)
|
||||
.is_none()
|
||||
);
|
||||
assert!(
|
||||
handle_key(
|
||||
&mut app,
|
||||
KeyEvent::new(KeyCode::Char('c'), KeyModifiers::NONE)
|
||||
)
|
||||
.is_none()
|
||||
);
|
||||
assert!(handle_key(&mut app, KeyEvent::new(KeyCode::Left, KeyModifiers::NONE)).is_none());
|
||||
assert!(
|
||||
handle_key(
|
||||
&mut app,
|
||||
KeyEvent::new(KeyCode::Char('b'), KeyModifiers::NONE)
|
||||
)
|
||||
.is_none()
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
protocol::Segment::flatten_to_text(&app.input.submit_segments()),
|
||||
"abc"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user