merge: tui-streaming-input-loss
This commit is contained in:
commit
0ff39f33bb
|
|
@ -10,9 +10,16 @@ mod task;
|
||||||
mod tool;
|
mod tool;
|
||||||
mod ui;
|
mod ui;
|
||||||
|
|
||||||
|
use std::future::Future;
|
||||||
use std::io;
|
use std::io;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::process::ExitCode;
|
use std::process::ExitCode;
|
||||||
|
use std::sync::{
|
||||||
|
Arc,
|
||||||
|
atomic::{AtomicBool, Ordering},
|
||||||
|
};
|
||||||
|
use std::thread;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
use crossterm::event::{
|
use crossterm::event::{
|
||||||
self, DisableBracketedPaste, DisableMouseCapture, EnableBracketedPaste, EnableMouseCapture,
|
self, DisableBracketedPaste, DisableMouseCapture, EnableBracketedPaste, EnableMouseCapture,
|
||||||
|
|
@ -26,6 +33,7 @@ use protocol::{Method, PodStatus};
|
||||||
use ratatui::Terminal;
|
use ratatui::Terminal;
|
||||||
use ratatui::backend::CrosstermBackend;
|
use ratatui::backend::CrosstermBackend;
|
||||||
use session_store::SegmentId;
|
use session_store::SegmentId;
|
||||||
|
use tokio::sync::mpsc;
|
||||||
|
|
||||||
use client::PodClient;
|
use client::PodClient;
|
||||||
|
|
||||||
|
|
@ -350,11 +358,137 @@ async fn run(
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type TerminalEventResult = io::Result<TermEvent>;
|
||||||
|
|
||||||
|
const TERMINAL_POLL_INTERVAL: Duration = Duration::from_millis(50);
|
||||||
|
const TERMINAL_EVENT_DRAIN_LIMIT: usize = 64;
|
||||||
|
const POD_EVENT_DRAIN_LIMIT: usize = 32;
|
||||||
|
|
||||||
|
struct TerminalEventReader {
|
||||||
|
stop: Arc<AtomicBool>,
|
||||||
|
_thread: thread::JoinHandle<()>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TerminalEventReader {
|
||||||
|
fn spawn() -> io::Result<(Self, mpsc::UnboundedReceiver<TerminalEventResult>)> {
|
||||||
|
let (tx, rx) = mpsc::unbounded_channel();
|
||||||
|
let stop = Arc::new(AtomicBool::new(false));
|
||||||
|
let thread_stop = Arc::clone(&stop);
|
||||||
|
let thread = thread::Builder::new()
|
||||||
|
.name("insomnia-tui-terminal-reader".to_string())
|
||||||
|
.spawn(move || read_terminal_events(thread_stop, tx))?;
|
||||||
|
|
||||||
|
Ok((
|
||||||
|
Self {
|
||||||
|
stop,
|
||||||
|
_thread: thread,
|
||||||
|
},
|
||||||
|
rx,
|
||||||
|
))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for TerminalEventReader {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
self.stop.store(true, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn read_terminal_events(stop: Arc<AtomicBool>, tx: mpsc::UnboundedSender<TerminalEventResult>) {
|
||||||
|
while !stop.load(Ordering::Relaxed) {
|
||||||
|
match event::poll(TERMINAL_POLL_INTERVAL) {
|
||||||
|
Ok(false) => {}
|
||||||
|
Ok(true) => {
|
||||||
|
let event = event::read();
|
||||||
|
let should_stop = event.is_err();
|
||||||
|
if tx.send(event).is_err() || should_stop {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
let _ = tx.send(Err(e));
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
enum LoopInput<P> {
|
||||||
|
Terminal(TerminalEventResult),
|
||||||
|
Pod(Option<P>),
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn next_loop_input<P, F>(
|
||||||
|
term_rx: &mut mpsc::UnboundedReceiver<TerminalEventResult>,
|
||||||
|
connected: bool,
|
||||||
|
pod_next: F,
|
||||||
|
) -> LoopInput<P>
|
||||||
|
where
|
||||||
|
F: Future<Output = Option<P>>,
|
||||||
|
{
|
||||||
|
tokio::select! {
|
||||||
|
biased;
|
||||||
|
|
||||||
|
term_event = term_rx.recv() => {
|
||||||
|
LoopInput::Terminal(term_event.unwrap_or_else(|| {
|
||||||
|
Err(io::Error::new(
|
||||||
|
io::ErrorKind::UnexpectedEof,
|
||||||
|
"terminal event reader stopped",
|
||||||
|
))
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
event = pod_next, if connected => LoopInput::Pod(event),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn drain_terminal_events(
|
||||||
|
app: &mut App,
|
||||||
|
client: &mut PodClient,
|
||||||
|
term_rx: &mut mpsc::UnboundedReceiver<TerminalEventResult>,
|
||||||
|
) -> Result<bool, Box<dyn std::error::Error>> {
|
||||||
|
let mut handled = false;
|
||||||
|
for _ in 0..TERMINAL_EVENT_DRAIN_LIMIT {
|
||||||
|
match term_rx.try_recv() {
|
||||||
|
Ok(event) => {
|
||||||
|
handled = true;
|
||||||
|
handle_terminal_event(app, client, event?).await?;
|
||||||
|
if app.quit {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(mpsc::error::TryRecvError::Empty) => break,
|
||||||
|
Err(mpsc::error::TryRecvError::Disconnected) => {
|
||||||
|
return Err(Box::new(io::Error::new(
|
||||||
|
io::ErrorKind::UnexpectedEof,
|
||||||
|
"terminal event reader stopped",
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(handled)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn drain_pod_events(app: &mut App, client: &mut PodClient) -> bool {
|
||||||
|
let mut handled = false;
|
||||||
|
for _ in 0..POD_EVENT_DRAIN_LIMIT {
|
||||||
|
match client.try_next_event() {
|
||||||
|
Some(ev) => {
|
||||||
|
handled = true;
|
||||||
|
app.handle_pod_event(ev);
|
||||||
|
}
|
||||||
|
None => break,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
handled
|
||||||
|
}
|
||||||
|
|
||||||
async fn run_loop(
|
async fn run_loop(
|
||||||
terminal: &mut Terminal<CrosstermBackend<io::Stdout>>,
|
terminal: &mut Terminal<CrosstermBackend<io::Stdout>>,
|
||||||
app: &mut App,
|
app: &mut App,
|
||||||
mut client: PodClient,
|
mut client: PodClient,
|
||||||
) -> Result<(), Box<dyn std::error::Error>> {
|
) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
let (_terminal_reader, mut term_rx) = TerminalEventReader::spawn()?;
|
||||||
|
|
||||||
terminal.draw(|f| ui::draw(f, app))?;
|
terminal.draw(|f| ui::draw(f, app))?;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
|
|
@ -362,56 +496,28 @@ async fn run_loop(
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Drain any already-buffered Pod events in a bounded batch before
|
let handled_term_event = drain_terminal_events(app, &mut client, &mut term_rx).await?;
|
||||||
// polling the terminal. This keeps status fresh without letting a
|
|
||||||
// busy event stream starve Ctrl-C / Ctrl-X input.
|
|
||||||
for _ in 0..32 {
|
|
||||||
match client.try_next_event() {
|
|
||||||
Some(ev) => app.handle_pod_event(ev),
|
|
||||||
None => break,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Always give the terminal queue a non-blocking pass each frame.
|
|
||||||
// The awaited select below only waits after this pass found nothing.
|
|
||||||
let mut handled_term_event = false;
|
|
||||||
while event::poll(std::time::Duration::ZERO)? {
|
|
||||||
handled_term_event = true;
|
|
||||||
handle_terminal_event(app, &mut client, event::read()?).await?;
|
|
||||||
if app.quit {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if app.quit {
|
if app.quit {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if handled_term_event {
|
let handled_pod_event = drain_pod_events(app, &mut client);
|
||||||
|
if handled_term_event || handled_pod_event {
|
||||||
terminal.draw(|f| ui::draw(f, app))?;
|
terminal.draw(|f| ui::draw(f, app))?;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
tokio::select! {
|
match next_loop_input(&mut term_rx, app.connected, client.next_event()).await {
|
||||||
term_event = tokio::task::spawn_blocking(|| {
|
LoopInput::Terminal(term_event) => {
|
||||||
if event::poll(std::time::Duration::from_millis(50))? {
|
handle_terminal_event(app, &mut client, term_event?).await?;
|
||||||
event::read().map(Some)
|
|
||||||
} else {
|
|
||||||
Ok(None)
|
|
||||||
}
|
|
||||||
}) => {
|
|
||||||
if let Some(term_event) = term_event?? {
|
|
||||||
handle_terminal_event(app, &mut client, term_event).await?;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
event = client.next_event(), if app.connected => {
|
LoopInput::Pod(event) => match event {
|
||||||
match event {
|
Some(ev) => app.handle_pod_event(ev),
|
||||||
Some(ev) => app.handle_pod_event(ev),
|
None => {
|
||||||
None => {
|
app.connected = false;
|
||||||
app.connected = false;
|
app.mark_orphan_compacts_incomplete();
|
||||||
app.mark_orphan_compacts_incomplete();
|
app.push_error("Connection lost");
|
||||||
app.push_error("Connection lost");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
terminal.draw(|f| ui::draw(f, app))?;
|
terminal.draw(|f| ui::draw(f, app))?;
|
||||||
|
|
@ -716,4 +822,78 @@ mod tests {
|
||||||
"--pod and --session are mutually exclusive"
|
"--pod and --session are mutually exclusive"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn terminal_event_is_selected_before_ready_pod_event() {
|
||||||
|
let (tx, mut rx) = mpsc::unbounded_channel();
|
||||||
|
tx.send(Ok(TermEvent::Key(KeyEvent::new(
|
||||||
|
KeyCode::Char('x'),
|
||||||
|
KeyModifiers::NONE,
|
||||||
|
))))
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
match next_loop_input(&mut rx, true, std::future::ready(Some(()))).await {
|
||||||
|
LoopInput::Terminal(Ok(TermEvent::Key(key))) => {
|
||||||
|
assert_eq!(key.code, KeyCode::Char('x'));
|
||||||
|
}
|
||||||
|
_ => panic!("ready terminal input should win over a ready Pod event"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn terminal_event_is_preserved_after_pod_event_wins() {
|
||||||
|
let (tx, mut rx) = mpsc::unbounded_channel();
|
||||||
|
|
||||||
|
match next_loop_input(&mut rx, true, std::future::ready(Some(1_u8))).await {
|
||||||
|
LoopInput::Pod(Some(1)) => {}
|
||||||
|
_ => panic!("expected the first ready Pod event to win before any terminal input"),
|
||||||
|
}
|
||||||
|
|
||||||
|
tx.send(Ok(TermEvent::Key(KeyEvent::new(
|
||||||
|
KeyCode::Char('y'),
|
||||||
|
KeyModifiers::NONE,
|
||||||
|
))))
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
match next_loop_input(&mut rx, true, std::future::ready(Some(2_u8))).await {
|
||||||
|
LoopInput::Terminal(Ok(TermEvent::Key(key))) => {
|
||||||
|
assert_eq!(key.code, KeyCode::Char('y'));
|
||||||
|
}
|
||||||
|
_ => panic!("queued terminal input should not be lost to subsequent Pod events"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn running_status_still_allows_text_editing() {
|
||||||
|
let mut app = App::new("agent".to_string());
|
||||||
|
app.set_pod_status(PodStatus::Running);
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
handle_key(
|
||||||
|
&mut app,
|
||||||
|
KeyEvent::new(KeyCode::Char('a'), KeyModifiers::NONE)
|
||||||
|
)
|
||||||
|
.is_none()
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
handle_key(
|
||||||
|
&mut app,
|
||||||
|
KeyEvent::new(KeyCode::Char('c'), KeyModifiers::NONE)
|
||||||
|
)
|
||||||
|
.is_none()
|
||||||
|
);
|
||||||
|
assert!(handle_key(&mut app, KeyEvent::new(KeyCode::Left, KeyModifiers::NONE)).is_none());
|
||||||
|
assert!(
|
||||||
|
handle_key(
|
||||||
|
&mut app,
|
||||||
|
KeyEvent::new(KeyCode::Char('b'), KeyModifiers::NONE)
|
||||||
|
)
|
||||||
|
.is_none()
|
||||||
|
);
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
protocol::Segment::flatten_to_text(&app.input.submit_segments()),
|
||||||
|
"abc"
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user