tui: probe multi-pod statuses concurrently

This commit is contained in:
Keisuke Hirata 2026-05-30 14:40:53 +09:00
parent 45d2c67689
commit 1ba99cdf8a
No known key found for this signature in database
2 changed files with 168 additions and 7 deletions

View File

@ -656,7 +656,7 @@ fn row_status_label(entry: &PodListEntry) -> (&'static str, Style) {
.fg(Color::Cyan)
.add_modifier(Modifier::BOLD),
),
None => ("live unknown", Style::default().fg(Color::DarkGray)),
None => ("live", Style::default().fg(Color::DarkGray)),
};
}
if entry
@ -1194,6 +1194,31 @@ mod tests {
assert!(app.selected_send_disabled_reason().is_none());
}
#[test]
fn multi_status_label_for_live_without_reported_status_is_softened() {
let mut live = live_info("probing", PodStatus::Idle);
live.status = None;
let app = test_app(vec![live]);
let (label, _) = row_status_label(app.list.selected_entry().unwrap());
assert_eq!(label, "live");
}
#[test]
fn multi_status_labels_preserve_explicit_live_statuses() {
for (status, expected_label) in [
(PodStatus::Idle, "live idle"),
(PodStatus::Running, "live running"),
(PodStatus::Paused, "live paused"),
] {
let app = test_app(vec![live_info("pod", status)]);
let (label, _) = row_status_label(app.list.selected_entry().unwrap());
assert_eq!(label, expected_label);
}
}
#[test]
fn multi_running_paused_and_stopped_targets_are_direct_send_disabled() {
let mut app = test_app(vec![

View File

@ -291,19 +291,39 @@ pub(crate) async fn read_reachable_live_pod_infos(
store: &FsStore,
) -> Result<Vec<LivePodInfo>, io::Error> {
let records = read_live_pod_infos()?;
let mut reachable = Vec::new();
for mut record in records {
let Ok(status) = probe_live_status(&record.socket_path).await else {
probe_reachable_live_pod_infos(store, records).await
}
async fn probe_reachable_live_pod_infos(
store: &FsStore,
records: Vec<LivePodInfo>,
) -> Result<Vec<LivePodInfo>, io::Error> {
let mut handles = Vec::with_capacity(records.len());
for record in records {
handles.push(tokio::spawn(probe_live_pod_info(record)));
}
let mut reachable = Vec::with_capacity(handles.len());
for handle in handles {
let result = handle
.await
.map_err(|e| io::Error::other(format!("live status probe task failed: {e}")))?;
let Ok(mut record) = result else {
continue;
};
record.reachable = true;
record.status = status;
record.summary = summarize_live_pod(store, &record);
reachable.push(record);
}
Ok(reachable)
}
async fn probe_live_pod_info(mut record: LivePodInfo) -> Result<LivePodInfo, io::Error> {
let status = probe_live_status(&record.socket_path).await?;
record.reachable = true;
record.status = status;
Ok(record)
}
pub(crate) fn live_socket_for_pod(pod_name: &str) -> Option<PathBuf> {
read_live_pod_infos()
.ok()?
@ -343,7 +363,7 @@ fn corrupt_stored_info(pod_name: String, message: String) -> StoredPodInfo {
}
}
const LIVE_STATUS_PROBE_TIMEOUT: Duration = Duration::from_millis(25);
const LIVE_STATUS_PROBE_TIMEOUT: Duration = Duration::from_millis(200);
async fn probe_live_status(socket_path: &Path) -> Result<Option<PodStatus>, io::Error> {
let mut client = PodClient::connect(socket_path).await?;
@ -561,11 +581,16 @@ fn trim_one_line(s: &str, max_chars: usize) -> String {
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use llm_worker::llm_client::types::RequestConfig;
use pod_store::FsPodStore;
use pod_store::{PodActiveSegmentRef, PodMetadataStore};
use protocol::stream::JsonLineWriter;
use session_store::{new_segment_id, new_session_id};
use tempfile::tempdir;
use tokio::net::UnixListener;
use tokio::sync::Barrier;
const SOURCE: PodVisibilitySource = PodVisibilitySource::ResumePicker;
@ -752,6 +777,30 @@ mod tests {
);
}
#[test]
fn live_reachable_row_without_reported_status_can_open_but_not_send_now() {
let mut live = live_info("live", PodStatus::Idle);
live.status = None;
live.reachable = true;
let entry = single_entry(PodList::from_sources(SOURCE, vec![], vec![live], None, 10));
assert!(entry.actions.can_open);
assert!(!entry.actions.can_restore);
assert!(!entry.actions.can_send_now);
assert!(!entry.actions.can_queue_send);
assert_eq!(
entry.attach_socket_path(),
Some(Path::new("/tmp/live.sock"))
);
assert!(
!entry
.diagnostics
.iter()
.any(|diagnostic| diagnostic.kind == PodEntryDiagnosticKind::LiveUnreachable)
);
}
#[test]
fn live_running_reachable_row_can_open_but_not_send_now() {
let entry = single_entry(PodList::from_sources(
@ -811,6 +860,82 @@ mod tests {
assert_eq!(status, Some(PodStatus::Idle));
}
#[tokio::test]
async fn live_status_probes_run_concurrently() {
let store_dir = tempdir().unwrap();
let store = FsStore::new(store_dir.path()).unwrap();
let socket_dir = tempdir().unwrap();
let probe_count = 3;
let barrier = Arc::new(Barrier::new(probe_count));
let mut records = Vec::new();
let mut servers = Vec::new();
for index in 0..probe_count {
let pod_name = format!("pod-{index}");
let socket_path = socket_dir.path().join(format!("{pod_name}.sock"));
let listener = UnixListener::bind(&socket_path).unwrap();
let barrier = Arc::clone(&barrier);
servers.push(tokio::spawn(async move {
let (stream, _) = listener.accept().await.unwrap();
barrier.wait().await;
let mut writer = JsonLineWriter::new(stream);
writer
.write(&Event::Status {
status: PodStatus::Idle,
})
.await
.unwrap();
}));
records.push(live_probe_record(&pod_name, socket_path));
}
let records = tokio::time::timeout(
LIVE_STATUS_PROBE_TIMEOUT * 3,
probe_reachable_live_pod_infos(&store, records),
)
.await
.expect("status probes should complete")
.unwrap();
assert_eq!(records.len(), probe_count);
assert!(records.iter().all(|record| record.reachable));
assert!(
records
.iter()
.all(|record| record.status == Some(PodStatus::Idle))
);
for server in servers {
server.await.unwrap();
}
}
#[tokio::test]
async fn live_status_probe_timeout_still_marks_socket_reachable() {
let store_dir = tempdir().unwrap();
let store = FsStore::new(store_dir.path()).unwrap();
let socket_dir = tempdir().unwrap();
let socket_path = socket_dir.path().join("silent.sock");
let listener = UnixListener::bind(&socket_path).unwrap();
let server = tokio::spawn(async move {
let (_stream, _) = listener.accept().await.unwrap();
std::future::pending::<()>().await;
});
let records = probe_reachable_live_pod_infos(
&store,
vec![live_probe_record("silent", socket_path.clone())],
)
.await
.unwrap();
assert_eq!(records.len(), 1);
assert_eq!(records[0].pod_name, "silent");
assert!(records[0].reachable);
assert_eq!(records[0].status, None);
assert_eq!(records[0].socket_path, socket_path);
server.abort();
}
#[test]
fn corrupt_stored_metadata_has_diagnostic() {
let entry = single_entry(PodList::from_sources(
@ -985,6 +1110,17 @@ mod tests {
}
}
fn live_probe_record(pod_name: &str, socket_path: PathBuf) -> LivePodInfo {
LivePodInfo {
pod_name: pod_name.to_string(),
socket_path,
status: None,
reachable: false,
segment_id: None,
summary: PodEntrySummary::default(),
}
}
fn test_greeting() -> protocol::Greeting {
protocol::Greeting {
pod_name: "live".to_string(),