diff --git a/crates/tui/src/multi_pod.rs b/crates/tui/src/multi_pod.rs index af525b67..8da33598 100644 --- a/crates/tui/src/multi_pod.rs +++ b/crates/tui/src/multi_pod.rs @@ -126,6 +126,7 @@ pub(crate) async fn run( } let mut pending_reload = PendingReload::default(); + let mut pending_queue_attention_notice = PendingQueueAttentionNotice::default(); if let Some(mode) = app.enter_reload.take() { if pending_reload.start(mode) { app.refreshing = true; @@ -134,11 +135,13 @@ pub(crate) async fn run( let mut next_poll = Instant::now() + MULTI_POD_POLL_INTERVAL; loop { + if let Some(result) = pending_queue_attention_notice.finish_if_ready().await { + app.finish_orchestrator_queue_attention_notice(result); + } if let Some(result) = pending_reload.finish_if_ready().await { app.apply_reload_result(result); if let Some(request) = app.prepare_orchestrator_queue_attention_notice() { - let result = dispatch_orchestrator_queue_attention_notice(request).await; - app.finish_orchestrator_queue_attention_notice(result); + pending_queue_attention_notice.start(request); } } @@ -159,7 +162,13 @@ pub(crate) async fn run( match read()? { TermEvent::Key(key) => match app.handle_key(key) { MultiPodAction::None => {} - MultiPodAction::Quit => return Ok(MultiPodOutcome::Quit), + MultiPodAction::Quit => { + abort_panel_background_work_for_quit( + &mut pending_reload, + &mut pending_queue_attention_notice, + ); + return Ok(MultiPodOutcome::Quit); + } MultiPodAction::Open => { if let Some(request) = app.prepare_open() { terminal.draw(|f| draw(f, app))?; @@ -168,6 +177,7 @@ pub(crate) async fn run( } MultiPodAction::DispatchTicketAction(request) => { pending_reload.abort(); + pending_queue_attention_notice.abort(); terminal.draw(|f| draw(f, app))?; let result = dispatch_ticket_action(request).await; app.finish_ticket_action_dispatch(result); @@ -178,6 +188,7 @@ pub(crate) async fn run( } MultiPodAction::LaunchIntake(request) => { pending_reload.abort(); + pending_queue_attention_notice.abort(); terminal.draw(|f| draw(f, app))?; let result = launch_intake_with_handoff(request).await; app.finish_intake_launch(result); @@ -188,6 +199,7 @@ pub(crate) async fn run( } MultiPodAction::SendCompanion(request) => { pending_reload.abort(); + pending_queue_attention_notice.abort(); terminal.draw(|f| draw(f, app))?; let result = dispatch_companion_message(request).await; app.finish_companion_send(result); @@ -267,6 +279,75 @@ impl Drop for PendingReload { } } +struct PendingQueueAttentionNotice { + handle: Option>, +} + +impl PendingQueueAttentionNotice { + fn start(&mut self, request: OrchestratorQueueAttentionNoticeRequest) -> bool { + if self.handle.is_some() { + return false; + } + self.handle = Some(tokio::spawn(async move { + dispatch_orchestrator_queue_attention_notice(request).await + })); + true + } + + #[cfg(test)] + fn start_with_handle( + &mut self, + handle: tokio::task::JoinHandle, + ) -> bool { + if self.handle.is_some() { + handle.abort(); + return false; + } + self.handle = Some(handle); + true + } + + async fn finish_if_ready(&mut self) -> Option { + if !self.handle.as_ref()?.is_finished() { + return None; + } + let handle = self.handle.take()?; + Some(match handle.await { + Ok(result) => result, + Err(e) => OrchestratorQueueAttentionNoticeResult::failed( + String::new(), + format!("queue-attention notice task failed: {e}"), + ), + }) + } + + fn abort(&mut self) { + if let Some(handle) = self.handle.take() { + handle.abort(); + } + } +} + +impl Default for PendingQueueAttentionNotice { + fn default() -> Self { + Self { handle: None } + } +} + +impl Drop for PendingQueueAttentionNotice { + fn drop(&mut self) { + self.abort(); + } +} + +fn abort_panel_background_work_for_quit( + pending_reload: &mut PendingReload, + pending_queue_attention_notice: &mut PendingQueueAttentionNotice, +) { + pending_reload.abort(); + pending_queue_attention_notice.abort(); +} + fn default_store_dir() -> Result { manifest::paths::sessions_dir().ok_or_else(|| { MultiPodError::Io(io::Error::new( @@ -4875,6 +4956,10 @@ fn truncate_with_ellipsis(s: &str, max_width: usize) -> String { #[cfg(test)] mod tests { use super::*; + use std::sync::{ + Arc, + atomic::{AtomicBool, Ordering}, + }; #[test] fn orchestration_worktree_layout_is_stable_under_original_workspace_root() { let root = Path::new("/tmp/Yoi Workspace"); @@ -6074,6 +6159,65 @@ mod tests { assert!(app.notice.as_deref().unwrap().contains("Refresh failed")); } + #[tokio::test] + async fn multi_quit_aborts_background_reload_and_notice_without_waiting() { + struct DropFlag(Arc); + impl Drop for DropFlag { + fn drop(&mut self) { + self.0.store(true, Ordering::SeqCst); + } + } + + let reload_cancelled = Arc::new(AtomicBool::new(false)); + let notice_cancelled = Arc::new(AtomicBool::new(false)); + let mut pending_reload = PendingReload::default(); + let mut pending_notice = PendingQueueAttentionNotice::default(); + let (reload_started_tx, reload_started_rx) = tokio::sync::oneshot::channel(); + let (notice_started_tx, notice_started_rx) = tokio::sync::oneshot::channel(); + + let reload_flag = Arc::clone(&reload_cancelled); + assert!(pending_reload.start_with_handle(tokio::spawn(async move { + let _drop_flag = DropFlag(reload_flag); + let _ = reload_started_tx.send(()); + std::future::pending::<()>().await; + Err(MultiPodError::Io(io::Error::other( + "unreachable reload completion", + ))) + }))); + + let notice_flag = Arc::clone(¬ice_cancelled); + assert!(pending_notice.start_with_handle(tokio::spawn(async move { + let _drop_flag = DropFlag(notice_flag); + let _ = notice_started_tx.send(()); + std::future::pending::<()>().await; + OrchestratorQueueAttentionNoticeResult::failed( + "unreachable".to_string(), + "unreachable notice completion", + ) + }))); + reload_started_rx.await.expect("reload task should start"); + notice_started_rx.await.expect("notice task should start"); + + tokio::time::timeout(Duration::from_millis(20), async { + abort_panel_background_work_for_quit(&mut pending_reload, &mut pending_notice); + }) + .await + .expect("quit abort should not wait for background task completion"); + + tokio::time::timeout(Duration::from_millis(100), async { + while !(reload_cancelled.load(Ordering::SeqCst) + && notice_cancelled.load(Ordering::SeqCst)) + { + tokio::task::yield_now().await; + } + }) + .await + .expect("quit abort should cancel reload and notice tasks"); + + assert!(pending_reload.finish_if_ready().await.is_none()); + assert!(pending_notice.finish_if_ready().await.is_none()); + } + #[test] fn multi_idle_live_selected_target_is_open_eligible() { let app = test_app(vec![live_info("idle", PodStatus::Idle)]);