diff --git a/crates/tui/src/multi_pod.rs b/crates/tui/src/multi_pod.rs index ac0dfccc..e9dc7f91 100644 --- a/crates/tui/src/multi_pod.rs +++ b/crates/tui/src/multi_pod.rs @@ -656,7 +656,7 @@ fn row_status_label(entry: &PodListEntry) -> (&'static str, Style) { .fg(Color::Cyan) .add_modifier(Modifier::BOLD), ), - None => ("live unknown", Style::default().fg(Color::DarkGray)), + None => ("live", Style::default().fg(Color::DarkGray)), }; } if entry @@ -1194,6 +1194,31 @@ mod tests { assert!(app.selected_send_disabled_reason().is_none()); } + #[test] + fn multi_status_label_for_live_without_reported_status_is_softened() { + let mut live = live_info("probing", PodStatus::Idle); + live.status = None; + let app = test_app(vec![live]); + + let (label, _) = row_status_label(app.list.selected_entry().unwrap()); + + assert_eq!(label, "live"); + } + + #[test] + fn multi_status_labels_preserve_explicit_live_statuses() { + for (status, expected_label) in [ + (PodStatus::Idle, "live idle"), + (PodStatus::Running, "live running"), + (PodStatus::Paused, "live paused"), + ] { + let app = test_app(vec![live_info("pod", status)]); + let (label, _) = row_status_label(app.list.selected_entry().unwrap()); + + assert_eq!(label, expected_label); + } + } + #[test] fn multi_running_paused_and_stopped_targets_are_direct_send_disabled() { let mut app = test_app(vec![ diff --git a/crates/tui/src/pod_list.rs b/crates/tui/src/pod_list.rs index f8a0e850..a4842571 100644 --- a/crates/tui/src/pod_list.rs +++ b/crates/tui/src/pod_list.rs @@ -291,19 +291,39 @@ pub(crate) async fn read_reachable_live_pod_infos( store: &FsStore, ) -> Result, io::Error> { let records = read_live_pod_infos()?; - let mut reachable = Vec::new(); - for mut record in records { - let Ok(status) = probe_live_status(&record.socket_path).await else { + probe_reachable_live_pod_infos(store, records).await +} + +async fn probe_reachable_live_pod_infos( + store: &FsStore, + records: Vec, +) -> Result, io::Error> { + let mut handles = Vec::with_capacity(records.len()); + for record in records { + handles.push(tokio::spawn(probe_live_pod_info(record))); + } + + let mut reachable = Vec::with_capacity(handles.len()); + for handle in handles { + let result = handle + .await + .map_err(|e| io::Error::other(format!("live status probe task failed: {e}")))?; + let Ok(mut record) = result else { continue; }; - record.reachable = true; - record.status = status; record.summary = summarize_live_pod(store, &record); reachable.push(record); } Ok(reachable) } +async fn probe_live_pod_info(mut record: LivePodInfo) -> Result { + let status = probe_live_status(&record.socket_path).await?; + record.reachable = true; + record.status = status; + Ok(record) +} + pub(crate) fn live_socket_for_pod(pod_name: &str) -> Option { read_live_pod_infos() .ok()? @@ -343,7 +363,7 @@ fn corrupt_stored_info(pod_name: String, message: String) -> StoredPodInfo { } } -const LIVE_STATUS_PROBE_TIMEOUT: Duration = Duration::from_millis(25); +const LIVE_STATUS_PROBE_TIMEOUT: Duration = Duration::from_millis(200); async fn probe_live_status(socket_path: &Path) -> Result, io::Error> { let mut client = PodClient::connect(socket_path).await?; @@ -561,11 +581,16 @@ fn trim_one_line(s: &str, max_chars: usize) -> String { #[cfg(test)] mod tests { use super::*; + use std::sync::Arc; + use llm_worker::llm_client::types::RequestConfig; use pod_store::FsPodStore; use pod_store::{PodActiveSegmentRef, PodMetadataStore}; + use protocol::stream::JsonLineWriter; use session_store::{new_segment_id, new_session_id}; use tempfile::tempdir; + use tokio::net::UnixListener; + use tokio::sync::Barrier; const SOURCE: PodVisibilitySource = PodVisibilitySource::ResumePicker; @@ -752,6 +777,30 @@ mod tests { ); } + #[test] + fn live_reachable_row_without_reported_status_can_open_but_not_send_now() { + let mut live = live_info("live", PodStatus::Idle); + live.status = None; + live.reachable = true; + + let entry = single_entry(PodList::from_sources(SOURCE, vec![], vec![live], None, 10)); + + assert!(entry.actions.can_open); + assert!(!entry.actions.can_restore); + assert!(!entry.actions.can_send_now); + assert!(!entry.actions.can_queue_send); + assert_eq!( + entry.attach_socket_path(), + Some(Path::new("/tmp/live.sock")) + ); + assert!( + !entry + .diagnostics + .iter() + .any(|diagnostic| diagnostic.kind == PodEntryDiagnosticKind::LiveUnreachable) + ); + } + #[test] fn live_running_reachable_row_can_open_but_not_send_now() { let entry = single_entry(PodList::from_sources( @@ -811,6 +860,82 @@ mod tests { assert_eq!(status, Some(PodStatus::Idle)); } + #[tokio::test] + async fn live_status_probes_run_concurrently() { + let store_dir = tempdir().unwrap(); + let store = FsStore::new(store_dir.path()).unwrap(); + let socket_dir = tempdir().unwrap(); + let probe_count = 3; + let barrier = Arc::new(Barrier::new(probe_count)); + let mut records = Vec::new(); + let mut servers = Vec::new(); + + for index in 0..probe_count { + let pod_name = format!("pod-{index}"); + let socket_path = socket_dir.path().join(format!("{pod_name}.sock")); + let listener = UnixListener::bind(&socket_path).unwrap(); + let barrier = Arc::clone(&barrier); + servers.push(tokio::spawn(async move { + let (stream, _) = listener.accept().await.unwrap(); + barrier.wait().await; + let mut writer = JsonLineWriter::new(stream); + writer + .write(&Event::Status { + status: PodStatus::Idle, + }) + .await + .unwrap(); + })); + records.push(live_probe_record(&pod_name, socket_path)); + } + + let records = tokio::time::timeout( + LIVE_STATUS_PROBE_TIMEOUT * 3, + probe_reachable_live_pod_infos(&store, records), + ) + .await + .expect("status probes should complete") + .unwrap(); + + assert_eq!(records.len(), probe_count); + assert!(records.iter().all(|record| record.reachable)); + assert!( + records + .iter() + .all(|record| record.status == Some(PodStatus::Idle)) + ); + for server in servers { + server.await.unwrap(); + } + } + + #[tokio::test] + async fn live_status_probe_timeout_still_marks_socket_reachable() { + let store_dir = tempdir().unwrap(); + let store = FsStore::new(store_dir.path()).unwrap(); + let socket_dir = tempdir().unwrap(); + let socket_path = socket_dir.path().join("silent.sock"); + let listener = UnixListener::bind(&socket_path).unwrap(); + let server = tokio::spawn(async move { + let (_stream, _) = listener.accept().await.unwrap(); + std::future::pending::<()>().await; + }); + + let records = probe_reachable_live_pod_infos( + &store, + vec![live_probe_record("silent", socket_path.clone())], + ) + .await + .unwrap(); + + assert_eq!(records.len(), 1); + assert_eq!(records[0].pod_name, "silent"); + assert!(records[0].reachable); + assert_eq!(records[0].status, None); + assert_eq!(records[0].socket_path, socket_path); + server.abort(); + } + #[test] fn corrupt_stored_metadata_has_diagnostic() { let entry = single_entry(PodList::from_sources( @@ -985,6 +1110,17 @@ mod tests { } } + fn live_probe_record(pod_name: &str, socket_path: PathBuf) -> LivePodInfo { + LivePodInfo { + pod_name: pod_name.to_string(), + socket_path, + status: None, + reachable: false, + segment_id: None, + summary: PodEntrySummary::default(), + } + } + fn test_greeting() -> protocol::Greeting { protocol::Greeting { pod_name: "live".to_string(),