feat: persist spawned pod registry

This commit is contained in:
Keisuke Hirata 2026-05-22 23:30:02 +09:00
parent 8e7126d177
commit 530027c62b
9 changed files with 366 additions and 25 deletions

View File

@ -3,7 +3,7 @@ use std::sync::Arc;
use llm_worker::WorkerError;
use llm_worker::llm_client::client::LlmClient;
use session_store::Store;
use session_store::{PodMetadataStore, Store};
use tokio::sync::{broadcast, mpsc, oneshot};
use crate::ipc::alerter::Alerter;
@ -132,7 +132,7 @@ impl PodController {
) -> Result<(PodHandle, ShutdownReceiver), std::io::Error>
where
C: LlmClient + Clone + 'static,
St: Store + Clone + 'static,
St: Store + PodMetadataStore + Clone + Send + Sync + 'static,
{
// === 1. Initialization (channels / RuntimeDir / pod-immutable
// snapshots / SpawnedPodRegistry / alerter attach /
@ -151,7 +151,12 @@ impl PodController {
let spawner_name = pod.manifest().pod.name.clone();
let self_parent_socket = pod.callback_socket().cloned();
let spawned_registry = SpawnedPodRegistry::new(runtime_dir.clone());
let spawned_registry = SpawnedPodRegistry::load_from_pod_state(
runtime_dir.clone(),
pod.store().clone(),
spawner_name.clone(),
)
.await?;
// Hand the alerter to the Pod so internal operations (compaction,
// AGENTS.md ingestion during the first turn) can emit user-facing

View File

@ -57,7 +57,12 @@ where
St: PodMetadataStore + Clone + Send + Sync + 'static,
{
let store = store.clone();
Arc::new(move |metadata| store.write(&metadata))
Arc::new(move |mut metadata| {
if let Some(existing) = store.read_by_name(&metadata.pod_name)? {
metadata.spawned_children = existing.spawned_children;
}
store.write(&metadata)
})
}
/// Lock-free shared session/segment pointer.

View File

@ -7,11 +7,11 @@ use tokio::fs;
use crate::shared_state::PodSharedState;
/// One spawned-child record persisted to `spawned_pods.json`.
/// One spawned-child record mirrored to `spawned_pods.json`.
///
/// Written by the spawner after a successful `SpawnPod` tool call so
/// `ListPods` (future ticket) and a restored spawner can enumerate
/// their live children without re-querying `pods.json`.
/// Written by the spawner after registry changes so runtime-local tools
/// have a materialised snapshot. Durable restore uses Pod state metadata;
/// this file is not the authoritative source.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SpawnedPodRecord {
/// Spawned Pod's identity.

View File

@ -6,7 +6,7 @@
//! target's Unix socket, perform one method exchange, disconnect.
//!
//! These tools only touch Pods listed in the spawner's
//! `spawned_pods.json`; there is no machine-wide directory lookup, so
//! `SpawnedPodRegistry`; there is no machine-wide directory lookup, so
//! the spawner can only reach its own descendants.
use std::path::Path;
@ -217,7 +217,7 @@ impl Tool for StopPodTool {
self.registry
.remove(&record.pod_name)
.await
.map_err(|e| ToolError::ExecutionFailed(format!("update spawned_pods.json: {e}")))?;
.map_err(|e| ToolError::ExecutionFailed(format!("update spawned pod registry: {e}")))?;
Ok(ToolOutput {
summary: format!(

View File

@ -2,30 +2,41 @@
//!
//! `SpawnPod` writes here; the pod-comm tools (`SendToPod`,
//! `ReadPodOutput`, `StopPod`, `ListPods`) read and mutate the same
//! instance. Persisted to `spawned_pods.json` in the spawner's runtime
//! dir so a restarted spawner rebuilds its view from disk (future work
//! — today only write-through is implemented).
//! instance. Runtime write-through still materialises `spawned_pods.json`,
//! but durable state lives in the spawner's Pod metadata.
//!
//! `ReadPodOutput` additionally owns a per-spawned-pod cursor here so
//! two consecutive reads yield only new assistant text. The cursor is
//! an item-index into the child's history; push-only history makes
//! index stable across reads.
//!
//! The registry stays in-memory only for this Pod's lifetime — cursors
//! intentionally do not persist.
//! Cursors intentionally do not persist; a restored registry starts with
//! fresh read positions.
use std::collections::HashMap;
use std::io;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use manifest::{Permission, ScopeRule};
use session_store::{
PodMetadata, PodMetadataStore, PodSpawnedChild, PodSpawnedScopeRule, StoreError,
};
use tokio::net::UnixStream;
use tokio::sync::Mutex;
use tracing::warn;
use crate::runtime::dir::{RuntimeDir, SpawnedPodRecord};
type RegistryStateWriter = Arc<dyn Fn(&[SpawnedPodRecord]) -> io::Result<()> + Send + Sync>;
const RESTORE_REACHABILITY_TIMEOUT: Duration = Duration::from_millis(500);
pub struct SpawnedPodRegistry {
records: Mutex<Vec<SpawnedPodRecord>>,
cursors: Mutex<HashMap<String, usize>>,
runtime_dir: Arc<RuntimeDir>,
state_writer: Option<RegistryStateWriter>,
}
impl SpawnedPodRegistry {
@ -34,18 +45,76 @@ impl SpawnedPodRegistry {
records: Mutex::new(Vec::new()),
cursors: Mutex::new(HashMap::new()),
runtime_dir,
state_writer: None,
})
}
/// Build a registry from the spawner's durable Pod state, pruning child
/// records whose socket path is already gone. The surviving list is
/// written through to both `spawned_pods.json` and Pod state so runtime
/// and durable views start aligned.
pub async fn load_from_pod_state<St>(
runtime_dir: Arc<RuntimeDir>,
store: St,
pod_name: String,
) -> io::Result<Arc<Self>>
where
St: PodMetadataStore + Clone + Send + Sync + 'static,
{
let metadata = store.read_by_name(&pod_name).map_err(store_error_to_io)?;
let persisted_children = metadata
.as_ref()
.map(|m| m.spawned_children.clone())
.unwrap_or_default();
let mut records = Vec::with_capacity(persisted_children.len());
let mut pruned = false;
for child in &persisted_children {
let record = match record_from_pod_state(child) {
Ok(record) => record,
Err(err) => {
pruned = true;
warn!(
error = %err,
pod = %child.pod_name,
"dropping corrupt persisted spawned-pod record"
);
continue;
}
};
if is_reachable(&record.socket_path).await {
records.push(record);
} else {
pruned = true;
warn!(
pod = %record.pod_name,
socket = %record.socket_path.display(),
"dropping unreachable persisted spawned-pod record"
);
}
}
runtime_dir.write_spawned_pods(&records).await?;
let state_writer = pod_state_writer(store, pod_name);
if pruned || metadata.is_some() {
state_writer(&records)?;
}
Ok(Arc::new(Self {
records: Mutex::new(records),
cursors: Mutex::new(HashMap::new()),
runtime_dir,
state_writer: Some(state_writer),
}))
}
/// Append a new record and persist the full list. Returns an I/O
/// error if the persisted write fails; the in-memory state is still
/// error if either persisted write fails; the in-memory state is still
/// updated in that case — the next successful write will reconcile.
pub async fn add(&self, record: SpawnedPodRecord) -> io::Result<()> {
let mut records = self.records.lock().await;
records.push(record);
self.runtime_dir
.write_spawned_pods(records.as_slice())
.await
self.persist_records(records.as_slice()).await
}
/// Look up a record by pod name. Cloned so callers can drop the lock.
@ -69,9 +138,7 @@ impl SpawnedPodRegistry {
let mut records = self.records.lock().await;
let idx = records.iter().position(|r| r.pod_name == pod_name);
let removed = idx.map(|i| records.remove(i));
self.runtime_dir
.write_spawned_pods(records.as_slice())
.await?;
self.persist_records(records.as_slice()).await?;
removed
};
self.cursors.lock().await.remove(pod_name);
@ -94,4 +161,98 @@ impl SpawnedPodRegistry {
.await
.insert(pod_name.to_string(), cursor);
}
async fn persist_records(&self, records: &[SpawnedPodRecord]) -> io::Result<()> {
self.runtime_dir.write_spawned_pods(records).await?;
if let Some(write_state) = &self.state_writer {
write_state(records)?;
}
Ok(())
}
}
fn pod_state_writer<St>(store: St, pod_name: String) -> RegistryStateWriter
where
St: PodMetadataStore + Clone + Send + Sync + 'static,
{
Arc::new(move |records| {
write_records_to_pod_state(&store, &pod_name, records).map_err(store_error_to_io)
})
}
fn write_records_to_pod_state<St>(
store: &St,
pod_name: &str,
records: &[SpawnedPodRecord],
) -> Result<(), StoreError>
where
St: PodMetadataStore,
{
let mut metadata = store
.read_by_name(pod_name)?
.unwrap_or_else(|| PodMetadata::new(pod_name, None));
metadata.spawned_children = records
.iter()
.map(record_to_pod_state)
.collect::<Result<Vec<_>, _>>()?;
store.write(&metadata)
}
fn record_to_pod_state(record: &SpawnedPodRecord) -> Result<PodSpawnedChild, serde_json::Error> {
Ok(PodSpawnedChild {
pod_name: record.pod_name.clone(),
socket_path: record.socket_path.clone(),
scope_delegated: record
.scope_delegated
.iter()
.map(|rule| PodSpawnedScopeRule {
target: rule.target.clone(),
permission: match rule.permission {
Permission::Read => "read".to_string(),
Permission::Write => "write".to_string(),
},
recursive: rule.recursive,
})
.collect(),
callback_address: record.callback_address.clone(),
})
}
fn record_from_pod_state(child: &PodSpawnedChild) -> Result<SpawnedPodRecord, serde_json::Error> {
Ok(SpawnedPodRecord {
pod_name: child.pod_name.clone(),
socket_path: child.socket_path.clone(),
scope_delegated: child
.scope_delegated
.iter()
.map(|rule| {
Ok(ScopeRule {
target: rule.target.clone(),
permission: match rule.permission.as_str() {
"read" => Permission::Read,
"write" => Permission::Write,
other => {
return Err(serde_json::Error::io(io::Error::new(
io::ErrorKind::InvalidData,
format!("invalid permission `{other}`"),
)));
}
},
recursive: rule.recursive,
})
})
.collect::<Result<Vec<_>, _>>()?,
callback_address: child.callback_address.clone(),
})
}
fn store_error_to_io(error: StoreError) -> io::Error {
io::Error::other(error)
}
async fn is_reachable(socket: &Path) -> bool {
tokio::time::timeout(RESTORE_REACHABILITY_TIMEOUT, UnixStream::connect(socket))
.await
.map(|result| result.is_ok())
.unwrap_or(false)
}

View File

@ -107,7 +107,8 @@ pub struct SpawnPodTool {
spawner_pwd: PathBuf,
/// Shared registry of spawned children, also used by the
/// pod-comm tools (`SendToPod` / `ReadPodOutput` / `StopPod` /
/// `ListPods`). Writes the list to `spawned_pods.json` on each add.
/// `ListPods`). Writes the list to runtime and durable Pod state on
/// each add.
registry: Arc<SpawnedPodRegistry>,
/// THIS Pod's own parent-callback socket, if any. After a
/// successful spawn we fire `PodEvent::ScopeSubDelegated` upward
@ -268,7 +269,7 @@ impl Tool for SpawnPodTool {
self.registry
.add(record)
.await
.map_err(|e| ToolError::ExecutionFailed(format!("write spawned_pods.json: {e}")))?;
.map_err(|e| ToolError::ExecutionFailed(format!("write spawned pod registry: {e}")))?;
// Notify this Pod's own parent so the grandparent can register
// the new grandchild directly. Fire-and-forget; top-level Pods

View File

@ -23,8 +23,10 @@ use pod::spawn::registry::SpawnedPodRegistry;
use protocol::stream::{JsonLineReader, JsonLineWriter};
use protocol::{ErrorCode, Event, Greeting, Method};
use serde_json::json;
use session_store::{FsStore, PodMetadataStore};
use tempfile::TempDir;
use tokio::net::UnixListener;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
/// Serialises env-mutating tests. The test harness runs tasks across
@ -183,6 +185,31 @@ fn serve_history(listener: UnixListener, items: Vec<Item>) -> JoinHandle<()> {
})
}
fn serve_pod_methods(listener: UnixListener) -> mpsc::Receiver<Method> {
let (tx, rx) = mpsc::channel(8);
tokio::spawn(async move {
loop {
let Ok((stream, _)) = listener.accept().await else {
return;
};
let (r, w) = stream.into_split();
let mut reader = JsonLineReader::new(r);
let mut writer = JsonLineWriter::new(w);
let Some(method) = reader.next::<Method>().await.ok().flatten() else {
continue;
};
let is_shutdown = matches!(method, Method::Shutdown);
if matches!(method, Method::Run { .. }) {
let _ = writer.write(&Event::TurnStart { turn: 1 }).await;
}
if tx.send(method).await.is_err() || is_shutdown {
return;
}
}
});
rx
}
fn assistant(text: &str) -> Item {
Item::Message {
id: None,
@ -418,6 +445,122 @@ async fn stop_pod_succeeds_even_when_child_unreachable() {
assert!(registry.get("child").await.is_none());
}
// ---------------------------------------------------------------------------
// Persistence / restore
// ---------------------------------------------------------------------------
#[tokio::test]
async fn restored_registry_uses_pod_state_without_runtime_file() {
let _env = EnvGuard::acquire();
let runtime_tmp = TempDir::new().unwrap();
let store_tmp = TempDir::new().unwrap();
let store = FsStore::new(store_tmp.path()).unwrap();
unsafe {
std::env::set_var("INSOMNIA_RUNTIME_DIR", runtime_tmp.path());
}
let rd = Arc::new(
RuntimeDir::create(runtime_tmp.path(), "spawner")
.await
.unwrap(),
);
let registry =
SpawnedPodRegistry::load_from_pod_state(rd.clone(), store.clone(), "spawner".to_string())
.await
.unwrap();
let (socket, listener) = bind_mock_socket(runtime_tmp.path(), "child").await;
let mut received = serve_pod_methods(listener);
register_child(&registry, "child", &socket, runtime_tmp.path()).await;
std::fs::remove_file(rd.path().join("spawned_pods.json")).unwrap();
let restored =
SpawnedPodRegistry::load_from_pod_state(rd.clone(), store.clone(), "spawner".to_string())
.await
.unwrap();
let def = list_pods_tool(restored.clone());
let (_meta, tool) = def();
let output: ToolOutput = tool.execute("{}").await.unwrap();
assert!(output.summary.contains("1 pod"), "{}", output.summary);
let body = output.content.expect("restored ListPods should list child");
assert!(body.contains("child [alive]"), "body: {body}");
let def = send_to_pod_tool(restored.clone());
let (_meta, tool) = def();
let input = json!({ "name": "child", "message": "after restart" }).to_string();
tool.execute(&input).await.unwrap();
match received.recv().await.expect("expected Run") {
Method::Run { input } => match input.as_slice() {
[protocol::Segment::Text { content }] => assert_eq!(content, "after restart"),
other => panic!("expected single Text segment, got {other:?}"),
},
other => panic!("expected Run, got {other:?}"),
}
let def = stop_pod_tool(restored.clone());
let (_meta, tool) = def();
tool.execute(&json!({ "name": "child" }).to_string())
.await
.unwrap();
assert!(matches!(
received.recv().await.expect("expected Shutdown"),
Method::Shutdown
));
assert!(restored.get("child").await.is_none());
let metadata = store
.read_by_name("spawner")
.unwrap()
.expect("spawner metadata should remain");
assert!(metadata.spawned_children.is_empty());
let runtime_contents = std::fs::read_to_string(rd.path().join("spawned_pods.json")).unwrap();
let runtime_records: Vec<SpawnedPodRecord> = serde_json::from_str(&runtime_contents).unwrap();
assert!(runtime_records.is_empty());
}
#[tokio::test]
async fn load_from_pod_state_prunes_children_with_missing_sockets() {
let runtime_tmp = TempDir::new().unwrap();
let store_tmp = TempDir::new().unwrap();
let store = FsStore::new(store_tmp.path()).unwrap();
let rd = Arc::new(
RuntimeDir::create(runtime_tmp.path(), "spawner")
.await
.unwrap(),
);
let registry =
SpawnedPodRegistry::load_from_pod_state(rd.clone(), store.clone(), "spawner".to_string())
.await
.unwrap();
let (live_socket, listener) = bind_mock_socket(runtime_tmp.path(), "alive").await;
let _server = serve_pod_methods(listener);
register_child(&registry, "alive", &live_socket, runtime_tmp.path()).await;
register_child(
&registry,
"missing",
&runtime_tmp.path().join("missing.sock"),
runtime_tmp.path(),
)
.await;
let restored =
SpawnedPodRegistry::load_from_pod_state(rd.clone(), store.clone(), "spawner".to_string())
.await
.unwrap();
assert!(restored.get("alive").await.is_some());
assert!(restored.get("missing").await.is_none());
let metadata = store
.read_by_name("spawner")
.unwrap()
.expect("spawner metadata should be written");
assert_eq!(metadata.spawned_children.len(), 1);
assert_eq!(metadata.spawned_children[0].pod_name, "alive");
}
// ---------------------------------------------------------------------------
// ListPods
// ---------------------------------------------------------------------------

View File

@ -44,7 +44,9 @@ pub use fs_store::FsStore;
pub use llm_worker::UsageRecord;
pub use llm_worker::llm_client::types::{ContentPart, Item, Role};
pub use logged_item::{LoggedContentPart, LoggedItem, LoggedRole, from_logged, to_logged};
pub use pod_metadata::{PodActiveSegmentRef, PodMetadata, PodMetadataStore};
pub use pod_metadata::{
PodActiveSegmentRef, PodMetadata, PodMetadataStore, PodSpawnedChild, PodSpawnedScopeRule,
};
pub use segment::{
SegmentStartState, append_entry, append_system_item, classify_history_item,
create_compacted_segment, create_segment, create_segment_with_ids, ensure_head_or_fork, fork,

View File

@ -7,6 +7,7 @@
use crate::store::StoreError;
use crate::{SegmentId, SessionId};
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
/// Active Session/Segment pointer for a Pod.
///
@ -38,12 +39,34 @@ impl PodActiveSegmentRef {
}
}
/// One delegated scope rule for a spawned child, kept local to
/// `session-store` so the persistence crate does not depend on manifest
/// scope types.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct PodSpawnedScopeRule {
pub target: PathBuf,
pub permission: String,
pub recursive: bool,
}
/// One child Pod spawned by this Pod and persisted with the spawner's
/// name-keyed Pod state.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct PodSpawnedChild {
pub pod_name: String,
pub socket_path: PathBuf,
pub scope_delegated: Vec<PodSpawnedScopeRule>,
pub callback_address: PathBuf,
}
/// Persistent metadata for a Pod name.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct PodMetadata {
pub pod_name: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub active: Option<PodActiveSegmentRef>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub spawned_children: Vec<PodSpawnedChild>,
}
impl PodMetadata {
@ -52,6 +75,7 @@ impl PodMetadata {
Self {
pod_name: pod_name.into(),
active,
spawned_children: Vec::new(),
}
}
}