merge: multi pod status probes
This commit is contained in:
commit
d8051af226
|
|
@ -656,7 +656,7 @@ fn row_status_label(entry: &PodListEntry) -> (&'static str, Style) {
|
|||
.fg(Color::Cyan)
|
||||
.add_modifier(Modifier::BOLD),
|
||||
),
|
||||
None => ("live unknown", Style::default().fg(Color::DarkGray)),
|
||||
None => ("live", Style::default().fg(Color::DarkGray)),
|
||||
};
|
||||
}
|
||||
if entry
|
||||
|
|
@ -1194,6 +1194,31 @@ mod tests {
|
|||
assert!(app.selected_send_disabled_reason().is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn multi_status_label_for_live_without_reported_status_is_softened() {
|
||||
let mut live = live_info("probing", PodStatus::Idle);
|
||||
live.status = None;
|
||||
let app = test_app(vec![live]);
|
||||
|
||||
let (label, _) = row_status_label(app.list.selected_entry().unwrap());
|
||||
|
||||
assert_eq!(label, "live");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn multi_status_labels_preserve_explicit_live_statuses() {
|
||||
for (status, expected_label) in [
|
||||
(PodStatus::Idle, "live idle"),
|
||||
(PodStatus::Running, "live running"),
|
||||
(PodStatus::Paused, "live paused"),
|
||||
] {
|
||||
let app = test_app(vec![live_info("pod", status)]);
|
||||
let (label, _) = row_status_label(app.list.selected_entry().unwrap());
|
||||
|
||||
assert_eq!(label, expected_label);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn multi_running_paused_and_stopped_targets_are_direct_send_disabled() {
|
||||
let mut app = test_app(vec![
|
||||
|
|
|
|||
|
|
@ -291,19 +291,39 @@ pub(crate) async fn read_reachable_live_pod_infos(
|
|||
store: &FsStore,
|
||||
) -> Result<Vec<LivePodInfo>, io::Error> {
|
||||
let records = read_live_pod_infos()?;
|
||||
let mut reachable = Vec::new();
|
||||
for mut record in records {
|
||||
let Ok(status) = probe_live_status(&record.socket_path).await else {
|
||||
probe_reachable_live_pod_infos(store, records).await
|
||||
}
|
||||
|
||||
async fn probe_reachable_live_pod_infos(
|
||||
store: &FsStore,
|
||||
records: Vec<LivePodInfo>,
|
||||
) -> Result<Vec<LivePodInfo>, io::Error> {
|
||||
let mut handles = Vec::with_capacity(records.len());
|
||||
for record in records {
|
||||
handles.push(tokio::spawn(probe_live_pod_info(record)));
|
||||
}
|
||||
|
||||
let mut reachable = Vec::with_capacity(handles.len());
|
||||
for handle in handles {
|
||||
let result = handle
|
||||
.await
|
||||
.map_err(|e| io::Error::other(format!("live status probe task failed: {e}")))?;
|
||||
let Ok(mut record) = result else {
|
||||
continue;
|
||||
};
|
||||
record.reachable = true;
|
||||
record.status = status;
|
||||
record.summary = summarize_live_pod(store, &record);
|
||||
reachable.push(record);
|
||||
}
|
||||
Ok(reachable)
|
||||
}
|
||||
|
||||
async fn probe_live_pod_info(mut record: LivePodInfo) -> Result<LivePodInfo, io::Error> {
|
||||
let status = probe_live_status(&record.socket_path).await?;
|
||||
record.reachable = true;
|
||||
record.status = status;
|
||||
Ok(record)
|
||||
}
|
||||
|
||||
pub(crate) fn live_socket_for_pod(pod_name: &str) -> Option<PathBuf> {
|
||||
read_live_pod_infos()
|
||||
.ok()?
|
||||
|
|
@ -343,7 +363,7 @@ fn corrupt_stored_info(pod_name: String, message: String) -> StoredPodInfo {
|
|||
}
|
||||
}
|
||||
|
||||
const LIVE_STATUS_PROBE_TIMEOUT: Duration = Duration::from_millis(25);
|
||||
const LIVE_STATUS_PROBE_TIMEOUT: Duration = Duration::from_millis(200);
|
||||
|
||||
async fn probe_live_status(socket_path: &Path) -> Result<Option<PodStatus>, io::Error> {
|
||||
let mut client = PodClient::connect(socket_path).await?;
|
||||
|
|
@ -561,11 +581,16 @@ fn trim_one_line(s: &str, max_chars: usize) -> String {
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::sync::Arc;
|
||||
|
||||
use llm_worker::llm_client::types::RequestConfig;
|
||||
use pod_store::FsPodStore;
|
||||
use pod_store::{PodActiveSegmentRef, PodMetadataStore};
|
||||
use protocol::stream::JsonLineWriter;
|
||||
use session_store::{new_segment_id, new_session_id};
|
||||
use tempfile::tempdir;
|
||||
use tokio::net::UnixListener;
|
||||
use tokio::sync::Barrier;
|
||||
|
||||
const SOURCE: PodVisibilitySource = PodVisibilitySource::ResumePicker;
|
||||
|
||||
|
|
@ -752,6 +777,30 @@ mod tests {
|
|||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn live_reachable_row_without_reported_status_can_open_but_not_send_now() {
|
||||
let mut live = live_info("live", PodStatus::Idle);
|
||||
live.status = None;
|
||||
live.reachable = true;
|
||||
|
||||
let entry = single_entry(PodList::from_sources(SOURCE, vec![], vec![live], None, 10));
|
||||
|
||||
assert!(entry.actions.can_open);
|
||||
assert!(!entry.actions.can_restore);
|
||||
assert!(!entry.actions.can_send_now);
|
||||
assert!(!entry.actions.can_queue_send);
|
||||
assert_eq!(
|
||||
entry.attach_socket_path(),
|
||||
Some(Path::new("/tmp/live.sock"))
|
||||
);
|
||||
assert!(
|
||||
!entry
|
||||
.diagnostics
|
||||
.iter()
|
||||
.any(|diagnostic| diagnostic.kind == PodEntryDiagnosticKind::LiveUnreachable)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn live_running_reachable_row_can_open_but_not_send_now() {
|
||||
let entry = single_entry(PodList::from_sources(
|
||||
|
|
@ -811,6 +860,82 @@ mod tests {
|
|||
assert_eq!(status, Some(PodStatus::Idle));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn live_status_probes_run_concurrently() {
|
||||
let store_dir = tempdir().unwrap();
|
||||
let store = FsStore::new(store_dir.path()).unwrap();
|
||||
let socket_dir = tempdir().unwrap();
|
||||
let probe_count = 3;
|
||||
let barrier = Arc::new(Barrier::new(probe_count));
|
||||
let mut records = Vec::new();
|
||||
let mut servers = Vec::new();
|
||||
|
||||
for index in 0..probe_count {
|
||||
let pod_name = format!("pod-{index}");
|
||||
let socket_path = socket_dir.path().join(format!("{pod_name}.sock"));
|
||||
let listener = UnixListener::bind(&socket_path).unwrap();
|
||||
let barrier = Arc::clone(&barrier);
|
||||
servers.push(tokio::spawn(async move {
|
||||
let (stream, _) = listener.accept().await.unwrap();
|
||||
barrier.wait().await;
|
||||
let mut writer = JsonLineWriter::new(stream);
|
||||
writer
|
||||
.write(&Event::Status {
|
||||
status: PodStatus::Idle,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
}));
|
||||
records.push(live_probe_record(&pod_name, socket_path));
|
||||
}
|
||||
|
||||
let records = tokio::time::timeout(
|
||||
LIVE_STATUS_PROBE_TIMEOUT * 3,
|
||||
probe_reachable_live_pod_infos(&store, records),
|
||||
)
|
||||
.await
|
||||
.expect("status probes should complete")
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(records.len(), probe_count);
|
||||
assert!(records.iter().all(|record| record.reachable));
|
||||
assert!(
|
||||
records
|
||||
.iter()
|
||||
.all(|record| record.status == Some(PodStatus::Idle))
|
||||
);
|
||||
for server in servers {
|
||||
server.await.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn live_status_probe_timeout_still_marks_socket_reachable() {
|
||||
let store_dir = tempdir().unwrap();
|
||||
let store = FsStore::new(store_dir.path()).unwrap();
|
||||
let socket_dir = tempdir().unwrap();
|
||||
let socket_path = socket_dir.path().join("silent.sock");
|
||||
let listener = UnixListener::bind(&socket_path).unwrap();
|
||||
let server = tokio::spawn(async move {
|
||||
let (_stream, _) = listener.accept().await.unwrap();
|
||||
std::future::pending::<()>().await;
|
||||
});
|
||||
|
||||
let records = probe_reachable_live_pod_infos(
|
||||
&store,
|
||||
vec![live_probe_record("silent", socket_path.clone())],
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(records.len(), 1);
|
||||
assert_eq!(records[0].pod_name, "silent");
|
||||
assert!(records[0].reachable);
|
||||
assert_eq!(records[0].status, None);
|
||||
assert_eq!(records[0].socket_path, socket_path);
|
||||
server.abort();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn corrupt_stored_metadata_has_diagnostic() {
|
||||
let entry = single_entry(PodList::from_sources(
|
||||
|
|
@ -985,6 +1110,17 @@ mod tests {
|
|||
}
|
||||
}
|
||||
|
||||
fn live_probe_record(pod_name: &str, socket_path: PathBuf) -> LivePodInfo {
|
||||
LivePodInfo {
|
||||
pod_name: pod_name.to_string(),
|
||||
socket_path,
|
||||
status: None,
|
||||
reachable: false,
|
||||
segment_id: None,
|
||||
summary: PodEntrySummary::default(),
|
||||
}
|
||||
}
|
||||
|
||||
fn test_greeting() -> protocol::Greeting {
|
||||
protocol::Greeting {
|
||||
pod_name: "live".to_string(),
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user