merge: TUI pod list abstraction
This commit is contained in:
commit
601ce9f5ac
|
|
@ -5,6 +5,7 @@ mod command;
|
|||
mod input;
|
||||
mod markdown;
|
||||
mod picker;
|
||||
mod pod_list;
|
||||
mod scroll;
|
||||
mod spawn;
|
||||
mod task;
|
||||
|
|
|
|||
|
|
@ -4,15 +4,11 @@
|
|||
//! from the session store's name-keyed metadata. Picking a live row attaches to
|
||||
//! its socket; picking a stopped row restores via `pod --pod <name>`.
|
||||
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::fs;
|
||||
use std::io;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::path::PathBuf;
|
||||
use std::time::Duration;
|
||||
|
||||
use client::PodClient;
|
||||
use crossterm::event::{self, Event as TermEvent, KeyCode, KeyEventKind, KeyModifiers};
|
||||
use pod_registry::{LockFileGuard, default_registry_path};
|
||||
use ratatui::Terminal;
|
||||
use ratatui::backend::CrosstermBackend;
|
||||
use ratatui::layout::{Constraint, Layout};
|
||||
|
|
@ -20,8 +16,12 @@ use ratatui::style::{Color, Modifier, Style};
|
|||
use ratatui::text::{Line, Span};
|
||||
use ratatui::widgets::Paragraph;
|
||||
use ratatui::{Frame, TerminalOptions, Viewport};
|
||||
use session_store::{
|
||||
FsStore, LogEntry, LoggedContentPart, LoggedItem, PodMetadata, SegmentId, SessionId, Store,
|
||||
use session_store::FsStore;
|
||||
|
||||
use crate::pod_list::{
|
||||
PodList, PodListEntry, PodVisibilitySource, StoredMetadataState,
|
||||
live_socket_for_pod as pod_list_live_socket_for_pod, read_reachable_live_pod_infos,
|
||||
read_stored_pod_infos,
|
||||
};
|
||||
|
||||
const MAX_ROWS: usize = 10;
|
||||
|
|
@ -99,62 +99,45 @@ impl PodRowState {
|
|||
}
|
||||
}
|
||||
|
||||
/// One row in the Pod picker. The primary key is the Pod name; Session/Segment
|
||||
/// IDs are included only as debug context.
|
||||
#[derive(Debug, Clone)]
|
||||
struct Row {
|
||||
pod_name: String,
|
||||
state: PodRowState,
|
||||
updated_at: u64,
|
||||
active_session_id: Option<SessionId>,
|
||||
active_segment_id: Option<SegmentId>,
|
||||
preview: Option<String>,
|
||||
socket_path: Option<PathBuf>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct PodStateRecord {
|
||||
pod_name: String,
|
||||
state: Result<PodMetadata, String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct LivePodRecord {
|
||||
pub pod_name: String,
|
||||
pub socket_path: PathBuf,
|
||||
pub segment_id: Option<SegmentId>,
|
||||
}
|
||||
|
||||
pub async fn run() -> Result<PickerOutcome, PickerError> {
|
||||
let store_dir = default_store_dir()?;
|
||||
let store = FsStore::new(&store_dir)?;
|
||||
let pod_states = read_pod_state_records(&store_dir)?;
|
||||
let live_pods = read_reachable_live_pod_records().await.unwrap_or_default();
|
||||
let rows = build_rows(&store, pod_states, live_pods)?;
|
||||
if rows.is_empty() {
|
||||
let stored_pods = read_stored_pod_infos(&store_dir, &store)?;
|
||||
let live_pods = read_reachable_live_pod_infos(&store)
|
||||
.await
|
||||
.unwrap_or_default();
|
||||
let mut list = PodList::from_sources(
|
||||
PodVisibilitySource::ResumePicker,
|
||||
stored_pods,
|
||||
live_pods,
|
||||
None,
|
||||
MAX_ROWS,
|
||||
);
|
||||
if list.entries.is_empty() {
|
||||
return Err(PickerError::NoPods);
|
||||
}
|
||||
|
||||
let mut selected = 0usize;
|
||||
let mut terminal = make_inline_terminal()?;
|
||||
loop {
|
||||
terminal.draw(|f| draw(f, &rows, selected))?;
|
||||
terminal.draw(|f| draw(f, &list))?;
|
||||
match poll_event()? {
|
||||
None => continue,
|
||||
Some(Action::Up) => {
|
||||
selected = selected.saturating_sub(1);
|
||||
let selected = list.selected_index().saturating_sub(1);
|
||||
list.select_index(selected);
|
||||
}
|
||||
Some(Action::Down) => {
|
||||
if selected + 1 < rows.len() {
|
||||
selected += 1;
|
||||
let selected = list.selected_index();
|
||||
if selected + 1 < list.entries.len() {
|
||||
list.select_index(selected + 1);
|
||||
}
|
||||
}
|
||||
Some(Action::Submit) => {
|
||||
close_viewport(&mut terminal)?;
|
||||
let row = &rows[selected];
|
||||
let entry = list.selected_entry().expect("non-empty pod list");
|
||||
return Ok(PickerOutcome::Picked {
|
||||
pod_name: row.pod_name.clone(),
|
||||
socket_override: row.socket_path.clone(),
|
||||
pod_name: entry.name.clone(),
|
||||
socket_override: entry.attach_socket_path().map(PathBuf::from),
|
||||
});
|
||||
}
|
||||
Some(Action::Cancel) => {
|
||||
|
|
@ -189,288 +172,8 @@ fn default_store_dir() -> Result<PathBuf, PickerError> {
|
|||
})
|
||||
}
|
||||
|
||||
fn read_pod_state_records(store_dir: &Path) -> Result<Vec<PodStateRecord>, PickerError> {
|
||||
let pods_dir = store_dir.join("pods");
|
||||
let mut records = Vec::new();
|
||||
if !pods_dir.exists() {
|
||||
return Ok(records);
|
||||
}
|
||||
|
||||
for entry in fs::read_dir(pods_dir)? {
|
||||
let entry = entry?;
|
||||
if !entry.file_type()?.is_dir() {
|
||||
continue;
|
||||
}
|
||||
let pod_name = entry.file_name().to_string_lossy().to_string();
|
||||
let path = entry.path().join("metadata.json");
|
||||
let state = match fs::read_to_string(&path) {
|
||||
Ok(content) => serde_json::from_str::<PodMetadata>(&content).map_err(|e| e.to_string()),
|
||||
Err(e) => Err(e.to_string()),
|
||||
};
|
||||
records.push(PodStateRecord { pod_name, state });
|
||||
}
|
||||
Ok(records)
|
||||
}
|
||||
|
||||
fn read_live_pod_records() -> Result<Vec<LivePodRecord>, io::Error> {
|
||||
let path = default_registry_path()?;
|
||||
let guard = LockFileGuard::open(&path)?;
|
||||
Ok(guard
|
||||
.data()
|
||||
.allocations
|
||||
.iter()
|
||||
.map(|allocation| LivePodRecord {
|
||||
pod_name: allocation.pod_name.clone(),
|
||||
socket_path: allocation.socket.clone(),
|
||||
segment_id: allocation.segment_id,
|
||||
})
|
||||
.collect())
|
||||
}
|
||||
|
||||
async fn read_reachable_live_pod_records() -> Result<Vec<LivePodRecord>, io::Error> {
|
||||
let records = read_live_pod_records()?;
|
||||
let mut reachable = Vec::new();
|
||||
for record in records {
|
||||
if PodClient::connect(&record.socket_path).await.is_ok() {
|
||||
reachable.push(record);
|
||||
}
|
||||
}
|
||||
Ok(reachable)
|
||||
}
|
||||
|
||||
pub(crate) fn live_socket_for_pod(pod_name: &str) -> Option<PathBuf> {
|
||||
read_live_pod_records()
|
||||
.ok()?
|
||||
.into_iter()
|
||||
.find(|pod| pod.pod_name == pod_name)
|
||||
.map(|pod| pod.socket_path)
|
||||
}
|
||||
|
||||
fn build_rows(
|
||||
store: &FsStore,
|
||||
pod_states: Vec<PodStateRecord>,
|
||||
live_pods: Vec<LivePodRecord>,
|
||||
) -> Result<Vec<Row>, PickerError> {
|
||||
let mut rows_by_name: BTreeMap<String, Row> = BTreeMap::new();
|
||||
let mut live_by_name: HashMap<String, LivePodRecord> = HashMap::new();
|
||||
|
||||
for live in live_pods {
|
||||
let (active_session_id, active_segment_id, updated_at, preview) =
|
||||
summarize_live_pod(store, &live);
|
||||
rows_by_name.insert(
|
||||
live.pod_name.clone(),
|
||||
Row {
|
||||
pod_name: live.pod_name.clone(),
|
||||
state: PodRowState::Live,
|
||||
updated_at,
|
||||
active_session_id,
|
||||
active_segment_id,
|
||||
preview,
|
||||
socket_path: Some(live.socket_path.clone()),
|
||||
},
|
||||
);
|
||||
live_by_name.insert(live.pod_name.clone(), live);
|
||||
}
|
||||
|
||||
for record in pod_states {
|
||||
match record.state {
|
||||
Ok(metadata) => {
|
||||
let summary = summarize_metadata(store, &metadata);
|
||||
let state = if live_by_name.contains_key(&record.pod_name) {
|
||||
PodRowState::Live
|
||||
} else {
|
||||
PodRowState::Stopped
|
||||
};
|
||||
upsert_metadata_row(&mut rows_by_name, record.pod_name, metadata, summary, state);
|
||||
}
|
||||
Err(message) => {
|
||||
rows_by_name.entry(record.pod_name.clone()).or_insert(Row {
|
||||
pod_name: record.pod_name,
|
||||
state: PodRowState::Corrupt,
|
||||
updated_at: 0,
|
||||
active_session_id: None,
|
||||
active_segment_id: None,
|
||||
preview: Some(format!("metadata: {}", trim_one_line(&message, 48))),
|
||||
socket_path: None,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut rows: Vec<Row> = rows_by_name.into_values().collect();
|
||||
rows.sort_by(|a, b| {
|
||||
b.updated_at
|
||||
.cmp(&a.updated_at)
|
||||
.then_with(|| a.pod_name.cmp(&b.pod_name))
|
||||
});
|
||||
rows.truncate(MAX_ROWS);
|
||||
Ok(rows)
|
||||
}
|
||||
|
||||
fn upsert_metadata_row(
|
||||
rows_by_name: &mut BTreeMap<String, Row>,
|
||||
pod_name: String,
|
||||
metadata: PodMetadata,
|
||||
summary: SegmentSummary,
|
||||
state: PodRowState,
|
||||
) {
|
||||
let active = metadata.active;
|
||||
let active_session_id = active.as_ref().map(|a| a.session_id);
|
||||
let active_segment_id = active.as_ref().and_then(|a| a.segment_id);
|
||||
|
||||
match rows_by_name.get_mut(&pod_name) {
|
||||
Some(existing) => {
|
||||
existing.state = state;
|
||||
if summary.updated_at > existing.updated_at {
|
||||
existing.updated_at = summary.updated_at;
|
||||
}
|
||||
if existing.active_session_id.is_none() {
|
||||
existing.active_session_id = active_session_id;
|
||||
}
|
||||
if existing.active_segment_id.is_none() {
|
||||
existing.active_segment_id = active_segment_id;
|
||||
}
|
||||
if existing.preview.is_none() {
|
||||
existing.preview = summary.preview;
|
||||
}
|
||||
}
|
||||
None => {
|
||||
rows_by_name.insert(
|
||||
pod_name.clone(),
|
||||
Row {
|
||||
pod_name,
|
||||
state,
|
||||
updated_at: summary.updated_at,
|
||||
active_session_id,
|
||||
active_segment_id,
|
||||
preview: summary.preview,
|
||||
socket_path: None,
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct SegmentSummary {
|
||||
updated_at: u64,
|
||||
preview: Option<String>,
|
||||
}
|
||||
|
||||
fn summarize_live_pod(
|
||||
store: &FsStore,
|
||||
live: &LivePodRecord,
|
||||
) -> (Option<SessionId>, Option<SegmentId>, u64, Option<String>) {
|
||||
let Some(segment_id) = live.segment_id else {
|
||||
return (None, None, 0, None);
|
||||
};
|
||||
let session_id = store.lookup_session_of(segment_id).ok().flatten();
|
||||
let Some(session_id) = session_id else {
|
||||
return (None, Some(segment_id), 0, None);
|
||||
};
|
||||
let summary = summarize_segment(store, session_id, segment_id);
|
||||
(
|
||||
Some(session_id),
|
||||
Some(segment_id),
|
||||
summary.updated_at,
|
||||
summary.preview,
|
||||
)
|
||||
}
|
||||
|
||||
fn summarize_metadata(store: &FsStore, metadata: &PodMetadata) -> SegmentSummary {
|
||||
let Some(active) = metadata.active.as_ref() else {
|
||||
return SegmentSummary {
|
||||
updated_at: 0,
|
||||
preview: None,
|
||||
};
|
||||
};
|
||||
let Some(segment_id) = active.segment_id else {
|
||||
return SegmentSummary {
|
||||
updated_at: 0,
|
||||
preview: Some("[pending segment]".to_string()),
|
||||
};
|
||||
};
|
||||
summarize_segment(store, active.session_id, segment_id)
|
||||
}
|
||||
|
||||
fn summarize_segment(
|
||||
store: &FsStore,
|
||||
session_id: SessionId,
|
||||
segment_id: SegmentId,
|
||||
) -> SegmentSummary {
|
||||
match store.read_all(session_id, segment_id) {
|
||||
Ok(entries) => SegmentSummary {
|
||||
updated_at: last_entry_ts(&entries).unwrap_or(0),
|
||||
preview: last_message_preview(&entries).or_else(|| Some("[empty]".to_string())),
|
||||
},
|
||||
Err(_) => SegmentSummary {
|
||||
updated_at: 0,
|
||||
preview: Some("[corrupt segment]".to_string()),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn last_entry_ts(entries: &[LogEntry]) -> Option<u64> {
|
||||
entries.iter().map(log_entry_ts).max()
|
||||
}
|
||||
|
||||
fn log_entry_ts(entry: &LogEntry) -> u64 {
|
||||
match entry {
|
||||
LogEntry::SegmentStart { ts, .. }
|
||||
| LogEntry::Invoke { ts, .. }
|
||||
| LogEntry::UserInput { ts, .. }
|
||||
| LogEntry::AssistantItem { ts, .. }
|
||||
| LogEntry::ToolResult { ts, .. }
|
||||
| LogEntry::SystemItem { ts, .. }
|
||||
| LogEntry::TurnEnd { ts, .. }
|
||||
| LogEntry::RunCompleted { ts, .. }
|
||||
| LogEntry::RunErrored { ts, .. }
|
||||
| LogEntry::ConfigChanged { ts, .. }
|
||||
| LogEntry::LlmUsage { ts, .. }
|
||||
| LogEntry::Extension { ts, .. } => *ts,
|
||||
}
|
||||
}
|
||||
|
||||
/// Walk the log from the tail looking for the most recent user-message or
|
||||
/// assistant-message entry, then render its first text fragment in a single line.
|
||||
fn last_message_preview(entries: &[LogEntry]) -> Option<String> {
|
||||
for entry in entries.iter().rev() {
|
||||
match entry {
|
||||
LogEntry::UserInput { segments, .. } => {
|
||||
let text = protocol::Segment::flatten_to_text(segments);
|
||||
if !text.is_empty() {
|
||||
return Some(format!("user: {}", trim_one_line(&text, 60)));
|
||||
}
|
||||
}
|
||||
LogEntry::AssistantItem { item, .. } => {
|
||||
if let Some(text) = first_text_logged(item) {
|
||||
return Some(format!("assistant: {}", trim_one_line(&text, 60)));
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
fn first_text_logged(item: &LoggedItem) -> Option<String> {
|
||||
match item {
|
||||
LoggedItem::Message { content, .. } => content.iter().find_map(|p| match p {
|
||||
LoggedContentPart::Text { text } => Some(text.clone()),
|
||||
_ => None,
|
||||
}),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn trim_one_line(s: &str, max_chars: usize) -> String {
|
||||
let collapsed: String = s.chars().map(|c| if c == '\n' { ' ' } else { c }).collect();
|
||||
if collapsed.chars().count() <= max_chars {
|
||||
collapsed
|
||||
} else {
|
||||
let truncated: String = collapsed.chars().take(max_chars - 1).collect();
|
||||
format!("{truncated}…")
|
||||
}
|
||||
pod_list_live_socket_for_pod(pod_name)
|
||||
}
|
||||
|
||||
fn make_inline_terminal() -> io::Result<Terminal<CrosstermBackend<io::Stdout>>> {
|
||||
|
|
@ -512,11 +215,11 @@ fn poll_event() -> io::Result<Option<Action>> {
|
|||
}
|
||||
}
|
||||
|
||||
fn draw(f: &mut Frame<'_>, rows: &[Row], selected: usize) {
|
||||
fn draw(f: &mut Frame<'_>, list: &PodList) {
|
||||
let area = f.area();
|
||||
let mut constraints: Vec<Constraint> = Vec::with_capacity(rows.len() + 3);
|
||||
let mut constraints: Vec<Constraint> = Vec::with_capacity(list.entries.len() + 3);
|
||||
constraints.push(Constraint::Length(1)); // title
|
||||
for _ in rows {
|
||||
for _ in &list.entries {
|
||||
constraints.push(Constraint::Length(1));
|
||||
}
|
||||
constraints.push(Constraint::Length(1)); // hint
|
||||
|
|
@ -531,8 +234,12 @@ fn draw(f: &mut Frame<'_>, rows: &[Row], selected: usize) {
|
|||
layout[0],
|
||||
);
|
||||
|
||||
for (i, row) in rows.iter().enumerate() {
|
||||
f.render_widget(Paragraph::new(row_line(row, i == selected)), layout[i + 1]);
|
||||
let selected = list.selected_index();
|
||||
for (i, entry) in list.entries.iter().enumerate() {
|
||||
f.render_widget(
|
||||
Paragraph::new(row_line(entry, i == selected)),
|
||||
layout[i + 1],
|
||||
);
|
||||
}
|
||||
|
||||
f.render_widget(
|
||||
|
|
@ -545,7 +252,7 @@ fn draw(f: &mut Frame<'_>, rows: &[Row], selected: usize) {
|
|||
Span::styled("[esc]", Style::default().fg(Color::Yellow)),
|
||||
Span::raw(" cancel"),
|
||||
])),
|
||||
layout[rows.len() + 1],
|
||||
layout[list.entries.len() + 1],
|
||||
);
|
||||
}
|
||||
|
||||
|
|
@ -553,7 +260,7 @@ fn picker_title() -> &'static str {
|
|||
"resume pod pick a pod"
|
||||
}
|
||||
|
||||
fn row_line(row: &Row, selected: bool) -> Line<'_> {
|
||||
fn row_line(entry: &PodListEntry, selected: bool) -> Line<'_> {
|
||||
let marker = if selected { "▶ " } else { " " };
|
||||
let name_style = if selected {
|
||||
Style::default()
|
||||
|
|
@ -567,27 +274,44 @@ fn row_line(row: &Row, selected: bool) -> Line<'_> {
|
|||
} else {
|
||||
Style::default().fg(Color::DarkGray)
|
||||
};
|
||||
let state = row_state(entry);
|
||||
let _visibility = entry.visibility;
|
||||
let _source_kinds = &entry.source_kinds;
|
||||
|
||||
let mut spans = vec![
|
||||
Span::raw(marker),
|
||||
Span::styled(row.pod_name.as_str(), name_style),
|
||||
Span::styled(entry.name.as_str(), name_style),
|
||||
Span::raw(" "),
|
||||
Span::styled(format!("[{}]", row.state.label()), row.state.style()),
|
||||
Span::styled(format!("[{}]", state.label()), state.style()),
|
||||
Span::raw(" "),
|
||||
Span::styled(
|
||||
format_updated_at(row.updated_at),
|
||||
format_updated_at(entry.summary.updated_at),
|
||||
Style::default().fg(Color::DarkGray),
|
||||
),
|
||||
Span::raw(" "),
|
||||
Span::styled(debug_ids(row), Style::default().fg(Color::DarkGray)),
|
||||
Span::styled(debug_ids(entry), Style::default().fg(Color::DarkGray)),
|
||||
];
|
||||
if let Some(preview) = row.preview.as_ref() {
|
||||
if let Some(preview) = entry.summary.preview.as_ref() {
|
||||
spans.push(Span::raw(" "));
|
||||
spans.push(Span::styled(preview.as_str(), preview_style));
|
||||
}
|
||||
Line::from(spans)
|
||||
}
|
||||
|
||||
fn row_state(entry: &PodListEntry) -> PodRowState {
|
||||
if entry.live.as_ref().is_some_and(|live| live.reachable) {
|
||||
return PodRowState::Live;
|
||||
}
|
||||
if entry
|
||||
.stored
|
||||
.as_ref()
|
||||
.is_some_and(|stored| matches!(stored.metadata_state, StoredMetadataState::Corrupt(_)))
|
||||
{
|
||||
return PodRowState::Corrupt;
|
||||
}
|
||||
PodRowState::Stopped
|
||||
}
|
||||
|
||||
fn format_updated_at(updated_at: u64) -> String {
|
||||
if updated_at == 0 {
|
||||
"updated: —".to_string()
|
||||
|
|
@ -596,12 +320,14 @@ fn format_updated_at(updated_at: u64) -> String {
|
|||
}
|
||||
}
|
||||
|
||||
fn debug_ids(row: &Row) -> String {
|
||||
let session = row
|
||||
fn debug_ids(entry: &PodListEntry) -> String {
|
||||
let session = entry
|
||||
.summary
|
||||
.active_session_id
|
||||
.map(short_id)
|
||||
.unwrap_or_else(|| "--------".to_string());
|
||||
let segment = row
|
||||
let segment = entry
|
||||
.summary
|
||||
.active_segment_id
|
||||
.map(short_id)
|
||||
.unwrap_or_else(|| "--------".to_string());
|
||||
|
|
@ -615,198 +341,9 @@ fn short_id<T: ToString>(id: T) -> String {
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use llm_worker::llm_client::types::RequestConfig;
|
||||
use session_store::{PodActiveSegmentRef, PodMetadataStore, new_segment_id, new_session_id};
|
||||
use tempfile::tempdir;
|
||||
|
||||
#[test]
|
||||
fn pod_rows_are_sorted_by_active_segment_timestamp() {
|
||||
let dir = tempdir().unwrap();
|
||||
let store = FsStore::new(dir.path()).unwrap();
|
||||
let earlier_session = new_session_id();
|
||||
let later_session = new_session_id();
|
||||
let earlier_segment = new_segment_id();
|
||||
let later_segment = new_segment_id();
|
||||
|
||||
append_start(&store, earlier_session, earlier_segment, 10);
|
||||
append_user(
|
||||
&store,
|
||||
earlier_session,
|
||||
earlier_segment,
|
||||
100,
|
||||
"old pod update",
|
||||
);
|
||||
append_start(&store, later_session, later_segment, 20);
|
||||
append_user(&store, later_session, later_segment, 200, "new pod update");
|
||||
|
||||
let records = vec![
|
||||
metadata_record("older", earlier_session, earlier_segment),
|
||||
metadata_record("newer", later_session, later_segment),
|
||||
];
|
||||
let rows = build_rows(&store, records, vec![]).unwrap();
|
||||
|
||||
assert_eq!(rows[0].pod_name, "newer");
|
||||
assert_eq!(rows[0].state, PodRowState::Stopped);
|
||||
assert_eq!(rows[0].updated_at, 200);
|
||||
assert_eq!(rows[0].preview.as_deref(), Some("user: new pod update"));
|
||||
assert_eq!(rows[1].pod_name, "older");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn pod_rows_include_live_and_stopped_pods() {
|
||||
let dir = tempdir().unwrap();
|
||||
let store = FsStore::new(dir.path()).unwrap();
|
||||
let stopped_session = new_session_id();
|
||||
let stopped_segment = new_segment_id();
|
||||
let live_session = new_session_id();
|
||||
let live_segment = new_segment_id();
|
||||
|
||||
append_start(&store, stopped_session, stopped_segment, 10);
|
||||
append_user(
|
||||
&store,
|
||||
stopped_session,
|
||||
stopped_segment,
|
||||
50,
|
||||
"stopped preview",
|
||||
);
|
||||
append_start(&store, live_session, live_segment, 20);
|
||||
append_user(&store, live_session, live_segment, 70, "live preview");
|
||||
|
||||
let rows = build_rows(
|
||||
&store,
|
||||
vec![metadata_record("stopped", stopped_session, stopped_segment)],
|
||||
vec![LivePodRecord {
|
||||
pod_name: "live".to_string(),
|
||||
socket_path: PathBuf::from("/tmp/live.sock"),
|
||||
segment_id: Some(live_segment),
|
||||
}],
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let live = rows.iter().find(|row| row.pod_name == "live").unwrap();
|
||||
assert_eq!(live.state, PodRowState::Live);
|
||||
assert_eq!(live.active_session_id, Some(live_session));
|
||||
assert_eq!(
|
||||
live.socket_path.as_deref(),
|
||||
Some(Path::new("/tmp/live.sock"))
|
||||
);
|
||||
|
||||
let stopped = rows.iter().find(|row| row.pod_name == "stopped").unwrap();
|
||||
assert_eq!(stopped.state, PodRowState::Stopped);
|
||||
assert_eq!(stopped.socket_path, None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn corrupt_pod_state_is_rendered_as_corrupt_row() {
|
||||
let dir = tempdir().unwrap();
|
||||
let store = FsStore::new(dir.path()).unwrap();
|
||||
let rows = build_rows(
|
||||
&store,
|
||||
vec![PodStateRecord {
|
||||
pod_name: "broken".to_string(),
|
||||
state: Err("expected value".to_string()),
|
||||
}],
|
||||
vec![],
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(rows.len(), 1);
|
||||
assert_eq!(rows[0].pod_name, "broken");
|
||||
assert_eq!(rows[0].state, PodRowState::Corrupt);
|
||||
assert!(
|
||||
rows[0]
|
||||
.preview
|
||||
.as_deref()
|
||||
.unwrap()
|
||||
.contains("expected value")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn picker_title_names_pods_not_sessions() {
|
||||
assert_eq!(picker_title(), "resume pod pick a pod");
|
||||
}
|
||||
|
||||
fn metadata_record(
|
||||
pod_name: &str,
|
||||
session_id: SessionId,
|
||||
segment_id: SegmentId,
|
||||
) -> PodStateRecord {
|
||||
PodStateRecord {
|
||||
pod_name: pod_name.to_string(),
|
||||
state: Ok(PodMetadata::new(
|
||||
pod_name,
|
||||
Some(PodActiveSegmentRef::active_segment(session_id, segment_id)),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
fn append_start(store: &FsStore, session_id: SessionId, segment_id: SegmentId, ts: u64) {
|
||||
store
|
||||
.append(
|
||||
session_id,
|
||||
segment_id,
|
||||
&LogEntry::SegmentStart {
|
||||
ts,
|
||||
session_id,
|
||||
system_prompt: None,
|
||||
config: RequestConfig::default(),
|
||||
history: vec![],
|
||||
forked_from: None,
|
||||
compacted_from: None,
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
fn append_user(
|
||||
store: &FsStore,
|
||||
session_id: SessionId,
|
||||
segment_id: SegmentId,
|
||||
ts: u64,
|
||||
text: &str,
|
||||
) {
|
||||
store
|
||||
.append(
|
||||
session_id,
|
||||
segment_id,
|
||||
&LogEntry::UserInput {
|
||||
ts,
|
||||
segments: vec![protocol::Segment::text(text)],
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn read_pod_state_records_reports_corrupt_metadata() {
|
||||
let dir = tempdir().unwrap();
|
||||
let pod_dir = dir.path().join("pods").join("broken");
|
||||
fs::create_dir_all(&pod_dir).unwrap();
|
||||
fs::write(pod_dir.join("metadata.json"), "{not-json").unwrap();
|
||||
|
||||
let records = read_pod_state_records(dir.path()).unwrap();
|
||||
assert_eq!(records.len(), 1);
|
||||
assert_eq!(records[0].pod_name, "broken");
|
||||
assert!(records[0].state.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn read_pod_state_records_reads_metadata() {
|
||||
let dir = tempdir().unwrap();
|
||||
let store = FsStore::new(dir.path()).unwrap();
|
||||
let session_id = new_session_id();
|
||||
let segment_id = new_segment_id();
|
||||
store
|
||||
.write(&PodMetadata::new(
|
||||
"agent",
|
||||
Some(PodActiveSegmentRef::active_segment(session_id, segment_id)),
|
||||
))
|
||||
.unwrap();
|
||||
|
||||
let records = read_pod_state_records(dir.path()).unwrap();
|
||||
assert_eq!(records.len(), 1);
|
||||
assert_eq!(records[0].pod_name, "agent");
|
||||
assert!(records[0].state.is_ok());
|
||||
}
|
||||
}
|
||||
|
|
|
|||
905
crates/tui/src/pod_list.rs
Normal file
905
crates/tui/src/pod_list.rs
Normal file
|
|
@ -0,0 +1,905 @@
|
|||
use std::collections::BTreeMap;
|
||||
use std::fs;
|
||||
use std::io;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::time::Duration;
|
||||
|
||||
use client::PodClient;
|
||||
use pod_registry::{LockFileGuard, default_registry_path};
|
||||
use protocol::{Event, PodStatus};
|
||||
use session_store::{
|
||||
FsStore, LogEntry, LoggedContentPart, LoggedItem, PodMetadata, SegmentId, SessionId, Store,
|
||||
};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct PodList {
|
||||
pub entries: Vec<PodListEntry>,
|
||||
pub selected_name: Option<String>,
|
||||
}
|
||||
|
||||
impl PodList {
|
||||
pub(crate) fn from_sources(
|
||||
source: PodVisibilitySource,
|
||||
stored: Vec<StoredPodInfo>,
|
||||
live: Vec<LivePodInfo>,
|
||||
selected_name: Option<String>,
|
||||
max_entries: usize,
|
||||
) -> Self {
|
||||
let mut entries_by_name: BTreeMap<String, PodListEntry> = BTreeMap::new();
|
||||
|
||||
for live_info in live {
|
||||
let name = live_info.pod_name.clone();
|
||||
entries_by_name
|
||||
.entry(name.clone())
|
||||
.or_insert_with(|| PodListEntry::new(name, source))
|
||||
.merge_live(live_info);
|
||||
}
|
||||
|
||||
for stored_info in stored {
|
||||
let name = stored_info.pod_name.clone();
|
||||
entries_by_name
|
||||
.entry(name.clone())
|
||||
.or_insert_with(|| PodListEntry::new(name, source))
|
||||
.merge_stored(stored_info);
|
||||
}
|
||||
|
||||
let mut entries: Vec<PodListEntry> = entries_by_name.into_values().collect();
|
||||
for entry in &mut entries {
|
||||
entry.finalize();
|
||||
}
|
||||
entries.sort_by(|a, b| {
|
||||
b.summary
|
||||
.updated_at
|
||||
.cmp(&a.summary.updated_at)
|
||||
.then_with(|| a.name.cmp(&b.name))
|
||||
});
|
||||
entries.truncate(max_entries);
|
||||
|
||||
let selected_name = selected_name
|
||||
.filter(|name| entries.iter().any(|entry| entry.name == *name))
|
||||
.or_else(|| entries.first().map(|entry| entry.name.clone()));
|
||||
|
||||
Self {
|
||||
entries,
|
||||
selected_name,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn selected_index(&self) -> usize {
|
||||
self.selected_name
|
||||
.as_ref()
|
||||
.and_then(|name| self.entries.iter().position(|entry| entry.name == *name))
|
||||
.unwrap_or(0)
|
||||
}
|
||||
|
||||
pub(crate) fn select_index(&mut self, index: usize) {
|
||||
self.selected_name = self.entries.get(index).map(|entry| entry.name.clone());
|
||||
}
|
||||
|
||||
pub(crate) fn selected_entry(&self) -> Option<&PodListEntry> {
|
||||
let index = self.selected_index();
|
||||
self.entries.get(index)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub(crate) enum PodVisibilitySource {
|
||||
ResumePicker,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub(crate) enum PodListSourceKind {
|
||||
RuntimeRegistry,
|
||||
StoredMetadata,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct PodListEntry {
|
||||
pub name: String,
|
||||
pub visibility: PodVisibilitySource,
|
||||
pub source_kinds: Vec<PodListSourceKind>,
|
||||
pub live: Option<LivePodInfo>,
|
||||
pub stored: Option<StoredPodInfo>,
|
||||
pub summary: PodEntrySummary,
|
||||
pub actions: PodEntryActions,
|
||||
pub diagnostics: Vec<PodEntryDiagnostic>,
|
||||
}
|
||||
|
||||
impl PodListEntry {
|
||||
fn new(name: String, visibility: PodVisibilitySource) -> Self {
|
||||
Self {
|
||||
name,
|
||||
visibility,
|
||||
source_kinds: Vec::new(),
|
||||
live: None,
|
||||
stored: None,
|
||||
summary: PodEntrySummary::default(),
|
||||
actions: PodEntryActions::default(),
|
||||
diagnostics: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
fn merge_live(&mut self, live: LivePodInfo) {
|
||||
if !self
|
||||
.source_kinds
|
||||
.contains(&PodListSourceKind::RuntimeRegistry)
|
||||
{
|
||||
self.source_kinds.push(PodListSourceKind::RuntimeRegistry);
|
||||
}
|
||||
if live.summary.updated_at > self.summary.updated_at {
|
||||
self.summary.updated_at = live.summary.updated_at;
|
||||
}
|
||||
if self.summary.active_session_id.is_none() {
|
||||
self.summary.active_session_id = live.summary.active_session_id;
|
||||
}
|
||||
if self.summary.active_segment_id.is_none() {
|
||||
self.summary.active_segment_id = live.summary.active_segment_id.or(live.segment_id);
|
||||
}
|
||||
if self.summary.preview.is_none() {
|
||||
self.summary.preview = live.summary.preview.clone();
|
||||
}
|
||||
self.live = Some(live);
|
||||
}
|
||||
|
||||
fn merge_stored(&mut self, stored: StoredPodInfo) {
|
||||
if !self
|
||||
.source_kinds
|
||||
.contains(&PodListSourceKind::StoredMetadata)
|
||||
{
|
||||
self.source_kinds.push(PodListSourceKind::StoredMetadata);
|
||||
}
|
||||
if stored.updated_at > self.summary.updated_at {
|
||||
self.summary.updated_at = stored.updated_at;
|
||||
}
|
||||
if self.summary.active_session_id.is_none() {
|
||||
self.summary.active_session_id = stored.active_session_id;
|
||||
}
|
||||
if self.summary.active_segment_id.is_none() {
|
||||
self.summary.active_segment_id = stored.active_segment_id;
|
||||
}
|
||||
if self.summary.preview.is_none() {
|
||||
self.summary.preview = stored.preview.clone();
|
||||
}
|
||||
self.stored = Some(stored);
|
||||
}
|
||||
|
||||
fn finalize(&mut self) {
|
||||
self.diagnostics = build_diagnostics(self);
|
||||
self.actions = build_actions(self);
|
||||
}
|
||||
|
||||
pub(crate) fn attach_socket_path(&self) -> Option<&Path> {
|
||||
self.live
|
||||
.as_ref()
|
||||
.filter(|live| live.reachable)
|
||||
.map(|live| live.socket_path.as_path())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct LivePodInfo {
|
||||
pub pod_name: String,
|
||||
pub socket_path: PathBuf,
|
||||
pub status: Option<PodStatus>,
|
||||
pub reachable: bool,
|
||||
pub segment_id: Option<SegmentId>,
|
||||
pub summary: PodEntrySummary,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct StoredPodInfo {
|
||||
pub pod_name: String,
|
||||
pub metadata_state: StoredMetadataState,
|
||||
pub active_session_id: Option<SessionId>,
|
||||
pub active_segment_id: Option<SegmentId>,
|
||||
pub updated_at: u64,
|
||||
pub preview: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub(crate) enum StoredMetadataState {
|
||||
Present,
|
||||
Corrupt(String),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub(crate) struct PodEntrySummary {
|
||||
pub active_session_id: Option<SessionId>,
|
||||
pub active_segment_id: Option<SegmentId>,
|
||||
pub updated_at: u64,
|
||||
pub preview: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default, PartialEq, Eq)]
|
||||
pub(crate) struct PodEntryActions {
|
||||
pub can_open: bool,
|
||||
pub can_restore: bool,
|
||||
pub can_send_now: bool,
|
||||
pub can_queue_send: bool,
|
||||
pub disabled_reason: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub(crate) struct PodEntryDiagnostic {
|
||||
pub kind: PodEntryDiagnosticKind,
|
||||
pub message: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub(crate) enum PodEntryDiagnosticKind {
|
||||
StoredMetadataCorrupt,
|
||||
LiveUnreachable,
|
||||
MissingStoredMetadata,
|
||||
MissingLiveStatus,
|
||||
}
|
||||
|
||||
pub(crate) fn read_stored_pod_infos(
|
||||
store_dir: &Path,
|
||||
store: &FsStore,
|
||||
) -> Result<Vec<StoredPodInfo>, io::Error> {
|
||||
let pods_dir = store_dir.join("pods");
|
||||
let mut records = Vec::new();
|
||||
if !pods_dir.exists() {
|
||||
return Ok(records);
|
||||
}
|
||||
|
||||
for entry in fs::read_dir(pods_dir)? {
|
||||
let entry = entry?;
|
||||
if !entry.file_type()?.is_dir() {
|
||||
continue;
|
||||
}
|
||||
let pod_name = entry.file_name().to_string_lossy().to_string();
|
||||
let path = entry.path().join("metadata.json");
|
||||
let info = match fs::read_to_string(&path) {
|
||||
Ok(content) => match serde_json::from_str::<PodMetadata>(&content) {
|
||||
Ok(metadata) => stored_info_from_metadata(store, pod_name, metadata),
|
||||
Err(e) => corrupt_stored_info(pod_name, e.to_string()),
|
||||
},
|
||||
Err(e) => corrupt_stored_info(pod_name, e.to_string()),
|
||||
};
|
||||
records.push(info);
|
||||
}
|
||||
Ok(records)
|
||||
}
|
||||
|
||||
pub(crate) fn read_live_pod_infos() -> Result<Vec<LivePodInfo>, io::Error> {
|
||||
let path = default_registry_path()?;
|
||||
let guard = LockFileGuard::open(&path)?;
|
||||
Ok(guard
|
||||
.data()
|
||||
.allocations
|
||||
.iter()
|
||||
.map(|allocation| LivePodInfo {
|
||||
pod_name: allocation.pod_name.clone(),
|
||||
socket_path: allocation.socket.clone(),
|
||||
status: None,
|
||||
reachable: false,
|
||||
segment_id: allocation.segment_id,
|
||||
summary: PodEntrySummary::default(),
|
||||
})
|
||||
.collect())
|
||||
}
|
||||
|
||||
pub(crate) async fn read_reachable_live_pod_infos(
|
||||
store: &FsStore,
|
||||
) -> Result<Vec<LivePodInfo>, io::Error> {
|
||||
let records = read_live_pod_infos()?;
|
||||
let mut reachable = Vec::new();
|
||||
for mut record in records {
|
||||
let Ok(status) = probe_live_status(&record.socket_path).await else {
|
||||
continue;
|
||||
};
|
||||
record.reachable = true;
|
||||
record.status = status;
|
||||
record.summary = summarize_live_pod(store, &record);
|
||||
reachable.push(record);
|
||||
}
|
||||
Ok(reachable)
|
||||
}
|
||||
|
||||
pub(crate) fn live_socket_for_pod(pod_name: &str) -> Option<PathBuf> {
|
||||
read_live_pod_infos()
|
||||
.ok()?
|
||||
.into_iter()
|
||||
.find(|pod| pod.pod_name == pod_name)
|
||||
.map(|pod| pod.socket_path)
|
||||
}
|
||||
|
||||
fn stored_info_from_metadata(
|
||||
store: &FsStore,
|
||||
pod_name: String,
|
||||
metadata: PodMetadata,
|
||||
) -> StoredPodInfo {
|
||||
let active = metadata.active;
|
||||
let active_session_id = active.as_ref().map(|a| a.session_id);
|
||||
let active_segment_id = active.as_ref().and_then(|a| a.segment_id);
|
||||
let summary = summarize_metadata(store, active.as_ref());
|
||||
|
||||
StoredPodInfo {
|
||||
pod_name,
|
||||
metadata_state: StoredMetadataState::Present,
|
||||
active_session_id,
|
||||
active_segment_id,
|
||||
updated_at: summary.updated_at,
|
||||
preview: summary.preview,
|
||||
}
|
||||
}
|
||||
|
||||
fn corrupt_stored_info(pod_name: String, message: String) -> StoredPodInfo {
|
||||
StoredPodInfo {
|
||||
pod_name,
|
||||
metadata_state: StoredMetadataState::Corrupt(message.clone()),
|
||||
active_session_id: None,
|
||||
active_segment_id: None,
|
||||
updated_at: 0,
|
||||
preview: Some(format!("metadata: {}", trim_one_line(&message, 48))),
|
||||
}
|
||||
}
|
||||
|
||||
const LIVE_STATUS_PROBE_TIMEOUT: Duration = Duration::from_millis(25);
|
||||
|
||||
async fn probe_live_status(socket_path: &Path) -> Result<Option<PodStatus>, io::Error> {
|
||||
let mut client = PodClient::connect(socket_path).await?;
|
||||
let deadline = tokio::time::Instant::now() + LIVE_STATUS_PROBE_TIMEOUT;
|
||||
|
||||
loop {
|
||||
if tokio::time::Instant::now() >= deadline {
|
||||
return Ok(None);
|
||||
}
|
||||
match tokio::time::timeout_at(deadline, client.next_event()).await {
|
||||
Ok(Some(event)) => {
|
||||
if let Some(status) = status_from_event(&event) {
|
||||
return Ok(Some(status));
|
||||
}
|
||||
}
|
||||
Ok(None) | Err(_) => return Ok(None),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn status_from_event(event: &Event) -> Option<PodStatus> {
|
||||
match event {
|
||||
Event::Snapshot { status, .. } | Event::Status { status } => Some(*status),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct SegmentSummary {
|
||||
updated_at: u64,
|
||||
preview: Option<String>,
|
||||
}
|
||||
|
||||
fn summarize_live_pod(store: &FsStore, live: &LivePodInfo) -> PodEntrySummary {
|
||||
let Some(segment_id) = live.segment_id else {
|
||||
return PodEntrySummary::default();
|
||||
};
|
||||
let session_id = store.lookup_session_of(segment_id).ok().flatten();
|
||||
let Some(session_id) = session_id else {
|
||||
return PodEntrySummary {
|
||||
active_session_id: None,
|
||||
active_segment_id: Some(segment_id),
|
||||
updated_at: 0,
|
||||
preview: None,
|
||||
};
|
||||
};
|
||||
let summary = summarize_segment(store, session_id, segment_id);
|
||||
PodEntrySummary {
|
||||
active_session_id: Some(session_id),
|
||||
active_segment_id: Some(segment_id),
|
||||
updated_at: summary.updated_at,
|
||||
preview: summary.preview,
|
||||
}
|
||||
}
|
||||
|
||||
fn summarize_metadata(
|
||||
store: &FsStore,
|
||||
active: Option<&session_store::PodActiveSegmentRef>,
|
||||
) -> SegmentSummary {
|
||||
let Some(active) = active else {
|
||||
return SegmentSummary {
|
||||
updated_at: 0,
|
||||
preview: None,
|
||||
};
|
||||
};
|
||||
let Some(segment_id) = active.segment_id else {
|
||||
return SegmentSummary {
|
||||
updated_at: 0,
|
||||
preview: Some("[pending segment]".to_string()),
|
||||
};
|
||||
};
|
||||
summarize_segment(store, active.session_id, segment_id)
|
||||
}
|
||||
|
||||
fn summarize_segment(
|
||||
store: &FsStore,
|
||||
session_id: SessionId,
|
||||
segment_id: SegmentId,
|
||||
) -> SegmentSummary {
|
||||
match store.read_all(session_id, segment_id) {
|
||||
Ok(entries) => SegmentSummary {
|
||||
updated_at: last_entry_ts(&entries).unwrap_or(0),
|
||||
preview: last_message_preview(&entries).or_else(|| Some("[empty]".to_string())),
|
||||
},
|
||||
Err(_) => SegmentSummary {
|
||||
updated_at: 0,
|
||||
preview: Some("[corrupt segment]".to_string()),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn last_entry_ts(entries: &[LogEntry]) -> Option<u64> {
|
||||
entries.iter().map(log_entry_ts).max()
|
||||
}
|
||||
|
||||
fn log_entry_ts(entry: &LogEntry) -> u64 {
|
||||
match entry {
|
||||
LogEntry::SegmentStart { ts, .. }
|
||||
| LogEntry::Invoke { ts, .. }
|
||||
| LogEntry::UserInput { ts, .. }
|
||||
| LogEntry::AssistantItem { ts, .. }
|
||||
| LogEntry::ToolResult { ts, .. }
|
||||
| LogEntry::SystemItem { ts, .. }
|
||||
| LogEntry::TurnEnd { ts, .. }
|
||||
| LogEntry::RunCompleted { ts, .. }
|
||||
| LogEntry::RunErrored { ts, .. }
|
||||
| LogEntry::ConfigChanged { ts, .. }
|
||||
| LogEntry::LlmUsage { ts, .. }
|
||||
| LogEntry::Extension { ts, .. } => *ts,
|
||||
}
|
||||
}
|
||||
|
||||
fn last_message_preview(entries: &[LogEntry]) -> Option<String> {
|
||||
for entry in entries.iter().rev() {
|
||||
match entry {
|
||||
LogEntry::UserInput { segments, .. } => {
|
||||
let text = protocol::Segment::flatten_to_text(segments);
|
||||
if !text.is_empty() {
|
||||
return Some(format!("user: {}", trim_one_line(&text, 60)));
|
||||
}
|
||||
}
|
||||
LogEntry::AssistantItem { item, .. } => {
|
||||
if let Some(text) = first_text_logged(item) {
|
||||
return Some(format!("assistant: {}", trim_one_line(&text, 60)));
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
fn first_text_logged(item: &LoggedItem) -> Option<String> {
|
||||
match item {
|
||||
LoggedItem::Message { content, .. } => content.iter().find_map(|p| match p {
|
||||
LoggedContentPart::Text { text } => Some(text.clone()),
|
||||
_ => None,
|
||||
}),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn build_diagnostics(entry: &PodListEntry) -> Vec<PodEntryDiagnostic> {
|
||||
let mut diagnostics = Vec::new();
|
||||
|
||||
if let Some(stored) = entry.stored.as_ref() {
|
||||
if let StoredMetadataState::Corrupt(message) = &stored.metadata_state {
|
||||
diagnostics.push(PodEntryDiagnostic {
|
||||
kind: PodEntryDiagnosticKind::StoredMetadataCorrupt,
|
||||
message: format!("metadata: {}", trim_one_line(message, 80)),
|
||||
});
|
||||
}
|
||||
} else if entry.live.is_some() {
|
||||
diagnostics.push(PodEntryDiagnostic {
|
||||
kind: PodEntryDiagnosticKind::MissingStoredMetadata,
|
||||
message: "no stored pod metadata".to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
if let Some(live) = entry.live.as_ref() {
|
||||
if !live.reachable {
|
||||
diagnostics.push(PodEntryDiagnostic {
|
||||
kind: PodEntryDiagnosticKind::LiveUnreachable,
|
||||
message: format!("socket unreachable: {}", live.socket_path.display()),
|
||||
});
|
||||
} else if live.status.is_none() {
|
||||
diagnostics.push(PodEntryDiagnostic {
|
||||
kind: PodEntryDiagnosticKind::MissingLiveStatus,
|
||||
message: "live pod status was not reported".to_string(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
diagnostics
|
||||
}
|
||||
|
||||
fn build_actions(entry: &PodListEntry) -> PodEntryActions {
|
||||
let live_reachable = entry.live.as_ref().is_some_and(|live| live.reachable);
|
||||
let stored_restorable = entry
|
||||
.stored
|
||||
.as_ref()
|
||||
.is_some_and(|stored| matches!(stored.metadata_state, StoredMetadataState::Present));
|
||||
let live_status = entry.live.as_ref().and_then(|live| live.status);
|
||||
|
||||
let can_restore = stored_restorable && !live_reachable;
|
||||
let can_open = live_reachable || stored_restorable;
|
||||
let can_send_now = live_reachable && live_status == Some(PodStatus::Idle);
|
||||
let can_queue_send = live_reachable && live_status == Some(PodStatus::Running);
|
||||
let disabled_reason = if can_open {
|
||||
None
|
||||
} else if entry.live.is_some() {
|
||||
Some("live pod is unreachable".to_string())
|
||||
} else if entry.stored.is_some() {
|
||||
Some("stored pod metadata is corrupt".to_string())
|
||||
} else {
|
||||
Some("no live or stored pod state".to_string())
|
||||
};
|
||||
|
||||
PodEntryActions {
|
||||
can_open,
|
||||
can_restore,
|
||||
can_send_now,
|
||||
can_queue_send,
|
||||
disabled_reason,
|
||||
}
|
||||
}
|
||||
|
||||
fn trim_one_line(s: &str, max_chars: usize) -> String {
|
||||
let collapsed: String = s.chars().map(|c| if c == '\n' { ' ' } else { c }).collect();
|
||||
if collapsed.chars().count() <= max_chars {
|
||||
collapsed
|
||||
} else {
|
||||
let truncated: String = collapsed.chars().take(max_chars - 1).collect();
|
||||
format!("{truncated}…")
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use llm_worker::llm_client::types::RequestConfig;
|
||||
use session_store::{PodActiveSegmentRef, PodMetadataStore, new_segment_id, new_session_id};
|
||||
use tempfile::tempdir;
|
||||
|
||||
const SOURCE: PodVisibilitySource = PodVisibilitySource::ResumePicker;
|
||||
|
||||
#[test]
|
||||
fn pod_list_rows_are_sorted_by_active_segment_timestamp() {
|
||||
let dir = tempdir().unwrap();
|
||||
let store = FsStore::new(dir.path()).unwrap();
|
||||
let earlier_session = new_session_id();
|
||||
let later_session = new_session_id();
|
||||
let earlier_segment = new_segment_id();
|
||||
let later_segment = new_segment_id();
|
||||
|
||||
append_start(&store, earlier_session, earlier_segment, 10);
|
||||
append_user(
|
||||
&store,
|
||||
earlier_session,
|
||||
earlier_segment,
|
||||
100,
|
||||
"old pod update",
|
||||
);
|
||||
append_start(&store, later_session, later_segment, 20);
|
||||
append_user(&store, later_session, later_segment, 200, "new pod update");
|
||||
|
||||
let entries = PodList::from_sources(
|
||||
SOURCE,
|
||||
vec![
|
||||
metadata_info(&store, "older", earlier_session, earlier_segment),
|
||||
metadata_info(&store, "newer", later_session, later_segment),
|
||||
],
|
||||
vec![],
|
||||
None,
|
||||
10,
|
||||
)
|
||||
.entries;
|
||||
|
||||
assert_eq!(entries[0].name, "newer");
|
||||
assert_eq!(entries[0].summary.updated_at, 200);
|
||||
assert_eq!(
|
||||
entries[0].summary.preview.as_deref(),
|
||||
Some("user: new pod update")
|
||||
);
|
||||
assert_eq!(entries[1].name, "older");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stored_only_row_can_restore_and_open_but_not_direct_send() {
|
||||
let dir = tempdir().unwrap();
|
||||
let store = FsStore::new(dir.path()).unwrap();
|
||||
let session_id = new_session_id();
|
||||
let segment_id = new_segment_id();
|
||||
append_start(&store, session_id, segment_id, 10);
|
||||
|
||||
let entry = single_entry(PodList::from_sources(
|
||||
SOURCE,
|
||||
vec![metadata_info(&store, "stored", session_id, segment_id)],
|
||||
vec![],
|
||||
None,
|
||||
10,
|
||||
));
|
||||
|
||||
assert_eq!(entry.name, "stored");
|
||||
assert_eq!(entry.visibility, SOURCE);
|
||||
assert_eq!(entry.source_kinds, vec![PodListSourceKind::StoredMetadata]);
|
||||
assert!(entry.live.is_none());
|
||||
assert!(entry.stored.is_some());
|
||||
assert!(entry.actions.can_open);
|
||||
assert!(entry.actions.can_restore);
|
||||
assert!(!entry.actions.can_send_now);
|
||||
assert!(!entry.actions.can_queue_send);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn live_idle_reachable_row_can_open_and_send_now() {
|
||||
let entry = single_entry(PodList::from_sources(
|
||||
SOURCE,
|
||||
vec![],
|
||||
vec![live_info("live", PodStatus::Idle)],
|
||||
None,
|
||||
10,
|
||||
));
|
||||
|
||||
assert_eq!(entry.name, "live");
|
||||
assert_eq!(entry.visibility, SOURCE);
|
||||
assert_eq!(entry.source_kinds, vec![PodListSourceKind::RuntimeRegistry]);
|
||||
assert!(entry.actions.can_open);
|
||||
assert!(!entry.actions.can_restore);
|
||||
assert!(entry.actions.can_send_now);
|
||||
assert!(!entry.actions.can_queue_send);
|
||||
assert_eq!(
|
||||
entry.attach_socket_path(),
|
||||
Some(Path::new("/tmp/live.sock"))
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn live_running_reachable_row_can_open_but_not_send_now() {
|
||||
let entry = single_entry(PodList::from_sources(
|
||||
SOURCE,
|
||||
vec![],
|
||||
vec![live_info("live", PodStatus::Running)],
|
||||
None,
|
||||
10,
|
||||
));
|
||||
|
||||
assert!(entry.actions.can_open);
|
||||
assert!(!entry.actions.can_restore);
|
||||
assert!(!entry.actions.can_send_now);
|
||||
assert!(entry.actions.can_queue_send);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn live_unreachable_row_has_diagnostic_and_cannot_open() {
|
||||
let mut live = live_info("live", PodStatus::Idle);
|
||||
live.reachable = false;
|
||||
live.status = None;
|
||||
|
||||
let entry = single_entry(PodList::from_sources(SOURCE, vec![], vec![live], None, 10));
|
||||
|
||||
assert!(!entry.actions.can_open);
|
||||
assert!(!entry.actions.can_restore);
|
||||
assert!(!entry.actions.can_send_now);
|
||||
assert!(!entry.actions.can_queue_send);
|
||||
assert_eq!(
|
||||
entry.actions.disabled_reason.as_deref(),
|
||||
Some("live pod is unreachable")
|
||||
);
|
||||
assert_eq!(entry.attach_socket_path(), None);
|
||||
assert!(entry.diagnostics.iter().any(|diagnostic| {
|
||||
diagnostic.kind == PodEntryDiagnosticKind::LiveUnreachable
|
||||
&& diagnostic.message.contains("/tmp/live.sock")
|
||||
}));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn status_extraction_skips_alert_before_snapshot() {
|
||||
let events = [
|
||||
Event::Alert(protocol::Alert {
|
||||
level: protocol::AlertLevel::Warn,
|
||||
source: protocol::AlertSource::Pod,
|
||||
message: "warming up".to_string(),
|
||||
timestamp_ms: 0,
|
||||
}),
|
||||
Event::Snapshot {
|
||||
entries: vec![],
|
||||
greeting: test_greeting(),
|
||||
status: PodStatus::Idle,
|
||||
},
|
||||
];
|
||||
|
||||
let status = events.iter().find_map(status_from_event);
|
||||
assert_eq!(status, Some(PodStatus::Idle));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn corrupt_stored_metadata_has_diagnostic() {
|
||||
let entry = single_entry(PodList::from_sources(
|
||||
SOURCE,
|
||||
vec![corrupt_stored_info(
|
||||
"broken".to_string(),
|
||||
"expected value".to_string(),
|
||||
)],
|
||||
vec![],
|
||||
None,
|
||||
10,
|
||||
));
|
||||
|
||||
assert_eq!(entry.name, "broken");
|
||||
assert!(!entry.actions.can_open);
|
||||
assert!(entry.diagnostics.iter().any(|diagnostic| {
|
||||
diagnostic.kind == PodEntryDiagnosticKind::StoredMetadataCorrupt
|
||||
&& diagnostic.message.contains("expected value")
|
||||
}));
|
||||
assert!(
|
||||
entry
|
||||
.summary
|
||||
.preview
|
||||
.as_deref()
|
||||
.unwrap()
|
||||
.contains("expected value")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn selected_pod_name_is_kept_after_rebuild() {
|
||||
let first = PodList::from_sources(
|
||||
SOURCE,
|
||||
vec![],
|
||||
vec![
|
||||
live_info("alpha", PodStatus::Idle),
|
||||
live_info("beta", PodStatus::Idle),
|
||||
],
|
||||
Some("alpha".to_string()),
|
||||
10,
|
||||
);
|
||||
assert_eq!(first.selected_entry().unwrap().name, "alpha");
|
||||
|
||||
let rebuilt = PodList::from_sources(
|
||||
SOURCE,
|
||||
vec![],
|
||||
vec![
|
||||
live_info_with_updated_at("beta", PodStatus::Idle, 20),
|
||||
live_info_with_updated_at("alpha", PodStatus::Idle, 10),
|
||||
],
|
||||
first.selected_name.clone(),
|
||||
10,
|
||||
);
|
||||
|
||||
assert_eq!(rebuilt.entries[0].name, "beta");
|
||||
assert_eq!(rebuilt.selected_entry().unwrap().name, "alpha");
|
||||
assert_eq!(rebuilt.selected_index(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn read_stored_pod_infos_reports_corrupt_metadata() {
|
||||
let dir = tempdir().unwrap();
|
||||
let store = FsStore::new(dir.path()).unwrap();
|
||||
let pod_dir = dir.path().join("pods").join("broken");
|
||||
fs::create_dir_all(&pod_dir).unwrap();
|
||||
fs::write(pod_dir.join("metadata.json"), "{not-json").unwrap();
|
||||
|
||||
let records = read_stored_pod_infos(dir.path(), &store).unwrap();
|
||||
assert_eq!(records.len(), 1);
|
||||
assert_eq!(records[0].pod_name, "broken");
|
||||
assert!(matches!(
|
||||
records[0].metadata_state,
|
||||
StoredMetadataState::Corrupt(_)
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn read_stored_pod_infos_reads_metadata() {
|
||||
let dir = tempdir().unwrap();
|
||||
let store = FsStore::new(dir.path()).unwrap();
|
||||
let session_id = new_session_id();
|
||||
let segment_id = new_segment_id();
|
||||
store
|
||||
.write(&PodMetadata::new(
|
||||
"agent",
|
||||
Some(PodActiveSegmentRef::active_segment(session_id, segment_id)),
|
||||
))
|
||||
.unwrap();
|
||||
|
||||
let records = read_stored_pod_infos(dir.path(), &store).unwrap();
|
||||
assert_eq!(records.len(), 1);
|
||||
assert_eq!(records[0].pod_name, "agent");
|
||||
assert_eq!(records[0].metadata_state, StoredMetadataState::Present);
|
||||
}
|
||||
|
||||
fn single_entry(list: PodList) -> PodListEntry {
|
||||
assert_eq!(list.entries.len(), 1);
|
||||
list.entries.into_iter().next().unwrap()
|
||||
}
|
||||
|
||||
fn metadata_info(
|
||||
store: &FsStore,
|
||||
pod_name: &str,
|
||||
session_id: SessionId,
|
||||
segment_id: SegmentId,
|
||||
) -> StoredPodInfo {
|
||||
stored_info_from_metadata(
|
||||
store,
|
||||
pod_name.to_string(),
|
||||
PodMetadata::new(
|
||||
pod_name,
|
||||
Some(PodActiveSegmentRef::active_segment(session_id, segment_id)),
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
fn live_info(pod_name: &str, status: PodStatus) -> LivePodInfo {
|
||||
live_info_with_updated_at(pod_name, status, 0)
|
||||
}
|
||||
|
||||
fn live_info_with_updated_at(
|
||||
pod_name: &str,
|
||||
status: PodStatus,
|
||||
updated_at: u64,
|
||||
) -> LivePodInfo {
|
||||
LivePodInfo {
|
||||
pod_name: pod_name.to_string(),
|
||||
socket_path: PathBuf::from(format!("/tmp/{pod_name}.sock")),
|
||||
status: Some(status),
|
||||
reachable: true,
|
||||
segment_id: None,
|
||||
summary: PodEntrySummary {
|
||||
active_session_id: None,
|
||||
active_segment_id: None,
|
||||
updated_at,
|
||||
preview: None,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn test_greeting() -> protocol::Greeting {
|
||||
protocol::Greeting {
|
||||
pod_name: "live".to_string(),
|
||||
cwd: "/tmp".to_string(),
|
||||
provider: "test".to_string(),
|
||||
model: "test".to_string(),
|
||||
scope_summary: "test".to_string(),
|
||||
tools: vec![],
|
||||
context_window: 0,
|
||||
context_tokens: 0,
|
||||
}
|
||||
}
|
||||
|
||||
fn append_start(store: &FsStore, session_id: SessionId, segment_id: SegmentId, ts: u64) {
|
||||
store
|
||||
.append(
|
||||
session_id,
|
||||
segment_id,
|
||||
&LogEntry::SegmentStart {
|
||||
ts,
|
||||
session_id,
|
||||
system_prompt: None,
|
||||
config: RequestConfig::default(),
|
||||
history: vec![],
|
||||
forked_from: None,
|
||||
compacted_from: None,
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
fn append_user(
|
||||
store: &FsStore,
|
||||
session_id: SessionId,
|
||||
segment_id: SegmentId,
|
||||
ts: u64,
|
||||
text: &str,
|
||||
) {
|
||||
store
|
||||
.append(
|
||||
session_id,
|
||||
segment_id,
|
||||
&LogEntry::UserInput {
|
||||
ts,
|
||||
segments: vec![protocol::Segment::text(text)],
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user