diff --git a/crates/pod/src/controller.rs b/crates/pod/src/controller.rs index 8335c257..d2fc319b 100644 --- a/crates/pod/src/controller.rs +++ b/crates/pod/src/controller.rs @@ -6,6 +6,9 @@ use llm_worker::llm_client::client::LlmClient; use session_store::{PodMetadataStore, Store}; use tokio::sync::{broadcast, mpsc, oneshot}; +use crate::discovery::{ + PodDiscovery, attach_or_restore_pod_tool, inspect_pod_tool, list_visible_pods_tool, +}; use crate::ipc::alerter::Alerter; use crate::ipc::notify_buffer::NotifyBuffer; use crate::ipc::server::SocketServer; @@ -78,7 +81,7 @@ async fn finish_controller_run( new_status: PodStatus, ) where C: LlmClient + Clone + 'static, - St: Store + Clone + 'static, + St: Store + PodMetadataStore + Clone + 'static, { // history / user_segments are no longer mirrored on PodSharedState — // clients reconstruct them from `Event::Snapshot` + live @@ -298,7 +301,7 @@ fn wire_event_bridges_on_worker( alerter: &Alerter, ) where C: LlmClient + Clone + 'static, - St: Store + Clone + 'static, + St: Store + PodMetadataStore + Clone + 'static, { let worker = pod.worker_mut(); @@ -436,7 +439,7 @@ fn register_pod_tools( ) -> tools::ScopedFs where C: LlmClient + Clone + 'static, - St: Store + Clone + 'static, + St: Store + PodMetadataStore + Clone + 'static, { // Pod-immutable snapshots taken before the mutable worker borrow // below so the worker borrow doesn't conflict with reads on `pod`. @@ -448,6 +451,7 @@ where let memory_config = pod.manifest().memory.clone(); let spawner_name = pod.manifest().pod.name.clone(); let spawner_model = pod.manifest().model.clone(); + let pod_store = pod.store().clone(); let self_parent_socket = pod.callback_socket().cloned(); let worker = pod.worker_mut(); @@ -492,10 +496,10 @@ where // the Pod-scoped `SpawnedPodRegistry` (also consumed by the main // loop's `PodEvent` handler). worker.register_tool(spawn_pod_tool( - spawner_name, + spawner_name.clone(), spawner_socket, - runtime_base, - pwd, + runtime_base.clone(), + pwd.clone(), spawned_registry.clone(), self_parent_socket, spawner_model, @@ -505,7 +509,12 @@ where worker.register_tool(send_to_pod_tool(spawned_registry.clone())); worker.register_tool(read_pod_output_tool(spawned_registry.clone())); worker.register_tool(stop_pod_tool(spawned_registry.clone())); - worker.register_tool(list_pods_tool(spawned_registry)); + worker.register_tool(list_pods_tool(spawned_registry.clone())); + + let discovery = PodDiscovery::new(pod_store, spawner_name, runtime_base, pwd, spawned_registry); + worker.register_tool(list_visible_pods_tool(discovery.clone())); + worker.register_tool(inspect_pod_tool(discovery.clone())); + worker.register_tool(attach_or_restore_pod_tool(discovery)); pod.attach_tracker(tracker); fs_for_view } @@ -532,11 +541,23 @@ async fn controller_loop( socket_server: SocketServer, ) where C: LlmClient + Clone + 'static, - St: Store + Clone + 'static, + St: Store + PodMetadataStore + Clone + 'static, { // Hold socket server alive for the lifetime of the controller task. let _socket_server = socket_server; + let discovery_runtime_base = runtime_dir + .path() + .parent() + .map(PathBuf::from) + .unwrap_or_else(|| runtime_dir.path().to_path_buf()); + let discovery = PodDiscovery::new( + pod.store().clone(), + spawner_name.clone(), + discovery_runtime_base, + pod.pwd().to_path_buf(), + spawned_registry.clone(), + ); let mut pending: Option = None; loop { @@ -691,6 +712,66 @@ async fn controller_loop( break; } + Method::ListVisiblePods => match discovery.list_visible().await { + Ok(pods) => match serde_json::to_value(pods) { + Ok(pods) => { + let _ = event_tx.send(Event::VisiblePods { pods }); + } + Err(error) => { + let _ = event_tx.send(Event::Error { + code: ErrorCode::Internal, + message: format!("serialize visible pods: {error}"), + }); + } + }, + Err(error) => { + let _ = event_tx.send(Event::Error { + code: ErrorCode::InvalidRequest, + message: error.to_string(), + }); + } + }, + + Method::InspectPod { name } => match discovery.inspect(&name).await { + Ok(pod) => match serde_json::to_value(pod) { + Ok(pod) => { + let _ = event_tx.send(Event::PodInspection { pod }); + } + Err(error) => { + let _ = event_tx.send(Event::Error { + code: ErrorCode::Internal, + message: format!("serialize pod inspection: {error}"), + }); + } + }, + Err(error) => { + let _ = event_tx.send(Event::Error { + code: ErrorCode::InvalidRequest, + message: error.to_string(), + }); + } + }, + + Method::AttachOrRestorePod { name } => match discovery.attach_or_restore(&name).await { + Ok(result) => match serde_json::to_value(result) { + Ok(result) => { + let _ = event_tx.send(Event::PodAttachRestore { result }); + } + Err(error) => { + let _ = event_tx.send(Event::Error { + code: ErrorCode::Internal, + message: format!("serialize pod attach/restore result: {error}"), + }); + } + }, + Err(error) => { + let _ = event_tx.send(Event::Error { + code: ErrorCode::InvalidRequest, + message: error.to_string(), + }); + } + }, + // ListCompletions is handled at the socket layer (direct // response). If it reaches the controller, ignore it. Method::ListCompletions { .. } => {} @@ -865,6 +946,17 @@ where notify_buffer.push_notify(message); } Some(Method::ListCompletions { .. }) => {} + Some( + Method::ListVisiblePods + | Method::InspectPod { .. } + | Method::AttachOrRestorePod { .. }, + ) => { + let _ = event_tx.send(Event::Error { + code: ErrorCode::AlreadyRunning, + message: "Pod discovery requests are only handled while the Pod is idle or paused" + .into(), + }); + } Some(Method::PodEvent(event)) => { // mpsc is consume-once, so we cannot defer this // to the next main-loop iteration — drop here diff --git a/crates/pod/src/discovery.rs b/crates/pod/src/discovery.rs new file mode 100644 index 00000000..8a69e9cc --- /dev/null +++ b/crates/pod/src/discovery.rs @@ -0,0 +1,973 @@ +//! Pod-state-backed discovery and restore/attach tools. +//! +//! This surface deliberately does not enumerate every Pod on the host. The +//! listing path starts from the caller's visibility set (the caller itself and +//! Pods it spawned according to durable Pod state) and only then reads each +//! Pod's own state. Name-targeted operations distinguish missing state from +//! state that exists but is outside that visibility set. + +use std::collections::BTreeMap; +use std::io; +use std::path::{Path, PathBuf}; +use std::process::Stdio; +use std::sync::Arc; +use std::time::Duration; + +use async_trait::async_trait; +use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput}; +use protocol::stream::JsonLineReader; +use protocol::{Event, PodStatus}; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; +use session_store::{PodActiveSegmentRef, PodMetadata, PodMetadataStore, SegmentId, SessionId}; +use tokio::net::UnixStream; +use tokio::process::Command; + +use crate::runtime::pod_registry; +use crate::spawn::registry::SpawnedPodRegistry; + +const PROBE_TIMEOUT: Duration = Duration::from_millis(500); +const RESTORE_START_TIMEOUT: Duration = Duration::from_secs(20); + +#[derive(Clone)] +pub struct PodDiscovery { + store: St, + self_pod_name: String, + runtime_base: PathBuf, + cwd: PathBuf, + store_dir: Option, + spawned_registry: Arc, +} + +impl PodDiscovery +where + St: PodMetadataStore + Clone + Send + Sync + 'static, +{ + pub fn new( + store: St, + self_pod_name: String, + runtime_base: PathBuf, + cwd: PathBuf, + spawned_registry: Arc, + ) -> Self { + let store_dir = store.root_dir(); + Self { + store, + self_pod_name, + runtime_base, + cwd, + store_dir, + spawned_registry, + } + } + + pub async fn list_visible(&self) -> Result, PodDiscoveryError> { + let visibility = self.visibility().await?; + let mut items = Vec::with_capacity(visibility.visible.len()); + for pod_name in visibility.visible.keys() { + items.push( + self.build_item_for_visible_name(pod_name, &visibility) + .await, + ); + } + Ok(items) + } + + pub async fn inspect(&self, pod_name: &str) -> Result { + let visibility = self.visibility().await?; + let known_names = self.store.list_names()?; + let state_exists = known_names.iter().any(|n| n == pod_name); + if !state_exists { + return Err(PodDiscoveryError::StateMissing { + pod_name: pod_name.to_string(), + }); + } + if !visibility.visible.contains_key(pod_name) { + return Err(PodDiscoveryError::NotVisible { + pod_name: pod_name.to_string(), + }); + } + + match self.store.read_by_name(pod_name)? { + Some(metadata) => Ok(self.detail_from_metadata(metadata, &visibility).await), + None => Err(PodDiscoveryError::StateMissing { + pod_name: pod_name.to_string(), + }), + } + } + + pub async fn attach_or_restore( + &self, + pod_name: &str, + ) -> Result { + match self.plan_attach_or_restore(pod_name).await? { + AttachRestorePlan::Attach { + pod_name, + socket_path, + status, + } => Ok(AttachRestoreResult::Attached { + pod_name, + socket_path, + status, + }), + AttachRestorePlan::Restore { + pod_name, + socket_path, + } => { + self.spawn_restore_process(&pod_name, &socket_path).await?; + Ok(AttachRestoreResult::Restored { + pod_name, + socket_path, + }) + } + } + } + + pub async fn plan_attach_or_restore( + &self, + pod_name: &str, + ) -> Result { + let detail = self.inspect(pod_name).await?; + if detail.live.reachable { + return Ok(AttachRestorePlan::Attach { + pod_name: pod_name.to_string(), + socket_path: detail.live.socket_path, + status: detail.live.status, + }); + } + + let active = detail + .active + .ok_or_else(|| PodDiscoveryError::NotRestorable { + pod_name: pod_name.to_string(), + reason: "pod state has no active session".into(), + })?; + let segment_id = active + .segment_id + .ok_or_else(|| PodDiscoveryError::NotRestorable { + pod_name: pod_name.to_string(), + reason: "pod state has an active session but no active segment yet".into(), + })?; + + if let Some(lock) = lookup_segment_lock(segment_id)? { + let lock_live = probe_socket(&lock.socket).await; + return if lock_live.reachable { + Ok(AttachRestorePlan::Attach { + pod_name: lock.pod_name, + socket_path: lock.socket, + status: lock_live.status, + }) + } else { + Err(PodDiscoveryError::LockConflict { + pod_name: pod_name.to_string(), + segment_id, + owner_pod: lock.pod_name, + socket_path: lock.socket, + pid: lock.pid, + }) + }; + } + + Ok(AttachRestorePlan::Restore { + pod_name: pod_name.to_string(), + socket_path: self.default_socket_path(pod_name), + }) + } + + async fn visibility(&self) -> Result { + let mut visible = BTreeMap::new(); + let mut child_sockets = BTreeMap::new(); + visible.insert(self.self_pod_name.clone(), VisibilityReason::SelfPod); + + // Durable parent -> child state is the primary visibility source. + if let Some(metadata) = self.store.read_by_name(&self.self_pod_name)? { + for child in metadata.spawned_children { + visible + .entry(child.pod_name.clone()) + .or_insert(VisibilityReason::SpawnedChild); + child_sockets.insert(child.pod_name, child.socket_path); + } + } + + // The live in-memory registry covers just-spawned children even if a + // state write failed after the process became reachable. It is an + // additive visibility hint, not the source of Pod metadata. + for record in self.spawned_registry.list().await { + visible + .entry(record.pod_name.clone()) + .or_insert(VisibilityReason::SpawnedChild); + child_sockets.insert(record.pod_name, record.socket_path); + } + + Ok(VisibilitySet { + visible, + child_sockets, + }) + } + + async fn build_item_for_visible_name( + &self, + pod_name: &str, + visibility: &VisibilitySet, + ) -> VisiblePodItem { + let visibility_reason = visibility.reason_for(pod_name); + match self.store.read_by_name(pod_name) { + Ok(Some(metadata)) => { + let detail = self.detail_from_metadata(metadata, visibility).await; + VisiblePodItem { + pod_name: pod_name.to_string(), + visibility: visibility_reason, + state: PodStateStatus::Readable, + active: detail.active, + live: detail.live, + restore: detail.restore, + spawned_children: detail.spawned_children, + error: None, + } + } + Ok(None) => VisiblePodItem { + pod_name: pod_name.to_string(), + visibility: visibility_reason, + state: PodStateStatus::Missing, + active: None, + live: self.live_for_name(pod_name, None).await, + restore: RestoreInfo::not_possible("pod state missing"), + spawned_children: SpawnedChildrenSummary::default(), + error: None, + }, + Err(error) => VisiblePodItem { + pod_name: pod_name.to_string(), + visibility: visibility_reason, + state: PodStateStatus::Corrupt, + active: None, + live: self.live_for_name(pod_name, None).await, + restore: RestoreInfo::not_possible("pod state is unreadable"), + spawned_children: SpawnedChildrenSummary::default(), + error: Some(error.to_string()), + }, + } + } + + async fn detail_from_metadata( + &self, + metadata: PodMetadata, + visibility: &VisibilitySet, + ) -> PodDetail { + let child_socket = visibility.child_socket_for(&metadata.pod_name); + let live = self + .live_for_name(&metadata.pod_name, child_socket.as_deref()) + .await; + let restore = self + .restore_info(&metadata.pod_name, metadata.active.as_ref()) + .await; + let spawned_children = summarize_spawned_children(&metadata.spawned_children).await; + PodDetail { + pod_name: metadata.pod_name.clone(), + visibility: visibility.reason_for(&metadata.pod_name), + active: metadata.active.map(ActivePointer::from), + live, + restore, + spawned_children, + } + } + + async fn restore_info( + &self, + pod_name: &str, + active: Option<&PodActiveSegmentRef>, + ) -> RestoreInfo { + let Some(active) = active else { + return RestoreInfo::not_possible("pod state has no active session"); + }; + let Some(segment_id) = active.segment_id else { + return RestoreInfo::not_possible("active segment is not known yet"); + }; + match lookup_segment_lock(segment_id) { + Ok(Some(lock)) => RestoreInfo { + possible: false, + restore_name: Some(pod_name.to_string()), + reason: Some(format!( + "segment is currently locked by `{}` (pid {})", + lock.pod_name, lock.pid + )), + }, + Ok(None) => RestoreInfo { + possible: true, + restore_name: Some(pod_name.to_string()), + reason: None, + }, + Err(error) => RestoreInfo { + possible: false, + restore_name: Some(pod_name.to_string()), + reason: Some(format!("lock lookup failed: {error}")), + }, + } + } + + async fn live_for_name(&self, pod_name: &str, socket_override: Option<&Path>) -> LiveInfo { + let socket_path = socket_override + .map(Path::to_path_buf) + .unwrap_or_else(|| self.default_socket_path(pod_name)); + probe_socket(&socket_path).await + } + + fn default_socket_path(&self, pod_name: &str) -> PathBuf { + self.runtime_base.join(pod_name).join("sock") + } + + async fn spawn_restore_process( + &self, + pod_name: &str, + socket_path: &Path, + ) -> Result<(), PodDiscoveryError> { + let mut command = Command::new(resolve_pod_command()); + command + .arg("--pod") + .arg(pod_name) + .arg("--require-pod-state") + .current_dir(&self.cwd) + .stdin(Stdio::null()) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .process_group(0); + if let Some(store_dir) = &self.store_dir { + command.arg("--store").arg(store_dir); + } + + let mut child = command.spawn().map_err(PodDiscoveryError::RestoreSpawn)?; + let deadline = tokio::time::Instant::now() + RESTORE_START_TIMEOUT; + loop { + if probe_socket(socket_path).await.reachable { + tokio::spawn(async move { + let _ = child.wait().await; + }); + return Ok(()); + } + if let Some(status) = child.try_wait().map_err(PodDiscoveryError::RestoreSpawn)? { + return Err(PodDiscoveryError::RestoreExited { status }); + } + if tokio::time::Instant::now() >= deadline { + let _ = child.start_kill(); + let _ = child.wait().await; + return Err(PodDiscoveryError::RestoreTimeout); + } + tokio::time::sleep(Duration::from_millis(100)).await; + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum VisibilityReason { + SelfPod, + SpawnedChild, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum PodStateStatus { + Readable, + Missing, + Corrupt, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct ActivePointer { + pub session_id: SessionId, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub segment_id: Option, +} + +impl From for ActivePointer { + fn from(value: PodActiveSegmentRef) -> Self { + Self { + session_id: value.session_id, + segment_id: value.segment_id, + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct LiveInfo { + pub socket_path: PathBuf, + pub reachable: bool, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub status: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub error: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct RestoreInfo { + pub possible: bool, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub restore_name: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub reason: Option, +} + +impl RestoreInfo { + fn not_possible(reason: impl Into) -> Self { + Self { + possible: false, + restore_name: None, + reason: Some(reason.into()), + } + } +} + +#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)] +pub struct SpawnedChildrenSummary { + pub count: usize, + pub reachable: usize, + pub unreachable: usize, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct VisiblePodItem { + pub pod_name: String, + pub visibility: VisibilityReason, + pub state: PodStateStatus, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub active: Option, + pub live: LiveInfo, + pub restore: RestoreInfo, + pub spawned_children: SpawnedChildrenSummary, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub error: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct PodDetail { + pub pod_name: String, + pub visibility: VisibilityReason, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub active: Option, + pub live: LiveInfo, + pub restore: RestoreInfo, + pub spawned_children: SpawnedChildrenSummary, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(tag = "action", rename_all = "snake_case")] +pub enum AttachRestorePlan { + Attach { + pod_name: String, + socket_path: PathBuf, + #[serde(default, skip_serializing_if = "Option::is_none")] + status: Option, + }, + Restore { + pod_name: String, + socket_path: PathBuf, + }, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(tag = "action", rename_all = "snake_case")] +pub enum AttachRestoreResult { + Attached { + pod_name: String, + socket_path: PathBuf, + #[serde(default, skip_serializing_if = "Option::is_none")] + status: Option, + }, + Restored { + pod_name: String, + socket_path: PathBuf, + }, +} + +#[derive(Debug, thiserror::Error)] +pub enum PodDiscoveryError { + #[error("pod state missing for `{pod_name}`")] + StateMissing { pod_name: String }, + #[error("pod `{pod_name}` is not visible to this Pod")] + NotVisible { pod_name: String }, + #[error("pod `{pod_name}` is not restorable: {reason}")] + NotRestorable { pod_name: String, reason: String }, + #[error( + "pod `{pod_name}` segment {segment_id} is locked by `{owner_pod}` pid {pid} at {socket_path}" + )] + LockConflict { + pod_name: String, + segment_id: SegmentId, + owner_pod: String, + socket_path: PathBuf, + pid: u32, + }, + #[error("store error: {0}")] + Store(#[from] session_store::StoreError), + #[error("scope lock error: {0}")] + ScopeLock(#[from] pod_registry::ScopeLockError), + #[error("failed to launch restore process: {0}")] + RestoreSpawn(io::Error), + #[error("restore process exited before socket became reachable: {status}")] + RestoreExited { status: std::process::ExitStatus }, + #[error("restore process did not become reachable before timeout")] + RestoreTimeout, +} + +struct VisibilitySet { + visible: BTreeMap, + child_sockets: BTreeMap, +} + +impl VisibilitySet { + fn reason_for(&self, pod_name: &str) -> VisibilityReason { + self.visible + .get(pod_name) + .cloned() + .unwrap_or(VisibilityReason::SpawnedChild) + } + + fn child_socket_for(&self, pod_name: &str) -> Option { + self.child_sockets.get(pod_name).cloned() + } +} + +async fn summarize_spawned_children( + children: &[session_store::PodSpawnedChild], +) -> SpawnedChildrenSummary { + let mut summary = SpawnedChildrenSummary { + count: children.len(), + ..Default::default() + }; + for child in children { + if probe_socket(&child.socket_path).await.reachable { + summary.reachable += 1; + } else { + summary.unreachable += 1; + } + } + summary +} + +async fn probe_socket(socket_path: &Path) -> LiveInfo { + match tokio::time::timeout(PROBE_TIMEOUT, UnixStream::connect(socket_path)).await { + Ok(Ok(stream)) => { + let (r, _w) = stream.into_split(); + let mut reader = JsonLineReader::new(r); + let status = match tokio::time::timeout(PROBE_TIMEOUT, reader.next::()).await { + Ok(Ok(Some(Event::Snapshot { status, .. }))) => Some(status), + _ => None, + }; + LiveInfo { + socket_path: socket_path.to_path_buf(), + reachable: true, + status, + error: None, + } + } + Ok(Err(error)) => LiveInfo { + socket_path: socket_path.to_path_buf(), + reachable: false, + status: None, + error: Some(error.to_string()), + }, + Err(_) => LiveInfo { + socket_path: socket_path.to_path_buf(), + reachable: false, + status: None, + error: Some("connect timed out".into()), + }, + } +} + +fn lookup_segment_lock( + segment_id: SegmentId, +) -> Result, pod_registry::ScopeLockError> { + pod_registry::lookup_segment(segment_id) +} + +fn resolve_pod_command() -> PathBuf { + if let Ok(cmd) = std::env::var("INSOMNIA_POD_COMMAND") + && !cmd.is_empty() + { + return PathBuf::from(cmd); + } + PathBuf::from("pod") +} + +#[derive(Debug, Deserialize, JsonSchema)] +struct PodNameInput { + /// Pod name to inspect, attach, or restore. + name: String, +} + +struct ListVisiblePodsTool { + discovery: PodDiscovery, +} + +#[async_trait] +impl Tool for ListVisiblePodsTool +where + St: PodMetadataStore + Clone + Send + Sync + 'static, +{ + async fn execute(&self, _input_json: &str) -> Result { + let items = self + .discovery + .list_visible() + .await + .map_err(discovery_error_to_tool_error)?; + let summary = format!("{} visible pod(s)", items.len()); + Ok(ToolOutput { + summary, + content: Some(json_content(&items)?), + }) + } +} + +struct InspectPodTool { + discovery: PodDiscovery, +} + +#[async_trait] +impl Tool for InspectPodTool +where + St: PodMetadataStore + Clone + Send + Sync + 'static, +{ + async fn execute(&self, input_json: &str) -> Result { + let input: PodNameInput = serde_json::from_str(input_json) + .map_err(|e| ToolError::InvalidArgument(format!("invalid InspectPod input: {e}")))?; + let detail = self + .discovery + .inspect(&input.name) + .await + .map_err(discovery_error_to_tool_error)?; + Ok(ToolOutput { + summary: format!("pod `{}` inspected", detail.pod_name), + content: Some(json_content(&detail)?), + }) + } +} + +struct AttachOrRestorePodTool { + discovery: PodDiscovery, +} + +#[async_trait] +impl Tool for AttachOrRestorePodTool +where + St: PodMetadataStore + Clone + Send + Sync + 'static, +{ + async fn execute(&self, input_json: &str) -> Result { + let input: PodNameInput = serde_json::from_str(input_json).map_err(|e| { + ToolError::InvalidArgument(format!("invalid AttachOrRestorePod input: {e}")) + })?; + let result = self + .discovery + .attach_or_restore(&input.name) + .await + .map_err(discovery_error_to_tool_error)?; + let summary = match &result { + AttachRestoreResult::Attached { pod_name, .. } => { + format!("pod `{pod_name}` is live; attached to existing socket") + } + AttachRestoreResult::Restored { pod_name, .. } => { + format!("pod `{pod_name}` restored from pod state") + } + }; + Ok(ToolOutput { + summary, + content: Some(json_content(&result)?), + }) + } +} + +pub fn list_visible_pods_tool(discovery: PodDiscovery) -> ToolDefinition +where + St: PodMetadataStore + Clone + Send + Sync + 'static, +{ + Arc::new(move || { + let meta = ToolMeta::new("ListVisiblePods") + .description( + "List Pod state entries visible to this Pod. This is state-backed and does not expose the host-wide Pod universe.", + ) + .input_schema(serde_json::to_value(schemars::schema_for!(())).unwrap()); + let tool: Arc = Arc::new(ListVisiblePodsTool { + discovery: discovery.clone(), + }); + (meta, tool) + }) +} + +pub fn inspect_pod_tool(discovery: PodDiscovery) -> ToolDefinition +where + St: PodMetadataStore + Clone + Send + Sync + 'static, +{ + Arc::new(move || { + let meta = ToolMeta::new("InspectPod") + .description( + "Inspect one visible Pod by name from durable Pod state, distinguishing missing state from not-visible Pods.", + ) + .input_schema(serde_json::to_value(schemars::schema_for!(PodNameInput)).unwrap()); + let tool: Arc = Arc::new(InspectPodTool { + discovery: discovery.clone(), + }); + (meta, tool) + }) +} + +pub fn attach_or_restore_pod_tool(discovery: PodDiscovery) -> ToolDefinition +where + St: PodMetadataStore + Clone + Send + Sync + 'static, +{ + Arc::new(move || { + let meta = ToolMeta::new("AttachOrRestorePod") + .description( + "Attach to a visible live Pod, or restore it from Pod state when no live socket is reachable. Missing state is an error.", + ) + .input_schema(serde_json::to_value(schemars::schema_for!(PodNameInput)).unwrap()); + let tool: Arc = Arc::new(AttachOrRestorePodTool { + discovery: discovery.clone(), + }); + (meta, tool) + }) +} + +fn json_content(value: &T) -> Result { + serde_json::to_string_pretty(value) + .map_err(|e| ToolError::Internal(format!("serialize pod discovery output: {e}"))) +} + +fn discovery_error_to_tool_error(error: PodDiscoveryError) -> ToolError { + match error { + PodDiscoveryError::StateMissing { .. } + | PodDiscoveryError::NotVisible { .. } + | PodDiscoveryError::NotRestorable { .. } => ToolError::InvalidArgument(error.to_string()), + PodDiscoveryError::LockConflict { .. } + | PodDiscoveryError::Store(_) + | PodDiscoveryError::ScopeLock(_) + | PodDiscoveryError::RestoreSpawn(_) + | PodDiscoveryError::RestoreExited { .. } + | PodDiscoveryError::RestoreTimeout => ToolError::ExecutionFailed(error.to_string()), + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::Mutex; + + use manifest::{Permission, ScopeRule}; + use protocol::Greeting; + use protocol::stream::JsonLineWriter; + use session_store::{ + FsStore, PodSpawnedChild, PodSpawnedScopeRule, new_segment_id, new_session_id, + }; + use tempfile::TempDir; + use tokio::net::UnixListener; + + use crate::runtime::dir::RuntimeDir; + + static ENV_LOCK: Mutex<()> = Mutex::new(()); + + #[tokio::test(flavor = "current_thread")] + async fn state_backed_visibility_and_attach_restore_planning() { + let _env = ENV_LOCK.lock().unwrap(); + let root = TempDir::new().unwrap(); + let store_dir = root.path().join("store"); + let runtime_base = root.path().join("runtime"); + std::fs::create_dir_all(&runtime_base).unwrap(); + unsafe { + std::env::set_var("INSOMNIA_RUNTIME_DIR", &runtime_base); + } + + let store = FsStore::new(&store_dir).unwrap(); + let session_id = new_session_id(); + let active_child_segment = new_segment_id(); + let pending_session_id = new_session_id(); + let live_socket = runtime_base.join("child-live").join("sock"); + std::fs::create_dir_all(live_socket.parent().unwrap()).unwrap(); + let live_listener = spawn_snapshot_socket(&live_socket).await; + + let stale_socket = runtime_base.join("child-stale").join("sock"); + let pending_socket = runtime_base.join("child-pending").join("sock"); + let parent = PodMetadata { + pod_name: "parent".into(), + active: None, + spawned_children: vec![ + child("child-live", &live_socket), + child("child-stale", &stale_socket), + child("child-pending", &pending_socket), + ], + }; + store.write(&parent).unwrap(); + store + .write(&PodMetadata { + pod_name: "child-live".into(), + active: Some(PodActiveSegmentRef::active_segment( + session_id, + active_child_segment, + )), + spawned_children: Vec::new(), + }) + .unwrap(); + store + .write(&PodMetadata { + pod_name: "child-stale".into(), + active: Some(PodActiveSegmentRef::active_segment( + session_id, + active_child_segment, + )), + spawned_children: Vec::new(), + }) + .unwrap(); + store + .write(&PodMetadata { + pod_name: "child-pending".into(), + active: Some(PodActiveSegmentRef::pending_segment(pending_session_id)), + spawned_children: Vec::new(), + }) + .unwrap(); + store + .write(&PodMetadata { + pod_name: "hidden".into(), + active: Some(PodActiveSegmentRef::active_segment( + session_id, + new_segment_id(), + )), + spawned_children: Vec::new(), + }) + .unwrap(); + + // RuntimeDir creates parent runtime files; discovery must still use + // Pod state when spawned_pods.json is absent. + let runtime_dir = Arc::new(RuntimeDir::create(&runtime_base, "parent").await.unwrap()); + let runtime_file = runtime_dir.path().join("spawned_pods.json"); + assert!(!runtime_file.exists()); + let registry = SpawnedPodRegistry::new(runtime_dir); + let discovery = PodDiscovery::new( + store.clone(), + "parent".into(), + runtime_base.clone(), + root.path().to_path_buf(), + registry, + ); + + let list = discovery.list_visible().await.unwrap(); + let names: Vec<_> = list.iter().map(|p| p.pod_name.as_str()).collect(); + assert_eq!( + names, + vec!["child-live", "child-pending", "child-stale", "parent"] + ); + assert!(!names.contains(&"hidden")); + assert!( + list.iter() + .find(|p| p.pod_name == "child-live") + .unwrap() + .live + .reachable + ); + assert!( + !list + .iter() + .find(|p| p.pod_name == "child-stale") + .unwrap() + .live + .reachable + ); + + let pending = list.iter().find(|p| p.pod_name == "child-pending").unwrap(); + assert_eq!(pending.state, PodStateStatus::Readable); + assert_eq!( + pending.active.as_ref().unwrap().session_id, + pending_session_id + ); + assert_eq!(pending.active.as_ref().unwrap().segment_id, None); + assert!(!pending.restore.possible); + + let hidden_err = discovery.inspect("hidden").await.unwrap_err(); + assert!(matches!(hidden_err, PodDiscoveryError::NotVisible { .. })); + let missing_err = discovery.inspect("missing").await.unwrap_err(); + assert!(matches!( + missing_err, + PodDiscoveryError::StateMissing { .. } + )); + let hidden_restore_err = discovery + .plan_attach_or_restore("hidden") + .await + .unwrap_err(); + assert!(matches!( + hidden_restore_err, + PodDiscoveryError::NotVisible { .. } + )); + + let attach_plan = discovery + .plan_attach_or_restore("child-live") + .await + .unwrap(); + assert!(matches!(attach_plan, AttachRestorePlan::Attach { .. })); + let restore_plan = discovery + .plan_attach_or_restore("child-stale") + .await + .unwrap(); + assert!(matches!(restore_plan, AttachRestorePlan::Restore { .. })); + + let lock_socket = runtime_base.join("lock-owner.sock"); + let _guard = pod_registry::install_top_level( + "lock-owner".into(), + std::process::id(), + lock_socket.clone(), + vec![ScopeRule { + target: root.path().to_path_buf(), + permission: Permission::Read, + recursive: true, + }], + active_child_segment, + ) + .unwrap(); + let locked_err = discovery + .plan_attach_or_restore("child-stale") + .await + .unwrap_err(); + assert!(matches!(locked_err, PodDiscoveryError::LockConflict { .. })); + + live_listener.abort(); + } + + fn child(name: &str, socket_path: &Path) -> PodSpawnedChild { + PodSpawnedChild { + pod_name: name.to_string(), + socket_path: socket_path.to_path_buf(), + scope_delegated: vec![PodSpawnedScopeRule { + target: PathBuf::from("/tmp"), + permission: "read".into(), + recursive: true, + }], + callback_address: PathBuf::from("/tmp/parent.sock"), + } + } + + async fn spawn_snapshot_socket(socket_path: &Path) -> tokio::task::JoinHandle<()> { + let _ = std::fs::remove_file(socket_path); + let listener = UnixListener::bind(socket_path).unwrap(); + tokio::spawn(async move { + loop { + let Ok((stream, _)) = listener.accept().await else { + break; + }; + tokio::spawn(async move { + let mut writer = JsonLineWriter::new(stream); + let _ = writer + .write(&Event::Snapshot { + entries: Vec::new(), + greeting: Greeting { + pod_name: "child-live".into(), + cwd: "/tmp".into(), + provider: "test".into(), + model: "test".into(), + scope_summary: String::new(), + tools: Vec::new(), + context_window: 0, + context_tokens: 0, + }, + status: PodStatus::Idle, + }) + .await; + }); + } + }) + } +} diff --git a/crates/pod/src/lib.rs b/crates/pod/src/lib.rs index 5f6f3df6..ba819360 100644 --- a/crates/pod/src/lib.rs +++ b/crates/pod/src/lib.rs @@ -1,5 +1,6 @@ pub mod compact; pub mod controller; +pub mod discovery; pub mod fs_view; pub mod hook; pub mod ipc; diff --git a/crates/pod/src/main.rs b/crates/pod/src/main.rs index a43716cd..ba5ab73f 100644 --- a/crates/pod/src/main.rs +++ b/crates/pod/src/main.rs @@ -53,6 +53,11 @@ struct Cli { #[arg(long, value_name = "NAME", conflicts_with_all = ["session", "adopt"])] pod: Option, + /// Require `--pod` to restore existing Pod state instead of creating a + /// fresh Pod when no state exists. Used by Pod discovery restore flows. + #[arg(long, requires = "pod")] + require_pod_state: bool, + /// Restore a Pod from an existing session. The Pod re-uses the /// given session id and appends new turns to the same jsonl; /// concurrent writers are prevented by the pod-registry. @@ -269,6 +274,10 @@ async fn main() -> ExitCode { } } } + Ok(None) if cli.require_pod_state => { + eprintln!("error: pod state missing for {pod_name}"); + return ExitCode::FAILURE; + } Ok(None) => match Pod::from_manifest(manifest, store, loader).await { Ok(p) => p, Err(e) => { diff --git a/crates/pod/src/spawn/registry.rs b/crates/pod/src/spawn/registry.rs index 79050fe8..d570ede8 100644 --- a/crates/pod/src/spawn/registry.rs +++ b/crates/pod/src/spawn/registry.rs @@ -129,7 +129,11 @@ impl SpawnedPodRegistry { runtime_dir.write_spawned_pods(&records).await?; let state_writer = pod_state_writer(store, pod_name.clone()); - if pruned || metadata.is_some() { + // Runtime spawned-pod records are a live registry for ListPods and + // cursor/scope cleanup; durable Pod state remains the discovery source + // for later attach/restore, so do not delete unreachable children from + // Pod state just because their sockets are gone. + if metadata.is_none() || !pruned { state_writer(&records)?; } diff --git a/crates/pod/tests/pod_comm_tools_test.rs b/crates/pod/tests/pod_comm_tools_test.rs index 77f5b376..08b8adfe 100644 --- a/crates/pod/tests/pod_comm_tools_test.rs +++ b/crates/pod/tests/pod_comm_tools_test.rs @@ -579,7 +579,7 @@ async fn restored_registry_uses_pod_state_without_runtime_file() { } #[tokio::test] -async fn load_from_pod_state_prunes_children_with_missing_sockets() { +async fn load_from_pod_state_prunes_runtime_children_but_preserves_durable_state() { let runtime_tmp = TempDir::new().unwrap(); let store_tmp = TempDir::new().unwrap(); let store = FsStore::new(store_tmp.path()).unwrap(); @@ -615,12 +615,23 @@ async fn load_from_pod_state_prunes_children_with_missing_sockets() { .read_by_name("spawner") .unwrap() .expect("spawner metadata should be written"); - assert_eq!(metadata.spawned_children.len(), 1); - assert_eq!(metadata.spawned_children[0].pod_name, "alive"); + assert_eq!(metadata.spawned_children.len(), 2); + assert!( + metadata + .spawned_children + .iter() + .any(|c| c.pod_name == "alive") + ); + assert!( + metadata + .spawned_children + .iter() + .any(|c| c.pod_name == "missing") + ); } #[tokio::test] -async fn load_from_pod_state_reclaims_pruned_child_scope_and_registry_deny() { +async fn load_from_pod_state_reclaims_pruned_child_scope_without_deleting_pod_state() { let _env = EnvGuard::acquire(); let runtime_tmp = TempDir::new().unwrap(); let store_tmp = TempDir::new().unwrap(); @@ -705,7 +716,8 @@ async fn load_from_pod_state_reclaims_pruned_child_scope_and_registry_deny() { .read_by_name("spawner") .unwrap() .expect("spawner metadata should remain"); - assert!(metadata.spawned_children.is_empty()); + assert_eq!(metadata.spawned_children.len(), 1); + assert_eq!(metadata.spawned_children[0].pod_name, "missing"); let runtime_contents = std::fs::read_to_string(rd.path().join("spawned_pods.json")).unwrap(); let runtime_records: Vec = serde_json::from_str(&runtime_contents).unwrap(); assert!(runtime_records.is_empty()); diff --git a/crates/protocol/src/lib.rs b/crates/protocol/src/lib.rs index b29d4a8e..7d46faa4 100644 --- a/crates/protocol/src/lib.rs +++ b/crates/protocol/src/lib.rs @@ -43,6 +43,18 @@ pub enum Method { kind: CompletionKind, prefix: String, }, + /// List Pods visible to this Pod from durable Pod state. This is not a + /// host-wide Pod universe query. + ListVisiblePods, + /// Inspect one Pod by name if its state exists and it is visible to this Pod. + InspectPod { + name: String, + }, + /// Attach to a visible live Pod, or restore it from durable Pod state when + /// it is not live. Missing state and not-visible state are distinct errors. + AttachOrRestorePod { + name: String, + }, } /// Typed lifecycle events sent from a child Pod to its parent. @@ -393,6 +405,20 @@ pub enum Event { kind: CompletionKind, entries: Vec, }, + /// Reply to `Method::ListVisiblePods`. Payload is a stable JSON value so + /// the Pod crate can evolve discovery fields without introducing a protocol + /// dependency on session-store. + VisiblePods { + pods: serde_json::Value, + }, + /// Reply to `Method::InspectPod`. + PodInspection { + pod: serde_json::Value, + }, + /// Reply to `Method::AttachOrRestorePod`. + PodAttachRestore { + result: serde_json::Value, + }, Alert(Alert), /// Pod has started compacting the current session. /// @@ -1215,4 +1241,59 @@ mod tests { other => panic!("expected UserMessage, got {other:?}"), } } + + #[test] + fn pod_discovery_methods_roundtrip() { + let methods = [ + Method::ListVisiblePods, + Method::InspectPod { + name: "child".into(), + }, + Method::AttachOrRestorePod { + name: "child".into(), + }, + ]; + for method in methods { + let json = serde_json::to_string(&method).unwrap(); + let decoded: Method = serde_json::from_str(&json).unwrap(); + match (decoded, method) { + (Method::ListVisiblePods, Method::ListVisiblePods) + | (Method::InspectPod { .. }, Method::InspectPod { .. }) + | (Method::AttachOrRestorePod { .. }, Method::AttachOrRestorePod { .. }) => {} + (decoded, expected) => panic!("decoded {decoded:?}, expected {expected:?}"), + } + } + } + + #[test] + fn pod_discovery_events_roundtrip() { + let events = [ + Event::VisiblePods { + pods: serde_json::json!([{ "pod_name": "child" }]), + }, + Event::PodInspection { + pod: serde_json::json!({ "pod_name": "child" }), + }, + Event::PodAttachRestore { + result: serde_json::json!({ "action": "attach" }), + }, + ]; + for event in events { + let json = serde_json::to_string(&event).unwrap(); + let decoded: Event = serde_json::from_str(&json).unwrap(); + match (decoded, event) { + (Event::VisiblePods { pods }, Event::VisiblePods { pods: expected }) => { + assert_eq!(pods, expected) + } + (Event::PodInspection { pod }, Event::PodInspection { pod: expected }) => { + assert_eq!(pod, expected) + } + ( + Event::PodAttachRestore { result }, + Event::PodAttachRestore { result: expected }, + ) => assert_eq!(result, expected), + (decoded, expected) => panic!("decoded {decoded:?}, expected {expected:?}"), + } + } + } } diff --git a/crates/session-store/src/fs_store.rs b/crates/session-store/src/fs_store.rs index fe0152c0..3925b603 100644 --- a/crates/session-store/src/fs_store.rs +++ b/crates/session-store/src/fs_store.rs @@ -123,6 +123,35 @@ impl PodMetadataStore for FsStore { Ok(Some(serde_json::from_str(&content)?)) } + fn list_names(&self) -> Result, StoreError> { + let dir = self.pods_dir(); + let mut names = Vec::new(); + if !dir.exists() { + return Ok(names); + } + for entry in fs::read_dir(dir)? { + let entry = entry?; + if !entry.file_type()?.is_dir() { + continue; + } + if !entry.path().join("metadata.json").exists() { + continue; + } + let Some(name) = entry.file_name().to_str().map(ToOwned::to_owned) else { + continue; + }; + if validate_pod_name(&name).is_ok() { + names.push(name); + } + } + names.sort(); + Ok(names) + } + + fn root_dir(&self) -> Option { + Some(self.root.clone()) + } + fn delete_by_name(&self, pod_name: &str) -> Result<(), StoreError> { let path = self.pod_metadata_path(pod_name)?; match fs::remove_file(&path) { diff --git a/crates/session-store/src/pod_metadata.rs b/crates/session-store/src/pod_metadata.rs index ef15d083..a3438d1f 100644 --- a/crates/session-store/src/pod_metadata.rs +++ b/crates/session-store/src/pod_metadata.rs @@ -92,6 +92,16 @@ pub trait PodMetadataStore: Send + Sync { /// Read metadata by Pod name. Returns `None` when no metadata exists. fn read_by_name(&self, pod_name: &str) -> Result, StoreError>; + /// List persisted Pod metadata keys. Implementations return names only; + /// callers can then read each item independently so a corrupt metadata + /// file does not make the whole discovery result fail. + fn list_names(&self) -> Result, StoreError>; + + /// Return the metadata root directory when this backend is path-backed. + fn root_dir(&self) -> Option { + None + } + /// Delete metadata by Pod name. Missing metadata is a successful no-op. fn delete_by_name(&self, pod_name: &str) -> Result<(), StoreError>; } diff --git a/crates/session-store/tests/fs_store_test.rs b/crates/session-store/tests/fs_store_test.rs index f4b0eb3b..c920b8c6 100644 --- a/crates/session-store/tests/fs_store_test.rs +++ b/crates/session-store/tests/fs_store_test.rs @@ -250,6 +250,7 @@ fn pod_metadata_minimal_crud() { let pending = PodMetadata::new(pod_name, Some(PodActiveSegmentRef::pending_segment(sid))); store.write(&pending).unwrap(); + assert_eq!(store.list_names().unwrap(), vec![pod_name.to_string()]); assert_eq!(store.read_by_name(pod_name).unwrap(), Some(pending.clone())); assert!( dir.path() diff --git a/crates/tui/src/app.rs b/crates/tui/src/app.rs index 23d1f98c..4888db37 100644 --- a/crates/tui/src/app.rs +++ b/crates/tui/src/app.rs @@ -758,6 +758,9 @@ impl App { state.selected = 0; } } + Event::VisiblePods { .. } + | Event::PodInspection { .. } + | Event::PodAttachRestore { .. } => {} Event::Shutdown => { self.mark_orphan_compacts_incomplete(); self.quit = true;