merge: multi-pod polling

This commit is contained in:
Keisuke Hirata 2026-05-29 18:17:17 +09:00
commit dc986dc8e5
No known key found for this signature in database
2 changed files with 215 additions and 8 deletions

View File

@ -439,7 +439,7 @@ async fn run_multi() -> Result<(), Box<dyn std::error::Error>> {
return Err(error);
}
}
app.reload().await?;
app.reload_or_notice().await;
}
}
}

View File

@ -1,8 +1,8 @@
use std::io;
use std::path::{Path, PathBuf};
use std::time::Duration;
use std::time::{Duration, Instant};
use crossterm::event::{Event as TermEvent, KeyCode, KeyEvent, KeyModifiers, read};
use crossterm::event::{Event as TermEvent, KeyCode, KeyEvent, KeyModifiers, poll, read};
use protocol::stream::{JsonLineReader, JsonLineWriter};
use protocol::{ErrorCode, Event, InvokeKind, Method, PodStatus, Segment};
use ratatui::Frame;
@ -25,6 +25,8 @@ use crate::pod_list::{
const MAX_ENTRIES: usize = 50;
const CLOSED_VISIBLE_ROWS: usize = 3;
const SOCKET_OP_TIMEOUT: Duration = Duration::from_secs(3);
const MULTI_POD_POLL_INTERVAL: Duration = Duration::from_millis(1_500);
const TERMINAL_EVENT_POLL_INTERVAL: Duration = Duration::from_millis(100);
#[derive(Debug)]
pub(crate) enum MultiPodError {
@ -83,8 +85,28 @@ pub(crate) async fn run(
return Err(MultiPodError::NoPods);
}
let mut pending_reload = PendingReload::default();
let mut next_poll = Instant::now() + MULTI_POD_POLL_INTERVAL;
loop {
if let Some(result) = pending_reload.finish_if_ready().await {
app.apply_reload_result(result);
}
terminal.draw(|f| draw(f, app))?;
let now = Instant::now();
if now >= next_poll {
pending_reload.start();
next_poll = now + MULTI_POD_POLL_INTERVAL;
continue;
}
let event_wait = TERMINAL_EVENT_POLL_INTERVAL.min(next_poll.saturating_duration_since(now));
if !poll(event_wait)? {
continue;
}
match read()? {
TermEvent::Key(key) => match app.handle_key(key) {
MultiPodAction::None => {}
@ -94,12 +116,18 @@ pub(crate) async fn run(
return Ok(MultiPodOutcome::Open(request));
}
}
MultiPodAction::Refresh => app.reload().await?,
MultiPodAction::Refresh => {
if !pending_reload.start() {
app.notice = Some("Refresh already in progress.".to_string());
}
}
MultiPodAction::Send(request) => {
pending_reload.abort();
terminal.draw(|f| draw(f, app))?;
let result = send_run_and_confirm(&request.socket_path, request.segments).await;
app.finish_send(result);
let _ = app.reload().await;
app.reload_or_notice().await;
next_poll = Instant::now() + MULTI_POD_POLL_INTERVAL;
}
},
TermEvent::Paste(text) => app.input.insert_paste(text),
@ -109,6 +137,64 @@ pub(crate) async fn run(
}
}
struct PendingReload {
handle: Option<tokio::task::JoinHandle<Result<PodList, MultiPodError>>>,
}
impl PendingReload {
fn start(&mut self) -> bool {
if self.handle.is_some() {
return false;
}
self.handle = Some(tokio::spawn(async { load_pod_list(None).await }));
true
}
#[cfg(test)]
fn start_with_handle(
&mut self,
handle: tokio::task::JoinHandle<Result<PodList, MultiPodError>>,
) -> bool {
if self.handle.is_some() {
handle.abort();
return false;
}
self.handle = Some(handle);
true
}
async fn finish_if_ready(&mut self) -> Option<Result<PodList, MultiPodError>> {
if !self.handle.as_ref()?.is_finished() {
return None;
}
let handle = self.handle.take()?;
Some(match handle.await {
Ok(result) => result,
Err(e) => Err(MultiPodError::Io(io::Error::other(format!(
"reload task failed: {e}"
)))),
})
}
fn abort(&mut self) {
if let Some(handle) = self.handle.take() {
handle.abort();
}
}
}
impl Default for PendingReload {
fn default() -> Self {
Self { handle: None }
}
}
impl Drop for PendingReload {
fn drop(&mut self) {
self.abort();
}
}
fn default_store_dir() -> Result<PathBuf, MultiPodError> {
manifest::paths::sessions_dir().ok_or_else(|| {
MultiPodError::Io(io::Error::new(
@ -151,10 +237,29 @@ impl MultiPodApp {
Ok(app)
}
pub(crate) async fn reload(&mut self) -> Result<(), MultiPodError> {
self.list = load_pod_list(self.list.selected_name.clone()).await?;
pub(crate) async fn reload_or_notice(&mut self) {
let result = load_pod_list(None).await;
self.apply_reload_result(result);
}
fn apply_reload_result(&mut self, result: Result<PodList, MultiPodError>) {
match result {
Ok(list) => self.apply_reloaded_list(list),
Err(error) => {
self.notice = Some(format!("Refresh failed: {error}"));
}
}
}
fn apply_reloaded_list(&mut self, mut list: PodList) {
list.selected_name = self
.list
.selected_name
.clone()
.filter(|name| list.entries.iter().any(|entry| entry.name == *name))
.or_else(|| list.entries.first().map(|entry| entry.name.clone()));
self.list = list;
self.ensure_selection_visible();
Ok(())
}
#[cfg(test)]
@ -967,6 +1072,108 @@ mod tests {
assert_eq!(app.list.selected_entry().unwrap().name, "beta");
}
#[test]
fn multi_poll_reload_preserves_selection_composer_and_notice() {
let mut app = test_app(vec![
live_info_with_updated_at("alpha", PodStatus::Idle, 10),
live_info_with_updated_at("beta", PodStatus::Idle, 20),
]);
app.select_next();
assert_eq!(app.list.selected_entry().unwrap().name, "alpha");
app.input.insert_str("draft survives polling");
app.notice = Some("keep this notice".to_string());
let refreshed = PodList::from_sources(
PodVisibilitySource::ResumePicker,
vec![],
vec![
live_info_with_updated_at("gamma", PodStatus::Idle, 60),
live_info_with_updated_at("alpha", PodStatus::Running, 50),
live_info_with_updated_at("beta", PodStatus::Idle, 40),
],
None,
10,
);
app.apply_reloaded_list(refreshed);
assert_eq!(app.list.selected_entry().unwrap().name, "alpha");
assert_eq!(
app.list
.selected_entry()
.unwrap()
.live
.as_ref()
.unwrap()
.status,
Some(PodStatus::Running)
);
assert_eq!(input_text(&app), "draft survives polling");
assert_eq!(app.notice.as_deref(), Some("keep this notice"));
}
#[test]
fn multi_poll_reload_falls_back_when_selected_pod_disappears() {
let mut app = test_app(vec![
live_info_with_updated_at("alpha", PodStatus::Idle, 10),
live_info_with_updated_at("beta", PodStatus::Running, 20),
]);
assert_eq!(app.list.selected_entry().unwrap().name, "beta");
let refreshed = PodList::from_sources(
PodVisibilitySource::ResumePicker,
vec![stopped_info_with_updated_at("closed", 30)],
vec![live_info_with_updated_at("alpha", PodStatus::Idle, 40)],
None,
10,
);
app.apply_reloaded_list(refreshed);
assert_eq!(app.list.selected_entry().unwrap().name, "alpha");
assert_eq!(visible_entry_indices(&app.list), vec![0, 1]);
}
#[test]
fn multi_poll_reload_error_keeps_previous_list_and_composer() {
let mut app = test_app(vec![live_info("alpha", PodStatus::Idle)]);
app.input.insert_str("keep draft");
app.apply_reload_result(Err(MultiPodError::Io(io::Error::other("boom"))));
assert_eq!(app.list.selected_entry().unwrap().name, "alpha");
assert_eq!(input_text(&app), "keep draft");
let notice = app.notice.as_deref().unwrap();
assert!(notice.contains("Refresh failed"));
assert!(notice.contains("boom"));
}
#[tokio::test]
async fn multi_poll_reload_does_not_overlap_in_flight_reload() {
let mut app = test_app(vec![live_info("alpha", PodStatus::Idle)]);
let mut pending = PendingReload::default();
assert!(pending.start_with_handle(tokio::spawn(async {
tokio::time::sleep(Duration::from_millis(10)).await;
Err(MultiPodError::Io(io::Error::other("boom")))
})));
assert!(!pending.start_with_handle(tokio::spawn(async {
Ok(PodList::from_sources(
PodVisibilitySource::ResumePicker,
vec![],
vec![live_info("beta", PodStatus::Idle)],
None,
10,
))
})));
assert!(pending.finish_if_ready().await.is_none());
tokio::time::sleep(Duration::from_millis(20)).await;
let result = pending.finish_if_ready().await.unwrap();
app.apply_reload_result(result);
assert_eq!(app.list.selected_entry().unwrap().name, "alpha");
assert!(app.notice.as_deref().unwrap().contains("Refresh failed"));
}
#[test]
fn multi_idle_live_selected_target_is_send_eligible() {
let app = test_app(vec![live_info("idle", PodStatus::Idle)]);