From 530027c62b7cb2782a8cc9a3526a686106f305e9 Mon Sep 17 00:00:00 2001 From: Hare Date: Fri, 22 May 2026 23:30:02 +0900 Subject: [PATCH] feat: persist spawned pod registry --- crates/pod/src/controller.rs | 11 +- crates/pod/src/pod.rs | 7 +- crates/pod/src/runtime/dir.rs | 8 +- crates/pod/src/spawn/comm_tools.rs | 4 +- crates/pod/src/spawn/registry.rs | 185 +++++++++++++++++++++-- crates/pod/src/spawn/tool.rs | 5 +- crates/pod/tests/pod_comm_tools_test.rs | 143 ++++++++++++++++++ crates/session-store/src/lib.rs | 4 +- crates/session-store/src/pod_metadata.rs | 24 +++ 9 files changed, 366 insertions(+), 25 deletions(-) diff --git a/crates/pod/src/controller.rs b/crates/pod/src/controller.rs index f2935650..c6864274 100644 --- a/crates/pod/src/controller.rs +++ b/crates/pod/src/controller.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use llm_worker::WorkerError; use llm_worker::llm_client::client::LlmClient; -use session_store::Store; +use session_store::{PodMetadataStore, Store}; use tokio::sync::{broadcast, mpsc, oneshot}; use crate::ipc::alerter::Alerter; @@ -132,7 +132,7 @@ impl PodController { ) -> Result<(PodHandle, ShutdownReceiver), std::io::Error> where C: LlmClient + Clone + 'static, - St: Store + Clone + 'static, + St: Store + PodMetadataStore + Clone + Send + Sync + 'static, { // === 1. Initialization (channels / RuntimeDir / pod-immutable // snapshots / SpawnedPodRegistry / alerter attach / @@ -151,7 +151,12 @@ impl PodController { let spawner_name = pod.manifest().pod.name.clone(); let self_parent_socket = pod.callback_socket().cloned(); - let spawned_registry = SpawnedPodRegistry::new(runtime_dir.clone()); + let spawned_registry = SpawnedPodRegistry::load_from_pod_state( + runtime_dir.clone(), + pod.store().clone(), + spawner_name.clone(), + ) + .await?; // Hand the alerter to the Pod so internal operations (compaction, // AGENTS.md ingestion during the first turn) can emit user-facing diff --git a/crates/pod/src/pod.rs b/crates/pod/src/pod.rs index b2c67097..acf20833 100644 --- a/crates/pod/src/pod.rs +++ b/crates/pod/src/pod.rs @@ -57,7 +57,12 @@ where St: PodMetadataStore + Clone + Send + Sync + 'static, { let store = store.clone(); - Arc::new(move |metadata| store.write(&metadata)) + Arc::new(move |mut metadata| { + if let Some(existing) = store.read_by_name(&metadata.pod_name)? { + metadata.spawned_children = existing.spawned_children; + } + store.write(&metadata) + }) } /// Lock-free shared session/segment pointer. diff --git a/crates/pod/src/runtime/dir.rs b/crates/pod/src/runtime/dir.rs index 26e9ec51..21259981 100644 --- a/crates/pod/src/runtime/dir.rs +++ b/crates/pod/src/runtime/dir.rs @@ -7,11 +7,11 @@ use tokio::fs; use crate::shared_state::PodSharedState; -/// One spawned-child record persisted to `spawned_pods.json`. +/// One spawned-child record mirrored to `spawned_pods.json`. /// -/// Written by the spawner after a successful `SpawnPod` tool call so -/// `ListPods` (future ticket) and a restored spawner can enumerate -/// their live children without re-querying `pods.json`. +/// Written by the spawner after registry changes so runtime-local tools +/// have a materialised snapshot. Durable restore uses Pod state metadata; +/// this file is not the authoritative source. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct SpawnedPodRecord { /// Spawned Pod's identity. diff --git a/crates/pod/src/spawn/comm_tools.rs b/crates/pod/src/spawn/comm_tools.rs index 3ef09a1b..9575df8c 100644 --- a/crates/pod/src/spawn/comm_tools.rs +++ b/crates/pod/src/spawn/comm_tools.rs @@ -6,7 +6,7 @@ //! target's Unix socket, perform one method exchange, disconnect. //! //! These tools only touch Pods listed in the spawner's -//! `spawned_pods.json`; there is no machine-wide directory lookup, so +//! `SpawnedPodRegistry`; there is no machine-wide directory lookup, so //! the spawner can only reach its own descendants. use std::path::Path; @@ -217,7 +217,7 @@ impl Tool for StopPodTool { self.registry .remove(&record.pod_name) .await - .map_err(|e| ToolError::ExecutionFailed(format!("update spawned_pods.json: {e}")))?; + .map_err(|e| ToolError::ExecutionFailed(format!("update spawned pod registry: {e}")))?; Ok(ToolOutput { summary: format!( diff --git a/crates/pod/src/spawn/registry.rs b/crates/pod/src/spawn/registry.rs index 1849ef69..2b0b3691 100644 --- a/crates/pod/src/spawn/registry.rs +++ b/crates/pod/src/spawn/registry.rs @@ -2,30 +2,41 @@ //! //! `SpawnPod` writes here; the pod-comm tools (`SendToPod`, //! `ReadPodOutput`, `StopPod`, `ListPods`) read and mutate the same -//! instance. Persisted to `spawned_pods.json` in the spawner's runtime -//! dir so a restarted spawner rebuilds its view from disk (future work -//! — today only write-through is implemented). +//! instance. Runtime write-through still materialises `spawned_pods.json`, +//! but durable state lives in the spawner's Pod metadata. //! //! `ReadPodOutput` additionally owns a per-spawned-pod cursor here so //! two consecutive reads yield only new assistant text. The cursor is //! an item-index into the child's history; push-only history makes //! index stable across reads. //! -//! The registry stays in-memory only for this Pod's lifetime — cursors -//! intentionally do not persist. +//! Cursors intentionally do not persist; a restored registry starts with +//! fresh read positions. use std::collections::HashMap; use std::io; +use std::path::Path; use std::sync::Arc; +use std::time::Duration; +use manifest::{Permission, ScopeRule}; +use session_store::{ + PodMetadata, PodMetadataStore, PodSpawnedChild, PodSpawnedScopeRule, StoreError, +}; +use tokio::net::UnixStream; use tokio::sync::Mutex; +use tracing::warn; use crate::runtime::dir::{RuntimeDir, SpawnedPodRecord}; +type RegistryStateWriter = Arc io::Result<()> + Send + Sync>; +const RESTORE_REACHABILITY_TIMEOUT: Duration = Duration::from_millis(500); + pub struct SpawnedPodRegistry { records: Mutex>, cursors: Mutex>, runtime_dir: Arc, + state_writer: Option, } impl SpawnedPodRegistry { @@ -34,18 +45,76 @@ impl SpawnedPodRegistry { records: Mutex::new(Vec::new()), cursors: Mutex::new(HashMap::new()), runtime_dir, + state_writer: None, }) } + /// Build a registry from the spawner's durable Pod state, pruning child + /// records whose socket path is already gone. The surviving list is + /// written through to both `spawned_pods.json` and Pod state so runtime + /// and durable views start aligned. + pub async fn load_from_pod_state( + runtime_dir: Arc, + store: St, + pod_name: String, + ) -> io::Result> + where + St: PodMetadataStore + Clone + Send + Sync + 'static, + { + let metadata = store.read_by_name(&pod_name).map_err(store_error_to_io)?; + let persisted_children = metadata + .as_ref() + .map(|m| m.spawned_children.clone()) + .unwrap_or_default(); + + let mut records = Vec::with_capacity(persisted_children.len()); + let mut pruned = false; + for child in &persisted_children { + let record = match record_from_pod_state(child) { + Ok(record) => record, + Err(err) => { + pruned = true; + warn!( + error = %err, + pod = %child.pod_name, + "dropping corrupt persisted spawned-pod record" + ); + continue; + } + }; + if is_reachable(&record.socket_path).await { + records.push(record); + } else { + pruned = true; + warn!( + pod = %record.pod_name, + socket = %record.socket_path.display(), + "dropping unreachable persisted spawned-pod record" + ); + } + } + + runtime_dir.write_spawned_pods(&records).await?; + let state_writer = pod_state_writer(store, pod_name); + if pruned || metadata.is_some() { + state_writer(&records)?; + } + + Ok(Arc::new(Self { + records: Mutex::new(records), + cursors: Mutex::new(HashMap::new()), + runtime_dir, + state_writer: Some(state_writer), + })) + } + /// Append a new record and persist the full list. Returns an I/O - /// error if the persisted write fails; the in-memory state is still + /// error if either persisted write fails; the in-memory state is still /// updated in that case — the next successful write will reconcile. pub async fn add(&self, record: SpawnedPodRecord) -> io::Result<()> { let mut records = self.records.lock().await; records.push(record); - self.runtime_dir - .write_spawned_pods(records.as_slice()) - .await + self.persist_records(records.as_slice()).await } /// Look up a record by pod name. Cloned so callers can drop the lock. @@ -69,9 +138,7 @@ impl SpawnedPodRegistry { let mut records = self.records.lock().await; let idx = records.iter().position(|r| r.pod_name == pod_name); let removed = idx.map(|i| records.remove(i)); - self.runtime_dir - .write_spawned_pods(records.as_slice()) - .await?; + self.persist_records(records.as_slice()).await?; removed }; self.cursors.lock().await.remove(pod_name); @@ -94,4 +161,98 @@ impl SpawnedPodRegistry { .await .insert(pod_name.to_string(), cursor); } + + async fn persist_records(&self, records: &[SpawnedPodRecord]) -> io::Result<()> { + self.runtime_dir.write_spawned_pods(records).await?; + if let Some(write_state) = &self.state_writer { + write_state(records)?; + } + Ok(()) + } +} + +fn pod_state_writer(store: St, pod_name: String) -> RegistryStateWriter +where + St: PodMetadataStore + Clone + Send + Sync + 'static, +{ + Arc::new(move |records| { + write_records_to_pod_state(&store, &pod_name, records).map_err(store_error_to_io) + }) +} + +fn write_records_to_pod_state( + store: &St, + pod_name: &str, + records: &[SpawnedPodRecord], +) -> Result<(), StoreError> +where + St: PodMetadataStore, +{ + let mut metadata = store + .read_by_name(pod_name)? + .unwrap_or_else(|| PodMetadata::new(pod_name, None)); + metadata.spawned_children = records + .iter() + .map(record_to_pod_state) + .collect::, _>>()?; + store.write(&metadata) +} + +fn record_to_pod_state(record: &SpawnedPodRecord) -> Result { + Ok(PodSpawnedChild { + pod_name: record.pod_name.clone(), + socket_path: record.socket_path.clone(), + scope_delegated: record + .scope_delegated + .iter() + .map(|rule| PodSpawnedScopeRule { + target: rule.target.clone(), + permission: match rule.permission { + Permission::Read => "read".to_string(), + Permission::Write => "write".to_string(), + }, + recursive: rule.recursive, + }) + .collect(), + callback_address: record.callback_address.clone(), + }) +} + +fn record_from_pod_state(child: &PodSpawnedChild) -> Result { + Ok(SpawnedPodRecord { + pod_name: child.pod_name.clone(), + socket_path: child.socket_path.clone(), + scope_delegated: child + .scope_delegated + .iter() + .map(|rule| { + Ok(ScopeRule { + target: rule.target.clone(), + permission: match rule.permission.as_str() { + "read" => Permission::Read, + "write" => Permission::Write, + other => { + return Err(serde_json::Error::io(io::Error::new( + io::ErrorKind::InvalidData, + format!("invalid permission `{other}`"), + ))); + } + }, + recursive: rule.recursive, + }) + }) + .collect::, _>>()?, + callback_address: child.callback_address.clone(), + }) +} + +fn store_error_to_io(error: StoreError) -> io::Error { + io::Error::other(error) +} + +async fn is_reachable(socket: &Path) -> bool { + tokio::time::timeout(RESTORE_REACHABILITY_TIMEOUT, UnixStream::connect(socket)) + .await + .map(|result| result.is_ok()) + .unwrap_or(false) } diff --git a/crates/pod/src/spawn/tool.rs b/crates/pod/src/spawn/tool.rs index 02ffc1ad..bf29f2b2 100644 --- a/crates/pod/src/spawn/tool.rs +++ b/crates/pod/src/spawn/tool.rs @@ -107,7 +107,8 @@ pub struct SpawnPodTool { spawner_pwd: PathBuf, /// Shared registry of spawned children, also used by the /// pod-comm tools (`SendToPod` / `ReadPodOutput` / `StopPod` / - /// `ListPods`). Writes the list to `spawned_pods.json` on each add. + /// `ListPods`). Writes the list to runtime and durable Pod state on + /// each add. registry: Arc, /// THIS Pod's own parent-callback socket, if any. After a /// successful spawn we fire `PodEvent::ScopeSubDelegated` upward @@ -268,7 +269,7 @@ impl Tool for SpawnPodTool { self.registry .add(record) .await - .map_err(|e| ToolError::ExecutionFailed(format!("write spawned_pods.json: {e}")))?; + .map_err(|e| ToolError::ExecutionFailed(format!("write spawned pod registry: {e}")))?; // Notify this Pod's own parent so the grandparent can register // the new grandchild directly. Fire-and-forget; top-level Pods diff --git a/crates/pod/tests/pod_comm_tools_test.rs b/crates/pod/tests/pod_comm_tools_test.rs index 94f00891..e419e42d 100644 --- a/crates/pod/tests/pod_comm_tools_test.rs +++ b/crates/pod/tests/pod_comm_tools_test.rs @@ -23,8 +23,10 @@ use pod::spawn::registry::SpawnedPodRegistry; use protocol::stream::{JsonLineReader, JsonLineWriter}; use protocol::{ErrorCode, Event, Greeting, Method}; use serde_json::json; +use session_store::{FsStore, PodMetadataStore}; use tempfile::TempDir; use tokio::net::UnixListener; +use tokio::sync::mpsc; use tokio::task::JoinHandle; /// Serialises env-mutating tests. The test harness runs tasks across @@ -183,6 +185,31 @@ fn serve_history(listener: UnixListener, items: Vec) -> JoinHandle<()> { }) } +fn serve_pod_methods(listener: UnixListener) -> mpsc::Receiver { + let (tx, rx) = mpsc::channel(8); + tokio::spawn(async move { + loop { + let Ok((stream, _)) = listener.accept().await else { + return; + }; + let (r, w) = stream.into_split(); + let mut reader = JsonLineReader::new(r); + let mut writer = JsonLineWriter::new(w); + let Some(method) = reader.next::().await.ok().flatten() else { + continue; + }; + let is_shutdown = matches!(method, Method::Shutdown); + if matches!(method, Method::Run { .. }) { + let _ = writer.write(&Event::TurnStart { turn: 1 }).await; + } + if tx.send(method).await.is_err() || is_shutdown { + return; + } + } + }); + rx +} + fn assistant(text: &str) -> Item { Item::Message { id: None, @@ -418,6 +445,122 @@ async fn stop_pod_succeeds_even_when_child_unreachable() { assert!(registry.get("child").await.is_none()); } +// --------------------------------------------------------------------------- +// Persistence / restore +// --------------------------------------------------------------------------- + +#[tokio::test] +async fn restored_registry_uses_pod_state_without_runtime_file() { + let _env = EnvGuard::acquire(); + let runtime_tmp = TempDir::new().unwrap(); + let store_tmp = TempDir::new().unwrap(); + let store = FsStore::new(store_tmp.path()).unwrap(); + unsafe { + std::env::set_var("INSOMNIA_RUNTIME_DIR", runtime_tmp.path()); + } + + let rd = Arc::new( + RuntimeDir::create(runtime_tmp.path(), "spawner") + .await + .unwrap(), + ); + let registry = + SpawnedPodRegistry::load_from_pod_state(rd.clone(), store.clone(), "spawner".to_string()) + .await + .unwrap(); + + let (socket, listener) = bind_mock_socket(runtime_tmp.path(), "child").await; + let mut received = serve_pod_methods(listener); + register_child(®istry, "child", &socket, runtime_tmp.path()).await; + + std::fs::remove_file(rd.path().join("spawned_pods.json")).unwrap(); + + let restored = + SpawnedPodRegistry::load_from_pod_state(rd.clone(), store.clone(), "spawner".to_string()) + .await + .unwrap(); + + let def = list_pods_tool(restored.clone()); + let (_meta, tool) = def(); + let output: ToolOutput = tool.execute("{}").await.unwrap(); + assert!(output.summary.contains("1 pod"), "{}", output.summary); + let body = output.content.expect("restored ListPods should list child"); + assert!(body.contains("child [alive]"), "body: {body}"); + + let def = send_to_pod_tool(restored.clone()); + let (_meta, tool) = def(); + let input = json!({ "name": "child", "message": "after restart" }).to_string(); + tool.execute(&input).await.unwrap(); + match received.recv().await.expect("expected Run") { + Method::Run { input } => match input.as_slice() { + [protocol::Segment::Text { content }] => assert_eq!(content, "after restart"), + other => panic!("expected single Text segment, got {other:?}"), + }, + other => panic!("expected Run, got {other:?}"), + } + + let def = stop_pod_tool(restored.clone()); + let (_meta, tool) = def(); + tool.execute(&json!({ "name": "child" }).to_string()) + .await + .unwrap(); + assert!(matches!( + received.recv().await.expect("expected Shutdown"), + Method::Shutdown + )); + assert!(restored.get("child").await.is_none()); + + let metadata = store + .read_by_name("spawner") + .unwrap() + .expect("spawner metadata should remain"); + assert!(metadata.spawned_children.is_empty()); + let runtime_contents = std::fs::read_to_string(rd.path().join("spawned_pods.json")).unwrap(); + let runtime_records: Vec = serde_json::from_str(&runtime_contents).unwrap(); + assert!(runtime_records.is_empty()); +} + +#[tokio::test] +async fn load_from_pod_state_prunes_children_with_missing_sockets() { + let runtime_tmp = TempDir::new().unwrap(); + let store_tmp = TempDir::new().unwrap(); + let store = FsStore::new(store_tmp.path()).unwrap(); + let rd = Arc::new( + RuntimeDir::create(runtime_tmp.path(), "spawner") + .await + .unwrap(), + ); + let registry = + SpawnedPodRegistry::load_from_pod_state(rd.clone(), store.clone(), "spawner".to_string()) + .await + .unwrap(); + + let (live_socket, listener) = bind_mock_socket(runtime_tmp.path(), "alive").await; + let _server = serve_pod_methods(listener); + register_child(®istry, "alive", &live_socket, runtime_tmp.path()).await; + register_child( + ®istry, + "missing", + &runtime_tmp.path().join("missing.sock"), + runtime_tmp.path(), + ) + .await; + + let restored = + SpawnedPodRegistry::load_from_pod_state(rd.clone(), store.clone(), "spawner".to_string()) + .await + .unwrap(); + + assert!(restored.get("alive").await.is_some()); + assert!(restored.get("missing").await.is_none()); + let metadata = store + .read_by_name("spawner") + .unwrap() + .expect("spawner metadata should be written"); + assert_eq!(metadata.spawned_children.len(), 1); + assert_eq!(metadata.spawned_children[0].pod_name, "alive"); +} + // --------------------------------------------------------------------------- // ListPods // --------------------------------------------------------------------------- diff --git a/crates/session-store/src/lib.rs b/crates/session-store/src/lib.rs index 9c326ffa..3dd80450 100644 --- a/crates/session-store/src/lib.rs +++ b/crates/session-store/src/lib.rs @@ -44,7 +44,9 @@ pub use fs_store::FsStore; pub use llm_worker::UsageRecord; pub use llm_worker::llm_client::types::{ContentPart, Item, Role}; pub use logged_item::{LoggedContentPart, LoggedItem, LoggedRole, from_logged, to_logged}; -pub use pod_metadata::{PodActiveSegmentRef, PodMetadata, PodMetadataStore}; +pub use pod_metadata::{ + PodActiveSegmentRef, PodMetadata, PodMetadataStore, PodSpawnedChild, PodSpawnedScopeRule, +}; pub use segment::{ SegmentStartState, append_entry, append_system_item, classify_history_item, create_compacted_segment, create_segment, create_segment_with_ids, ensure_head_or_fork, fork, diff --git a/crates/session-store/src/pod_metadata.rs b/crates/session-store/src/pod_metadata.rs index b031b06d..ef15d083 100644 --- a/crates/session-store/src/pod_metadata.rs +++ b/crates/session-store/src/pod_metadata.rs @@ -7,6 +7,7 @@ use crate::store::StoreError; use crate::{SegmentId, SessionId}; use serde::{Deserialize, Serialize}; +use std::path::PathBuf; /// Active Session/Segment pointer for a Pod. /// @@ -38,12 +39,34 @@ impl PodActiveSegmentRef { } } +/// One delegated scope rule for a spawned child, kept local to +/// `session-store` so the persistence crate does not depend on manifest +/// scope types. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct PodSpawnedScopeRule { + pub target: PathBuf, + pub permission: String, + pub recursive: bool, +} + +/// One child Pod spawned by this Pod and persisted with the spawner's +/// name-keyed Pod state. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct PodSpawnedChild { + pub pod_name: String, + pub socket_path: PathBuf, + pub scope_delegated: Vec, + pub callback_address: PathBuf, +} + /// Persistent metadata for a Pod name. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct PodMetadata { pub pod_name: String, #[serde(default, skip_serializing_if = "Option::is_none")] pub active: Option, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub spawned_children: Vec, } impl PodMetadata { @@ -52,6 +75,7 @@ impl PodMetadata { Self { pod_name: pod_name.into(), active, + spawned_children: Vec::new(), } } }