diff --git a/crates/tui/src/main.rs b/crates/tui/src/main.rs index ce79a65c..16473320 100644 --- a/crates/tui/src/main.rs +++ b/crates/tui/src/main.rs @@ -5,6 +5,7 @@ mod command; mod input; mod markdown; mod picker; +mod pod_list; mod scroll; mod spawn; mod task; diff --git a/crates/tui/src/picker.rs b/crates/tui/src/picker.rs index dad9fb1e..c8526f07 100644 --- a/crates/tui/src/picker.rs +++ b/crates/tui/src/picker.rs @@ -4,15 +4,11 @@ //! from the session store's name-keyed metadata. Picking a live row attaches to //! its socket; picking a stopped row restores via `pod --pod `. -use std::collections::{BTreeMap, HashMap}; -use std::fs; use std::io; -use std::path::{Path, PathBuf}; +use std::path::PathBuf; use std::time::Duration; -use client::PodClient; use crossterm::event::{self, Event as TermEvent, KeyCode, KeyEventKind, KeyModifiers}; -use pod_registry::{LockFileGuard, default_registry_path}; use ratatui::Terminal; use ratatui::backend::CrosstermBackend; use ratatui::layout::{Constraint, Layout}; @@ -20,8 +16,12 @@ use ratatui::style::{Color, Modifier, Style}; use ratatui::text::{Line, Span}; use ratatui::widgets::Paragraph; use ratatui::{Frame, TerminalOptions, Viewport}; -use session_store::{ - FsStore, LogEntry, LoggedContentPart, LoggedItem, PodMetadata, SegmentId, SessionId, Store, +use session_store::FsStore; + +use crate::pod_list::{ + PodList, PodListEntry, PodVisibilitySource, StoredMetadataState, + live_socket_for_pod as pod_list_live_socket_for_pod, read_reachable_live_pod_infos, + read_stored_pod_infos, }; const MAX_ROWS: usize = 10; @@ -99,62 +99,45 @@ impl PodRowState { } } -/// One row in the Pod picker. The primary key is the Pod name; Session/Segment -/// IDs are included only as debug context. -#[derive(Debug, Clone)] -struct Row { - pod_name: String, - state: PodRowState, - updated_at: u64, - active_session_id: Option, - active_segment_id: Option, - preview: Option, - socket_path: Option, -} - -#[derive(Debug)] -struct PodStateRecord { - pod_name: String, - state: Result, -} - -#[derive(Debug, Clone)] -pub(crate) struct LivePodRecord { - pub pod_name: String, - pub socket_path: PathBuf, - pub segment_id: Option, -} - pub async fn run() -> Result { let store_dir = default_store_dir()?; let store = FsStore::new(&store_dir)?; - let pod_states = read_pod_state_records(&store_dir)?; - let live_pods = read_reachable_live_pod_records().await.unwrap_or_default(); - let rows = build_rows(&store, pod_states, live_pods)?; - if rows.is_empty() { + let stored_pods = read_stored_pod_infos(&store_dir, &store)?; + let live_pods = read_reachable_live_pod_infos(&store) + .await + .unwrap_or_default(); + let mut list = PodList::from_sources( + PodVisibilitySource::ResumePicker, + stored_pods, + live_pods, + None, + MAX_ROWS, + ); + if list.entries.is_empty() { return Err(PickerError::NoPods); } - let mut selected = 0usize; let mut terminal = make_inline_terminal()?; loop { - terminal.draw(|f| draw(f, &rows, selected))?; + terminal.draw(|f| draw(f, &list))?; match poll_event()? { None => continue, Some(Action::Up) => { - selected = selected.saturating_sub(1); + let selected = list.selected_index().saturating_sub(1); + list.select_index(selected); } Some(Action::Down) => { - if selected + 1 < rows.len() { - selected += 1; + let selected = list.selected_index(); + if selected + 1 < list.entries.len() { + list.select_index(selected + 1); } } Some(Action::Submit) => { close_viewport(&mut terminal)?; - let row = &rows[selected]; + let entry = list.selected_entry().expect("non-empty pod list"); return Ok(PickerOutcome::Picked { - pod_name: row.pod_name.clone(), - socket_override: row.socket_path.clone(), + pod_name: entry.name.clone(), + socket_override: entry.attach_socket_path().map(PathBuf::from), }); } Some(Action::Cancel) => { @@ -189,288 +172,8 @@ fn default_store_dir() -> Result { }) } -fn read_pod_state_records(store_dir: &Path) -> Result, PickerError> { - let pods_dir = store_dir.join("pods"); - let mut records = Vec::new(); - if !pods_dir.exists() { - return Ok(records); - } - - for entry in fs::read_dir(pods_dir)? { - let entry = entry?; - if !entry.file_type()?.is_dir() { - continue; - } - let pod_name = entry.file_name().to_string_lossy().to_string(); - let path = entry.path().join("metadata.json"); - let state = match fs::read_to_string(&path) { - Ok(content) => serde_json::from_str::(&content).map_err(|e| e.to_string()), - Err(e) => Err(e.to_string()), - }; - records.push(PodStateRecord { pod_name, state }); - } - Ok(records) -} - -fn read_live_pod_records() -> Result, io::Error> { - let path = default_registry_path()?; - let guard = LockFileGuard::open(&path)?; - Ok(guard - .data() - .allocations - .iter() - .map(|allocation| LivePodRecord { - pod_name: allocation.pod_name.clone(), - socket_path: allocation.socket.clone(), - segment_id: allocation.segment_id, - }) - .collect()) -} - -async fn read_reachable_live_pod_records() -> Result, io::Error> { - let records = read_live_pod_records()?; - let mut reachable = Vec::new(); - for record in records { - if PodClient::connect(&record.socket_path).await.is_ok() { - reachable.push(record); - } - } - Ok(reachable) -} - pub(crate) fn live_socket_for_pod(pod_name: &str) -> Option { - read_live_pod_records() - .ok()? - .into_iter() - .find(|pod| pod.pod_name == pod_name) - .map(|pod| pod.socket_path) -} - -fn build_rows( - store: &FsStore, - pod_states: Vec, - live_pods: Vec, -) -> Result, PickerError> { - let mut rows_by_name: BTreeMap = BTreeMap::new(); - let mut live_by_name: HashMap = HashMap::new(); - - for live in live_pods { - let (active_session_id, active_segment_id, updated_at, preview) = - summarize_live_pod(store, &live); - rows_by_name.insert( - live.pod_name.clone(), - Row { - pod_name: live.pod_name.clone(), - state: PodRowState::Live, - updated_at, - active_session_id, - active_segment_id, - preview, - socket_path: Some(live.socket_path.clone()), - }, - ); - live_by_name.insert(live.pod_name.clone(), live); - } - - for record in pod_states { - match record.state { - Ok(metadata) => { - let summary = summarize_metadata(store, &metadata); - let state = if live_by_name.contains_key(&record.pod_name) { - PodRowState::Live - } else { - PodRowState::Stopped - }; - upsert_metadata_row(&mut rows_by_name, record.pod_name, metadata, summary, state); - } - Err(message) => { - rows_by_name.entry(record.pod_name.clone()).or_insert(Row { - pod_name: record.pod_name, - state: PodRowState::Corrupt, - updated_at: 0, - active_session_id: None, - active_segment_id: None, - preview: Some(format!("metadata: {}", trim_one_line(&message, 48))), - socket_path: None, - }); - } - } - } - - let mut rows: Vec = rows_by_name.into_values().collect(); - rows.sort_by(|a, b| { - b.updated_at - .cmp(&a.updated_at) - .then_with(|| a.pod_name.cmp(&b.pod_name)) - }); - rows.truncate(MAX_ROWS); - Ok(rows) -} - -fn upsert_metadata_row( - rows_by_name: &mut BTreeMap, - pod_name: String, - metadata: PodMetadata, - summary: SegmentSummary, - state: PodRowState, -) { - let active = metadata.active; - let active_session_id = active.as_ref().map(|a| a.session_id); - let active_segment_id = active.as_ref().and_then(|a| a.segment_id); - - match rows_by_name.get_mut(&pod_name) { - Some(existing) => { - existing.state = state; - if summary.updated_at > existing.updated_at { - existing.updated_at = summary.updated_at; - } - if existing.active_session_id.is_none() { - existing.active_session_id = active_session_id; - } - if existing.active_segment_id.is_none() { - existing.active_segment_id = active_segment_id; - } - if existing.preview.is_none() { - existing.preview = summary.preview; - } - } - None => { - rows_by_name.insert( - pod_name.clone(), - Row { - pod_name, - state, - updated_at: summary.updated_at, - active_session_id, - active_segment_id, - preview: summary.preview, - socket_path: None, - }, - ); - } - } -} - -#[derive(Debug, Clone)] -struct SegmentSummary { - updated_at: u64, - preview: Option, -} - -fn summarize_live_pod( - store: &FsStore, - live: &LivePodRecord, -) -> (Option, Option, u64, Option) { - let Some(segment_id) = live.segment_id else { - return (None, None, 0, None); - }; - let session_id = store.lookup_session_of(segment_id).ok().flatten(); - let Some(session_id) = session_id else { - return (None, Some(segment_id), 0, None); - }; - let summary = summarize_segment(store, session_id, segment_id); - ( - Some(session_id), - Some(segment_id), - summary.updated_at, - summary.preview, - ) -} - -fn summarize_metadata(store: &FsStore, metadata: &PodMetadata) -> SegmentSummary { - let Some(active) = metadata.active.as_ref() else { - return SegmentSummary { - updated_at: 0, - preview: None, - }; - }; - let Some(segment_id) = active.segment_id else { - return SegmentSummary { - updated_at: 0, - preview: Some("[pending segment]".to_string()), - }; - }; - summarize_segment(store, active.session_id, segment_id) -} - -fn summarize_segment( - store: &FsStore, - session_id: SessionId, - segment_id: SegmentId, -) -> SegmentSummary { - match store.read_all(session_id, segment_id) { - Ok(entries) => SegmentSummary { - updated_at: last_entry_ts(&entries).unwrap_or(0), - preview: last_message_preview(&entries).or_else(|| Some("[empty]".to_string())), - }, - Err(_) => SegmentSummary { - updated_at: 0, - preview: Some("[corrupt segment]".to_string()), - }, - } -} - -fn last_entry_ts(entries: &[LogEntry]) -> Option { - entries.iter().map(log_entry_ts).max() -} - -fn log_entry_ts(entry: &LogEntry) -> u64 { - match entry { - LogEntry::SegmentStart { ts, .. } - | LogEntry::Invoke { ts, .. } - | LogEntry::UserInput { ts, .. } - | LogEntry::AssistantItem { ts, .. } - | LogEntry::ToolResult { ts, .. } - | LogEntry::SystemItem { ts, .. } - | LogEntry::TurnEnd { ts, .. } - | LogEntry::RunCompleted { ts, .. } - | LogEntry::RunErrored { ts, .. } - | LogEntry::ConfigChanged { ts, .. } - | LogEntry::LlmUsage { ts, .. } - | LogEntry::Extension { ts, .. } => *ts, - } -} - -/// Walk the log from the tail looking for the most recent user-message or -/// assistant-message entry, then render its first text fragment in a single line. -fn last_message_preview(entries: &[LogEntry]) -> Option { - for entry in entries.iter().rev() { - match entry { - LogEntry::UserInput { segments, .. } => { - let text = protocol::Segment::flatten_to_text(segments); - if !text.is_empty() { - return Some(format!("user: {}", trim_one_line(&text, 60))); - } - } - LogEntry::AssistantItem { item, .. } => { - if let Some(text) = first_text_logged(item) { - return Some(format!("assistant: {}", trim_one_line(&text, 60))); - } - } - _ => {} - } - } - None -} - -fn first_text_logged(item: &LoggedItem) -> Option { - match item { - LoggedItem::Message { content, .. } => content.iter().find_map(|p| match p { - LoggedContentPart::Text { text } => Some(text.clone()), - _ => None, - }), - _ => None, - } -} - -fn trim_one_line(s: &str, max_chars: usize) -> String { - let collapsed: String = s.chars().map(|c| if c == '\n' { ' ' } else { c }).collect(); - if collapsed.chars().count() <= max_chars { - collapsed - } else { - let truncated: String = collapsed.chars().take(max_chars - 1).collect(); - format!("{truncated}…") - } + pod_list_live_socket_for_pod(pod_name) } fn make_inline_terminal() -> io::Result>> { @@ -512,11 +215,11 @@ fn poll_event() -> io::Result> { } } -fn draw(f: &mut Frame<'_>, rows: &[Row], selected: usize) { +fn draw(f: &mut Frame<'_>, list: &PodList) { let area = f.area(); - let mut constraints: Vec = Vec::with_capacity(rows.len() + 3); + let mut constraints: Vec = Vec::with_capacity(list.entries.len() + 3); constraints.push(Constraint::Length(1)); // title - for _ in rows { + for _ in &list.entries { constraints.push(Constraint::Length(1)); } constraints.push(Constraint::Length(1)); // hint @@ -531,8 +234,12 @@ fn draw(f: &mut Frame<'_>, rows: &[Row], selected: usize) { layout[0], ); - for (i, row) in rows.iter().enumerate() { - f.render_widget(Paragraph::new(row_line(row, i == selected)), layout[i + 1]); + let selected = list.selected_index(); + for (i, entry) in list.entries.iter().enumerate() { + f.render_widget( + Paragraph::new(row_line(entry, i == selected)), + layout[i + 1], + ); } f.render_widget( @@ -545,7 +252,7 @@ fn draw(f: &mut Frame<'_>, rows: &[Row], selected: usize) { Span::styled("[esc]", Style::default().fg(Color::Yellow)), Span::raw(" cancel"), ])), - layout[rows.len() + 1], + layout[list.entries.len() + 1], ); } @@ -553,7 +260,7 @@ fn picker_title() -> &'static str { "resume pod pick a pod" } -fn row_line(row: &Row, selected: bool) -> Line<'_> { +fn row_line(entry: &PodListEntry, selected: bool) -> Line<'_> { let marker = if selected { "▶ " } else { " " }; let name_style = if selected { Style::default() @@ -567,27 +274,44 @@ fn row_line(row: &Row, selected: bool) -> Line<'_> { } else { Style::default().fg(Color::DarkGray) }; + let state = row_state(entry); + let _visibility = entry.visibility; + let _source_kinds = &entry.source_kinds; let mut spans = vec![ Span::raw(marker), - Span::styled(row.pod_name.as_str(), name_style), + Span::styled(entry.name.as_str(), name_style), Span::raw(" "), - Span::styled(format!("[{}]", row.state.label()), row.state.style()), + Span::styled(format!("[{}]", state.label()), state.style()), Span::raw(" "), Span::styled( - format_updated_at(row.updated_at), + format_updated_at(entry.summary.updated_at), Style::default().fg(Color::DarkGray), ), Span::raw(" "), - Span::styled(debug_ids(row), Style::default().fg(Color::DarkGray)), + Span::styled(debug_ids(entry), Style::default().fg(Color::DarkGray)), ]; - if let Some(preview) = row.preview.as_ref() { + if let Some(preview) = entry.summary.preview.as_ref() { spans.push(Span::raw(" ")); spans.push(Span::styled(preview.as_str(), preview_style)); } Line::from(spans) } +fn row_state(entry: &PodListEntry) -> PodRowState { + if entry.live.as_ref().is_some_and(|live| live.reachable) { + return PodRowState::Live; + } + if entry + .stored + .as_ref() + .is_some_and(|stored| matches!(stored.metadata_state, StoredMetadataState::Corrupt(_))) + { + return PodRowState::Corrupt; + } + PodRowState::Stopped +} + fn format_updated_at(updated_at: u64) -> String { if updated_at == 0 { "updated: —".to_string() @@ -596,12 +320,14 @@ fn format_updated_at(updated_at: u64) -> String { } } -fn debug_ids(row: &Row) -> String { - let session = row +fn debug_ids(entry: &PodListEntry) -> String { + let session = entry + .summary .active_session_id .map(short_id) .unwrap_or_else(|| "--------".to_string()); - let segment = row + let segment = entry + .summary .active_segment_id .map(short_id) .unwrap_or_else(|| "--------".to_string()); @@ -615,198 +341,9 @@ fn short_id(id: T) -> String { #[cfg(test)] mod tests { use super::*; - use llm_worker::llm_client::types::RequestConfig; - use session_store::{PodActiveSegmentRef, PodMetadataStore, new_segment_id, new_session_id}; - use tempfile::tempdir; - - #[test] - fn pod_rows_are_sorted_by_active_segment_timestamp() { - let dir = tempdir().unwrap(); - let store = FsStore::new(dir.path()).unwrap(); - let earlier_session = new_session_id(); - let later_session = new_session_id(); - let earlier_segment = new_segment_id(); - let later_segment = new_segment_id(); - - append_start(&store, earlier_session, earlier_segment, 10); - append_user( - &store, - earlier_session, - earlier_segment, - 100, - "old pod update", - ); - append_start(&store, later_session, later_segment, 20); - append_user(&store, later_session, later_segment, 200, "new pod update"); - - let records = vec![ - metadata_record("older", earlier_session, earlier_segment), - metadata_record("newer", later_session, later_segment), - ]; - let rows = build_rows(&store, records, vec![]).unwrap(); - - assert_eq!(rows[0].pod_name, "newer"); - assert_eq!(rows[0].state, PodRowState::Stopped); - assert_eq!(rows[0].updated_at, 200); - assert_eq!(rows[0].preview.as_deref(), Some("user: new pod update")); - assert_eq!(rows[1].pod_name, "older"); - } - - #[test] - fn pod_rows_include_live_and_stopped_pods() { - let dir = tempdir().unwrap(); - let store = FsStore::new(dir.path()).unwrap(); - let stopped_session = new_session_id(); - let stopped_segment = new_segment_id(); - let live_session = new_session_id(); - let live_segment = new_segment_id(); - - append_start(&store, stopped_session, stopped_segment, 10); - append_user( - &store, - stopped_session, - stopped_segment, - 50, - "stopped preview", - ); - append_start(&store, live_session, live_segment, 20); - append_user(&store, live_session, live_segment, 70, "live preview"); - - let rows = build_rows( - &store, - vec![metadata_record("stopped", stopped_session, stopped_segment)], - vec![LivePodRecord { - pod_name: "live".to_string(), - socket_path: PathBuf::from("/tmp/live.sock"), - segment_id: Some(live_segment), - }], - ) - .unwrap(); - - let live = rows.iter().find(|row| row.pod_name == "live").unwrap(); - assert_eq!(live.state, PodRowState::Live); - assert_eq!(live.active_session_id, Some(live_session)); - assert_eq!( - live.socket_path.as_deref(), - Some(Path::new("/tmp/live.sock")) - ); - - let stopped = rows.iter().find(|row| row.pod_name == "stopped").unwrap(); - assert_eq!(stopped.state, PodRowState::Stopped); - assert_eq!(stopped.socket_path, None); - } - - #[test] - fn corrupt_pod_state_is_rendered_as_corrupt_row() { - let dir = tempdir().unwrap(); - let store = FsStore::new(dir.path()).unwrap(); - let rows = build_rows( - &store, - vec![PodStateRecord { - pod_name: "broken".to_string(), - state: Err("expected value".to_string()), - }], - vec![], - ) - .unwrap(); - - assert_eq!(rows.len(), 1); - assert_eq!(rows[0].pod_name, "broken"); - assert_eq!(rows[0].state, PodRowState::Corrupt); - assert!( - rows[0] - .preview - .as_deref() - .unwrap() - .contains("expected value") - ); - } #[test] fn picker_title_names_pods_not_sessions() { assert_eq!(picker_title(), "resume pod pick a pod"); } - - fn metadata_record( - pod_name: &str, - session_id: SessionId, - segment_id: SegmentId, - ) -> PodStateRecord { - PodStateRecord { - pod_name: pod_name.to_string(), - state: Ok(PodMetadata::new( - pod_name, - Some(PodActiveSegmentRef::active_segment(session_id, segment_id)), - )), - } - } - - fn append_start(store: &FsStore, session_id: SessionId, segment_id: SegmentId, ts: u64) { - store - .append( - session_id, - segment_id, - &LogEntry::SegmentStart { - ts, - session_id, - system_prompt: None, - config: RequestConfig::default(), - history: vec![], - forked_from: None, - compacted_from: None, - }, - ) - .unwrap(); - } - - fn append_user( - store: &FsStore, - session_id: SessionId, - segment_id: SegmentId, - ts: u64, - text: &str, - ) { - store - .append( - session_id, - segment_id, - &LogEntry::UserInput { - ts, - segments: vec![protocol::Segment::text(text)], - }, - ) - .unwrap(); - } - - #[test] - fn read_pod_state_records_reports_corrupt_metadata() { - let dir = tempdir().unwrap(); - let pod_dir = dir.path().join("pods").join("broken"); - fs::create_dir_all(&pod_dir).unwrap(); - fs::write(pod_dir.join("metadata.json"), "{not-json").unwrap(); - - let records = read_pod_state_records(dir.path()).unwrap(); - assert_eq!(records.len(), 1); - assert_eq!(records[0].pod_name, "broken"); - assert!(records[0].state.is_err()); - } - - #[test] - fn read_pod_state_records_reads_metadata() { - let dir = tempdir().unwrap(); - let store = FsStore::new(dir.path()).unwrap(); - let session_id = new_session_id(); - let segment_id = new_segment_id(); - store - .write(&PodMetadata::new( - "agent", - Some(PodActiveSegmentRef::active_segment(session_id, segment_id)), - )) - .unwrap(); - - let records = read_pod_state_records(dir.path()).unwrap(); - assert_eq!(records.len(), 1); - assert_eq!(records[0].pod_name, "agent"); - assert!(records[0].state.is_ok()); - } } diff --git a/crates/tui/src/pod_list.rs b/crates/tui/src/pod_list.rs new file mode 100644 index 00000000..81e84f7e --- /dev/null +++ b/crates/tui/src/pod_list.rs @@ -0,0 +1,905 @@ +use std::collections::BTreeMap; +use std::fs; +use std::io; +use std::path::{Path, PathBuf}; +use std::time::Duration; + +use client::PodClient; +use pod_registry::{LockFileGuard, default_registry_path}; +use protocol::{Event, PodStatus}; +use session_store::{ + FsStore, LogEntry, LoggedContentPart, LoggedItem, PodMetadata, SegmentId, SessionId, Store, +}; + +#[derive(Debug, Clone)] +pub(crate) struct PodList { + pub entries: Vec, + pub selected_name: Option, +} + +impl PodList { + pub(crate) fn from_sources( + source: PodVisibilitySource, + stored: Vec, + live: Vec, + selected_name: Option, + max_entries: usize, + ) -> Self { + let mut entries_by_name: BTreeMap = BTreeMap::new(); + + for live_info in live { + let name = live_info.pod_name.clone(); + entries_by_name + .entry(name.clone()) + .or_insert_with(|| PodListEntry::new(name, source)) + .merge_live(live_info); + } + + for stored_info in stored { + let name = stored_info.pod_name.clone(); + entries_by_name + .entry(name.clone()) + .or_insert_with(|| PodListEntry::new(name, source)) + .merge_stored(stored_info); + } + + let mut entries: Vec = entries_by_name.into_values().collect(); + for entry in &mut entries { + entry.finalize(); + } + entries.sort_by(|a, b| { + b.summary + .updated_at + .cmp(&a.summary.updated_at) + .then_with(|| a.name.cmp(&b.name)) + }); + entries.truncate(max_entries); + + let selected_name = selected_name + .filter(|name| entries.iter().any(|entry| entry.name == *name)) + .or_else(|| entries.first().map(|entry| entry.name.clone())); + + Self { + entries, + selected_name, + } + } + + pub(crate) fn selected_index(&self) -> usize { + self.selected_name + .as_ref() + .and_then(|name| self.entries.iter().position(|entry| entry.name == *name)) + .unwrap_or(0) + } + + pub(crate) fn select_index(&mut self, index: usize) { + self.selected_name = self.entries.get(index).map(|entry| entry.name.clone()); + } + + pub(crate) fn selected_entry(&self) -> Option<&PodListEntry> { + let index = self.selected_index(); + self.entries.get(index) + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum PodVisibilitySource { + ResumePicker, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum PodListSourceKind { + RuntimeRegistry, + StoredMetadata, +} + +#[derive(Debug, Clone)] +pub(crate) struct PodListEntry { + pub name: String, + pub visibility: PodVisibilitySource, + pub source_kinds: Vec, + pub live: Option, + pub stored: Option, + pub summary: PodEntrySummary, + pub actions: PodEntryActions, + pub diagnostics: Vec, +} + +impl PodListEntry { + fn new(name: String, visibility: PodVisibilitySource) -> Self { + Self { + name, + visibility, + source_kinds: Vec::new(), + live: None, + stored: None, + summary: PodEntrySummary::default(), + actions: PodEntryActions::default(), + diagnostics: Vec::new(), + } + } + + fn merge_live(&mut self, live: LivePodInfo) { + if !self + .source_kinds + .contains(&PodListSourceKind::RuntimeRegistry) + { + self.source_kinds.push(PodListSourceKind::RuntimeRegistry); + } + if live.summary.updated_at > self.summary.updated_at { + self.summary.updated_at = live.summary.updated_at; + } + if self.summary.active_session_id.is_none() { + self.summary.active_session_id = live.summary.active_session_id; + } + if self.summary.active_segment_id.is_none() { + self.summary.active_segment_id = live.summary.active_segment_id.or(live.segment_id); + } + if self.summary.preview.is_none() { + self.summary.preview = live.summary.preview.clone(); + } + self.live = Some(live); + } + + fn merge_stored(&mut self, stored: StoredPodInfo) { + if !self + .source_kinds + .contains(&PodListSourceKind::StoredMetadata) + { + self.source_kinds.push(PodListSourceKind::StoredMetadata); + } + if stored.updated_at > self.summary.updated_at { + self.summary.updated_at = stored.updated_at; + } + if self.summary.active_session_id.is_none() { + self.summary.active_session_id = stored.active_session_id; + } + if self.summary.active_segment_id.is_none() { + self.summary.active_segment_id = stored.active_segment_id; + } + if self.summary.preview.is_none() { + self.summary.preview = stored.preview.clone(); + } + self.stored = Some(stored); + } + + fn finalize(&mut self) { + self.diagnostics = build_diagnostics(self); + self.actions = build_actions(self); + } + + pub(crate) fn attach_socket_path(&self) -> Option<&Path> { + self.live + .as_ref() + .filter(|live| live.reachable) + .map(|live| live.socket_path.as_path()) + } +} + +#[derive(Debug, Clone)] +pub(crate) struct LivePodInfo { + pub pod_name: String, + pub socket_path: PathBuf, + pub status: Option, + pub reachable: bool, + pub segment_id: Option, + pub summary: PodEntrySummary, +} + +#[derive(Debug, Clone)] +pub(crate) struct StoredPodInfo { + pub pod_name: String, + pub metadata_state: StoredMetadataState, + pub active_session_id: Option, + pub active_segment_id: Option, + pub updated_at: u64, + pub preview: Option, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) enum StoredMetadataState { + Present, + Corrupt(String), +} + +#[derive(Debug, Clone, Default)] +pub(crate) struct PodEntrySummary { + pub active_session_id: Option, + pub active_segment_id: Option, + pub updated_at: u64, + pub preview: Option, +} + +#[derive(Debug, Clone, Default, PartialEq, Eq)] +pub(crate) struct PodEntryActions { + pub can_open: bool, + pub can_restore: bool, + pub can_send_now: bool, + pub can_queue_send: bool, + pub disabled_reason: Option, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) struct PodEntryDiagnostic { + pub kind: PodEntryDiagnosticKind, + pub message: String, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum PodEntryDiagnosticKind { + StoredMetadataCorrupt, + LiveUnreachable, + MissingStoredMetadata, + MissingLiveStatus, +} + +pub(crate) fn read_stored_pod_infos( + store_dir: &Path, + store: &FsStore, +) -> Result, io::Error> { + let pods_dir = store_dir.join("pods"); + let mut records = Vec::new(); + if !pods_dir.exists() { + return Ok(records); + } + + for entry in fs::read_dir(pods_dir)? { + let entry = entry?; + if !entry.file_type()?.is_dir() { + continue; + } + let pod_name = entry.file_name().to_string_lossy().to_string(); + let path = entry.path().join("metadata.json"); + let info = match fs::read_to_string(&path) { + Ok(content) => match serde_json::from_str::(&content) { + Ok(metadata) => stored_info_from_metadata(store, pod_name, metadata), + Err(e) => corrupt_stored_info(pod_name, e.to_string()), + }, + Err(e) => corrupt_stored_info(pod_name, e.to_string()), + }; + records.push(info); + } + Ok(records) +} + +pub(crate) fn read_live_pod_infos() -> Result, io::Error> { + let path = default_registry_path()?; + let guard = LockFileGuard::open(&path)?; + Ok(guard + .data() + .allocations + .iter() + .map(|allocation| LivePodInfo { + pod_name: allocation.pod_name.clone(), + socket_path: allocation.socket.clone(), + status: None, + reachable: false, + segment_id: allocation.segment_id, + summary: PodEntrySummary::default(), + }) + .collect()) +} + +pub(crate) async fn read_reachable_live_pod_infos( + store: &FsStore, +) -> Result, io::Error> { + let records = read_live_pod_infos()?; + let mut reachable = Vec::new(); + for mut record in records { + let Ok(status) = probe_live_status(&record.socket_path).await else { + continue; + }; + record.reachable = true; + record.status = status; + record.summary = summarize_live_pod(store, &record); + reachable.push(record); + } + Ok(reachable) +} + +pub(crate) fn live_socket_for_pod(pod_name: &str) -> Option { + read_live_pod_infos() + .ok()? + .into_iter() + .find(|pod| pod.pod_name == pod_name) + .map(|pod| pod.socket_path) +} + +fn stored_info_from_metadata( + store: &FsStore, + pod_name: String, + metadata: PodMetadata, +) -> StoredPodInfo { + let active = metadata.active; + let active_session_id = active.as_ref().map(|a| a.session_id); + let active_segment_id = active.as_ref().and_then(|a| a.segment_id); + let summary = summarize_metadata(store, active.as_ref()); + + StoredPodInfo { + pod_name, + metadata_state: StoredMetadataState::Present, + active_session_id, + active_segment_id, + updated_at: summary.updated_at, + preview: summary.preview, + } +} + +fn corrupt_stored_info(pod_name: String, message: String) -> StoredPodInfo { + StoredPodInfo { + pod_name, + metadata_state: StoredMetadataState::Corrupt(message.clone()), + active_session_id: None, + active_segment_id: None, + updated_at: 0, + preview: Some(format!("metadata: {}", trim_one_line(&message, 48))), + } +} + +const LIVE_STATUS_PROBE_TIMEOUT: Duration = Duration::from_millis(25); + +async fn probe_live_status(socket_path: &Path) -> Result, io::Error> { + let mut client = PodClient::connect(socket_path).await?; + let deadline = tokio::time::Instant::now() + LIVE_STATUS_PROBE_TIMEOUT; + + loop { + if tokio::time::Instant::now() >= deadline { + return Ok(None); + } + match tokio::time::timeout_at(deadline, client.next_event()).await { + Ok(Some(event)) => { + if let Some(status) = status_from_event(&event) { + return Ok(Some(status)); + } + } + Ok(None) | Err(_) => return Ok(None), + } + } +} + +fn status_from_event(event: &Event) -> Option { + match event { + Event::Snapshot { status, .. } | Event::Status { status } => Some(*status), + _ => None, + } +} + +#[derive(Debug, Clone)] +struct SegmentSummary { + updated_at: u64, + preview: Option, +} + +fn summarize_live_pod(store: &FsStore, live: &LivePodInfo) -> PodEntrySummary { + let Some(segment_id) = live.segment_id else { + return PodEntrySummary::default(); + }; + let session_id = store.lookup_session_of(segment_id).ok().flatten(); + let Some(session_id) = session_id else { + return PodEntrySummary { + active_session_id: None, + active_segment_id: Some(segment_id), + updated_at: 0, + preview: None, + }; + }; + let summary = summarize_segment(store, session_id, segment_id); + PodEntrySummary { + active_session_id: Some(session_id), + active_segment_id: Some(segment_id), + updated_at: summary.updated_at, + preview: summary.preview, + } +} + +fn summarize_metadata( + store: &FsStore, + active: Option<&session_store::PodActiveSegmentRef>, +) -> SegmentSummary { + let Some(active) = active else { + return SegmentSummary { + updated_at: 0, + preview: None, + }; + }; + let Some(segment_id) = active.segment_id else { + return SegmentSummary { + updated_at: 0, + preview: Some("[pending segment]".to_string()), + }; + }; + summarize_segment(store, active.session_id, segment_id) +} + +fn summarize_segment( + store: &FsStore, + session_id: SessionId, + segment_id: SegmentId, +) -> SegmentSummary { + match store.read_all(session_id, segment_id) { + Ok(entries) => SegmentSummary { + updated_at: last_entry_ts(&entries).unwrap_or(0), + preview: last_message_preview(&entries).or_else(|| Some("[empty]".to_string())), + }, + Err(_) => SegmentSummary { + updated_at: 0, + preview: Some("[corrupt segment]".to_string()), + }, + } +} + +fn last_entry_ts(entries: &[LogEntry]) -> Option { + entries.iter().map(log_entry_ts).max() +} + +fn log_entry_ts(entry: &LogEntry) -> u64 { + match entry { + LogEntry::SegmentStart { ts, .. } + | LogEntry::Invoke { ts, .. } + | LogEntry::UserInput { ts, .. } + | LogEntry::AssistantItem { ts, .. } + | LogEntry::ToolResult { ts, .. } + | LogEntry::SystemItem { ts, .. } + | LogEntry::TurnEnd { ts, .. } + | LogEntry::RunCompleted { ts, .. } + | LogEntry::RunErrored { ts, .. } + | LogEntry::ConfigChanged { ts, .. } + | LogEntry::LlmUsage { ts, .. } + | LogEntry::Extension { ts, .. } => *ts, + } +} + +fn last_message_preview(entries: &[LogEntry]) -> Option { + for entry in entries.iter().rev() { + match entry { + LogEntry::UserInput { segments, .. } => { + let text = protocol::Segment::flatten_to_text(segments); + if !text.is_empty() { + return Some(format!("user: {}", trim_one_line(&text, 60))); + } + } + LogEntry::AssistantItem { item, .. } => { + if let Some(text) = first_text_logged(item) { + return Some(format!("assistant: {}", trim_one_line(&text, 60))); + } + } + _ => {} + } + } + None +} + +fn first_text_logged(item: &LoggedItem) -> Option { + match item { + LoggedItem::Message { content, .. } => content.iter().find_map(|p| match p { + LoggedContentPart::Text { text } => Some(text.clone()), + _ => None, + }), + _ => None, + } +} + +fn build_diagnostics(entry: &PodListEntry) -> Vec { + let mut diagnostics = Vec::new(); + + if let Some(stored) = entry.stored.as_ref() { + if let StoredMetadataState::Corrupt(message) = &stored.metadata_state { + diagnostics.push(PodEntryDiagnostic { + kind: PodEntryDiagnosticKind::StoredMetadataCorrupt, + message: format!("metadata: {}", trim_one_line(message, 80)), + }); + } + } else if entry.live.is_some() { + diagnostics.push(PodEntryDiagnostic { + kind: PodEntryDiagnosticKind::MissingStoredMetadata, + message: "no stored pod metadata".to_string(), + }); + } + + if let Some(live) = entry.live.as_ref() { + if !live.reachable { + diagnostics.push(PodEntryDiagnostic { + kind: PodEntryDiagnosticKind::LiveUnreachable, + message: format!("socket unreachable: {}", live.socket_path.display()), + }); + } else if live.status.is_none() { + diagnostics.push(PodEntryDiagnostic { + kind: PodEntryDiagnosticKind::MissingLiveStatus, + message: "live pod status was not reported".to_string(), + }); + } + } + + diagnostics +} + +fn build_actions(entry: &PodListEntry) -> PodEntryActions { + let live_reachable = entry.live.as_ref().is_some_and(|live| live.reachable); + let stored_restorable = entry + .stored + .as_ref() + .is_some_and(|stored| matches!(stored.metadata_state, StoredMetadataState::Present)); + let live_status = entry.live.as_ref().and_then(|live| live.status); + + let can_restore = stored_restorable && !live_reachable; + let can_open = live_reachable || stored_restorable; + let can_send_now = live_reachable && live_status == Some(PodStatus::Idle); + let can_queue_send = live_reachable && live_status == Some(PodStatus::Running); + let disabled_reason = if can_open { + None + } else if entry.live.is_some() { + Some("live pod is unreachable".to_string()) + } else if entry.stored.is_some() { + Some("stored pod metadata is corrupt".to_string()) + } else { + Some("no live or stored pod state".to_string()) + }; + + PodEntryActions { + can_open, + can_restore, + can_send_now, + can_queue_send, + disabled_reason, + } +} + +fn trim_one_line(s: &str, max_chars: usize) -> String { + let collapsed: String = s.chars().map(|c| if c == '\n' { ' ' } else { c }).collect(); + if collapsed.chars().count() <= max_chars { + collapsed + } else { + let truncated: String = collapsed.chars().take(max_chars - 1).collect(); + format!("{truncated}…") + } +} + +#[cfg(test)] +mod tests { + use super::*; + use llm_worker::llm_client::types::RequestConfig; + use session_store::{PodActiveSegmentRef, PodMetadataStore, new_segment_id, new_session_id}; + use tempfile::tempdir; + + const SOURCE: PodVisibilitySource = PodVisibilitySource::ResumePicker; + + #[test] + fn pod_list_rows_are_sorted_by_active_segment_timestamp() { + let dir = tempdir().unwrap(); + let store = FsStore::new(dir.path()).unwrap(); + let earlier_session = new_session_id(); + let later_session = new_session_id(); + let earlier_segment = new_segment_id(); + let later_segment = new_segment_id(); + + append_start(&store, earlier_session, earlier_segment, 10); + append_user( + &store, + earlier_session, + earlier_segment, + 100, + "old pod update", + ); + append_start(&store, later_session, later_segment, 20); + append_user(&store, later_session, later_segment, 200, "new pod update"); + + let entries = PodList::from_sources( + SOURCE, + vec![ + metadata_info(&store, "older", earlier_session, earlier_segment), + metadata_info(&store, "newer", later_session, later_segment), + ], + vec![], + None, + 10, + ) + .entries; + + assert_eq!(entries[0].name, "newer"); + assert_eq!(entries[0].summary.updated_at, 200); + assert_eq!( + entries[0].summary.preview.as_deref(), + Some("user: new pod update") + ); + assert_eq!(entries[1].name, "older"); + } + + #[test] + fn stored_only_row_can_restore_and_open_but_not_direct_send() { + let dir = tempdir().unwrap(); + let store = FsStore::new(dir.path()).unwrap(); + let session_id = new_session_id(); + let segment_id = new_segment_id(); + append_start(&store, session_id, segment_id, 10); + + let entry = single_entry(PodList::from_sources( + SOURCE, + vec![metadata_info(&store, "stored", session_id, segment_id)], + vec![], + None, + 10, + )); + + assert_eq!(entry.name, "stored"); + assert_eq!(entry.visibility, SOURCE); + assert_eq!(entry.source_kinds, vec![PodListSourceKind::StoredMetadata]); + assert!(entry.live.is_none()); + assert!(entry.stored.is_some()); + assert!(entry.actions.can_open); + assert!(entry.actions.can_restore); + assert!(!entry.actions.can_send_now); + assert!(!entry.actions.can_queue_send); + } + + #[test] + fn live_idle_reachable_row_can_open_and_send_now() { + let entry = single_entry(PodList::from_sources( + SOURCE, + vec![], + vec![live_info("live", PodStatus::Idle)], + None, + 10, + )); + + assert_eq!(entry.name, "live"); + assert_eq!(entry.visibility, SOURCE); + assert_eq!(entry.source_kinds, vec![PodListSourceKind::RuntimeRegistry]); + assert!(entry.actions.can_open); + assert!(!entry.actions.can_restore); + assert!(entry.actions.can_send_now); + assert!(!entry.actions.can_queue_send); + assert_eq!( + entry.attach_socket_path(), + Some(Path::new("/tmp/live.sock")) + ); + } + + #[test] + fn live_running_reachable_row_can_open_but_not_send_now() { + let entry = single_entry(PodList::from_sources( + SOURCE, + vec![], + vec![live_info("live", PodStatus::Running)], + None, + 10, + )); + + assert!(entry.actions.can_open); + assert!(!entry.actions.can_restore); + assert!(!entry.actions.can_send_now); + assert!(entry.actions.can_queue_send); + } + + #[test] + fn live_unreachable_row_has_diagnostic_and_cannot_open() { + let mut live = live_info("live", PodStatus::Idle); + live.reachable = false; + live.status = None; + + let entry = single_entry(PodList::from_sources(SOURCE, vec![], vec![live], None, 10)); + + assert!(!entry.actions.can_open); + assert!(!entry.actions.can_restore); + assert!(!entry.actions.can_send_now); + assert!(!entry.actions.can_queue_send); + assert_eq!( + entry.actions.disabled_reason.as_deref(), + Some("live pod is unreachable") + ); + assert_eq!(entry.attach_socket_path(), None); + assert!(entry.diagnostics.iter().any(|diagnostic| { + diagnostic.kind == PodEntryDiagnosticKind::LiveUnreachable + && diagnostic.message.contains("/tmp/live.sock") + })); + } + + #[test] + fn status_extraction_skips_alert_before_snapshot() { + let events = [ + Event::Alert(protocol::Alert { + level: protocol::AlertLevel::Warn, + source: protocol::AlertSource::Pod, + message: "warming up".to_string(), + timestamp_ms: 0, + }), + Event::Snapshot { + entries: vec![], + greeting: test_greeting(), + status: PodStatus::Idle, + }, + ]; + + let status = events.iter().find_map(status_from_event); + assert_eq!(status, Some(PodStatus::Idle)); + } + + #[test] + fn corrupt_stored_metadata_has_diagnostic() { + let entry = single_entry(PodList::from_sources( + SOURCE, + vec![corrupt_stored_info( + "broken".to_string(), + "expected value".to_string(), + )], + vec![], + None, + 10, + )); + + assert_eq!(entry.name, "broken"); + assert!(!entry.actions.can_open); + assert!(entry.diagnostics.iter().any(|diagnostic| { + diagnostic.kind == PodEntryDiagnosticKind::StoredMetadataCorrupt + && diagnostic.message.contains("expected value") + })); + assert!( + entry + .summary + .preview + .as_deref() + .unwrap() + .contains("expected value") + ); + } + + #[test] + fn selected_pod_name_is_kept_after_rebuild() { + let first = PodList::from_sources( + SOURCE, + vec![], + vec![ + live_info("alpha", PodStatus::Idle), + live_info("beta", PodStatus::Idle), + ], + Some("alpha".to_string()), + 10, + ); + assert_eq!(first.selected_entry().unwrap().name, "alpha"); + + let rebuilt = PodList::from_sources( + SOURCE, + vec![], + vec![ + live_info_with_updated_at("beta", PodStatus::Idle, 20), + live_info_with_updated_at("alpha", PodStatus::Idle, 10), + ], + first.selected_name.clone(), + 10, + ); + + assert_eq!(rebuilt.entries[0].name, "beta"); + assert_eq!(rebuilt.selected_entry().unwrap().name, "alpha"); + assert_eq!(rebuilt.selected_index(), 1); + } + + #[test] + fn read_stored_pod_infos_reports_corrupt_metadata() { + let dir = tempdir().unwrap(); + let store = FsStore::new(dir.path()).unwrap(); + let pod_dir = dir.path().join("pods").join("broken"); + fs::create_dir_all(&pod_dir).unwrap(); + fs::write(pod_dir.join("metadata.json"), "{not-json").unwrap(); + + let records = read_stored_pod_infos(dir.path(), &store).unwrap(); + assert_eq!(records.len(), 1); + assert_eq!(records[0].pod_name, "broken"); + assert!(matches!( + records[0].metadata_state, + StoredMetadataState::Corrupt(_) + )); + } + + #[test] + fn read_stored_pod_infos_reads_metadata() { + let dir = tempdir().unwrap(); + let store = FsStore::new(dir.path()).unwrap(); + let session_id = new_session_id(); + let segment_id = new_segment_id(); + store + .write(&PodMetadata::new( + "agent", + Some(PodActiveSegmentRef::active_segment(session_id, segment_id)), + )) + .unwrap(); + + let records = read_stored_pod_infos(dir.path(), &store).unwrap(); + assert_eq!(records.len(), 1); + assert_eq!(records[0].pod_name, "agent"); + assert_eq!(records[0].metadata_state, StoredMetadataState::Present); + } + + fn single_entry(list: PodList) -> PodListEntry { + assert_eq!(list.entries.len(), 1); + list.entries.into_iter().next().unwrap() + } + + fn metadata_info( + store: &FsStore, + pod_name: &str, + session_id: SessionId, + segment_id: SegmentId, + ) -> StoredPodInfo { + stored_info_from_metadata( + store, + pod_name.to_string(), + PodMetadata::new( + pod_name, + Some(PodActiveSegmentRef::active_segment(session_id, segment_id)), + ), + ) + } + + fn live_info(pod_name: &str, status: PodStatus) -> LivePodInfo { + live_info_with_updated_at(pod_name, status, 0) + } + + fn live_info_with_updated_at( + pod_name: &str, + status: PodStatus, + updated_at: u64, + ) -> LivePodInfo { + LivePodInfo { + pod_name: pod_name.to_string(), + socket_path: PathBuf::from(format!("/tmp/{pod_name}.sock")), + status: Some(status), + reachable: true, + segment_id: None, + summary: PodEntrySummary { + active_session_id: None, + active_segment_id: None, + updated_at, + preview: None, + }, + } + } + + fn test_greeting() -> protocol::Greeting { + protocol::Greeting { + pod_name: "live".to_string(), + cwd: "/tmp".to_string(), + provider: "test".to_string(), + model: "test".to_string(), + scope_summary: "test".to_string(), + tools: vec![], + context_window: 0, + context_tokens: 0, + } + } + + fn append_start(store: &FsStore, session_id: SessionId, segment_id: SegmentId, ts: u64) { + store + .append( + session_id, + segment_id, + &LogEntry::SegmentStart { + ts, + session_id, + system_prompt: None, + config: RequestConfig::default(), + history: vec![], + forked_from: None, + compacted_from: None, + }, + ) + .unwrap(); + } + + fn append_user( + store: &FsStore, + session_id: SessionId, + segment_id: SegmentId, + ts: u64, + text: &str, + ) { + store + .append( + session_id, + segment_id, + &LogEntry::UserInput { + ts, + segments: vec![protocol::Segment::text(text)], + }, + ) + .unwrap(); + } +}