merge: pod-discovery-restore-tools
This commit is contained in:
commit
044032ef2b
|
|
@ -6,6 +6,9 @@ use llm_worker::llm_client::client::LlmClient;
|
||||||
use session_store::{PodMetadataStore, Store};
|
use session_store::{PodMetadataStore, Store};
|
||||||
use tokio::sync::{broadcast, mpsc, oneshot};
|
use tokio::sync::{broadcast, mpsc, oneshot};
|
||||||
|
|
||||||
|
use crate::discovery::{
|
||||||
|
PodDiscovery, attach_or_restore_pod_tool, inspect_pod_tool, list_visible_pods_tool,
|
||||||
|
};
|
||||||
use crate::ipc::alerter::Alerter;
|
use crate::ipc::alerter::Alerter;
|
||||||
use crate::ipc::notify_buffer::NotifyBuffer;
|
use crate::ipc::notify_buffer::NotifyBuffer;
|
||||||
use crate::ipc::server::SocketServer;
|
use crate::ipc::server::SocketServer;
|
||||||
|
|
@ -78,7 +81,7 @@ async fn finish_controller_run<C, St>(
|
||||||
new_status: PodStatus,
|
new_status: PodStatus,
|
||||||
) where
|
) where
|
||||||
C: LlmClient + Clone + 'static,
|
C: LlmClient + Clone + 'static,
|
||||||
St: Store + Clone + 'static,
|
St: Store + PodMetadataStore + Clone + 'static,
|
||||||
{
|
{
|
||||||
// history / user_segments are no longer mirrored on PodSharedState —
|
// history / user_segments are no longer mirrored on PodSharedState —
|
||||||
// clients reconstruct them from `Event::Snapshot` + live
|
// clients reconstruct them from `Event::Snapshot` + live
|
||||||
|
|
@ -298,7 +301,7 @@ fn wire_event_bridges_on_worker<C, St>(
|
||||||
alerter: &Alerter,
|
alerter: &Alerter,
|
||||||
) where
|
) where
|
||||||
C: LlmClient + Clone + 'static,
|
C: LlmClient + Clone + 'static,
|
||||||
St: Store + Clone + 'static,
|
St: Store + PodMetadataStore + Clone + 'static,
|
||||||
{
|
{
|
||||||
let worker = pod.worker_mut();
|
let worker = pod.worker_mut();
|
||||||
|
|
||||||
|
|
@ -436,7 +439,7 @@ fn register_pod_tools<C, St>(
|
||||||
) -> tools::ScopedFs
|
) -> tools::ScopedFs
|
||||||
where
|
where
|
||||||
C: LlmClient + Clone + 'static,
|
C: LlmClient + Clone + 'static,
|
||||||
St: Store + Clone + 'static,
|
St: Store + PodMetadataStore + Clone + 'static,
|
||||||
{
|
{
|
||||||
// Pod-immutable snapshots taken before the mutable worker borrow
|
// Pod-immutable snapshots taken before the mutable worker borrow
|
||||||
// below so the worker borrow doesn't conflict with reads on `pod`.
|
// below so the worker borrow doesn't conflict with reads on `pod`.
|
||||||
|
|
@ -448,6 +451,7 @@ where
|
||||||
let memory_config = pod.manifest().memory.clone();
|
let memory_config = pod.manifest().memory.clone();
|
||||||
let spawner_name = pod.manifest().pod.name.clone();
|
let spawner_name = pod.manifest().pod.name.clone();
|
||||||
let spawner_model = pod.manifest().model.clone();
|
let spawner_model = pod.manifest().model.clone();
|
||||||
|
let pod_store = pod.store().clone();
|
||||||
let self_parent_socket = pod.callback_socket().cloned();
|
let self_parent_socket = pod.callback_socket().cloned();
|
||||||
|
|
||||||
let worker = pod.worker_mut();
|
let worker = pod.worker_mut();
|
||||||
|
|
@ -492,10 +496,10 @@ where
|
||||||
// the Pod-scoped `SpawnedPodRegistry` (also consumed by the main
|
// the Pod-scoped `SpawnedPodRegistry` (also consumed by the main
|
||||||
// loop's `PodEvent` handler).
|
// loop's `PodEvent` handler).
|
||||||
worker.register_tool(spawn_pod_tool(
|
worker.register_tool(spawn_pod_tool(
|
||||||
spawner_name,
|
spawner_name.clone(),
|
||||||
spawner_socket,
|
spawner_socket,
|
||||||
runtime_base,
|
runtime_base.clone(),
|
||||||
pwd,
|
pwd.clone(),
|
||||||
spawned_registry.clone(),
|
spawned_registry.clone(),
|
||||||
self_parent_socket,
|
self_parent_socket,
|
||||||
spawner_model,
|
spawner_model,
|
||||||
|
|
@ -505,7 +509,12 @@ where
|
||||||
worker.register_tool(send_to_pod_tool(spawned_registry.clone()));
|
worker.register_tool(send_to_pod_tool(spawned_registry.clone()));
|
||||||
worker.register_tool(read_pod_output_tool(spawned_registry.clone()));
|
worker.register_tool(read_pod_output_tool(spawned_registry.clone()));
|
||||||
worker.register_tool(stop_pod_tool(spawned_registry.clone()));
|
worker.register_tool(stop_pod_tool(spawned_registry.clone()));
|
||||||
worker.register_tool(list_pods_tool(spawned_registry));
|
worker.register_tool(list_pods_tool(spawned_registry.clone()));
|
||||||
|
|
||||||
|
let discovery = PodDiscovery::new(pod_store, spawner_name, runtime_base, pwd, spawned_registry);
|
||||||
|
worker.register_tool(list_visible_pods_tool(discovery.clone()));
|
||||||
|
worker.register_tool(inspect_pod_tool(discovery.clone()));
|
||||||
|
worker.register_tool(attach_or_restore_pod_tool(discovery));
|
||||||
pod.attach_tracker(tracker);
|
pod.attach_tracker(tracker);
|
||||||
fs_for_view
|
fs_for_view
|
||||||
}
|
}
|
||||||
|
|
@ -532,11 +541,23 @@ async fn controller_loop<C, St>(
|
||||||
socket_server: SocketServer,
|
socket_server: SocketServer,
|
||||||
) where
|
) where
|
||||||
C: LlmClient + Clone + 'static,
|
C: LlmClient + Clone + 'static,
|
||||||
St: Store + Clone + 'static,
|
St: Store + PodMetadataStore + Clone + 'static,
|
||||||
{
|
{
|
||||||
// Hold socket server alive for the lifetime of the controller task.
|
// Hold socket server alive for the lifetime of the controller task.
|
||||||
let _socket_server = socket_server;
|
let _socket_server = socket_server;
|
||||||
|
|
||||||
|
let discovery_runtime_base = runtime_dir
|
||||||
|
.path()
|
||||||
|
.parent()
|
||||||
|
.map(PathBuf::from)
|
||||||
|
.unwrap_or_else(|| runtime_dir.path().to_path_buf());
|
||||||
|
let discovery = PodDiscovery::new(
|
||||||
|
pod.store().clone(),
|
||||||
|
spawner_name.clone(),
|
||||||
|
discovery_runtime_base,
|
||||||
|
pod.pwd().to_path_buf(),
|
||||||
|
spawned_registry.clone(),
|
||||||
|
);
|
||||||
let mut pending: Option<PendingRun> = None;
|
let mut pending: Option<PendingRun> = None;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
|
|
@ -691,6 +712,66 @@ async fn controller_loop<C, St>(
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Method::ListVisiblePods => match discovery.list_visible().await {
|
||||||
|
Ok(pods) => match serde_json::to_value(pods) {
|
||||||
|
Ok(pods) => {
|
||||||
|
let _ = event_tx.send(Event::VisiblePods { pods });
|
||||||
|
}
|
||||||
|
Err(error) => {
|
||||||
|
let _ = event_tx.send(Event::Error {
|
||||||
|
code: ErrorCode::Internal,
|
||||||
|
message: format!("serialize visible pods: {error}"),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(error) => {
|
||||||
|
let _ = event_tx.send(Event::Error {
|
||||||
|
code: ErrorCode::InvalidRequest,
|
||||||
|
message: error.to_string(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
|
Method::InspectPod { name } => match discovery.inspect(&name).await {
|
||||||
|
Ok(pod) => match serde_json::to_value(pod) {
|
||||||
|
Ok(pod) => {
|
||||||
|
let _ = event_tx.send(Event::PodInspection { pod });
|
||||||
|
}
|
||||||
|
Err(error) => {
|
||||||
|
let _ = event_tx.send(Event::Error {
|
||||||
|
code: ErrorCode::Internal,
|
||||||
|
message: format!("serialize pod inspection: {error}"),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(error) => {
|
||||||
|
let _ = event_tx.send(Event::Error {
|
||||||
|
code: ErrorCode::InvalidRequest,
|
||||||
|
message: error.to_string(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
|
Method::AttachOrRestorePod { name } => match discovery.attach_or_restore(&name).await {
|
||||||
|
Ok(result) => match serde_json::to_value(result) {
|
||||||
|
Ok(result) => {
|
||||||
|
let _ = event_tx.send(Event::PodAttachRestore { result });
|
||||||
|
}
|
||||||
|
Err(error) => {
|
||||||
|
let _ = event_tx.send(Event::Error {
|
||||||
|
code: ErrorCode::Internal,
|
||||||
|
message: format!("serialize pod attach/restore result: {error}"),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(error) => {
|
||||||
|
let _ = event_tx.send(Event::Error {
|
||||||
|
code: ErrorCode::InvalidRequest,
|
||||||
|
message: error.to_string(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
// ListCompletions is handled at the socket layer (direct
|
// ListCompletions is handled at the socket layer (direct
|
||||||
// response). If it reaches the controller, ignore it.
|
// response). If it reaches the controller, ignore it.
|
||||||
Method::ListCompletions { .. } => {}
|
Method::ListCompletions { .. } => {}
|
||||||
|
|
@ -865,6 +946,17 @@ where
|
||||||
notify_buffer.push_notify(message);
|
notify_buffer.push_notify(message);
|
||||||
}
|
}
|
||||||
Some(Method::ListCompletions { .. }) => {}
|
Some(Method::ListCompletions { .. }) => {}
|
||||||
|
Some(
|
||||||
|
Method::ListVisiblePods
|
||||||
|
| Method::InspectPod { .. }
|
||||||
|
| Method::AttachOrRestorePod { .. },
|
||||||
|
) => {
|
||||||
|
let _ = event_tx.send(Event::Error {
|
||||||
|
code: ErrorCode::AlreadyRunning,
|
||||||
|
message: "Pod discovery requests are only handled while the Pod is idle or paused"
|
||||||
|
.into(),
|
||||||
|
});
|
||||||
|
}
|
||||||
Some(Method::PodEvent(event)) => {
|
Some(Method::PodEvent(event)) => {
|
||||||
// mpsc is consume-once, so we cannot defer this
|
// mpsc is consume-once, so we cannot defer this
|
||||||
// to the next main-loop iteration — drop here
|
// to the next main-loop iteration — drop here
|
||||||
|
|
|
||||||
973
crates/pod/src/discovery.rs
Normal file
973
crates/pod/src/discovery.rs
Normal file
|
|
@ -0,0 +1,973 @@
|
||||||
|
//! Pod-state-backed discovery and restore/attach tools.
|
||||||
|
//!
|
||||||
|
//! This surface deliberately does not enumerate every Pod on the host. The
|
||||||
|
//! listing path starts from the caller's visibility set (the caller itself and
|
||||||
|
//! Pods it spawned according to durable Pod state) and only then reads each
|
||||||
|
//! Pod's own state. Name-targeted operations distinguish missing state from
|
||||||
|
//! state that exists but is outside that visibility set.
|
||||||
|
|
||||||
|
use std::collections::BTreeMap;
|
||||||
|
use std::io;
|
||||||
|
use std::path::{Path, PathBuf};
|
||||||
|
use std::process::Stdio;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput};
|
||||||
|
use protocol::stream::JsonLineReader;
|
||||||
|
use protocol::{Event, PodStatus};
|
||||||
|
use schemars::JsonSchema;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use session_store::{PodActiveSegmentRef, PodMetadata, PodMetadataStore, SegmentId, SessionId};
|
||||||
|
use tokio::net::UnixStream;
|
||||||
|
use tokio::process::Command;
|
||||||
|
|
||||||
|
use crate::runtime::pod_registry;
|
||||||
|
use crate::spawn::registry::SpawnedPodRegistry;
|
||||||
|
|
||||||
|
const PROBE_TIMEOUT: Duration = Duration::from_millis(500);
|
||||||
|
const RESTORE_START_TIMEOUT: Duration = Duration::from_secs(20);
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct PodDiscovery<St> {
|
||||||
|
store: St,
|
||||||
|
self_pod_name: String,
|
||||||
|
runtime_base: PathBuf,
|
||||||
|
cwd: PathBuf,
|
||||||
|
store_dir: Option<PathBuf>,
|
||||||
|
spawned_registry: Arc<SpawnedPodRegistry>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<St> PodDiscovery<St>
|
||||||
|
where
|
||||||
|
St: PodMetadataStore + Clone + Send + Sync + 'static,
|
||||||
|
{
|
||||||
|
pub fn new(
|
||||||
|
store: St,
|
||||||
|
self_pod_name: String,
|
||||||
|
runtime_base: PathBuf,
|
||||||
|
cwd: PathBuf,
|
||||||
|
spawned_registry: Arc<SpawnedPodRegistry>,
|
||||||
|
) -> Self {
|
||||||
|
let store_dir = store.root_dir();
|
||||||
|
Self {
|
||||||
|
store,
|
||||||
|
self_pod_name,
|
||||||
|
runtime_base,
|
||||||
|
cwd,
|
||||||
|
store_dir,
|
||||||
|
spawned_registry,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn list_visible(&self) -> Result<Vec<VisiblePodItem>, PodDiscoveryError> {
|
||||||
|
let visibility = self.visibility().await?;
|
||||||
|
let mut items = Vec::with_capacity(visibility.visible.len());
|
||||||
|
for pod_name in visibility.visible.keys() {
|
||||||
|
items.push(
|
||||||
|
self.build_item_for_visible_name(pod_name, &visibility)
|
||||||
|
.await,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Ok(items)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn inspect(&self, pod_name: &str) -> Result<PodDetail, PodDiscoveryError> {
|
||||||
|
let visibility = self.visibility().await?;
|
||||||
|
let known_names = self.store.list_names()?;
|
||||||
|
let state_exists = known_names.iter().any(|n| n == pod_name);
|
||||||
|
if !state_exists {
|
||||||
|
return Err(PodDiscoveryError::StateMissing {
|
||||||
|
pod_name: pod_name.to_string(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
if !visibility.visible.contains_key(pod_name) {
|
||||||
|
return Err(PodDiscoveryError::NotVisible {
|
||||||
|
pod_name: pod_name.to_string(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
match self.store.read_by_name(pod_name)? {
|
||||||
|
Some(metadata) => Ok(self.detail_from_metadata(metadata, &visibility).await),
|
||||||
|
None => Err(PodDiscoveryError::StateMissing {
|
||||||
|
pod_name: pod_name.to_string(),
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn attach_or_restore(
|
||||||
|
&self,
|
||||||
|
pod_name: &str,
|
||||||
|
) -> Result<AttachRestoreResult, PodDiscoveryError> {
|
||||||
|
match self.plan_attach_or_restore(pod_name).await? {
|
||||||
|
AttachRestorePlan::Attach {
|
||||||
|
pod_name,
|
||||||
|
socket_path,
|
||||||
|
status,
|
||||||
|
} => Ok(AttachRestoreResult::Attached {
|
||||||
|
pod_name,
|
||||||
|
socket_path,
|
||||||
|
status,
|
||||||
|
}),
|
||||||
|
AttachRestorePlan::Restore {
|
||||||
|
pod_name,
|
||||||
|
socket_path,
|
||||||
|
} => {
|
||||||
|
self.spawn_restore_process(&pod_name, &socket_path).await?;
|
||||||
|
Ok(AttachRestoreResult::Restored {
|
||||||
|
pod_name,
|
||||||
|
socket_path,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn plan_attach_or_restore(
|
||||||
|
&self,
|
||||||
|
pod_name: &str,
|
||||||
|
) -> Result<AttachRestorePlan, PodDiscoveryError> {
|
||||||
|
let detail = self.inspect(pod_name).await?;
|
||||||
|
if detail.live.reachable {
|
||||||
|
return Ok(AttachRestorePlan::Attach {
|
||||||
|
pod_name: pod_name.to_string(),
|
||||||
|
socket_path: detail.live.socket_path,
|
||||||
|
status: detail.live.status,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
let active = detail
|
||||||
|
.active
|
||||||
|
.ok_or_else(|| PodDiscoveryError::NotRestorable {
|
||||||
|
pod_name: pod_name.to_string(),
|
||||||
|
reason: "pod state has no active session".into(),
|
||||||
|
})?;
|
||||||
|
let segment_id = active
|
||||||
|
.segment_id
|
||||||
|
.ok_or_else(|| PodDiscoveryError::NotRestorable {
|
||||||
|
pod_name: pod_name.to_string(),
|
||||||
|
reason: "pod state has an active session but no active segment yet".into(),
|
||||||
|
})?;
|
||||||
|
|
||||||
|
if let Some(lock) = lookup_segment_lock(segment_id)? {
|
||||||
|
let lock_live = probe_socket(&lock.socket).await;
|
||||||
|
return if lock_live.reachable {
|
||||||
|
Ok(AttachRestorePlan::Attach {
|
||||||
|
pod_name: lock.pod_name,
|
||||||
|
socket_path: lock.socket,
|
||||||
|
status: lock_live.status,
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
Err(PodDiscoveryError::LockConflict {
|
||||||
|
pod_name: pod_name.to_string(),
|
||||||
|
segment_id,
|
||||||
|
owner_pod: lock.pod_name,
|
||||||
|
socket_path: lock.socket,
|
||||||
|
pid: lock.pid,
|
||||||
|
})
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(AttachRestorePlan::Restore {
|
||||||
|
pod_name: pod_name.to_string(),
|
||||||
|
socket_path: self.default_socket_path(pod_name),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn visibility(&self) -> Result<VisibilitySet, PodDiscoveryError> {
|
||||||
|
let mut visible = BTreeMap::new();
|
||||||
|
let mut child_sockets = BTreeMap::new();
|
||||||
|
visible.insert(self.self_pod_name.clone(), VisibilityReason::SelfPod);
|
||||||
|
|
||||||
|
// Durable parent -> child state is the primary visibility source.
|
||||||
|
if let Some(metadata) = self.store.read_by_name(&self.self_pod_name)? {
|
||||||
|
for child in metadata.spawned_children {
|
||||||
|
visible
|
||||||
|
.entry(child.pod_name.clone())
|
||||||
|
.or_insert(VisibilityReason::SpawnedChild);
|
||||||
|
child_sockets.insert(child.pod_name, child.socket_path);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// The live in-memory registry covers just-spawned children even if a
|
||||||
|
// state write failed after the process became reachable. It is an
|
||||||
|
// additive visibility hint, not the source of Pod metadata.
|
||||||
|
for record in self.spawned_registry.list().await {
|
||||||
|
visible
|
||||||
|
.entry(record.pod_name.clone())
|
||||||
|
.or_insert(VisibilityReason::SpawnedChild);
|
||||||
|
child_sockets.insert(record.pod_name, record.socket_path);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(VisibilitySet {
|
||||||
|
visible,
|
||||||
|
child_sockets,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn build_item_for_visible_name(
|
||||||
|
&self,
|
||||||
|
pod_name: &str,
|
||||||
|
visibility: &VisibilitySet,
|
||||||
|
) -> VisiblePodItem {
|
||||||
|
let visibility_reason = visibility.reason_for(pod_name);
|
||||||
|
match self.store.read_by_name(pod_name) {
|
||||||
|
Ok(Some(metadata)) => {
|
||||||
|
let detail = self.detail_from_metadata(metadata, visibility).await;
|
||||||
|
VisiblePodItem {
|
||||||
|
pod_name: pod_name.to_string(),
|
||||||
|
visibility: visibility_reason,
|
||||||
|
state: PodStateStatus::Readable,
|
||||||
|
active: detail.active,
|
||||||
|
live: detail.live,
|
||||||
|
restore: detail.restore,
|
||||||
|
spawned_children: detail.spawned_children,
|
||||||
|
error: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(None) => VisiblePodItem {
|
||||||
|
pod_name: pod_name.to_string(),
|
||||||
|
visibility: visibility_reason,
|
||||||
|
state: PodStateStatus::Missing,
|
||||||
|
active: None,
|
||||||
|
live: self.live_for_name(pod_name, None).await,
|
||||||
|
restore: RestoreInfo::not_possible("pod state missing"),
|
||||||
|
spawned_children: SpawnedChildrenSummary::default(),
|
||||||
|
error: None,
|
||||||
|
},
|
||||||
|
Err(error) => VisiblePodItem {
|
||||||
|
pod_name: pod_name.to_string(),
|
||||||
|
visibility: visibility_reason,
|
||||||
|
state: PodStateStatus::Corrupt,
|
||||||
|
active: None,
|
||||||
|
live: self.live_for_name(pod_name, None).await,
|
||||||
|
restore: RestoreInfo::not_possible("pod state is unreadable"),
|
||||||
|
spawned_children: SpawnedChildrenSummary::default(),
|
||||||
|
error: Some(error.to_string()),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn detail_from_metadata(
|
||||||
|
&self,
|
||||||
|
metadata: PodMetadata,
|
||||||
|
visibility: &VisibilitySet,
|
||||||
|
) -> PodDetail {
|
||||||
|
let child_socket = visibility.child_socket_for(&metadata.pod_name);
|
||||||
|
let live = self
|
||||||
|
.live_for_name(&metadata.pod_name, child_socket.as_deref())
|
||||||
|
.await;
|
||||||
|
let restore = self
|
||||||
|
.restore_info(&metadata.pod_name, metadata.active.as_ref())
|
||||||
|
.await;
|
||||||
|
let spawned_children = summarize_spawned_children(&metadata.spawned_children).await;
|
||||||
|
PodDetail {
|
||||||
|
pod_name: metadata.pod_name.clone(),
|
||||||
|
visibility: visibility.reason_for(&metadata.pod_name),
|
||||||
|
active: metadata.active.map(ActivePointer::from),
|
||||||
|
live,
|
||||||
|
restore,
|
||||||
|
spawned_children,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn restore_info(
|
||||||
|
&self,
|
||||||
|
pod_name: &str,
|
||||||
|
active: Option<&PodActiveSegmentRef>,
|
||||||
|
) -> RestoreInfo {
|
||||||
|
let Some(active) = active else {
|
||||||
|
return RestoreInfo::not_possible("pod state has no active session");
|
||||||
|
};
|
||||||
|
let Some(segment_id) = active.segment_id else {
|
||||||
|
return RestoreInfo::not_possible("active segment is not known yet");
|
||||||
|
};
|
||||||
|
match lookup_segment_lock(segment_id) {
|
||||||
|
Ok(Some(lock)) => RestoreInfo {
|
||||||
|
possible: false,
|
||||||
|
restore_name: Some(pod_name.to_string()),
|
||||||
|
reason: Some(format!(
|
||||||
|
"segment is currently locked by `{}` (pid {})",
|
||||||
|
lock.pod_name, lock.pid
|
||||||
|
)),
|
||||||
|
},
|
||||||
|
Ok(None) => RestoreInfo {
|
||||||
|
possible: true,
|
||||||
|
restore_name: Some(pod_name.to_string()),
|
||||||
|
reason: None,
|
||||||
|
},
|
||||||
|
Err(error) => RestoreInfo {
|
||||||
|
possible: false,
|
||||||
|
restore_name: Some(pod_name.to_string()),
|
||||||
|
reason: Some(format!("lock lookup failed: {error}")),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn live_for_name(&self, pod_name: &str, socket_override: Option<&Path>) -> LiveInfo {
|
||||||
|
let socket_path = socket_override
|
||||||
|
.map(Path::to_path_buf)
|
||||||
|
.unwrap_or_else(|| self.default_socket_path(pod_name));
|
||||||
|
probe_socket(&socket_path).await
|
||||||
|
}
|
||||||
|
|
||||||
|
fn default_socket_path(&self, pod_name: &str) -> PathBuf {
|
||||||
|
self.runtime_base.join(pod_name).join("sock")
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn spawn_restore_process(
|
||||||
|
&self,
|
||||||
|
pod_name: &str,
|
||||||
|
socket_path: &Path,
|
||||||
|
) -> Result<(), PodDiscoveryError> {
|
||||||
|
let mut command = Command::new(resolve_pod_command());
|
||||||
|
command
|
||||||
|
.arg("--pod")
|
||||||
|
.arg(pod_name)
|
||||||
|
.arg("--require-pod-state")
|
||||||
|
.current_dir(&self.cwd)
|
||||||
|
.stdin(Stdio::null())
|
||||||
|
.stdout(Stdio::null())
|
||||||
|
.stderr(Stdio::null())
|
||||||
|
.process_group(0);
|
||||||
|
if let Some(store_dir) = &self.store_dir {
|
||||||
|
command.arg("--store").arg(store_dir);
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut child = command.spawn().map_err(PodDiscoveryError::RestoreSpawn)?;
|
||||||
|
let deadline = tokio::time::Instant::now() + RESTORE_START_TIMEOUT;
|
||||||
|
loop {
|
||||||
|
if probe_socket(socket_path).await.reachable {
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let _ = child.wait().await;
|
||||||
|
});
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
if let Some(status) = child.try_wait().map_err(PodDiscoveryError::RestoreSpawn)? {
|
||||||
|
return Err(PodDiscoveryError::RestoreExited { status });
|
||||||
|
}
|
||||||
|
if tokio::time::Instant::now() >= deadline {
|
||||||
|
let _ = child.start_kill();
|
||||||
|
let _ = child.wait().await;
|
||||||
|
return Err(PodDiscoveryError::RestoreTimeout);
|
||||||
|
}
|
||||||
|
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||||
|
#[serde(rename_all = "snake_case")]
|
||||||
|
pub enum VisibilityReason {
|
||||||
|
SelfPod,
|
||||||
|
SpawnedChild,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||||
|
#[serde(rename_all = "snake_case")]
|
||||||
|
pub enum PodStateStatus {
|
||||||
|
Readable,
|
||||||
|
Missing,
|
||||||
|
Corrupt,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||||
|
pub struct ActivePointer {
|
||||||
|
pub session_id: SessionId,
|
||||||
|
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||||
|
pub segment_id: Option<SegmentId>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<PodActiveSegmentRef> for ActivePointer {
|
||||||
|
fn from(value: PodActiveSegmentRef) -> Self {
|
||||||
|
Self {
|
||||||
|
session_id: value.session_id,
|
||||||
|
segment_id: value.segment_id,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||||
|
pub struct LiveInfo {
|
||||||
|
pub socket_path: PathBuf,
|
||||||
|
pub reachable: bool,
|
||||||
|
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||||
|
pub status: Option<PodStatus>,
|
||||||
|
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||||
|
pub error: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||||
|
pub struct RestoreInfo {
|
||||||
|
pub possible: bool,
|
||||||
|
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||||
|
pub restore_name: Option<String>,
|
||||||
|
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||||
|
pub reason: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RestoreInfo {
|
||||||
|
fn not_possible(reason: impl Into<String>) -> Self {
|
||||||
|
Self {
|
||||||
|
possible: false,
|
||||||
|
restore_name: None,
|
||||||
|
reason: Some(reason.into()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
|
||||||
|
pub struct SpawnedChildrenSummary {
|
||||||
|
pub count: usize,
|
||||||
|
pub reachable: usize,
|
||||||
|
pub unreachable: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||||
|
pub struct VisiblePodItem {
|
||||||
|
pub pod_name: String,
|
||||||
|
pub visibility: VisibilityReason,
|
||||||
|
pub state: PodStateStatus,
|
||||||
|
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||||
|
pub active: Option<ActivePointer>,
|
||||||
|
pub live: LiveInfo,
|
||||||
|
pub restore: RestoreInfo,
|
||||||
|
pub spawned_children: SpawnedChildrenSummary,
|
||||||
|
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||||
|
pub error: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||||
|
pub struct PodDetail {
|
||||||
|
pub pod_name: String,
|
||||||
|
pub visibility: VisibilityReason,
|
||||||
|
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||||
|
pub active: Option<ActivePointer>,
|
||||||
|
pub live: LiveInfo,
|
||||||
|
pub restore: RestoreInfo,
|
||||||
|
pub spawned_children: SpawnedChildrenSummary,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||||
|
#[serde(tag = "action", rename_all = "snake_case")]
|
||||||
|
pub enum AttachRestorePlan {
|
||||||
|
Attach {
|
||||||
|
pod_name: String,
|
||||||
|
socket_path: PathBuf,
|
||||||
|
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||||
|
status: Option<PodStatus>,
|
||||||
|
},
|
||||||
|
Restore {
|
||||||
|
pod_name: String,
|
||||||
|
socket_path: PathBuf,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||||
|
#[serde(tag = "action", rename_all = "snake_case")]
|
||||||
|
pub enum AttachRestoreResult {
|
||||||
|
Attached {
|
||||||
|
pod_name: String,
|
||||||
|
socket_path: PathBuf,
|
||||||
|
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||||
|
status: Option<PodStatus>,
|
||||||
|
},
|
||||||
|
Restored {
|
||||||
|
pod_name: String,
|
||||||
|
socket_path: PathBuf,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, thiserror::Error)]
|
||||||
|
pub enum PodDiscoveryError {
|
||||||
|
#[error("pod state missing for `{pod_name}`")]
|
||||||
|
StateMissing { pod_name: String },
|
||||||
|
#[error("pod `{pod_name}` is not visible to this Pod")]
|
||||||
|
NotVisible { pod_name: String },
|
||||||
|
#[error("pod `{pod_name}` is not restorable: {reason}")]
|
||||||
|
NotRestorable { pod_name: String, reason: String },
|
||||||
|
#[error(
|
||||||
|
"pod `{pod_name}` segment {segment_id} is locked by `{owner_pod}` pid {pid} at {socket_path}"
|
||||||
|
)]
|
||||||
|
LockConflict {
|
||||||
|
pod_name: String,
|
||||||
|
segment_id: SegmentId,
|
||||||
|
owner_pod: String,
|
||||||
|
socket_path: PathBuf,
|
||||||
|
pid: u32,
|
||||||
|
},
|
||||||
|
#[error("store error: {0}")]
|
||||||
|
Store(#[from] session_store::StoreError),
|
||||||
|
#[error("scope lock error: {0}")]
|
||||||
|
ScopeLock(#[from] pod_registry::ScopeLockError),
|
||||||
|
#[error("failed to launch restore process: {0}")]
|
||||||
|
RestoreSpawn(io::Error),
|
||||||
|
#[error("restore process exited before socket became reachable: {status}")]
|
||||||
|
RestoreExited { status: std::process::ExitStatus },
|
||||||
|
#[error("restore process did not become reachable before timeout")]
|
||||||
|
RestoreTimeout,
|
||||||
|
}
|
||||||
|
|
||||||
|
struct VisibilitySet {
|
||||||
|
visible: BTreeMap<String, VisibilityReason>,
|
||||||
|
child_sockets: BTreeMap<String, PathBuf>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl VisibilitySet {
|
||||||
|
fn reason_for(&self, pod_name: &str) -> VisibilityReason {
|
||||||
|
self.visible
|
||||||
|
.get(pod_name)
|
||||||
|
.cloned()
|
||||||
|
.unwrap_or(VisibilityReason::SpawnedChild)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn child_socket_for(&self, pod_name: &str) -> Option<PathBuf> {
|
||||||
|
self.child_sockets.get(pod_name).cloned()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn summarize_spawned_children(
|
||||||
|
children: &[session_store::PodSpawnedChild],
|
||||||
|
) -> SpawnedChildrenSummary {
|
||||||
|
let mut summary = SpawnedChildrenSummary {
|
||||||
|
count: children.len(),
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
for child in children {
|
||||||
|
if probe_socket(&child.socket_path).await.reachable {
|
||||||
|
summary.reachable += 1;
|
||||||
|
} else {
|
||||||
|
summary.unreachable += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
summary
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn probe_socket(socket_path: &Path) -> LiveInfo {
|
||||||
|
match tokio::time::timeout(PROBE_TIMEOUT, UnixStream::connect(socket_path)).await {
|
||||||
|
Ok(Ok(stream)) => {
|
||||||
|
let (r, _w) = stream.into_split();
|
||||||
|
let mut reader = JsonLineReader::new(r);
|
||||||
|
let status = match tokio::time::timeout(PROBE_TIMEOUT, reader.next::<Event>()).await {
|
||||||
|
Ok(Ok(Some(Event::Snapshot { status, .. }))) => Some(status),
|
||||||
|
_ => None,
|
||||||
|
};
|
||||||
|
LiveInfo {
|
||||||
|
socket_path: socket_path.to_path_buf(),
|
||||||
|
reachable: true,
|
||||||
|
status,
|
||||||
|
error: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(Err(error)) => LiveInfo {
|
||||||
|
socket_path: socket_path.to_path_buf(),
|
||||||
|
reachable: false,
|
||||||
|
status: None,
|
||||||
|
error: Some(error.to_string()),
|
||||||
|
},
|
||||||
|
Err(_) => LiveInfo {
|
||||||
|
socket_path: socket_path.to_path_buf(),
|
||||||
|
reachable: false,
|
||||||
|
status: None,
|
||||||
|
error: Some("connect timed out".into()),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn lookup_segment_lock(
|
||||||
|
segment_id: SegmentId,
|
||||||
|
) -> Result<Option<pod_registry::SegmentLockInfo>, pod_registry::ScopeLockError> {
|
||||||
|
pod_registry::lookup_segment(segment_id)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn resolve_pod_command() -> PathBuf {
|
||||||
|
if let Ok(cmd) = std::env::var("INSOMNIA_POD_COMMAND")
|
||||||
|
&& !cmd.is_empty()
|
||||||
|
{
|
||||||
|
return PathBuf::from(cmd);
|
||||||
|
}
|
||||||
|
PathBuf::from("pod")
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize, JsonSchema)]
|
||||||
|
struct PodNameInput {
|
||||||
|
/// Pod name to inspect, attach, or restore.
|
||||||
|
name: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
struct ListVisiblePodsTool<St> {
|
||||||
|
discovery: PodDiscovery<St>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl<St> Tool for ListVisiblePodsTool<St>
|
||||||
|
where
|
||||||
|
St: PodMetadataStore + Clone + Send + Sync + 'static,
|
||||||
|
{
|
||||||
|
async fn execute(&self, _input_json: &str) -> Result<ToolOutput, ToolError> {
|
||||||
|
let items = self
|
||||||
|
.discovery
|
||||||
|
.list_visible()
|
||||||
|
.await
|
||||||
|
.map_err(discovery_error_to_tool_error)?;
|
||||||
|
let summary = format!("{} visible pod(s)", items.len());
|
||||||
|
Ok(ToolOutput {
|
||||||
|
summary,
|
||||||
|
content: Some(json_content(&items)?),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct InspectPodTool<St> {
|
||||||
|
discovery: PodDiscovery<St>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl<St> Tool for InspectPodTool<St>
|
||||||
|
where
|
||||||
|
St: PodMetadataStore + Clone + Send + Sync + 'static,
|
||||||
|
{
|
||||||
|
async fn execute(&self, input_json: &str) -> Result<ToolOutput, ToolError> {
|
||||||
|
let input: PodNameInput = serde_json::from_str(input_json)
|
||||||
|
.map_err(|e| ToolError::InvalidArgument(format!("invalid InspectPod input: {e}")))?;
|
||||||
|
let detail = self
|
||||||
|
.discovery
|
||||||
|
.inspect(&input.name)
|
||||||
|
.await
|
||||||
|
.map_err(discovery_error_to_tool_error)?;
|
||||||
|
Ok(ToolOutput {
|
||||||
|
summary: format!("pod `{}` inspected", detail.pod_name),
|
||||||
|
content: Some(json_content(&detail)?),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct AttachOrRestorePodTool<St> {
|
||||||
|
discovery: PodDiscovery<St>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl<St> Tool for AttachOrRestorePodTool<St>
|
||||||
|
where
|
||||||
|
St: PodMetadataStore + Clone + Send + Sync + 'static,
|
||||||
|
{
|
||||||
|
async fn execute(&self, input_json: &str) -> Result<ToolOutput, ToolError> {
|
||||||
|
let input: PodNameInput = serde_json::from_str(input_json).map_err(|e| {
|
||||||
|
ToolError::InvalidArgument(format!("invalid AttachOrRestorePod input: {e}"))
|
||||||
|
})?;
|
||||||
|
let result = self
|
||||||
|
.discovery
|
||||||
|
.attach_or_restore(&input.name)
|
||||||
|
.await
|
||||||
|
.map_err(discovery_error_to_tool_error)?;
|
||||||
|
let summary = match &result {
|
||||||
|
AttachRestoreResult::Attached { pod_name, .. } => {
|
||||||
|
format!("pod `{pod_name}` is live; attached to existing socket")
|
||||||
|
}
|
||||||
|
AttachRestoreResult::Restored { pod_name, .. } => {
|
||||||
|
format!("pod `{pod_name}` restored from pod state")
|
||||||
|
}
|
||||||
|
};
|
||||||
|
Ok(ToolOutput {
|
||||||
|
summary,
|
||||||
|
content: Some(json_content(&result)?),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn list_visible_pods_tool<St>(discovery: PodDiscovery<St>) -> ToolDefinition
|
||||||
|
where
|
||||||
|
St: PodMetadataStore + Clone + Send + Sync + 'static,
|
||||||
|
{
|
||||||
|
Arc::new(move || {
|
||||||
|
let meta = ToolMeta::new("ListVisiblePods")
|
||||||
|
.description(
|
||||||
|
"List Pod state entries visible to this Pod. This is state-backed and does not expose the host-wide Pod universe.",
|
||||||
|
)
|
||||||
|
.input_schema(serde_json::to_value(schemars::schema_for!(())).unwrap());
|
||||||
|
let tool: Arc<dyn Tool> = Arc::new(ListVisiblePodsTool {
|
||||||
|
discovery: discovery.clone(),
|
||||||
|
});
|
||||||
|
(meta, tool)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn inspect_pod_tool<St>(discovery: PodDiscovery<St>) -> ToolDefinition
|
||||||
|
where
|
||||||
|
St: PodMetadataStore + Clone + Send + Sync + 'static,
|
||||||
|
{
|
||||||
|
Arc::new(move || {
|
||||||
|
let meta = ToolMeta::new("InspectPod")
|
||||||
|
.description(
|
||||||
|
"Inspect one visible Pod by name from durable Pod state, distinguishing missing state from not-visible Pods.",
|
||||||
|
)
|
||||||
|
.input_schema(serde_json::to_value(schemars::schema_for!(PodNameInput)).unwrap());
|
||||||
|
let tool: Arc<dyn Tool> = Arc::new(InspectPodTool {
|
||||||
|
discovery: discovery.clone(),
|
||||||
|
});
|
||||||
|
(meta, tool)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn attach_or_restore_pod_tool<St>(discovery: PodDiscovery<St>) -> ToolDefinition
|
||||||
|
where
|
||||||
|
St: PodMetadataStore + Clone + Send + Sync + 'static,
|
||||||
|
{
|
||||||
|
Arc::new(move || {
|
||||||
|
let meta = ToolMeta::new("AttachOrRestorePod")
|
||||||
|
.description(
|
||||||
|
"Attach to a visible live Pod, or restore it from Pod state when no live socket is reachable. Missing state is an error.",
|
||||||
|
)
|
||||||
|
.input_schema(serde_json::to_value(schemars::schema_for!(PodNameInput)).unwrap());
|
||||||
|
let tool: Arc<dyn Tool> = Arc::new(AttachOrRestorePodTool {
|
||||||
|
discovery: discovery.clone(),
|
||||||
|
});
|
||||||
|
(meta, tool)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn json_content<T: Serialize>(value: &T) -> Result<String, ToolError> {
|
||||||
|
serde_json::to_string_pretty(value)
|
||||||
|
.map_err(|e| ToolError::Internal(format!("serialize pod discovery output: {e}")))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn discovery_error_to_tool_error(error: PodDiscoveryError) -> ToolError {
|
||||||
|
match error {
|
||||||
|
PodDiscoveryError::StateMissing { .. }
|
||||||
|
| PodDiscoveryError::NotVisible { .. }
|
||||||
|
| PodDiscoveryError::NotRestorable { .. } => ToolError::InvalidArgument(error.to_string()),
|
||||||
|
PodDiscoveryError::LockConflict { .. }
|
||||||
|
| PodDiscoveryError::Store(_)
|
||||||
|
| PodDiscoveryError::ScopeLock(_)
|
||||||
|
| PodDiscoveryError::RestoreSpawn(_)
|
||||||
|
| PodDiscoveryError::RestoreExited { .. }
|
||||||
|
| PodDiscoveryError::RestoreTimeout => ToolError::ExecutionFailed(error.to_string()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use std::sync::Mutex;
|
||||||
|
|
||||||
|
use manifest::{Permission, ScopeRule};
|
||||||
|
use protocol::Greeting;
|
||||||
|
use protocol::stream::JsonLineWriter;
|
||||||
|
use session_store::{
|
||||||
|
FsStore, PodSpawnedChild, PodSpawnedScopeRule, new_segment_id, new_session_id,
|
||||||
|
};
|
||||||
|
use tempfile::TempDir;
|
||||||
|
use tokio::net::UnixListener;
|
||||||
|
|
||||||
|
use crate::runtime::dir::RuntimeDir;
|
||||||
|
|
||||||
|
static ENV_LOCK: Mutex<()> = Mutex::new(());
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "current_thread")]
|
||||||
|
async fn state_backed_visibility_and_attach_restore_planning() {
|
||||||
|
let _env = ENV_LOCK.lock().unwrap();
|
||||||
|
let root = TempDir::new().unwrap();
|
||||||
|
let store_dir = root.path().join("store");
|
||||||
|
let runtime_base = root.path().join("runtime");
|
||||||
|
std::fs::create_dir_all(&runtime_base).unwrap();
|
||||||
|
unsafe {
|
||||||
|
std::env::set_var("INSOMNIA_RUNTIME_DIR", &runtime_base);
|
||||||
|
}
|
||||||
|
|
||||||
|
let store = FsStore::new(&store_dir).unwrap();
|
||||||
|
let session_id = new_session_id();
|
||||||
|
let active_child_segment = new_segment_id();
|
||||||
|
let pending_session_id = new_session_id();
|
||||||
|
let live_socket = runtime_base.join("child-live").join("sock");
|
||||||
|
std::fs::create_dir_all(live_socket.parent().unwrap()).unwrap();
|
||||||
|
let live_listener = spawn_snapshot_socket(&live_socket).await;
|
||||||
|
|
||||||
|
let stale_socket = runtime_base.join("child-stale").join("sock");
|
||||||
|
let pending_socket = runtime_base.join("child-pending").join("sock");
|
||||||
|
let parent = PodMetadata {
|
||||||
|
pod_name: "parent".into(),
|
||||||
|
active: None,
|
||||||
|
spawned_children: vec![
|
||||||
|
child("child-live", &live_socket),
|
||||||
|
child("child-stale", &stale_socket),
|
||||||
|
child("child-pending", &pending_socket),
|
||||||
|
],
|
||||||
|
};
|
||||||
|
store.write(&parent).unwrap();
|
||||||
|
store
|
||||||
|
.write(&PodMetadata {
|
||||||
|
pod_name: "child-live".into(),
|
||||||
|
active: Some(PodActiveSegmentRef::active_segment(
|
||||||
|
session_id,
|
||||||
|
active_child_segment,
|
||||||
|
)),
|
||||||
|
spawned_children: Vec::new(),
|
||||||
|
})
|
||||||
|
.unwrap();
|
||||||
|
store
|
||||||
|
.write(&PodMetadata {
|
||||||
|
pod_name: "child-stale".into(),
|
||||||
|
active: Some(PodActiveSegmentRef::active_segment(
|
||||||
|
session_id,
|
||||||
|
active_child_segment,
|
||||||
|
)),
|
||||||
|
spawned_children: Vec::new(),
|
||||||
|
})
|
||||||
|
.unwrap();
|
||||||
|
store
|
||||||
|
.write(&PodMetadata {
|
||||||
|
pod_name: "child-pending".into(),
|
||||||
|
active: Some(PodActiveSegmentRef::pending_segment(pending_session_id)),
|
||||||
|
spawned_children: Vec::new(),
|
||||||
|
})
|
||||||
|
.unwrap();
|
||||||
|
store
|
||||||
|
.write(&PodMetadata {
|
||||||
|
pod_name: "hidden".into(),
|
||||||
|
active: Some(PodActiveSegmentRef::active_segment(
|
||||||
|
session_id,
|
||||||
|
new_segment_id(),
|
||||||
|
)),
|
||||||
|
spawned_children: Vec::new(),
|
||||||
|
})
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// RuntimeDir creates parent runtime files; discovery must still use
|
||||||
|
// Pod state when spawned_pods.json is absent.
|
||||||
|
let runtime_dir = Arc::new(RuntimeDir::create(&runtime_base, "parent").await.unwrap());
|
||||||
|
let runtime_file = runtime_dir.path().join("spawned_pods.json");
|
||||||
|
assert!(!runtime_file.exists());
|
||||||
|
let registry = SpawnedPodRegistry::new(runtime_dir);
|
||||||
|
let discovery = PodDiscovery::new(
|
||||||
|
store.clone(),
|
||||||
|
"parent".into(),
|
||||||
|
runtime_base.clone(),
|
||||||
|
root.path().to_path_buf(),
|
||||||
|
registry,
|
||||||
|
);
|
||||||
|
|
||||||
|
let list = discovery.list_visible().await.unwrap();
|
||||||
|
let names: Vec<_> = list.iter().map(|p| p.pod_name.as_str()).collect();
|
||||||
|
assert_eq!(
|
||||||
|
names,
|
||||||
|
vec!["child-live", "child-pending", "child-stale", "parent"]
|
||||||
|
);
|
||||||
|
assert!(!names.contains(&"hidden"));
|
||||||
|
assert!(
|
||||||
|
list.iter()
|
||||||
|
.find(|p| p.pod_name == "child-live")
|
||||||
|
.unwrap()
|
||||||
|
.live
|
||||||
|
.reachable
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
!list
|
||||||
|
.iter()
|
||||||
|
.find(|p| p.pod_name == "child-stale")
|
||||||
|
.unwrap()
|
||||||
|
.live
|
||||||
|
.reachable
|
||||||
|
);
|
||||||
|
|
||||||
|
let pending = list.iter().find(|p| p.pod_name == "child-pending").unwrap();
|
||||||
|
assert_eq!(pending.state, PodStateStatus::Readable);
|
||||||
|
assert_eq!(
|
||||||
|
pending.active.as_ref().unwrap().session_id,
|
||||||
|
pending_session_id
|
||||||
|
);
|
||||||
|
assert_eq!(pending.active.as_ref().unwrap().segment_id, None);
|
||||||
|
assert!(!pending.restore.possible);
|
||||||
|
|
||||||
|
let hidden_err = discovery.inspect("hidden").await.unwrap_err();
|
||||||
|
assert!(matches!(hidden_err, PodDiscoveryError::NotVisible { .. }));
|
||||||
|
let missing_err = discovery.inspect("missing").await.unwrap_err();
|
||||||
|
assert!(matches!(
|
||||||
|
missing_err,
|
||||||
|
PodDiscoveryError::StateMissing { .. }
|
||||||
|
));
|
||||||
|
let hidden_restore_err = discovery
|
||||||
|
.plan_attach_or_restore("hidden")
|
||||||
|
.await
|
||||||
|
.unwrap_err();
|
||||||
|
assert!(matches!(
|
||||||
|
hidden_restore_err,
|
||||||
|
PodDiscoveryError::NotVisible { .. }
|
||||||
|
));
|
||||||
|
|
||||||
|
let attach_plan = discovery
|
||||||
|
.plan_attach_or_restore("child-live")
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
assert!(matches!(attach_plan, AttachRestorePlan::Attach { .. }));
|
||||||
|
let restore_plan = discovery
|
||||||
|
.plan_attach_or_restore("child-stale")
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
assert!(matches!(restore_plan, AttachRestorePlan::Restore { .. }));
|
||||||
|
|
||||||
|
let lock_socket = runtime_base.join("lock-owner.sock");
|
||||||
|
let _guard = pod_registry::install_top_level(
|
||||||
|
"lock-owner".into(),
|
||||||
|
std::process::id(),
|
||||||
|
lock_socket.clone(),
|
||||||
|
vec![ScopeRule {
|
||||||
|
target: root.path().to_path_buf(),
|
||||||
|
permission: Permission::Read,
|
||||||
|
recursive: true,
|
||||||
|
}],
|
||||||
|
active_child_segment,
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
let locked_err = discovery
|
||||||
|
.plan_attach_or_restore("child-stale")
|
||||||
|
.await
|
||||||
|
.unwrap_err();
|
||||||
|
assert!(matches!(locked_err, PodDiscoveryError::LockConflict { .. }));
|
||||||
|
|
||||||
|
live_listener.abort();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn child(name: &str, socket_path: &Path) -> PodSpawnedChild {
|
||||||
|
PodSpawnedChild {
|
||||||
|
pod_name: name.to_string(),
|
||||||
|
socket_path: socket_path.to_path_buf(),
|
||||||
|
scope_delegated: vec![PodSpawnedScopeRule {
|
||||||
|
target: PathBuf::from("/tmp"),
|
||||||
|
permission: "read".into(),
|
||||||
|
recursive: true,
|
||||||
|
}],
|
||||||
|
callback_address: PathBuf::from("/tmp/parent.sock"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn spawn_snapshot_socket(socket_path: &Path) -> tokio::task::JoinHandle<()> {
|
||||||
|
let _ = std::fs::remove_file(socket_path);
|
||||||
|
let listener = UnixListener::bind(socket_path).unwrap();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
loop {
|
||||||
|
let Ok((stream, _)) = listener.accept().await else {
|
||||||
|
break;
|
||||||
|
};
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let mut writer = JsonLineWriter::new(stream);
|
||||||
|
let _ = writer
|
||||||
|
.write(&Event::Snapshot {
|
||||||
|
entries: Vec::new(),
|
||||||
|
greeting: Greeting {
|
||||||
|
pod_name: "child-live".into(),
|
||||||
|
cwd: "/tmp".into(),
|
||||||
|
provider: "test".into(),
|
||||||
|
model: "test".into(),
|
||||||
|
scope_summary: String::new(),
|
||||||
|
tools: Vec::new(),
|
||||||
|
context_window: 0,
|
||||||
|
context_tokens: 0,
|
||||||
|
},
|
||||||
|
status: PodStatus::Idle,
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -1,5 +1,6 @@
|
||||||
pub mod compact;
|
pub mod compact;
|
||||||
pub mod controller;
|
pub mod controller;
|
||||||
|
pub mod discovery;
|
||||||
pub mod fs_view;
|
pub mod fs_view;
|
||||||
pub mod hook;
|
pub mod hook;
|
||||||
pub mod ipc;
|
pub mod ipc;
|
||||||
|
|
|
||||||
|
|
@ -53,6 +53,11 @@ struct Cli {
|
||||||
#[arg(long, value_name = "NAME", conflicts_with_all = ["session", "adopt"])]
|
#[arg(long, value_name = "NAME", conflicts_with_all = ["session", "adopt"])]
|
||||||
pod: Option<String>,
|
pod: Option<String>,
|
||||||
|
|
||||||
|
/// Require `--pod` to restore existing Pod state instead of creating a
|
||||||
|
/// fresh Pod when no state exists. Used by Pod discovery restore flows.
|
||||||
|
#[arg(long, requires = "pod")]
|
||||||
|
require_pod_state: bool,
|
||||||
|
|
||||||
/// Restore a Pod from an existing session. The Pod re-uses the
|
/// Restore a Pod from an existing session. The Pod re-uses the
|
||||||
/// given session id and appends new turns to the same jsonl;
|
/// given session id and appends new turns to the same jsonl;
|
||||||
/// concurrent writers are prevented by the pod-registry.
|
/// concurrent writers are prevented by the pod-registry.
|
||||||
|
|
@ -269,6 +274,10 @@ async fn main() -> ExitCode {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Ok(None) if cli.require_pod_state => {
|
||||||
|
eprintln!("error: pod state missing for {pod_name}");
|
||||||
|
return ExitCode::FAILURE;
|
||||||
|
}
|
||||||
Ok(None) => match Pod::from_manifest(manifest, store, loader).await {
|
Ok(None) => match Pod::from_manifest(manifest, store, loader).await {
|
||||||
Ok(p) => p,
|
Ok(p) => p,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
|
|
||||||
|
|
@ -129,7 +129,11 @@ impl SpawnedPodRegistry {
|
||||||
|
|
||||||
runtime_dir.write_spawned_pods(&records).await?;
|
runtime_dir.write_spawned_pods(&records).await?;
|
||||||
let state_writer = pod_state_writer(store, pod_name.clone());
|
let state_writer = pod_state_writer(store, pod_name.clone());
|
||||||
if pruned || metadata.is_some() {
|
// Runtime spawned-pod records are a live registry for ListPods and
|
||||||
|
// cursor/scope cleanup; durable Pod state remains the discovery source
|
||||||
|
// for later attach/restore, so do not delete unreachable children from
|
||||||
|
// Pod state just because their sockets are gone.
|
||||||
|
if metadata.is_none() || !pruned {
|
||||||
state_writer(&records)?;
|
state_writer(&records)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -579,7 +579,7 @@ async fn restored_registry_uses_pod_state_without_runtime_file() {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn load_from_pod_state_prunes_children_with_missing_sockets() {
|
async fn load_from_pod_state_prunes_runtime_children_but_preserves_durable_state() {
|
||||||
let runtime_tmp = TempDir::new().unwrap();
|
let runtime_tmp = TempDir::new().unwrap();
|
||||||
let store_tmp = TempDir::new().unwrap();
|
let store_tmp = TempDir::new().unwrap();
|
||||||
let store = FsStore::new(store_tmp.path()).unwrap();
|
let store = FsStore::new(store_tmp.path()).unwrap();
|
||||||
|
|
@ -615,12 +615,23 @@ async fn load_from_pod_state_prunes_children_with_missing_sockets() {
|
||||||
.read_by_name("spawner")
|
.read_by_name("spawner")
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.expect("spawner metadata should be written");
|
.expect("spawner metadata should be written");
|
||||||
assert_eq!(metadata.spawned_children.len(), 1);
|
assert_eq!(metadata.spawned_children.len(), 2);
|
||||||
assert_eq!(metadata.spawned_children[0].pod_name, "alive");
|
assert!(
|
||||||
|
metadata
|
||||||
|
.spawned_children
|
||||||
|
.iter()
|
||||||
|
.any(|c| c.pod_name == "alive")
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
metadata
|
||||||
|
.spawned_children
|
||||||
|
.iter()
|
||||||
|
.any(|c| c.pod_name == "missing")
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn load_from_pod_state_reclaims_pruned_child_scope_and_registry_deny() {
|
async fn load_from_pod_state_reclaims_pruned_child_scope_without_deleting_pod_state() {
|
||||||
let _env = EnvGuard::acquire();
|
let _env = EnvGuard::acquire();
|
||||||
let runtime_tmp = TempDir::new().unwrap();
|
let runtime_tmp = TempDir::new().unwrap();
|
||||||
let store_tmp = TempDir::new().unwrap();
|
let store_tmp = TempDir::new().unwrap();
|
||||||
|
|
@ -705,7 +716,8 @@ async fn load_from_pod_state_reclaims_pruned_child_scope_and_registry_deny() {
|
||||||
.read_by_name("spawner")
|
.read_by_name("spawner")
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.expect("spawner metadata should remain");
|
.expect("spawner metadata should remain");
|
||||||
assert!(metadata.spawned_children.is_empty());
|
assert_eq!(metadata.spawned_children.len(), 1);
|
||||||
|
assert_eq!(metadata.spawned_children[0].pod_name, "missing");
|
||||||
let runtime_contents = std::fs::read_to_string(rd.path().join("spawned_pods.json")).unwrap();
|
let runtime_contents = std::fs::read_to_string(rd.path().join("spawned_pods.json")).unwrap();
|
||||||
let runtime_records: Vec<SpawnedPodRecord> = serde_json::from_str(&runtime_contents).unwrap();
|
let runtime_records: Vec<SpawnedPodRecord> = serde_json::from_str(&runtime_contents).unwrap();
|
||||||
assert!(runtime_records.is_empty());
|
assert!(runtime_records.is_empty());
|
||||||
|
|
|
||||||
|
|
@ -43,6 +43,18 @@ pub enum Method {
|
||||||
kind: CompletionKind,
|
kind: CompletionKind,
|
||||||
prefix: String,
|
prefix: String,
|
||||||
},
|
},
|
||||||
|
/// List Pods visible to this Pod from durable Pod state. This is not a
|
||||||
|
/// host-wide Pod universe query.
|
||||||
|
ListVisiblePods,
|
||||||
|
/// Inspect one Pod by name if its state exists and it is visible to this Pod.
|
||||||
|
InspectPod {
|
||||||
|
name: String,
|
||||||
|
},
|
||||||
|
/// Attach to a visible live Pod, or restore it from durable Pod state when
|
||||||
|
/// it is not live. Missing state and not-visible state are distinct errors.
|
||||||
|
AttachOrRestorePod {
|
||||||
|
name: String,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Typed lifecycle events sent from a child Pod to its parent.
|
/// Typed lifecycle events sent from a child Pod to its parent.
|
||||||
|
|
@ -393,6 +405,20 @@ pub enum Event {
|
||||||
kind: CompletionKind,
|
kind: CompletionKind,
|
||||||
entries: Vec<CompletionEntry>,
|
entries: Vec<CompletionEntry>,
|
||||||
},
|
},
|
||||||
|
/// Reply to `Method::ListVisiblePods`. Payload is a stable JSON value so
|
||||||
|
/// the Pod crate can evolve discovery fields without introducing a protocol
|
||||||
|
/// dependency on session-store.
|
||||||
|
VisiblePods {
|
||||||
|
pods: serde_json::Value,
|
||||||
|
},
|
||||||
|
/// Reply to `Method::InspectPod`.
|
||||||
|
PodInspection {
|
||||||
|
pod: serde_json::Value,
|
||||||
|
},
|
||||||
|
/// Reply to `Method::AttachOrRestorePod`.
|
||||||
|
PodAttachRestore {
|
||||||
|
result: serde_json::Value,
|
||||||
|
},
|
||||||
Alert(Alert),
|
Alert(Alert),
|
||||||
/// Pod has started compacting the current session.
|
/// Pod has started compacting the current session.
|
||||||
///
|
///
|
||||||
|
|
@ -1215,4 +1241,59 @@ mod tests {
|
||||||
other => panic!("expected UserMessage, got {other:?}"),
|
other => panic!("expected UserMessage, got {other:?}"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn pod_discovery_methods_roundtrip() {
|
||||||
|
let methods = [
|
||||||
|
Method::ListVisiblePods,
|
||||||
|
Method::InspectPod {
|
||||||
|
name: "child".into(),
|
||||||
|
},
|
||||||
|
Method::AttachOrRestorePod {
|
||||||
|
name: "child".into(),
|
||||||
|
},
|
||||||
|
];
|
||||||
|
for method in methods {
|
||||||
|
let json = serde_json::to_string(&method).unwrap();
|
||||||
|
let decoded: Method = serde_json::from_str(&json).unwrap();
|
||||||
|
match (decoded, method) {
|
||||||
|
(Method::ListVisiblePods, Method::ListVisiblePods)
|
||||||
|
| (Method::InspectPod { .. }, Method::InspectPod { .. })
|
||||||
|
| (Method::AttachOrRestorePod { .. }, Method::AttachOrRestorePod { .. }) => {}
|
||||||
|
(decoded, expected) => panic!("decoded {decoded:?}, expected {expected:?}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn pod_discovery_events_roundtrip() {
|
||||||
|
let events = [
|
||||||
|
Event::VisiblePods {
|
||||||
|
pods: serde_json::json!([{ "pod_name": "child" }]),
|
||||||
|
},
|
||||||
|
Event::PodInspection {
|
||||||
|
pod: serde_json::json!({ "pod_name": "child" }),
|
||||||
|
},
|
||||||
|
Event::PodAttachRestore {
|
||||||
|
result: serde_json::json!({ "action": "attach" }),
|
||||||
|
},
|
||||||
|
];
|
||||||
|
for event in events {
|
||||||
|
let json = serde_json::to_string(&event).unwrap();
|
||||||
|
let decoded: Event = serde_json::from_str(&json).unwrap();
|
||||||
|
match (decoded, event) {
|
||||||
|
(Event::VisiblePods { pods }, Event::VisiblePods { pods: expected }) => {
|
||||||
|
assert_eq!(pods, expected)
|
||||||
|
}
|
||||||
|
(Event::PodInspection { pod }, Event::PodInspection { pod: expected }) => {
|
||||||
|
assert_eq!(pod, expected)
|
||||||
|
}
|
||||||
|
(
|
||||||
|
Event::PodAttachRestore { result },
|
||||||
|
Event::PodAttachRestore { result: expected },
|
||||||
|
) => assert_eq!(result, expected),
|
||||||
|
(decoded, expected) => panic!("decoded {decoded:?}, expected {expected:?}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -123,6 +123,35 @@ impl PodMetadataStore for FsStore {
|
||||||
Ok(Some(serde_json::from_str(&content)?))
|
Ok(Some(serde_json::from_str(&content)?))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn list_names(&self) -> Result<Vec<String>, StoreError> {
|
||||||
|
let dir = self.pods_dir();
|
||||||
|
let mut names = Vec::new();
|
||||||
|
if !dir.exists() {
|
||||||
|
return Ok(names);
|
||||||
|
}
|
||||||
|
for entry in fs::read_dir(dir)? {
|
||||||
|
let entry = entry?;
|
||||||
|
if !entry.file_type()?.is_dir() {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if !entry.path().join("metadata.json").exists() {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let Some(name) = entry.file_name().to_str().map(ToOwned::to_owned) else {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
if validate_pod_name(&name).is_ok() {
|
||||||
|
names.push(name);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
names.sort();
|
||||||
|
Ok(names)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn root_dir(&self) -> Option<PathBuf> {
|
||||||
|
Some(self.root.clone())
|
||||||
|
}
|
||||||
|
|
||||||
fn delete_by_name(&self, pod_name: &str) -> Result<(), StoreError> {
|
fn delete_by_name(&self, pod_name: &str) -> Result<(), StoreError> {
|
||||||
let path = self.pod_metadata_path(pod_name)?;
|
let path = self.pod_metadata_path(pod_name)?;
|
||||||
match fs::remove_file(&path) {
|
match fs::remove_file(&path) {
|
||||||
|
|
|
||||||
|
|
@ -92,6 +92,16 @@ pub trait PodMetadataStore: Send + Sync {
|
||||||
/// Read metadata by Pod name. Returns `None` when no metadata exists.
|
/// Read metadata by Pod name. Returns `None` when no metadata exists.
|
||||||
fn read_by_name(&self, pod_name: &str) -> Result<Option<PodMetadata>, StoreError>;
|
fn read_by_name(&self, pod_name: &str) -> Result<Option<PodMetadata>, StoreError>;
|
||||||
|
|
||||||
|
/// List persisted Pod metadata keys. Implementations return names only;
|
||||||
|
/// callers can then read each item independently so a corrupt metadata
|
||||||
|
/// file does not make the whole discovery result fail.
|
||||||
|
fn list_names(&self) -> Result<Vec<String>, StoreError>;
|
||||||
|
|
||||||
|
/// Return the metadata root directory when this backend is path-backed.
|
||||||
|
fn root_dir(&self) -> Option<PathBuf> {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
|
||||||
/// Delete metadata by Pod name. Missing metadata is a successful no-op.
|
/// Delete metadata by Pod name. Missing metadata is a successful no-op.
|
||||||
fn delete_by_name(&self, pod_name: &str) -> Result<(), StoreError>;
|
fn delete_by_name(&self, pod_name: &str) -> Result<(), StoreError>;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -250,6 +250,7 @@ fn pod_metadata_minimal_crud() {
|
||||||
|
|
||||||
let pending = PodMetadata::new(pod_name, Some(PodActiveSegmentRef::pending_segment(sid)));
|
let pending = PodMetadata::new(pod_name, Some(PodActiveSegmentRef::pending_segment(sid)));
|
||||||
store.write(&pending).unwrap();
|
store.write(&pending).unwrap();
|
||||||
|
assert_eq!(store.list_names().unwrap(), vec![pod_name.to_string()]);
|
||||||
assert_eq!(store.read_by_name(pod_name).unwrap(), Some(pending.clone()));
|
assert_eq!(store.read_by_name(pod_name).unwrap(), Some(pending.clone()));
|
||||||
assert!(
|
assert!(
|
||||||
dir.path()
|
dir.path()
|
||||||
|
|
|
||||||
|
|
@ -758,6 +758,9 @@ impl App {
|
||||||
state.selected = 0;
|
state.selected = 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Event::VisiblePods { .. }
|
||||||
|
| Event::PodInspection { .. }
|
||||||
|
| Event::PodAttachRestore { .. } => {}
|
||||||
Event::Shutdown => {
|
Event::Shutdown => {
|
||||||
self.mark_orphan_compacts_incomplete();
|
self.mark_orphan_compacts_incomplete();
|
||||||
self.quit = true;
|
self.quit = true;
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user