From 211738132ca330e320a6b18921c28122aa8deaf7 Mon Sep 17 00:00:00 2001 From: Hare Date: Sat, 30 May 2026 07:16:50 +0900 Subject: [PATCH 1/3] refactor: split pod metadata store --- Cargo.lock | 13 + Cargo.toml | 2 + crates/pod-store/Cargo.toml | 15 + crates/pod-store/src/lib.rs | 476 ++++++++++++++++++ crates/pod/Cargo.toml | 1 + crates/pod/examples/pod_cli.rs | 6 +- crates/pod/examples/pod_protocol.rs | 6 +- crates/pod/src/controller.rs | 3 +- crates/pod/src/discovery.rs | 17 +- crates/pod/src/main.rs | 22 +- crates/pod/src/pod.rs | 36 +- crates/pod/src/spawn/registry.rs | 18 +- crates/pod/tests/compact_events_test.rs | 14 +- crates/pod/tests/consolidation_test.rs | 12 +- crates/pod/tests/controller_test.rs | 16 +- crates/pod/tests/pod_comm_tools_test.rs | 23 +- crates/pod/tests/restore_test.rs | 33 +- crates/pod/tests/session_metrics_test.rs | 15 +- .../pod/tests/system_prompt_template_test.rs | 10 +- crates/session-store/src/fs_store.rs | 79 --- crates/session-store/src/lib.rs | 4 - crates/session-store/src/pod_metadata.rs | 150 ------ crates/session-store/src/store.rs | 3 - crates/session-store/tests/fs_store_test.rs | 40 +- crates/tui/Cargo.toml | 1 + crates/tui/src/multi_pod.rs | 18 +- crates/tui/src/picker.rs | 18 +- crates/tui/src/pod_list.rs | 51 +- 28 files changed, 726 insertions(+), 376 deletions(-) create mode 100644 crates/pod-store/Cargo.toml create mode 100644 crates/pod-store/src/lib.rs delete mode 100644 crates/session-store/src/pod_metadata.rs diff --git a/Cargo.lock b/Cargo.lock index 6d339000..23e533d9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2166,6 +2166,7 @@ dependencies = [ "memory", "minijinja", "pod-registry", + "pod-store", "protocol", "provider", "schemars", @@ -2197,6 +2198,17 @@ dependencies = [ "thiserror 2.0.18", ] +[[package]] +name = "pod-store" +version = "0.1.0" +dependencies = [ + "serde", + "serde_json", + "session-store", + "tempfile", + "thiserror 2.0.18", +] + [[package]] name = "portable-atomic" version = "1.13.1" @@ -3652,6 +3664,7 @@ dependencies = [ "llm-worker", "manifest", "pod-registry", + "pod-store", "protocol", "pulldown-cmark", "ratatui", diff --git a/Cargo.toml b/Cargo.toml index 0294b309..9179d56b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ members = [ "crates/session-store", "crates/manifest", "crates/pod", + "crates/pod-store", "crates/protocol", "crates/provider", "crates/pod-registry", @@ -32,6 +33,7 @@ manifest = { path = "crates/manifest" } lint-common = { path = "crates/lint-common" } memory = { path = "crates/memory" } pod-registry = { path = "crates/pod-registry" } +pod-store = { path = "crates/pod-store" } protocol = { path = "crates/protocol" } provider = { path = "crates/provider" } session-metrics = { path = "crates/session-metrics" } diff --git a/crates/pod-store/Cargo.toml b/crates/pod-store/Cargo.toml new file mode 100644 index 00000000..cf6ea475 --- /dev/null +++ b/crates/pod-store/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "pod-store" +description = "Durable Pod-name metadata/state persistence" +version = "0.1.0" +edition.workspace = true +license.workspace = true + +[dependencies] +session-store = { workspace = true } +serde = { workspace = true, features = ["derive"] } +serde_json = { workspace = true } +thiserror = { workspace = true } + +[dev-dependencies] +tempfile = { workspace = true } diff --git a/crates/pod-store/src/lib.rs b/crates/pod-store/src/lib.rs new file mode 100644 index 00000000..c39ea5c4 --- /dev/null +++ b/crates/pod-store/src/lib.rs @@ -0,0 +1,476 @@ +//! Durable Pod-name metadata/state persistence. +//! +//! This crate owns the name-keyed Pod state surface under a Pod-state root, +//! e.g. `{data_dir}/pods/{pod_name}/metadata.json`. Session JSONL replay stays +//! in `session-store`; Pod metadata may point at a `(SessionId, SegmentId)` but +//! does not own or replay session logs. +//! +//! `resolved_manifest_snapshot` is authority only for Pod-name restore before +//! loading the session log. Existing segment replay still uses `SegmentStart` +//! entries from `session-store`. `spawned_children` is durable current parent +//! Pod state for child registry/reclaim; child lifecycle messages shown to the +//! model remain session JSONL history. Socket and callback paths are last-known +//! runtime hints, not proof of liveness. + +use serde::{Deserialize, Serialize}; +use session_store::{SegmentId, SessionId}; +use std::fs; +use std::path::PathBuf; + +/// Errors from Pod metadata persistence. +#[derive(Debug, thiserror::Error)] +pub enum PodStoreError { + #[error("I/O error: {0}")] + Io(#[from] std::io::Error), + + #[error("serialization error: {0}")] + Serde(#[from] serde_json::Error), + + #[error("invalid pod name: {0}")] + InvalidPodName(String), +} + +/// Active Session/Segment pointer for a Pod. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct PodActiveSegmentRef { + pub session_id: SessionId, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub segment_id: Option, +} + +impl PodActiveSegmentRef { + /// Create a reference whose active Segment is not known yet. + pub fn pending_segment(session_id: SessionId) -> Self { + Self { + session_id, + segment_id: None, + } + } + + /// Create a fully resolved active Session/Segment reference. + pub fn active_segment(session_id: SessionId, segment_id: SegmentId) -> Self { + Self { + session_id, + segment_id: Some(segment_id), + } + } +} + +/// One delegated scope rule for a spawned child, kept local to avoid depending +/// on manifest scope types in durable Pod state. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct PodSpawnedScopeRule { + pub target: PathBuf, + pub permission: String, + pub recursive: bool, +} + +/// One child Pod spawned by this Pod and persisted with the spawner's +/// name-keyed Pod state. Runtime paths are last-known hints only. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct PodSpawnedChild { + pub pod_name: String, + pub socket_path: PathBuf, + pub scope_delegated: Vec, + pub callback_address: PathBuf, +} + +/// Persistent metadata for a Pod name. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct PodMetadata { + pub pod_name: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub active: Option, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub spawned_children: Vec, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub resolved_manifest_snapshot: Option, +} + +impl PodMetadata { + /// Create Pod metadata for `pod_name`. + pub fn new(pod_name: impl Into, active: Option) -> Self { + Self { + pod_name: pod_name.into(), + active, + spawned_children: Vec::new(), + resolved_manifest_snapshot: None, + } + } +} + +/// Sync persistence backend for Pod metadata. +pub trait PodMetadataStore: Send + Sync { + /// Create or replace metadata for its `pod_name` key. + fn write(&self, metadata: &PodMetadata) -> Result<(), PodStoreError>; + + /// Read metadata by Pod name. Returns `None` when no metadata exists. + fn read_by_name(&self, pod_name: &str) -> Result, PodStoreError>; + + /// List persisted Pod metadata keys. + fn list_names(&self) -> Result, PodStoreError>; + + /// Return the metadata root directory when this backend is path-backed. + fn root_dir(&self) -> Option { + None + } + + /// Delete metadata by Pod name. Missing metadata is a successful no-op. + fn delete_by_name(&self, pod_name: &str) -> Result<(), PodStoreError>; + + /// Merge an update into one Pod's metadata, preserving unrelated fields. + fn update_by_name(&self, pod_name: &str, update: F) -> Result + where + F: FnOnce(&mut PodMetadata), + { + let mut metadata = self + .read_by_name(pod_name)? + .unwrap_or_else(|| PodMetadata::new(pod_name, None)); + update(&mut metadata); + metadata.pod_name = pod_name.to_string(); + self.write(&metadata)?; + Ok(metadata) + } + + /// Set the active pointer while preserving spawned children and manifest snapshot. + fn set_active( + &self, + pod_name: &str, + active: Option, + resolved_manifest_snapshot: Option, + ) -> Result { + self.update_by_name(pod_name, |metadata| { + metadata.active = active; + metadata.resolved_manifest_snapshot = resolved_manifest_snapshot; + }) + } + + /// Set spawned-child registry state while preserving active pointer and manifest snapshot. + fn set_spawned_children( + &self, + pod_name: &str, + children: Vec, + ) -> Result { + self.update_by_name(pod_name, |metadata| { + metadata.spawned_children = children; + }) + } +} + +/// Filesystem-backed Pod metadata store. +#[derive(Clone)] +pub struct FsPodStore { + root: PathBuf, +} + +impl FsPodStore { + /// Create a store rooted at the Pod-state directory, usually `{data_dir}/pods`. + pub fn new(root: impl Into) -> Result { + let root = root.into(); + fs::create_dir_all(&root)?; + Ok(Self { root }) + } + + fn pod_dir(&self, pod_name: &str) -> Result { + validate_pod_name(pod_name)?; + Ok(self.root.join(pod_name)) + } + + fn metadata_path(&self, pod_name: &str) -> Result { + Ok(self.pod_dir(pod_name)?.join("metadata.json")) + } +} + +impl PodMetadataStore for FsPodStore { + fn write(&self, metadata: &PodMetadata) -> Result<(), PodStoreError> { + let path = self.metadata_path(&metadata.pod_name)?; + if let Some(parent) = path.parent() { + fs::create_dir_all(parent)?; + } + let content = serde_json::to_vec_pretty(metadata)?; + fs::write(path, content)?; + Ok(()) + } + + fn read_by_name(&self, pod_name: &str) -> Result, PodStoreError> { + let path = self.metadata_path(pod_name)?; + let content = match fs::read_to_string(path) { + Ok(content) => content, + Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(None), + Err(err) => return Err(PodStoreError::Io(err)), + }; + Ok(Some(serde_json::from_str(&content)?)) + } + + fn list_names(&self) -> Result, PodStoreError> { + let mut names = Vec::new(); + if !self.root.exists() { + return Ok(names); + } + for entry in fs::read_dir(&self.root)? { + let entry = entry?; + if !entry.file_type()?.is_dir() { + continue; + } + if !entry.path().join("metadata.json").exists() { + continue; + } + let Some(name) = entry.file_name().to_str().map(ToOwned::to_owned) else { + continue; + }; + if validate_pod_name(&name).is_ok() { + names.push(name); + } + } + names.sort(); + Ok(names) + } + + fn root_dir(&self) -> Option { + Some(self.root.clone()) + } + + fn delete_by_name(&self, pod_name: &str) -> Result<(), PodStoreError> { + let path = self.metadata_path(pod_name)?; + match fs::remove_file(&path) { + Ok(()) => {} + Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(()), + Err(err) => return Err(PodStoreError::Io(err)), + } + if let Some(parent) = path.parent() { + let _ = fs::remove_dir(parent); + } + Ok(()) + } +} + +pub fn validate_pod_name(pod_name: &str) -> Result<(), PodStoreError> { + if pod_name.is_empty() + || pod_name == "." + || pod_name == ".." + || pod_name.contains('/') + || pod_name.contains('\0') + { + return Err(PodStoreError::InvalidPodName(pod_name.to_string())); + } + Ok(()) +} + +/// Convenience composition for callers that want one handle carrying separate +/// session-log and Pod-state roots. +#[derive(Clone)] +pub struct CombinedStore { + pub session_store: S, + pub pod_store: P, +} + +impl CombinedStore { + pub fn new(session_store: S, pod_store: P) -> Self { + Self { + session_store, + pod_store, + } + } +} + +impl session_store::Store for CombinedStore +where + S: session_store::Store, + P: Send + Sync, +{ + fn append( + &self, + session_id: SessionId, + segment_id: SegmentId, + entry: &session_store::LogEntry, + ) -> Result<(), session_store::StoreError> { + self.session_store.append(session_id, segment_id, entry) + } + fn read_all( + &self, + session_id: SessionId, + segment_id: SegmentId, + ) -> Result, session_store::StoreError> { + self.session_store.read_all(session_id, segment_id) + } + fn list_sessions(&self) -> Result, session_store::StoreError> { + self.session_store.list_sessions() + } + fn list_segments( + &self, + session_id: SessionId, + ) -> Result, session_store::StoreError> { + self.session_store.list_segments(session_id) + } + fn lookup_session_of( + &self, + segment_id: SegmentId, + ) -> Result, session_store::StoreError> { + self.session_store.lookup_session_of(segment_id) + } + fn create_segment( + &self, + session_id: SessionId, + segment_id: SegmentId, + entries: &[session_store::LogEntry], + ) -> Result<(), session_store::StoreError> { + self.session_store + .create_segment(session_id, segment_id, entries) + } + fn exists( + &self, + session_id: SessionId, + segment_id: SegmentId, + ) -> Result { + self.session_store.exists(session_id, segment_id) + } + fn truncate( + &self, + session_id: SessionId, + segment_id: SegmentId, + entries_len: usize, + ) -> Result<(), session_store::StoreError> { + self.session_store + .truncate(session_id, segment_id, entries_len) + } + fn read_entry_count( + &self, + session_id: SessionId, + segment_id: SegmentId, + ) -> Result { + self.session_store.read_entry_count(session_id, segment_id) + } + fn append_trace( + &self, + session_id: SessionId, + segment_id: SegmentId, + entry: &session_store::TraceEntry, + ) -> Result<(), session_store::StoreError> { + self.session_store + .append_trace(session_id, segment_id, entry) + } +} + +impl PodMetadataStore for CombinedStore +where + S: Send + Sync, + P: PodMetadataStore, +{ + fn write(&self, metadata: &PodMetadata) -> Result<(), PodStoreError> { + self.pod_store.write(metadata) + } + fn read_by_name(&self, pod_name: &str) -> Result, PodStoreError> { + self.pod_store.read_by_name(pod_name) + } + fn list_names(&self) -> Result, PodStoreError> { + self.pod_store.list_names() + } + fn root_dir(&self) -> Option { + self.pod_store.root_dir() + } + fn delete_by_name(&self, pod_name: &str) -> Result<(), PodStoreError> { + self.pod_store.delete_by_name(pod_name) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn pod_metadata_manifest_snapshot_roundtrips() { + let mut metadata = PodMetadata::new( + "profile-pod", + Some(PodActiveSegmentRef::pending_segment( + session_store::new_session_id(), + )), + ); + metadata.resolved_manifest_snapshot = Some(serde_json::json!({ + "pod": { "name": "profile-pod" }, + "profile": { "source": { "kind": "path", "path": "/profiles/coder.nix" } } + })); + + let json = serde_json::to_string(&metadata).unwrap(); + let restored: PodMetadata = serde_json::from_str(&json).unwrap(); + + assert_eq!(restored, metadata); + } + + #[test] + fn fs_store_writes_under_pod_state_root_only() { + let tmp = tempfile::TempDir::new().unwrap(); + let session_root = tmp.path().join("sessions"); + let pod_root = tmp.path().join("pods"); + fs::create_dir_all(&session_root).unwrap(); + let store = FsPodStore::new(&pod_root).unwrap(); + store + .write(&PodMetadata::new( + "agent", + Some(PodActiveSegmentRef::pending_segment( + session_store::new_session_id(), + )), + )) + .unwrap(); + + assert!(pod_root.join("agent/metadata.json").exists()); + assert!(!session_root.join("pods/agent/metadata.json").exists()); + } + + #[test] + fn active_updates_preserve_children_and_manifest_snapshot() { + let tmp = tempfile::TempDir::new().unwrap(); + let store = FsPodStore::new(tmp.path()).unwrap(); + let mut metadata = PodMetadata::new("agent", None); + metadata.spawned_children.push(PodSpawnedChild { + pod_name: "child".into(), + socket_path: std::path::Path::new("/tmp/child.sock").into(), + scope_delegated: vec![], + callback_address: std::path::Path::new("/tmp/parent.sock").into(), + }); + metadata.resolved_manifest_snapshot = Some(serde_json::json!({"pod":{"name":"agent"}})); + store.write(&metadata).unwrap(); + + let snapshot = serde_json::json!({"pod":{"name":"updated"}}); + store + .set_active( + "agent", + Some(PodActiveSegmentRef::active_segment( + session_store::new_session_id(), + session_store::new_segment_id(), + )), + Some(snapshot.clone()), + ) + .unwrap(); + let restored = store.read_by_name("agent").unwrap().unwrap(); + assert_eq!(restored.spawned_children.len(), 1); + assert_eq!(restored.resolved_manifest_snapshot, Some(snapshot)); + } + + #[test] + fn child_updates_preserve_active_and_manifest_snapshot() { + let tmp = tempfile::TempDir::new().unwrap(); + let store = FsPodStore::new(tmp.path()).unwrap(); + let active = PodActiveSegmentRef::active_segment( + session_store::new_session_id(), + session_store::new_segment_id(), + ); + let snapshot = serde_json::json!({"pod":{"name":"agent"}}); + store + .set_active("agent", Some(active.clone()), Some(snapshot.clone())) + .unwrap(); + store + .set_spawned_children( + "agent", + vec![PodSpawnedChild { + pod_name: "child".into(), + socket_path: std::path::Path::new("/tmp/child.sock").into(), + scope_delegated: vec![], + callback_address: std::path::Path::new("/tmp/parent.sock").into(), + }], + ) + .unwrap(); + let restored = store.read_by_name("agent").unwrap().unwrap(); + assert_eq!(restored.active, Some(active)); + assert_eq!(restored.resolved_manifest_snapshot, Some(snapshot)); + } +} diff --git a/crates/pod/Cargo.toml b/crates/pod/Cargo.toml index 1ac1fa50..875c7077 100644 --- a/crates/pod/Cargo.toml +++ b/crates/pod/Cargo.toml @@ -13,6 +13,7 @@ async-trait = { workspace = true } clap = { version = "4.6.0", features = ["derive"] } llm-worker = { workspace = true } session-store = { workspace = true } +pod-store = { workspace = true } manifest = { workspace = true } protocol = { workspace = true } provider = { workspace = true } diff --git a/crates/pod/examples/pod_cli.rs b/crates/pod/examples/pod_cli.rs index c5768170..5cdcf355 100644 --- a/crates/pod/examples/pod_cli.rs +++ b/crates/pod/examples/pod_cli.rs @@ -12,6 +12,7 @@ //! ``` use pod::{Pod, PodManifest, PodRunResult}; +use pod_store::{CombinedStore, FsPodStore}; use session_store::FsStore; fn manifest_toml(pwd: &std::path::Path) -> String { @@ -48,7 +49,10 @@ async fn main() -> Result<(), Box> { // 2. Create a persistent store (temp dir for demo) let tmp = tempfile::tempdir()?; - let store = FsStore::new(tmp.path())?; + let store = CombinedStore::new( + FsStore::new(tmp.path().join("sessions"))?, + FsPodStore::new(tmp.path().join("pods"))?, + ); // 3. Build the Pod from the single-layer manifest TOML let mut pod = Pod::from_manifest_toml(&toml, store).await?; diff --git a/crates/pod/examples/pod_protocol.rs b/crates/pod/examples/pod_protocol.rs index 7d6d6347..11811794 100644 --- a/crates/pod/examples/pod_protocol.rs +++ b/crates/pod/examples/pod_protocol.rs @@ -6,6 +6,7 @@ //! ``` use pod::{Event, Method, PodController}; +use pod_store::{CombinedStore, FsPodStore}; use session_store::FsStore; fn manifest_toml(pwd: &std::path::Path) -> String { @@ -39,7 +40,10 @@ async fn main() -> Result<(), Box> { let pwd = std::env::current_dir()?; let toml = manifest_toml(&pwd); let tmp = tempfile::tempdir()?; - let store = FsStore::new(tmp.path())?; + let store = CombinedStore::new( + FsStore::new(tmp.path().join("sessions"))?, + FsPodStore::new(tmp.path().join("pods"))?, + ); let pod = pod::Pod::from_manifest_toml(&toml, store).await?; let runtime_tmp = tempfile::tempdir()?; diff --git a/crates/pod/src/controller.rs b/crates/pod/src/controller.rs index 9449e265..51cf9a98 100644 --- a/crates/pod/src/controller.rs +++ b/crates/pod/src/controller.rs @@ -4,7 +4,8 @@ use std::sync::atomic::Ordering; use llm_worker::WorkerError; use llm_worker::llm_client::client::LlmClient; -use session_store::{PodMetadataStore, Store}; +use pod_store::PodMetadataStore; +use session_store::Store; use tokio::sync::{broadcast, mpsc, oneshot}; use crate::discovery::{ diff --git a/crates/pod/src/discovery.rs b/crates/pod/src/discovery.rs index b5581bac..bfbd79af 100644 --- a/crates/pod/src/discovery.rs +++ b/crates/pod/src/discovery.rs @@ -15,11 +15,12 @@ use std::time::Duration; use async_trait::async_trait; use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput}; +use pod_store::{PodActiveSegmentRef, PodMetadata, PodMetadataStore}; use protocol::stream::JsonLineReader; use protocol::{Event, PodStatus}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -use session_store::{PodActiveSegmentRef, PodMetadata, PodMetadataStore, SegmentId, SessionId}; +use session_store::{SegmentId, SessionId}; use tokio::net::UnixStream; use tokio::process::Command; @@ -496,8 +497,10 @@ pub enum PodDiscoveryError { socket_path: PathBuf, pid: u32, }, - #[error("store error: {0}")] + #[error("session store error: {0}")] Store(#[from] session_store::StoreError), + #[error("pod store error: {0}")] + PodStore(#[from] pod_store::PodStoreError), #[error("scope lock error: {0}")] ScopeLock(#[from] pod_registry::ScopeLockError), #[error("failed to launch restore process: {0}")] @@ -527,7 +530,7 @@ impl VisibilitySet { } async fn summarize_spawned_children( - children: &[session_store::PodSpawnedChild], + children: &[pod_store::PodSpawnedChild], ) -> SpawnedChildrenSummary { let mut summary = SpawnedChildrenSummary { count: children.len(), @@ -752,6 +755,7 @@ fn discovery_error_to_tool_error(error: PodDiscoveryError) -> ToolError { | PodDiscoveryError::NotRestorable { .. } => ToolError::InvalidArgument(error.to_string()), PodDiscoveryError::LockConflict { .. } | PodDiscoveryError::Store(_) + | PodDiscoveryError::PodStore(_) | PodDiscoveryError::ScopeLock(_) | PodDiscoveryError::RestoreSpawn(_) | PodDiscoveryError::RestoreExited { .. } @@ -765,11 +769,10 @@ mod tests { use std::sync::Mutex; use manifest::{Permission, ScopeRule}; + use pod_store::{FsPodStore, PodSpawnedChild, PodSpawnedScopeRule}; use protocol::stream::JsonLineWriter; use protocol::{Alert, AlertLevel, AlertSource, Greeting}; - use session_store::{ - FsStore, PodSpawnedChild, PodSpawnedScopeRule, new_segment_id, new_session_id, - }; + use session_store::{new_segment_id, new_session_id}; use tempfile::TempDir; use tokio::net::UnixListener; @@ -788,7 +791,7 @@ mod tests { std::env::set_var("INSOMNIA_RUNTIME_DIR", &runtime_base); } - let store = FsStore::new(&store_dir).unwrap(); + let store = FsPodStore::new(&store_dir).unwrap(); let session_id = new_session_id(); let active_child_segment = new_segment_id(); let pending_session_id = new_session_id(); diff --git a/crates/pod/src/main.rs b/crates/pod/src/main.rs index f2c38db7..caf58305 100644 --- a/crates/pod/src/main.rs +++ b/crates/pod/src/main.rs @@ -6,7 +6,8 @@ use manifest::{ NixProfileResolver, PodManifest, PodManifestConfig, ProfileSelector, ScopeConfig, paths, }; use pod::{Pod, PodController, PromptLoader}; -use session_store::{FsStore, PodMetadataStore, SegmentId, Store}; +use pod_store::{CombinedStore, FsPodStore, PodMetadataStore}; +use session_store::{FsStore, SegmentId, Store}; #[derive(Debug, Parser)] #[command( @@ -229,13 +230,28 @@ async fn main() -> ExitCode { } }, }; - let store = match FsStore::new(&store_dir) { + let session_store = match FsStore::new(&store_dir) { Ok(s) => s, Err(e) => { - eprintln!("error: failed to initialize store at {store_dir:?}: {e}"); + eprintln!("error: failed to initialize session store at {store_dir:?}: {e}"); return ExitCode::FAILURE; } }; + let pod_store_dir = match paths::data_dir() { + Some(data_dir) => data_dir.join("pods"), + None => store_dir + .parent() + .map(|parent| parent.join("pods")) + .unwrap_or_else(|| PathBuf::from("pods")), + }; + let pod_store = match FsPodStore::new(&pod_store_dir) { + Ok(s) => s, + Err(e) => { + eprintln!("error: failed to initialize pod store at {pod_store_dir:?}: {e}"); + return ExitCode::FAILURE; + } + }; + let store = CombinedStore::new(session_store, pod_store); let pod = if cli.adopt { let callback = match cli.callback.clone() { diff --git a/crates/pod/src/pod.rs b/crates/pod/src/pod.rs index 7402cc12..91768b82 100644 --- a/crates/pod/src/pod.rs +++ b/crates/pod/src/pod.rs @@ -9,9 +9,10 @@ use llm_worker::llm_client::client::LlmClient; use llm_worker::llm_client::types::Role; use llm_worker::state::Mutable; use llm_worker::{ToolOutputLimits, UsageRecord, Worker, WorkerError, WorkerResult}; +use pod_store::{PodActiveSegmentRef, PodMetadata, PodMetadataStore, PodStoreError}; use session_store::{ - LogEntry, PodActiveSegmentRef, PodMetadata, PodMetadataStore, PodScopeSnapshot, SegmentId, - SessionId, Store, StoreError, SystemItem, segment_log, to_logged, + LogEntry, PodScopeSnapshot, SegmentId, SessionId, Store, StoreError, SystemItem, segment_log, + to_logged, }; use tracing::{info, warn}; @@ -53,18 +54,21 @@ pub struct SegmentLocation { pub segment_id: SegmentId, } -type PodMetadataWriter = Arc Result<(), StoreError> + Send + Sync>; +type PodMetadataWriter = Arc Result<(), PodStoreError> + Send + Sync>; fn pod_metadata_writer_for_store(store: &St) -> PodMetadataWriter where St: PodMetadataStore + Clone + Send + Sync + 'static, { let store = store.clone(); - Arc::new(move |mut metadata| { - if let Some(existing) = store.read_by_name(&metadata.pod_name)? { - metadata.spawned_children = existing.spawned_children; - } - store.write(&metadata) + Arc::new(move |metadata| { + store + .set_active( + &metadata.pod_name, + metadata.active, + metadata.resolved_manifest_snapshot, + ) + .map(|_| ()) }) } @@ -925,30 +929,32 @@ impl Pod { metadata } - fn write_pod_metadata_pending(&self) -> Result<(), StoreError> { + fn write_pod_metadata_pending(&self) -> Result<(), PodError> { let Some(writer) = &self.pod_metadata_writer else { return Ok(()); }; writer(self.pod_metadata(Some(PodActiveSegmentRef::pending_segment( self.session_id(), - )))) + ))))?; + Ok(()) } - fn write_pod_metadata_active(&self, loc: SegmentLocation) -> Result<(), StoreError> { + fn write_pod_metadata_active(&self, loc: SegmentLocation) -> Result<(), PodError> { let Some(writer) = &self.pod_metadata_writer else { return Ok(()); }; writer(self.pod_metadata(Some(PodActiveSegmentRef::active_segment( loc.session_id, loc.segment_id, - )))) + ))))?; + Ok(()) } /// Enable name-keyed Pod metadata write-through for Pods built through /// the low-level constructor. High-level manifest constructors enable it /// automatically; this hook lets tests and custom embedders opt into the /// same persistence behavior without changing `Pod::new`'s minimal bounds. - pub fn enable_pod_metadata_write_through(&mut self) -> Result<(), StoreError> + pub fn enable_pod_metadata_write_through(&mut self) -> Result<(), PodError> where St: PodMetadataStore + Clone + Send + Sync + 'static, { @@ -4438,6 +4444,7 @@ fn token_budget_bytes(tokens: u64) -> usize { pub enum RewindError { #[error(transparent)] Store(#[from] StoreError), + #[error("{0}")] Invalid(String), } @@ -4546,6 +4553,9 @@ pub enum PodError { #[error(transparent)] Store(#[from] StoreError), + #[error(transparent)] + PodStore(#[from] PodStoreError), + #[error(transparent)] Scope(ScopeError), diff --git a/crates/pod/src/spawn/registry.rs b/crates/pod/src/spawn/registry.rs index d570ede8..0a220d24 100644 --- a/crates/pod/src/spawn/registry.rs +++ b/crates/pod/src/spawn/registry.rs @@ -20,10 +20,8 @@ use std::sync::Arc; use std::time::Duration; use manifest::{Permission, ScopeRule, SharedScope}; -use session_store::{ - PodMetadata, PodMetadataStore, PodScopeSnapshot, PodSpawnedChild, PodSpawnedScopeRule, - StoreError, -}; +use pod_store::{PodMetadataStore, PodSpawnedChild, PodSpawnedScopeRule, PodStoreError}; +use session_store::PodScopeSnapshot; use tokio::net::UnixStream; use tokio::sync::Mutex; use tracing::warn; @@ -304,18 +302,16 @@ fn write_records_to_pod_state( store: &St, pod_name: &str, records: &[SpawnedPodRecord], -) -> Result<(), StoreError> +) -> Result<(), PodStoreError> where St: PodMetadataStore, { - let mut metadata = store - .read_by_name(pod_name)? - .unwrap_or_else(|| PodMetadata::new(pod_name, None)); - metadata.spawned_children = records + let children = records .iter() .map(record_to_pod_state) .collect::, _>>()?; - store.write(&metadata) + store.set_spawned_children(pod_name, children)?; + Ok(()) } fn record_to_pod_state(record: &SpawnedPodRecord) -> Result { @@ -366,7 +362,7 @@ fn record_from_pod_state(child: &PodSpawnedChild) -> Result io::Error { +fn store_error_to_io(error: PodStoreError) -> io::Error { io::Error::other(error) } diff --git a/crates/pod/tests/compact_events_test.rs b/crates/pod/tests/compact_events_test.rs index 5c6406ef..de3d54a9 100644 --- a/crates/pod/tests/compact_events_test.rs +++ b/crates/pod/tests/compact_events_test.rs @@ -16,12 +16,15 @@ use llm_worker::Worker; use llm_worker::llm_client::event::{Event as LlmEvent, ResponseStatus, StatusEvent}; use llm_worker::llm_client::types::Item; use llm_worker::llm_client::{ClientError, LlmClient, Request}; +use pod_store::{CombinedStore, FsPodStore, PodMetadataStore}; use protocol::{Event, Method, RunResult}; -use session_store::{FsStore, LogEntry, PodMetadataStore, Store}; +use session_store::{FsStore, LogEntry, Store}; use tokio::sync::broadcast; use pod::{Pod, PodController}; +type TestStore = CombinedStore; + #[derive(Clone)] struct MockClient { responses: Arc>>, @@ -145,11 +148,14 @@ permission = "write" async fn make_pod_with_manifest( manifest_toml: &str, client: MockClient, -) -> Pod { +) -> Pod { let manifest = pod::PodManifest::from_toml(manifest_toml).unwrap(); let store_tmp = tempfile::tempdir().unwrap(); - let store = FsStore::new(store_tmp.path()).unwrap(); + let store = CombinedStore::new( + FsStore::new(store_tmp.path()).unwrap(), + FsPodStore::new(store_tmp.path().join("pods")).unwrap(), + ); std::mem::forget(store_tmp); let pwd_tmp = tempfile::tempdir().unwrap(); @@ -163,7 +169,7 @@ async fn make_pod_with_manifest( pod } -async fn make_pod(client: MockClient) -> Pod { +async fn make_pod(client: MockClient) -> Pod { make_pod_with_manifest(POST_RUN_MANIFEST_TOML, client).await } diff --git a/crates/pod/tests/consolidation_test.rs b/crates/pod/tests/consolidation_test.rs index 76a83ffb..4b81d5cf 100644 --- a/crates/pod/tests/consolidation_test.rs +++ b/crates/pod/tests/consolidation_test.rs @@ -26,7 +26,10 @@ use llm_worker::llm_client::{ClientError, LlmClient, Request}; use memory::WorkspaceLayout; use memory::extract::{ExtractedPayload, write_staging}; use memory::schema::SourceRef; +use pod_store::{CombinedStore, FsPodStore}; use session_store::FsStore; + +type TestStore = CombinedStore; use tokio::sync::broadcast; use pod::{Event, Pod}; @@ -155,11 +158,14 @@ async fn make_pod_with( manifest_toml: &str, pwd: std::path::PathBuf, client: MockClient, -) -> Pod { +) -> Pod { let manifest = pod::PodManifest::from_toml(manifest_toml).unwrap(); let store_tmp = tempfile::tempdir().unwrap(); - let store = FsStore::new(store_tmp.path()).unwrap(); + let store = CombinedStore::new( + FsStore::new(store_tmp.path()).unwrap(), + FsPodStore::new(store_tmp.path().join("pods")).unwrap(), + ); std::mem::forget(store_tmp); let scope = pod::Scope::writable(&pwd).unwrap(); @@ -184,7 +190,7 @@ fn write_n_staging(layout: &WorkspaceLayout, n: usize) -> Vec { ids } -fn attach_event_receiver(pod: &mut Pod) -> broadcast::Receiver { +fn attach_event_receiver(pod: &mut Pod) -> broadcast::Receiver { let (tx, rx) = broadcast::channel(16); pod.attach_event_tx(tx); rx diff --git a/crates/pod/tests/controller_test.rs b/crates/pod/tests/controller_test.rs index 0252cb31..41340376 100644 --- a/crates/pod/tests/controller_test.rs +++ b/crates/pod/tests/controller_test.rs @@ -9,10 +9,13 @@ use llm_worker::llm_client::event::{Event as LlmEvent, ResponseStatus, StatusEve use llm_worker::llm_client::types::Item; use llm_worker::llm_client::{ClientError, LlmClient, Request}; use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput}; +use pod_store::{CombinedStore, FsPodStore}; use session_store::{FsStore, LogEntry}; use pod::{Event, Method, Pod, PodController, PodHandle, PodManifest, PodStatus}; +type TestStore = CombinedStore; + /// Reconstruct a worker-history-like `Vec` from the live session /// log mirror held by the Pod's broadcast sink. Replaces the previous /// `PodSharedState.history()` test helper now that the mirror lives in @@ -152,21 +155,24 @@ target = "./" permission = "write" "#; -async fn make_pod(client: MockClient) -> Pod { +async fn make_pod(client: MockClient) -> Pod { make_pod_with_pwd(client).await.0 } -async fn make_pod_with_pwd(client: MockClient) -> (Pod, std::path::PathBuf) { +async fn make_pod_with_pwd(client: MockClient) -> (Pod, std::path::PathBuf) { make_pod_with_pwd_and_manifest(client, MANIFEST_TOML).await } async fn make_pod_with_pwd_and_manifest( client: MockClient, manifest_toml: &str, -) -> (Pod, std::path::PathBuf) { +) -> (Pod, std::path::PathBuf) { let manifest = PodManifest::from_toml(manifest_toml).unwrap(); let store_tmp = tempfile::tempdir().unwrap(); - let store = FsStore::new(store_tmp.path()).unwrap(); + let store = CombinedStore::new( + FsStore::new(store_tmp.path()).unwrap(), + FsPodStore::new(store_tmp.path().join("pods")).unwrap(), + ); std::mem::forget(store_tmp); // Separate tempdir to serve as the Pod's pwd/scope — these tests @@ -184,7 +190,7 @@ async fn make_pod_with_pwd_and_manifest( (pod, pwd) } -async fn spawn_controller(pod: Pod) -> PodHandle { +async fn spawn_controller(pod: Pod) -> PodHandle { let tmp = tempfile::tempdir().unwrap(); let runtime_base = tmp.path().to_owned(); std::mem::forget(tmp); diff --git a/crates/pod/tests/pod_comm_tools_test.rs b/crates/pod/tests/pod_comm_tools_test.rs index 08b8adfe..6ea601c1 100644 --- a/crates/pod/tests/pod_comm_tools_test.rs +++ b/crates/pod/tests/pod_comm_tools_test.rs @@ -20,10 +20,11 @@ use pod::spawn::comm_tools::{ list_pods_tool, read_pod_output_tool, send_to_pod_tool, stop_pod_tool, }; use pod::spawn::registry::SpawnedPodRegistry; +use pod_store::{CombinedStore, FsPodStore, PodMetadataStore}; use protocol::stream::{JsonLineReader, JsonLineWriter}; use protocol::{ErrorCode, Event, Greeting, Method}; use serde_json::json; -use session_store::{FsStore, PodMetadataStore}; +use session_store::FsStore; use tempfile::TempDir; use tokio::net::UnixListener; use tokio::sync::mpsc; @@ -385,7 +386,10 @@ async fn stop_pod_sends_shutdown_and_releases_scope() { let _env = EnvGuard::acquire(); let tmp = TempDir::new().unwrap(); let store_tmp = TempDir::new().unwrap(); - let store = FsStore::new(store_tmp.path()).unwrap(); + let store = CombinedStore::new( + FsStore::new(store_tmp.path()).unwrap(), + FsPodStore::new(store_tmp.path().join("pods")).unwrap(), + ); let rd = Arc::new(RuntimeDir::create(tmp.path(), "spawner").await.unwrap()); let parent_scope = SharedScope::new( Scope::writable(tmp.path()) @@ -512,7 +516,10 @@ async fn restored_registry_uses_pod_state_without_runtime_file() { let _env = EnvGuard::acquire(); let runtime_tmp = TempDir::new().unwrap(); let store_tmp = TempDir::new().unwrap(); - let store = FsStore::new(store_tmp.path()).unwrap(); + let store = CombinedStore::new( + FsStore::new(store_tmp.path()).unwrap(), + FsPodStore::new(store_tmp.path().join("pods")).unwrap(), + ); unsafe { std::env::set_var("INSOMNIA_RUNTIME_DIR", runtime_tmp.path()); } @@ -582,7 +589,10 @@ async fn restored_registry_uses_pod_state_without_runtime_file() { async fn load_from_pod_state_prunes_runtime_children_but_preserves_durable_state() { let runtime_tmp = TempDir::new().unwrap(); let store_tmp = TempDir::new().unwrap(); - let store = FsStore::new(store_tmp.path()).unwrap(); + let store = CombinedStore::new( + FsStore::new(store_tmp.path()).unwrap(), + FsPodStore::new(store_tmp.path().join("pods")).unwrap(), + ); let rd = Arc::new( RuntimeDir::create(runtime_tmp.path(), "spawner") .await @@ -635,7 +645,10 @@ async fn load_from_pod_state_reclaims_pruned_child_scope_without_deleting_pod_st let _env = EnvGuard::acquire(); let runtime_tmp = TempDir::new().unwrap(); let store_tmp = TempDir::new().unwrap(); - let store = FsStore::new(store_tmp.path()).unwrap(); + let store = CombinedStore::new( + FsStore::new(store_tmp.path()).unwrap(), + FsPodStore::new(store_tmp.path().join("pods")).unwrap(), + ); unsafe { std::env::set_var("INSOMNIA_RUNTIME_DIR", runtime_tmp.path()); } diff --git a/crates/pod/tests/restore_test.rs b/crates/pod/tests/restore_test.rs index 1dc97f37..6f0d4ad7 100644 --- a/crates/pod/tests/restore_test.rs +++ b/crates/pod/tests/restore_test.rs @@ -8,7 +8,8 @@ use std::sync::{LazyLock, Mutex}; use pod::{Pod, PodError}; -use session_store::{FsStore, PodActiveSegmentRef, PodMetadata, PodMetadataStore, StoreError}; +use pod_store::{CombinedStore, FsPodStore, PodActiveSegmentRef, PodMetadata, PodMetadataStore}; +use session_store::{FsStore, StoreError}; const MINIMAL_MANIFEST_TOML: &str = r#" [pod] @@ -36,7 +37,10 @@ async fn restore_from_pod_metadata_rejects_missing_metadata() { let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner()); let store_tmp = tempfile::tempdir().unwrap(); - let store = FsStore::new(store_tmp.path()).unwrap(); + let store = CombinedStore::new( + FsStore::new(store_tmp.path()).unwrap(), + FsPodStore::new(store_tmp.path().join("pods")).unwrap(), + ); let manifest = pod::PodManifest::from_toml(MINIMAL_MANIFEST_TOML).unwrap(); let result = Pod::restore_from_pod_metadata( @@ -59,7 +63,10 @@ async fn restore_from_pod_metadata_rejects_pending_segment() { let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner()); let store_tmp = tempfile::tempdir().unwrap(); - let store = FsStore::new(store_tmp.path()).unwrap(); + let store = CombinedStore::new( + FsStore::new(store_tmp.path()).unwrap(), + FsPodStore::new(store_tmp.path().join("pods")).unwrap(), + ); let manifest = pod::PodManifest::from_toml(MINIMAL_MANIFEST_TOML).unwrap(); let session_id = session_store::new_session_id(); store @@ -95,7 +102,10 @@ async fn restore_from_pod_metadata_resolves_active_pointer_through_session_log() let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner()); let store_tmp = tempfile::tempdir().unwrap(); - let store = FsStore::new(store_tmp.path()).unwrap(); + let store = CombinedStore::new( + FsStore::new(store_tmp.path()).unwrap(), + FsPodStore::new(store_tmp.path().join("pods")).unwrap(), + ); let manifest = pod::PodManifest::from_toml(MINIMAL_MANIFEST_TOML).unwrap(); let session_id = session_store::new_session_id(); let segment_id = session_store::new_segment_id(); @@ -126,7 +136,10 @@ async fn restore_from_manifest_rejects_unknown_segment() { let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner()); let store_tmp = tempfile::tempdir().unwrap(); - let store = FsStore::new(store_tmp.path()).unwrap(); + let store = CombinedStore::new( + FsStore::new(store_tmp.path()).unwrap(), + FsPodStore::new(store_tmp.path().join("pods")).unwrap(), + ); let manifest = pod::PodManifest::from_toml(MINIMAL_MANIFEST_TOML).unwrap(); // A freshly-minted id with no jsonl file at all → store returns @@ -155,7 +168,10 @@ async fn restore_from_manifest_rejects_empty_segment_log() { let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner()); let store_tmp = tempfile::tempdir().unwrap(); - let store = FsStore::new(store_tmp.path()).unwrap(); + let store = CombinedStore::new( + FsStore::new(store_tmp.path()).unwrap(), + FsPodStore::new(store_tmp.path().join("pods")).unwrap(), + ); let manifest = pod::PodManifest::from_toml(MINIMAL_MANIFEST_TOML).unwrap(); // Pre-create an empty `/.jsonl` so `read_all` succeeds @@ -189,7 +205,10 @@ async fn restore_from_manifest_rejects_segment_without_scope_snapshot() { let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner()); let store_tmp = tempfile::tempdir().unwrap(); - let store = FsStore::new(store_tmp.path()).unwrap(); + let store = CombinedStore::new( + FsStore::new(store_tmp.path()).unwrap(), + FsPodStore::new(store_tmp.path().join("pods")).unwrap(), + ); let manifest = pod::PodManifest::from_toml(MINIMAL_MANIFEST_TOML).unwrap(); let sid = session_store::new_session_id(); diff --git a/crates/pod/tests/session_metrics_test.rs b/crates/pod/tests/session_metrics_test.rs index 75add356..08bb2eb6 100644 --- a/crates/pod/tests/session_metrics_test.rs +++ b/crates/pod/tests/session_metrics_test.rs @@ -25,11 +25,14 @@ use llm_worker::Worker; use llm_worker::llm_client::event::{Event as LlmEvent, ResponseStatus, StatusEvent, UsageEvent}; use llm_worker::llm_client::{ClientError, LlmClient, Request}; use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput}; +use pod_store::{CombinedStore, FsPodStore}; use session_metrics::{DOMAIN, Metric, metrics_from_extensions}; use session_store::{FsStore, LogEntry, SegmentId, SessionId, Store, StoreError, TraceEntry}; use pod::{Pod, PodManifest}; +type TestStore = CombinedStore; + #[derive(Clone)] struct MockClient { responses: Arc>>, @@ -166,13 +169,16 @@ async fn make_pod( client: MockClient, tool_name: &'static str, ) -> ( - Pod, + Pod, tempfile::TempDir, tempfile::TempDir, ) { let manifest = PodManifest::from_toml(&manifest_toml).unwrap(); let store_tmp = tempfile::tempdir().unwrap(); - let store = FsStore::new(store_tmp.path()).unwrap(); + let store = CombinedStore::new( + FsStore::new(store_tmp.path()).unwrap(), + FsPodStore::new(store_tmp.path().join("pods")).unwrap(), + ); let pwd_tmp = tempfile::tempdir().unwrap(); let pwd = pwd_tmp.path().to_path_buf(); let scope = pod::Scope::writable(&pwd).unwrap(); @@ -500,7 +506,10 @@ permission = "write" let client = MockClient::new(vec![text_response_with_cache("hi", 0, 0)]); let manifest = PodManifest::from_toml(manifest_toml).unwrap(); let store_tmp = tempfile::tempdir().unwrap(); - let store = FsStore::new(store_tmp.path()).unwrap(); + let store = CombinedStore::new( + FsStore::new(store_tmp.path()).unwrap(), + FsPodStore::new(store_tmp.path().join("pods")).unwrap(), + ); let pwd_tmp = tempfile::tempdir().unwrap(); let pwd = pwd_tmp.path().to_path_buf(); let scope = pod::Scope::writable(&pwd).unwrap(); diff --git a/crates/pod/tests/system_prompt_template_test.rs b/crates/pod/tests/system_prompt_template_test.rs index 8433968a..c9b32ca7 100644 --- a/crates/pod/tests/system_prompt_template_test.rs +++ b/crates/pod/tests/system_prompt_template_test.rs @@ -8,10 +8,13 @@ use futures::Stream; use llm_worker::Worker; use llm_worker::llm_client::event::{Event as LlmEvent, ResponseStatus, StatusEvent}; use llm_worker::llm_client::{ClientError, LlmClient, Request}; +use pod_store::{CombinedStore, FsPodStore}; use session_store::{FsStore, LogEntry, Store}; use pod::{Pod, PodError, PromptLoader, SystemPromptTemplate}; +type TestStore = CombinedStore; + // --------------------------------------------------------------------------- // Mock LLM Client // --------------------------------------------------------------------------- @@ -99,11 +102,14 @@ permission = "write" async fn make_pod_with_body( body: &str, client: MockClient, -) -> Result<(Pod, PathBuf), PodError> { +) -> Result<(Pod, PathBuf), PodError> { let manifest = pod::PodManifest::from_toml(MINIMAL_MANIFEST_TOML).unwrap(); let store_tmp = tempfile::tempdir().unwrap(); - let store = FsStore::new(store_tmp.path()).unwrap(); + let store = CombinedStore::new( + FsStore::new(store_tmp.path()).unwrap(), + FsPodStore::new(store_tmp.path().join("pods")).unwrap(), + ); std::mem::forget(store_tmp); let pwd_tmp = tempfile::tempdir().unwrap(); diff --git a/crates/session-store/src/fs_store.rs b/crates/session-store/src/fs_store.rs index 3925b603..8a871c6b 100644 --- a/crates/session-store/src/fs_store.rs +++ b/crates/session-store/src/fs_store.rs @@ -3,7 +3,6 @@ //! Layout: //! - Segment log: `{root}/{session_id}/{segment_id}.jsonl` //! - Event trace: `{root}/{session_id}/{segment_id}.trace.jsonl` -//! - Pod metadata: `{root}/pods/{pod_name}/metadata.json` //! //! The per-Session directory makes `list_segments(session_id)` an O(dir) //! scan and gives the fork tree a visible grouping in the filesystem. @@ -17,7 +16,6 @@ //! enumerable by the picker. use crate::event_trace::TraceEntry; -use crate::pod_metadata::{PodMetadata, PodMetadataStore, validate_pod_name}; use crate::segment_log::LogEntry; use crate::store::{Store, StoreError}; use crate::{SegmentId, SessionId}; @@ -57,19 +55,6 @@ impl FsStore { .join(format!("{segment_id}.trace.jsonl")) } - fn pods_dir(&self) -> PathBuf { - self.root.join("pods") - } - - fn pod_dir(&self, pod_name: &str) -> Result { - validate_pod_name(pod_name)?; - Ok(self.pods_dir().join(pod_name)) - } - - fn pod_metadata_path(&self, pod_name: &str) -> Result { - Ok(self.pod_dir(pod_name)?.join("metadata.json")) - } - fn append_line(&self, path: &Path, line: &str) -> Result<(), StoreError> { if let Some(parent) = path.parent() { fs::create_dir_all(parent)?; @@ -102,70 +87,6 @@ impl FsStore { } } -impl PodMetadataStore for FsStore { - fn write(&self, metadata: &PodMetadata) -> Result<(), StoreError> { - let path = self.pod_metadata_path(&metadata.pod_name)?; - if let Some(parent) = path.parent() { - fs::create_dir_all(parent)?; - } - let content = serde_json::to_vec_pretty(metadata)?; - fs::write(path, content)?; - Ok(()) - } - - fn read_by_name(&self, pod_name: &str) -> Result, StoreError> { - let path = self.pod_metadata_path(pod_name)?; - let content = match fs::read_to_string(path) { - Ok(content) => content, - Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(None), - Err(err) => return Err(StoreError::Io(err)), - }; - Ok(Some(serde_json::from_str(&content)?)) - } - - fn list_names(&self) -> Result, StoreError> { - let dir = self.pods_dir(); - let mut names = Vec::new(); - if !dir.exists() { - return Ok(names); - } - for entry in fs::read_dir(dir)? { - let entry = entry?; - if !entry.file_type()?.is_dir() { - continue; - } - if !entry.path().join("metadata.json").exists() { - continue; - } - let Some(name) = entry.file_name().to_str().map(ToOwned::to_owned) else { - continue; - }; - if validate_pod_name(&name).is_ok() { - names.push(name); - } - } - names.sort(); - Ok(names) - } - - fn root_dir(&self) -> Option { - Some(self.root.clone()) - } - - fn delete_by_name(&self, pod_name: &str) -> Result<(), StoreError> { - let path = self.pod_metadata_path(pod_name)?; - match fs::remove_file(&path) { - Ok(()) => {} - Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(()), - Err(err) => return Err(StoreError::Io(err)), - } - if let Some(parent) = path.parent() { - let _ = fs::remove_dir(parent); - } - Ok(()) - } -} - impl Store for FsStore { fn append( &self, diff --git a/crates/session-store/src/lib.rs b/crates/session-store/src/lib.rs index 892c98b8..dc71b4d0 100644 --- a/crates/session-store/src/lib.rs +++ b/crates/session-store/src/lib.rs @@ -33,7 +33,6 @@ pub mod event_trace; pub mod fs_store; pub mod logged_item; -pub mod pod_metadata; pub mod segment; pub mod segment_log; pub mod store; @@ -44,9 +43,6 @@ pub use fs_store::FsStore; pub use llm_worker::UsageRecord; pub use llm_worker::llm_client::types::{ContentPart, Item, Role}; pub use logged_item::{LoggedContentPart, LoggedItem, LoggedRole, from_logged, to_logged}; -pub use pod_metadata::{ - PodActiveSegmentRef, PodMetadata, PodMetadataStore, PodSpawnedChild, PodSpawnedScopeRule, -}; pub use segment::{ SegmentStartState, append_entry, append_system_item, classify_history_item, create_compacted_segment, create_segment, create_segment_with_ids, ensure_head_or_fork, fork, diff --git a/crates/session-store/src/pod_metadata.rs b/crates/session-store/src/pod_metadata.rs deleted file mode 100644 index d6fd8e07..00000000 --- a/crates/session-store/src/pod_metadata.rs +++ /dev/null @@ -1,150 +0,0 @@ -//! Pod metadata persistence API. -//! -//! Pod metadata is a lightweight name-keyed pointer to the Session/Segment -//! currently active for a Pod. Conversation content remains in the segment log; -//! this metadata only records references needed by Pod-name resume/attach flows. - -use crate::store::StoreError; -use crate::{SegmentId, SessionId}; -use serde::{Deserialize, Serialize}; -use std::path::PathBuf; - -/// Active Session/Segment pointer for a Pod. -/// -/// `segment_id` is optional so callers can persist a reserved Session before -/// the first Segment ID is known. Once a segment exists, callers should rewrite -/// the metadata with `Some(segment_id)`. -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct PodActiveSegmentRef { - pub session_id: SessionId, - #[serde(default, skip_serializing_if = "Option::is_none")] - pub segment_id: Option, -} - -impl PodActiveSegmentRef { - /// Create a reference whose active Segment is not known yet. - pub fn pending_segment(session_id: SessionId) -> Self { - Self { - session_id, - segment_id: None, - } - } - - /// Create a fully resolved active Session/Segment reference. - pub fn active_segment(session_id: SessionId, segment_id: SegmentId) -> Self { - Self { - session_id, - segment_id: Some(segment_id), - } - } -} - -/// One delegated scope rule for a spawned child, kept local to -/// `session-store` so the persistence crate does not depend on manifest -/// scope types. -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct PodSpawnedScopeRule { - pub target: PathBuf, - pub permission: String, - pub recursive: bool, -} - -/// One child Pod spawned by this Pod and persisted with the spawner's -/// name-keyed Pod state. -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct PodSpawnedChild { - pub pod_name: String, - pub socket_path: PathBuf, - pub scope_delegated: Vec, - pub callback_address: PathBuf, -} - -/// Persistent metadata for a Pod name. -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct PodMetadata { - pub pod_name: String, - #[serde(default, skip_serializing_if = "Option::is_none")] - pub active: Option, - #[serde(default, skip_serializing_if = "Vec::is_empty")] - pub spawned_children: Vec, - #[serde(default, skip_serializing_if = "Option::is_none")] - pub resolved_manifest_snapshot: Option, -} - -impl PodMetadata { - /// Create Pod metadata for `pod_name`. - pub fn new(pod_name: impl Into, active: Option) -> Self { - Self { - pod_name: pod_name.into(), - active, - spawned_children: Vec::new(), - resolved_manifest_snapshot: None, - } - } -} - -/// Sync persistence backend for Pod metadata. -/// -/// The key is the Pod name. Missing state is not an error: `read_by_name` -/// returns `Ok(None)` for Pods that have never persisted metadata or whose -/// metadata was deleted. -pub trait PodMetadataStore: Send + Sync { - /// Create or replace metadata for its `pod_name` key. - fn write(&self, metadata: &PodMetadata) -> Result<(), StoreError>; - - /// Read metadata by Pod name. Returns `None` when no metadata exists. - fn read_by_name(&self, pod_name: &str) -> Result, StoreError>; - - /// List persisted Pod metadata keys. Implementations return names only; - /// callers can then read each item independently so a corrupt metadata - /// file does not make the whole discovery result fail. - fn list_names(&self) -> Result, StoreError>; - - /// Return the metadata root directory when this backend is path-backed. - fn root_dir(&self) -> Option { - None - } - - /// Delete metadata by Pod name. Missing metadata is a successful no-op. - fn delete_by_name(&self, pod_name: &str) -> Result<(), StoreError>; -} - -pub(crate) fn validate_pod_name(pod_name: &str) -> Result<(), StoreError> { - if pod_name.is_empty() - || pod_name == "." - || pod_name == ".." - || pod_name.contains('/') - || pod_name.contains('\0') - { - return Err(StoreError::InvalidPodName(pod_name.to_string())); - } - Ok(()) -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn pod_metadata_manifest_snapshot_roundtrips() { - let mut metadata = PodMetadata::new( - "profile-pod", - Some(PodActiveSegmentRef::pending_segment(crate::new_session_id())), - ); - metadata.resolved_manifest_snapshot = Some(serde_json::json!({ - "pod": { "name": "profile-pod" }, - "profile": { - "source": { "kind": "path", "path": "/profiles/coder.nix" } - } - })); - - let json = serde_json::to_string(&metadata).unwrap(); - let restored: PodMetadata = serde_json::from_str(&json).unwrap(); - - assert_eq!(restored, metadata); - assert_eq!( - restored.resolved_manifest_snapshot.as_ref().unwrap()["profile"]["source"]["kind"], - "path" - ); - } -} diff --git a/crates/session-store/src/store.rs b/crates/session-store/src/store.rs index e2c5b6b8..ff5489fc 100644 --- a/crates/session-store/src/store.rs +++ b/crates/session-store/src/store.rs @@ -29,9 +29,6 @@ pub enum StoreError { #[error("log corrupted at line {line}: {message}")] Corrupt { line: usize, message: String }, - - #[error("invalid pod name: {0}")] - InvalidPodName(String), } /// Sync persistence backend for segment logs. diff --git a/crates/session-store/tests/fs_store_test.rs b/crates/session-store/tests/fs_store_test.rs index 5c6d7327..c5d0b9b0 100644 --- a/crates/session-store/tests/fs_store_test.rs +++ b/crates/session-store/tests/fs_store_test.rs @@ -1,8 +1,7 @@ use llm_worker::WorkerResult; use llm_worker::llm_client::types::{Item, RequestConfig}; use session_store::{ - FsStore, LogEntry, PodActiveSegmentRef, PodMetadata, PodMetadataStore, Store, TraceEntry, - collect_state, new_segment_id, new_session_id, + FsStore, LogEntry, Store, TraceEntry, collect_state, new_segment_id, new_session_id, }; fn nil_session_start(ts: u64, session_id: uuid::Uuid) -> LogEntry { @@ -240,40 +239,3 @@ fn lookup_session_of_finds_owning_session() { assert_eq!(store.lookup_session_of(segid).unwrap(), Some(sid)); } - -#[test] -fn pod_metadata_minimal_crud() { - let dir = tempfile::tempdir().unwrap(); - let store = FsStore::new(dir.path()).unwrap(); - let pod_name = "worker-a"; - let sid = new_session_id(); - let segid = new_segment_id(); - - assert_eq!(store.read_by_name(pod_name).unwrap(), None); - - let pending = PodMetadata::new(pod_name, Some(PodActiveSegmentRef::pending_segment(sid))); - store.write(&pending).unwrap(); - assert_eq!(store.list_names().unwrap(), vec![pod_name.to_string()]); - assert_eq!(store.read_by_name(pod_name).unwrap(), Some(pending.clone())); - assert!( - dir.path() - .join("pods") - .join(pod_name) - .join("metadata.json") - .exists(), - "Pod metadata must live under /pods//" - ); - - let resolved = PodMetadata::new( - pod_name, - Some(PodActiveSegmentRef::active_segment(sid, segid)), - ); - store.write(&resolved).unwrap(); - assert_eq!(store.read_by_name(pod_name).unwrap(), Some(resolved)); - - store.delete_by_name(pod_name).unwrap(); - assert_eq!(store.read_by_name(pod_name).unwrap(), None); - - // Delete is idempotent for missing metadata. - store.delete_by_name(pod_name).unwrap(); -} diff --git a/crates/tui/Cargo.toml b/crates/tui/Cargo.toml index 997e3e00..09799309 100644 --- a/crates/tui/Cargo.toml +++ b/crates/tui/Cargo.toml @@ -20,6 +20,7 @@ uuid = { workspace = true } toml = { workspace = true } manifest = { workspace = true } session-store = { workspace = true } +pod-store = { workspace = true } pod-registry = { workspace = true } serde = { workspace = true, features = ["derive"] } pulldown-cmark = { version = "0.13.3", default-features = false } diff --git a/crates/tui/src/multi_pod.rs b/crates/tui/src/multi_pod.rs index 2388c5b8..ac0dfccc 100644 --- a/crates/tui/src/multi_pod.rs +++ b/crates/tui/src/multi_pod.rs @@ -3,6 +3,7 @@ use std::path::{Path, PathBuf}; use std::time::{Duration, Instant}; use crossterm::event::{Event as TermEvent, KeyCode, KeyEvent, KeyModifiers, poll, read}; +use pod_store::FsPodStore; use protocol::stream::{JsonLineReader, JsonLineWriter}; use protocol::{ErrorCode, Event, InvokeKind, Method, PodStatus, Segment}; use ratatui::Frame; @@ -199,12 +200,22 @@ fn default_store_dir() -> Result { manifest::paths::sessions_dir().ok_or_else(|| { MultiPodError::Io(io::Error::new( io::ErrorKind::NotFound, - "could not resolve sessions directory \ - (set INSOMNIA_HOME, INSOMNIA_DATA_DIR, or HOME)", + "could not resolve sessions directory", )) }) } +fn default_pod_store_dir() -> Result { + manifest::paths::data_dir() + .map(|dir| dir.join("pods")) + .ok_or_else(|| { + MultiPodError::Io(io::Error::new( + io::ErrorKind::NotFound, + "could not resolve pod state directory", + )) + }) +} + #[cfg(test)] #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub(crate) enum SendEligibility { @@ -483,7 +494,8 @@ enum MultiPodAction { async fn load_pod_list(selected_name: Option) -> Result { let store_dir = default_store_dir()?; let store = FsStore::new(&store_dir)?; - let stored = read_stored_pod_infos(&store_dir, &store)?; + let pod_store = FsPodStore::new(default_pod_store_dir()?).map_err(io::Error::other)?; + let stored = read_stored_pod_infos(&store, &pod_store)?; let live = read_reachable_live_pod_infos(&store) .await .unwrap_or_default(); diff --git a/crates/tui/src/picker.rs b/crates/tui/src/picker.rs index 51acc21d..633405c1 100644 --- a/crates/tui/src/picker.rs +++ b/crates/tui/src/picker.rs @@ -1,7 +1,7 @@ //! Inline-viewport "pick a Pod to attach or restore" UX. //! //! Reads live Pod allocations from the runtime registry and stopped Pod state -//! from the session store's name-keyed metadata. Picking a live row attaches to +//! from the pod-store name-keyed metadata. Picking a live row attaches to //! its socket; picking a stopped row restores via `insomnia-pod --pod `. use std::io; @@ -9,6 +9,7 @@ use std::path::PathBuf; use std::time::Duration; use crossterm::event::{self, Event as TermEvent, KeyCode, KeyEventKind, KeyModifiers}; +use pod_store::FsPodStore; use ratatui::Terminal; use ratatui::backend::CrosstermBackend; use ratatui::layout::{Constraint, Layout}; @@ -102,7 +103,8 @@ impl PodRowState { pub async fn run() -> Result { let store_dir = default_store_dir()?; let store = FsStore::new(&store_dir)?; - let stored_pods = read_stored_pod_infos(&store_dir, &store)?; + let pod_store = FsPodStore::new(default_pod_store_dir()?).map_err(io::Error::other)?; + let stored_pods = read_stored_pod_infos(&store, &pod_store)?; let live_pods = read_reachable_live_pod_infos(&store) .await .unwrap_or_default(); @@ -172,6 +174,18 @@ fn default_store_dir() -> Result { }) } +fn default_pod_store_dir() -> Result { + manifest::paths::data_dir() + .map(|dir| dir.join("pods")) + .ok_or_else(|| { + PickerError::Io(io::Error::new( + io::ErrorKind::NotFound, + "could not resolve pod state directory \ + (set INSOMNIA_HOME, INSOMNIA_DATA_DIR, or HOME)", + )) + }) +} + pub(crate) fn live_socket_for_pod(pod_name: &str) -> Option { pod_list_live_socket_for_pod(pod_name) } diff --git a/crates/tui/src/pod_list.rs b/crates/tui/src/pod_list.rs index 81e84f7e..6f3d160b 100644 --- a/crates/tui/src/pod_list.rs +++ b/crates/tui/src/pod_list.rs @@ -1,14 +1,14 @@ use std::collections::BTreeMap; -use std::fs; use std::io; use std::path::{Path, PathBuf}; use std::time::Duration; use client::PodClient; use pod_registry::{LockFileGuard, default_registry_path}; +use pod_store::{PodActiveSegmentRef, PodMetadata, PodMetadataStore}; use protocol::{Event, PodStatus}; use session_store::{ - FsStore, LogEntry, LoggedContentPart, LoggedItem, PodMetadata, SegmentId, SessionId, Store, + FsStore, LogEntry, LoggedContentPart, LoggedItem, SegmentId, SessionId, Store, }; #[derive(Debug, Clone)] @@ -234,27 +234,17 @@ pub(crate) enum PodEntryDiagnosticKind { } pub(crate) fn read_stored_pod_infos( - store_dir: &Path, store: &FsStore, + pod_store: &impl PodMetadataStore, ) -> Result, io::Error> { - let pods_dir = store_dir.join("pods"); let mut records = Vec::new(); - if !pods_dir.exists() { - return Ok(records); - } - - for entry in fs::read_dir(pods_dir)? { - let entry = entry?; - if !entry.file_type()?.is_dir() { - continue; - } - let pod_name = entry.file_name().to_string_lossy().to_string(); - let path = entry.path().join("metadata.json"); - let info = match fs::read_to_string(&path) { - Ok(content) => match serde_json::from_str::(&content) { - Ok(metadata) => stored_info_from_metadata(store, pod_name, metadata), - Err(e) => corrupt_stored_info(pod_name, e.to_string()), - }, + for pod_name in pod_store.list_names().map_err(io::Error::other)? { + let info = match pod_store.read_by_name(&pod_name) { + Ok(Some(metadata)) => stored_info_from_metadata(store, pod_name, metadata), + Ok(None) => corrupt_stored_info( + pod_name, + "metadata disappeared during discovery".to_string(), + ), Err(e) => corrupt_stored_info(pod_name, e.to_string()), }; records.push(info); @@ -392,10 +382,7 @@ fn summarize_live_pod(store: &FsStore, live: &LivePodInfo) -> PodEntrySummary { } } -fn summarize_metadata( - store: &FsStore, - active: Option<&session_store::PodActiveSegmentRef>, -) -> SegmentSummary { +fn summarize_metadata(store: &FsStore, active: Option<&PodActiveSegmentRef>) -> SegmentSummary { let Some(active) = active else { return SegmentSummary { updated_at: 0, @@ -558,7 +545,9 @@ fn trim_one_line(s: &str, max_chars: usize) -> String { mod tests { use super::*; use llm_worker::llm_client::types::RequestConfig; - use session_store::{PodActiveSegmentRef, PodMetadataStore, new_segment_id, new_session_id}; + use pod_store::FsPodStore; + use pod_store::{PodActiveSegmentRef, PodMetadataStore}; + use session_store::{new_segment_id, new_session_id}; use tempfile::tempdir; const SOURCE: PodVisibilitySource = PodVisibilitySource::ResumePicker; @@ -776,11 +765,12 @@ mod tests { fn read_stored_pod_infos_reports_corrupt_metadata() { let dir = tempdir().unwrap(); let store = FsStore::new(dir.path()).unwrap(); + let pod_store = FsPodStore::new(dir.path().join("pods")).unwrap(); let pod_dir = dir.path().join("pods").join("broken"); - fs::create_dir_all(&pod_dir).unwrap(); - fs::write(pod_dir.join("metadata.json"), "{not-json").unwrap(); + std::fs::create_dir_all(&pod_dir).unwrap(); + std::fs::write(pod_dir.join("metadata.json"), "{not-json").unwrap(); - let records = read_stored_pod_infos(dir.path(), &store).unwrap(); + let records = read_stored_pod_infos(&store, &pod_store).unwrap(); assert_eq!(records.len(), 1); assert_eq!(records[0].pod_name, "broken"); assert!(matches!( @@ -793,16 +783,17 @@ mod tests { fn read_stored_pod_infos_reads_metadata() { let dir = tempdir().unwrap(); let store = FsStore::new(dir.path()).unwrap(); + let pod_store = FsPodStore::new(dir.path().join("pods")).unwrap(); let session_id = new_session_id(); let segment_id = new_segment_id(); - store + pod_store .write(&PodMetadata::new( "agent", Some(PodActiveSegmentRef::active_segment(session_id, segment_id)), )) .unwrap(); - let records = read_stored_pod_infos(dir.path(), &store).unwrap(); + let records = read_stored_pod_infos(&store, &pod_store).unwrap(); assert_eq!(records.len(), 1); assert_eq!(records[0].pod_name, "agent"); assert_eq!(records[0].metadata_state, StoredMetadataState::Present); From e10b4ad4f022b8a47c7ea2e5496e801479d34c00 Mon Sep 17 00:00:00 2001 From: Hare Date: Sat, 30 May 2026 07:36:17 +0900 Subject: [PATCH 2/3] refactor: move scope authority to pod store --- crates/client/src/spawn.rs | 8 -- crates/pod-store/src/lib.rs | 65 +++++++++++++ crates/pod/src/controller.rs | 9 +- crates/pod/src/discovery.rs | 5 + crates/pod/src/main.rs | 12 +-- crates/pod/src/pod.rs | 123 ++++++++---------------- crates/pod/src/spawn/registry.rs | 101 ++++++++++++------- crates/pod/src/spawn/tool.rs | 13 --- crates/pod/tests/pod_comm_tools_test.rs | 30 +++--- crates/pod/tests/restore_test.rs | 36 ------- crates/pod/tests/spawn_pod_test.rs | 3 - crates/session-store/src/lib.rs | 8 +- crates/session-store/src/segment.rs | 19 +--- crates/session-store/src/segment_log.rs | 27 +----- crates/tui/src/spawn.rs | 79 +-------------- 15 files changed, 200 insertions(+), 338 deletions(-) diff --git a/crates/client/src/spawn.rs b/crates/client/src/spawn.rs index c603c7fd..401cff03 100644 --- a/crates/client/src/spawn.rs +++ b/crates/client/src/spawn.rs @@ -31,8 +31,6 @@ pub struct SpawnConfig { /// `--profile`; the Pod name is supplied through `--profile-pod-name` so /// profile evaluation stays separate from `--pod` restore semantics. pub profile: Option, - /// Optional session-scope snapshot used when restoring by session id. - pub resume_scope: Option, /// pod の current_dir。 pub cwd: PathBuf, /// `Some(id)` のとき `--session ` を付与し、当該セッションから @@ -132,12 +130,6 @@ where .arg(id.to_string()) .arg("--session-pod-name") .arg(&config.pod_name); - if let Some(scope) = &config.resume_scope { - let scope_json = serde_json::to_string(scope).map_err(|e| { - SpawnError::PodLaunchFailed(io::Error::new(io::ErrorKind::InvalidInput, e)) - })?; - command.arg("--resume-scope-json").arg(scope_json); - } } let mut child = command.spawn().map_err(SpawnError::PodLaunchFailed)?; diff --git a/crates/pod-store/src/lib.rs b/crates/pod-store/src/lib.rs index c39ea5c4..07517b4d 100644 --- a/crates/pod-store/src/lib.rs +++ b/crates/pod-store/src/lib.rs @@ -75,6 +75,15 @@ pub struct PodSpawnedChild { pub callback_address: PathBuf, } +/// One child delegation that has been reclaimed. Kept as durable audit state so +/// restore can distinguish outstanding delegated scope from already-reclaimed +/// child state without consulting session logs. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct PodReclaimedChild { + pub pod_name: String, + pub scope_delegated: Vec, +} + /// Persistent metadata for a Pod name. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct PodMetadata { @@ -83,6 +92,8 @@ pub struct PodMetadata { pub active: Option, #[serde(default, skip_serializing_if = "Vec::is_empty")] pub spawned_children: Vec, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub reclaimed_children: Vec, #[serde(default, skip_serializing_if = "Option::is_none")] pub resolved_manifest_snapshot: Option, } @@ -94,6 +105,7 @@ impl PodMetadata { pod_name: pod_name.into(), active, spawned_children: Vec::new(), + reclaimed_children: Vec::new(), resolved_manifest_snapshot: None, } } @@ -155,6 +167,23 @@ pub trait PodMetadataStore: Send + Sync { metadata.spawned_children = children; }) } + + /// Remove reclaimed child delegations from the outstanding set and record + /// them in durable reclaim history. + fn reclaim_spawned_children( + &self, + pod_name: &str, + reclaimed: Vec, + ) -> Result { + self.update_by_name(pod_name, |metadata| { + for reclaimed_child in &reclaimed { + metadata + .spawned_children + .retain(|child| child.pod_name != reclaimed_child.pod_name); + } + metadata.reclaimed_children.extend(reclaimed); + }) + } } /// Filesystem-backed Pod metadata store. @@ -473,4 +502,40 @@ mod tests { assert_eq!(restored.active, Some(active)); assert_eq!(restored.resolved_manifest_snapshot, Some(snapshot)); } + + #[test] + fn reclaim_children_removes_outstanding_and_records_history() { + let tmp = tempfile::TempDir::new().unwrap(); + let store = FsPodStore::new(tmp.path()).unwrap(); + let scope = PodSpawnedScopeRule { + target: std::path::Path::new("/tmp/delegated").into(), + permission: "write".into(), + recursive: true, + }; + store + .set_spawned_children( + "agent", + vec![PodSpawnedChild { + pod_name: "child".into(), + socket_path: std::path::Path::new("/tmp/child.sock").into(), + scope_delegated: vec![scope.clone()], + callback_address: std::path::Path::new("/tmp/parent.sock").into(), + }], + ) + .unwrap(); + + store + .reclaim_spawned_children( + "agent", + vec![PodReclaimedChild { + pod_name: "child".into(), + scope_delegated: vec![scope.clone()], + }], + ) + .unwrap(); + let restored = store.read_by_name("agent").unwrap().unwrap(); + assert!(restored.spawned_children.is_empty()); + assert_eq!(restored.reclaimed_children.len(), 1); + assert_eq!(restored.reclaimed_children[0].scope_delegated, vec![scope]); + } } diff --git a/crates/pod/src/controller.rs b/crates/pod/src/controller.rs index 51cf9a98..b61c35e1 100644 --- a/crates/pod/src/controller.rs +++ b/crates/pod/src/controller.rs @@ -162,14 +162,15 @@ impl PodController { pod.store().clone(), spawner_name.clone(), Some(pod.scope().clone()), - Some(pod.scope_change_sink()), ) .await?; let reclaimed_unreachable = loaded_registry.reclaimed_unreachable; let spawned_registry = loaded_registry.registry; if reclaimed_unreachable { - pod.persist_scope_snapshot() - .map_err(std::io::Error::other)?; + pod.push_notify( + "Restored Pod state contained unreachable delegated child Pods; their delegated write scopes were reclaimed before resume." + .to_string(), + ); } // Hand the alerter to the Pod so internal operations (compaction, @@ -497,7 +498,6 @@ where let pwd = pod.pwd().to_path_buf(); let task_store = pod.task_store(); let session_id_for_usage = pod.segment_id().to_string(); - let scope_change_sink = pod.scope_change_sink(); let memory_config = pod.manifest().memory.clone(); let web_config = pod.manifest().web.clone(); let spawner_name = pod.manifest().pod.name.clone(); @@ -557,7 +557,6 @@ where self_parent_socket, spawner_model, scope_handle, - scope_change_sink, )); worker.register_tool(send_to_pod_tool(spawned_registry.clone())); worker.register_tool(read_pod_output_tool(spawned_registry.clone())); diff --git a/crates/pod/src/discovery.rs b/crates/pod/src/discovery.rs index bfbd79af..acceaa33 100644 --- a/crates/pod/src/discovery.rs +++ b/crates/pod/src/discovery.rs @@ -809,6 +809,7 @@ mod tests { child("child-stale", &stale_socket), child("child-pending", &pending_socket), ], + reclaimed_children: Vec::new(), resolved_manifest_snapshot: None, }; store.write(&parent).unwrap(); @@ -820,6 +821,7 @@ mod tests { active_child_segment, )), spawned_children: Vec::new(), + reclaimed_children: Vec::new(), resolved_manifest_snapshot: None, }) .unwrap(); @@ -831,6 +833,7 @@ mod tests { active_child_segment, )), spawned_children: Vec::new(), + reclaimed_children: Vec::new(), resolved_manifest_snapshot: None, }) .unwrap(); @@ -839,6 +842,7 @@ mod tests { pod_name: "child-pending".into(), active: Some(PodActiveSegmentRef::pending_segment(pending_session_id)), spawned_children: Vec::new(), + reclaimed_children: Vec::new(), resolved_manifest_snapshot: None, }) .unwrap(); @@ -850,6 +854,7 @@ mod tests { new_segment_id(), )), spawned_children: Vec::new(), + reclaimed_children: Vec::new(), resolved_manifest_snapshot: None, }) .unwrap(); diff --git a/crates/pod/src/main.rs b/crates/pod/src/main.rs index caf58305..9b9e6a06 100644 --- a/crates/pod/src/main.rs +++ b/crates/pod/src/main.rs @@ -2,9 +2,7 @@ use std::path::{Path, PathBuf}; use std::process::ExitCode; use clap::Parser; -use manifest::{ - NixProfileResolver, PodManifest, PodManifestConfig, ProfileSelector, ScopeConfig, paths, -}; +use manifest::{NixProfileResolver, PodManifest, PodManifestConfig, ProfileSelector, paths}; use pod::{Pod, PodController, PromptLoader}; use pod_store::{CombinedStore, FsPodStore, PodMetadataStore}; use session_store::{FsStore, SegmentId, Store}; @@ -46,10 +44,6 @@ struct Cli { #[arg(long, value_name = "NAME", requires = "session", hide = true)] session_pod_name: Option, - /// Internal typed scope snapshot for session restore launched by the TUI. - #[arg(long, value_name = "JSON", requires = "session", hide = true)] - resume_scope_json: Option, - /// Internal resolved manifest config for delegated child Pod spawning. #[arg( long, @@ -134,10 +128,6 @@ fn apply_session_restore_overrides(manifest: &mut PodManifest, cli: &Cli) -> Res if let Some(pod_name) = cli.session_pod_name.as_deref() { manifest.pod.name = pod_name.to_string(); } - if let Some(scope_json) = cli.resume_scope_json.as_deref() { - manifest.scope = serde_json::from_str::(scope_json) - .map_err(|e| format!("failed to parse --resume-scope-json: {e}"))?; - } Ok(()) } diff --git a/crates/pod/src/pod.rs b/crates/pod/src/pod.rs index 91768b82..5e79fc15 100644 --- a/crates/pod/src/pod.rs +++ b/crates/pod/src/pod.rs @@ -9,10 +9,11 @@ use llm_worker::llm_client::client::LlmClient; use llm_worker::llm_client::types::Role; use llm_worker::state::Mutable; use llm_worker::{ToolOutputLimits, UsageRecord, Worker, WorkerError, WorkerResult}; -use pod_store::{PodActiveSegmentRef, PodMetadata, PodMetadataStore, PodStoreError}; +use pod_store::{ + PodActiveSegmentRef, PodMetadata, PodMetadataStore, PodSpawnedScopeRule, PodStoreError, +}; use session_store::{ - LogEntry, PodScopeSnapshot, SegmentId, SessionId, Store, StoreError, SystemItem, segment_log, - to_logged, + LogEntry, SegmentId, SessionId, Store, StoreError, SystemItem, segment_log, to_logged, }; use tracing::{info, warn}; @@ -345,10 +346,6 @@ pub struct Pod { /// Workflow descriptions. This is intentionally independent from /// summary and Knowledge residency: each section has its own gate. inject_resident_workflows: bool, - /// Latest runtime scope snapshot queued by dynamic scope changes. - /// Drained into the session log before the next turn result is - /// persisted, so resume never silently reclaims delegated writes. - pending_scope_snapshot: Arc>>, /// extract (memory.extract) reentry guard. `true` while an extract /// worker is running; subsequent triggers are skipped per spec /// (`docs/plan/memory.md` §Extract 並走防止). `Arc` so @@ -454,7 +451,6 @@ impl Pod { inject_resident_summary: self.inject_resident_summary, inject_resident_knowledge: self.inject_resident_knowledge, inject_resident_workflows: self.inject_resident_workflows, - pending_scope_snapshot: self.pending_scope_snapshot.clone(), extract_in_flight: self.extract_in_flight.clone(), consolidation_in_flight: self.consolidation_in_flight.clone(), extract_pointer: self.extract_pointer.clone(), @@ -634,7 +630,6 @@ impl Pod { inject_resident_summary: true, inject_resident_knowledge: true, inject_resident_workflows: true, - pending_scope_snapshot: Arc::new(Mutex::new(None)), extract_in_flight: Arc::new(AtomicBool::new(false)), consolidation_in_flight: Arc::new(AtomicBool::new(false)), extract_pointer: Arc::new(Mutex::new(None)), @@ -753,30 +748,6 @@ impl Pod { .update(|cur| cur.with_added_deny_rules(revoke.clone())) } - /// Snapshot the current runtime scope in the session log. The entry - /// is intentionally appended as soon as a session log exists: if the - /// process later exits while children keep their allocations, resume - /// can restore the narrowed scope instead of reclaiming delegated - /// writes. - pub fn persist_scope_snapshot(&mut self) -> Result<(), StoreError> { - if self.segment_state.entries_written() == 0 { - return Ok(()); - } - let snapshot = { - let scope = self.scope.snapshot(); - PodScopeSnapshot { - allow: scope.allow_rules(), - deny: scope.deny_rules(), - } - }; - let payload = serde_json::to_value(&snapshot).expect("PodScopeSnapshot is Serialize"); - self.commit_entry(LogEntry::Extension { - ts: segment_log::now_millis(), - domain: session_store::POD_SCOPE_EXTENSION_DOMAIN.into(), - payload, - }) - } - /// Append `entry` to the session log AND publish it through the /// broadcast sink. No user-space serialization is needed across /// concurrent appenders — the kernel orders `O_APPEND` writes for @@ -796,34 +767,6 @@ impl Pod { self.sink.clone() } - /// Cloneable callback handed to dynamic-scope tools. It cannot append - /// directly to the async store from a sync tool callback, so it records - /// the latest snapshot and the controller flushes it after the tool - /// turn completes. - pub fn scope_change_sink(&self) -> Arc { - let pending = self.pending_scope_snapshot.clone(); - Arc::new(move |snapshot| { - *pending.lock().expect("pending_scope_snapshot poisoned") = Some(snapshot); - }) - } - - fn flush_pending_scope_snapshot(&mut self) -> Result<(), StoreError> { - let snapshot = self - .pending_scope_snapshot - .lock() - .expect("pending_scope_snapshot poisoned") - .take(); - if let Some(snapshot) = snapshot { - let payload = serde_json::to_value(&snapshot).expect("PodScopeSnapshot is Serialize"); - self.commit_entry(LogEntry::Extension { - ts: segment_log::now_millis(), - domain: session_store::POD_SCOPE_EXTENSION_DOMAIN.into(), - payload, - })?; - } - Ok(()) - } - /// Direct access to the underlying Worker. pub fn worker(&self) -> &Worker { self.worker.as_ref().expect("worker taken during run") @@ -2007,7 +1950,6 @@ impl Pod { compacted_from: None, }; self.commit_entry(initial)?; - self.persist_scope_snapshot()?; self.write_pod_metadata_active(loc)?; return Ok(()); } @@ -2302,8 +2244,6 @@ impl Pod { } } - self.flush_pending_scope_snapshot()?; - let turn_count = self.worker.as_ref().unwrap().turn_count(); self.commit_entry(LogEntry::TurnEnd { ts: segment_log::now_millis(), @@ -2775,7 +2715,6 @@ impl Pod { .lock() .expect("usage_history poisoned") .clear(); - self.persist_scope_snapshot()?; // Reset extract pointer alongside usage_history: the compacted // session has a fresh log with no `LogEntry::Extension` entries // yet, so a cold restore here would set extract_pointer to None @@ -3831,7 +3770,6 @@ where inject_resident_summary: true, inject_resident_knowledge: true, inject_resident_workflows: true, - pending_scope_snapshot: Arc::new(Mutex::new(None)), extract_in_flight: Arc::new(AtomicBool::new(false)), consolidation_in_flight: Arc::new(AtomicBool::new(false)), extract_pointer: Arc::new(Mutex::new(None)), @@ -3911,7 +3849,6 @@ where inject_resident_summary: true, inject_resident_knowledge: true, inject_resident_workflows: true, - pending_scope_snapshot: Arc::new(Mutex::new(None)), extract_in_flight: Arc::new(AtomicBool::new(false)), consolidation_in_flight: Arc::new(AtomicBool::new(false)), extract_pointer: Arc::new(Mutex::new(None)), @@ -4001,19 +3938,13 @@ where return Err(PodError::SegmentEmpty { segment_id }); } let mirror_entries: Vec = raw_entries.clone(); - let scope_snapshot = state - .pod_scope - .clone() - .ok_or(PodError::SegmentScopeMissing { segment_id })?; + let scope_config = effective_restore_scope_config(&store, &manifest)?; let mut common = prepare_pod_common_with_scope( &manifest, &loader, /* parse_template */ false, - ScopeConfig { - allow: scope_snapshot.allow, - deny: scope_snapshot.deny, - }, + scope_config, )?; let skill_shadows = std::mem::take(&mut common.skill_shadows); @@ -4099,7 +4030,6 @@ where inject_resident_summary: true, inject_resident_knowledge: true, inject_resident_workflows: true, - pending_scope_snapshot: Arc::new(Mutex::new(None)), extract_in_flight: Arc::new(AtomicBool::new(false)), consolidation_in_flight: Arc::new(AtomicBool::new(false)), extract_pointer: Arc::new(Mutex::new(extract_pointer)), @@ -4623,11 +4553,6 @@ pub enum PodError { #[error("session {segment_id} has no entries to restore")] SegmentEmpty { segment_id: SegmentId }, - #[error( - "session {segment_id} has no persisted scope snapshot; refusing resume without explicit scope" - )] - SegmentScopeMissing { segment_id: SegmentId }, - #[error("pod metadata for {pod_name} was not found")] PodMetadataMissing { pod_name: String }, @@ -4669,6 +4594,42 @@ struct PodCommon { skill_shadows: Vec, } +fn effective_restore_scope_config( + store: &St, + manifest: &PodManifest, +) -> Result +where + St: PodMetadataStore, +{ + let mut scope = manifest.scope.clone(); + let Some(metadata) = store.read_by_name(&manifest.pod.name)? else { + return Ok(scope); + }; + for child in metadata.spawned_children { + for rule in child.scope_delegated { + if let Some(deny) = delegated_write_rule_to_deny(rule) { + scope.deny.push(deny); + } + } + } + Ok(scope) +} + +fn delegated_write_rule_to_deny(rule: PodSpawnedScopeRule) -> Option { + match rule.permission.as_str() { + "write" => Some(ScopeRule { + target: rule.target, + permission: Permission::Write, + recursive: rule.recursive, + }), + "read" => None, + other => { + warn!(permission = %other, "ignoring invalid delegated child scope permission"); + None + } + } +} + /// Resolve pwd / scope / LLM client / prompt catalog from a validated /// manifest cascade. Used by `from_manifest`, `from_manifest_spawned`, /// and `restore_from_manifest` so they share one definition of "what diff --git a/crates/pod/src/spawn/registry.rs b/crates/pod/src/spawn/registry.rs index 0a220d24..e66a2602 100644 --- a/crates/pod/src/spawn/registry.rs +++ b/crates/pod/src/spawn/registry.rs @@ -20,8 +20,9 @@ use std::sync::Arc; use std::time::Duration; use manifest::{Permission, ScopeRule, SharedScope}; -use pod_store::{PodMetadataStore, PodSpawnedChild, PodSpawnedScopeRule, PodStoreError}; -use session_store::PodScopeSnapshot; +use pod_store::{ + PodMetadataStore, PodReclaimedChild, PodSpawnedChild, PodSpawnedScopeRule, PodStoreError, +}; use tokio::net::UnixStream; use tokio::sync::Mutex; use tracing::warn; @@ -30,7 +31,7 @@ use crate::runtime::dir::{RuntimeDir, SpawnedPodRecord}; use crate::runtime::pod_registry; type RegistryStateWriter = Arc io::Result<()> + Send + Sync>; -type ScopeChangeSink = Arc; +type RegistryReclaimWriter = Arc io::Result<()> + Send + Sync>; const RESTORE_REACHABILITY_TIMEOUT: Duration = Duration::from_millis(500); @@ -39,9 +40,9 @@ pub struct SpawnedPodRegistry { cursors: Mutex>, runtime_dir: Arc, state_writer: Option, + reclaim_writer: Option, parent_name: Option, parent_scope: Option, - scope_change_sink: Option, } pub struct SpawnedPodRegistryLoad { @@ -56,9 +57,9 @@ impl SpawnedPodRegistry { cursors: Mutex::new(HashMap::new()), runtime_dir, state_writer: None, + reclaim_writer: None, parent_name: None, parent_scope: None, - scope_change_sink: None, }) } @@ -75,8 +76,7 @@ impl SpawnedPodRegistry { St: PodMetadataStore + Clone + Send + Sync + 'static, { let loaded = - Self::load_from_pod_state_with_reclaim(runtime_dir, store, pod_name, None, None) - .await?; + Self::load_from_pod_state_with_reclaim(runtime_dir, store, pod_name, None).await?; Ok(loaded.registry) } @@ -85,7 +85,6 @@ impl SpawnedPodRegistry { store: St, pod_name: String, parent_scope: Option, - scope_change_sink: Option, ) -> io::Result where St: PodMetadataStore + Clone + Send + Sync + 'static, @@ -97,13 +96,11 @@ impl SpawnedPodRegistry { .unwrap_or_default(); let mut records = Vec::with_capacity(persisted_children.len()); - let mut pruned = false; let mut pruned_records = Vec::new(); for child in &persisted_children { let record = match record_from_pod_state(child) { Ok(record) => record, Err(err) => { - pruned = true; warn!( error = %err, pod = %child.pod_name, @@ -115,7 +112,6 @@ impl SpawnedPodRegistry { if is_reachable(&record.socket_path).await { records.push(record); } else { - pruned = true; warn!( pod = %record.pod_name, socket = %record.socket_path.display(), @@ -126,20 +122,40 @@ impl SpawnedPodRegistry { } runtime_dir.write_spawned_pods(&records).await?; - let state_writer = pod_state_writer(store, pod_name.clone()); - // Runtime spawned-pod records are a live registry for ListPods and - // cursor/scope cleanup; durable Pod state remains the discovery source - // for later attach/restore, so do not delete unreachable children from - // Pod state just because their sockets are gone. - if metadata.is_none() || !pruned { + let state_writer = pod_state_writer(store.clone(), pod_name.clone()); + let reclaim_writer = pod_state_reclaim_writer(store.clone(), pod_name.clone()); + if metadata.is_none() { state_writer(&records)?; } let mut reclaimed_unreachable = false; + if !pruned_records.is_empty() { + let reclaimed = pruned_records + .iter() + .map(|record| PodReclaimedChild { + pod_name: record.pod_name.clone(), + scope_delegated: record + .scope_delegated + .iter() + .map(|rule| PodSpawnedScopeRule { + target: rule.target.clone(), + permission: match rule.permission { + Permission::Read => "read".to_string(), + Permission::Write => "write".to_string(), + }, + recursive: rule.recursive, + }) + .collect(), + }) + .collect(); + store + .reclaim_spawned_children(&pod_name, reclaimed) + .map_err(store_error_to_io)?; + reclaimed_unreachable = true; + } if parent_scope.is_some() { for record in &pruned_records { - reclaim_record(&pod_name, parent_scope.as_ref(), None, record)?; - reclaimed_unreachable = true; + reclaim_record(&pod_name, parent_scope.as_ref(), record)?; } } @@ -149,9 +165,9 @@ impl SpawnedPodRegistry { cursors: Mutex::new(HashMap::new()), runtime_dir, state_writer: Some(state_writer), + reclaim_writer: Some(reclaim_writer), parent_name: Some(pod_name), parent_scope, - scope_change_sink, }), reclaimed_unreachable, }) @@ -194,6 +210,9 @@ impl SpawnedPodRegistry { self.cursors.lock().await.remove(pod_name); if let Some(record) = &removed { self.reclaim_record(record)?; + if let Some(write_reclaim) = &self.reclaim_writer { + write_reclaim(record)?; + } } Ok(removed) } @@ -203,12 +222,7 @@ impl SpawnedPodRegistry { release_child_allocation(&record.pod_name)?; return Ok(()); }; - reclaim_record( - parent_name, - self.parent_scope.as_ref(), - self.scope_change_sink.as_ref(), - record, - ) + reclaim_record(parent_name, self.parent_scope.as_ref(), record) } /// Read-only cursor lookup. Returns 0 when no cursor has been set. @@ -246,10 +260,36 @@ where }) } +fn pod_state_reclaim_writer(store: St, pod_name: String) -> RegistryReclaimWriter +where + St: PodMetadataStore + Clone + Send + Sync + 'static, +{ + Arc::new(move |record| { + let reclaimed = PodReclaimedChild { + pod_name: record.pod_name.clone(), + scope_delegated: record + .scope_delegated + .iter() + .map(|rule| PodSpawnedScopeRule { + target: rule.target.clone(), + permission: match rule.permission { + Permission::Read => "read".to_string(), + Permission::Write => "write".to_string(), + }, + recursive: rule.recursive, + }) + .collect(), + }; + store + .reclaim_spawned_children(&pod_name, vec![reclaimed]) + .map(|_| ()) + .map_err(store_error_to_io) + }) +} + fn reclaim_record( parent_name: &str, parent_scope: Option<&SharedScope>, - scope_change_sink: Option<&ScopeChangeSink>, record: &SpawnedPodRecord, ) -> io::Result<()> { let write_rules = record @@ -275,13 +315,6 @@ fn reclaim_record( scope .update(|current| current.with_removed_deny_rules(write_rules)) .map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))?; - if let Some(sink) = scope_change_sink { - let snapshot = scope.snapshot(); - sink(PodScopeSnapshot { - allow: snapshot.allow_rules(), - deny: snapshot.deny_rules(), - }); - } } Ok(()) diff --git a/crates/pod/src/spawn/tool.rs b/crates/pod/src/spawn/tool.rs index 0c21c5d7..89c327e1 100644 --- a/crates/pod/src/spawn/tool.rs +++ b/crates/pod/src/spawn/tool.rs @@ -18,7 +18,6 @@ use manifest::{ SharedScope, WorkerManifestConfig, }; use serde::Deserialize; -use session_store::PodScopeSnapshot; use tokio::net::UnixStream; use tokio::process::Command; use tokio::time::sleep; @@ -128,9 +127,6 @@ pub struct SpawnPodTool { /// `effective_write` semantics: Write is the only permission /// tracked across Pods, so revocation only touches Write. spawner_scope: SharedScope, - /// Called after the spawner scope has been updated so the new - /// effective scope can be persisted to the session log. - scope_changed: Arc, } impl SpawnPodTool { @@ -143,7 +139,6 @@ impl SpawnPodTool { parent_socket: Option, spawner_model: ModelManifest, spawner_scope: SharedScope, - scope_changed: Arc, ) -> Self { Self { spawner_name, @@ -154,7 +149,6 @@ impl SpawnPodTool { parent_socket, spawner_model, spawner_scope, - scope_changed, } } } @@ -250,11 +244,6 @@ impl Tool for SpawnPodTool { self.spawner_scope .update(|cur| cur.with_added_deny_rules(revoke_write.clone())) .map_err(|e| ToolError::ExecutionFailed(format!("revoke spawner scope: {e}")))?; - let current = self.spawner_scope.snapshot(); - (self.scope_changed)(PodScopeSnapshot { - allow: current.allow_rules(), - deny: current.deny_rules(), - }); } let record = SpawnedPodRecord { @@ -496,7 +485,6 @@ pub fn spawn_pod_tool( parent_socket: Option, spawner_model: ModelManifest, spawner_scope: SharedScope, - scope_changed: Arc, ) -> ToolDefinition { Arc::new(move || { let schema = schemars::schema_for!(SpawnPodInput); @@ -513,7 +501,6 @@ pub fn spawn_pod_tool( parent_socket.clone(), spawner_model.clone(), spawner_scope.clone(), - scope_changed.clone(), )); (meta, tool) }) diff --git a/crates/pod/tests/pod_comm_tools_test.rs b/crates/pod/tests/pod_comm_tools_test.rs index 6ea601c1..83678396 100644 --- a/crates/pod/tests/pod_comm_tools_test.rs +++ b/crates/pod/tests/pod_comm_tools_test.rs @@ -442,7 +442,6 @@ async fn stop_pod_sends_shutdown_and_releases_scope() { store.clone(), "spawner".into(), Some(parent_scope.clone()), - None, ) .await .unwrap(); @@ -580,13 +579,15 @@ async fn restored_registry_uses_pod_state_without_runtime_file() { .unwrap() .expect("spawner metadata should remain"); assert!(metadata.spawned_children.is_empty()); + assert_eq!(metadata.reclaimed_children.len(), 1); + assert_eq!(metadata.reclaimed_children[0].pod_name, "child"); let runtime_contents = std::fs::read_to_string(rd.path().join("spawned_pods.json")).unwrap(); let runtime_records: Vec = serde_json::from_str(&runtime_contents).unwrap(); assert!(runtime_records.is_empty()); } #[tokio::test] -async fn load_from_pod_state_prunes_runtime_children_but_preserves_durable_state() { +async fn load_from_pod_state_prunes_runtime_children_and_reclaims_durable_delegation() { let runtime_tmp = TempDir::new().unwrap(); let store_tmp = TempDir::new().unwrap(); let store = CombinedStore::new( @@ -625,23 +626,14 @@ async fn load_from_pod_state_prunes_runtime_children_but_preserves_durable_state .read_by_name("spawner") .unwrap() .expect("spawner metadata should be written"); - assert_eq!(metadata.spawned_children.len(), 2); - assert!( - metadata - .spawned_children - .iter() - .any(|c| c.pod_name == "alive") - ); - assert!( - metadata - .spawned_children - .iter() - .any(|c| c.pod_name == "missing") - ); + assert_eq!(metadata.spawned_children.len(), 1); + assert_eq!(metadata.spawned_children[0].pod_name, "alive"); + assert_eq!(metadata.reclaimed_children.len(), 1); + assert_eq!(metadata.reclaimed_children[0].pod_name, "missing"); } #[tokio::test] -async fn load_from_pod_state_reclaims_pruned_child_scope_without_deleting_pod_state() { +async fn load_from_pod_state_reclaims_pruned_child_scope_and_records_history() { let _env = EnvGuard::acquire(); let runtime_tmp = TempDir::new().unwrap(); let store_tmp = TempDir::new().unwrap(); @@ -709,7 +701,6 @@ async fn load_from_pod_state_reclaims_pruned_child_scope_without_deleting_pod_st store.clone(), "spawner".into(), Some(parent_scope.clone()), - None, ) .await .unwrap(); @@ -729,8 +720,9 @@ async fn load_from_pod_state_reclaims_pruned_child_scope_without_deleting_pod_st .read_by_name("spawner") .unwrap() .expect("spawner metadata should remain"); - assert_eq!(metadata.spawned_children.len(), 1); - assert_eq!(metadata.spawned_children[0].pod_name, "missing"); + assert!(metadata.spawned_children.is_empty()); + assert_eq!(metadata.reclaimed_children.len(), 1); + assert_eq!(metadata.reclaimed_children[0].pod_name, "missing"); let runtime_contents = std::fs::read_to_string(rd.path().join("spawned_pods.json")).unwrap(); let runtime_records: Vec = serde_json::from_str(&runtime_contents).unwrap(); assert!(runtime_records.is_empty()); diff --git a/crates/pod/tests/restore_test.rs b/crates/pod/tests/restore_test.rs index 6f0d4ad7..42968967 100644 --- a/crates/pod/tests/restore_test.rs +++ b/crates/pod/tests/restore_test.rs @@ -199,39 +199,3 @@ async fn restore_from_manifest_rejects_empty_segment_log() { Ok(_) => panic!("expected empty segment log to fail"), } } - -#[tokio::test] -async fn restore_from_manifest_rejects_segment_without_scope_snapshot() { - let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner()); - - let store_tmp = tempfile::tempdir().unwrap(); - let store = CombinedStore::new( - FsStore::new(store_tmp.path()).unwrap(), - FsPodStore::new(store_tmp.path().join("pods")).unwrap(), - ); - let manifest = pod::PodManifest::from_toml(MINIMAL_MANIFEST_TOML).unwrap(); - - let sid = session_store::new_session_id(); - let segid = session_store::new_segment_id(); - let state = session_store::SegmentStartState { - system_prompt: None, - config: &Default::default(), - history: &[], - }; - session_store::create_segment_with_ids(&store, sid, segid, state).unwrap(); - - let result = Pod::restore_from_manifest( - sid, - segid, - manifest, - store, - pod::PromptLoader::builtins_only(), - ) - .await; - - match result { - Err(PodError::SegmentScopeMissing { segment_id }) => assert_eq!(segment_id, segid), - Err(other) => panic!("expected SegmentScopeMissing, got {other:?}"), - Ok(_) => panic!("expected missing scope snapshot to fail"), - } -} diff --git a/crates/pod/tests/spawn_pod_test.rs b/crates/pod/tests/spawn_pod_test.rs index d60cf71f..7e99e2a3 100644 --- a/crates/pod/tests/spawn_pod_test.rs +++ b/crates/pod/tests/spawn_pod_test.rs @@ -193,7 +193,6 @@ async fn spawn_pod_delegates_scope_and_sends_run() { None, dummy_model(), spawner_scope.clone(), - std::sync::Arc::new(|_| {}), ); let (_meta, tool) = def(); @@ -282,7 +281,6 @@ async fn spawn_pod_rejects_scope_outside_spawner() { None, dummy_model(), spawner_scope.clone(), - std::sync::Arc::new(|_| {}), ); let (_meta, tool) = def(); @@ -354,7 +352,6 @@ async fn spawn_pod_rolls_back_reservation_when_socket_never_appears() { None, dummy_model(), spawner_scope.clone(), - std::sync::Arc::new(|_| {}), ); let (_meta, tool) = def(); diff --git a/crates/session-store/src/lib.rs b/crates/session-store/src/lib.rs index dc71b4d0..de383648 100644 --- a/crates/session-store/src/lib.rs +++ b/crates/session-store/src/lib.rs @@ -47,13 +47,9 @@ pub use segment::{ SegmentStartState, append_entry, append_system_item, classify_history_item, create_compacted_segment, create_segment, create_segment_with_ids, ensure_head_or_fork, fork, fork_at, restore, restore_by_segment, save_config_changed, save_delta, save_extension, - save_pod_scope, save_run_completed, save_run_errored, save_turn_end, save_usage, - save_user_input, -}; -pub use segment_log::{ - LogEntry, POD_SCOPE_EXTENSION_DOMAIN, PodScopeSnapshot, RestoredState, SegmentOrigin, - collect_state, + save_run_completed, save_run_errored, save_turn_end, save_usage, save_user_input, }; +pub use segment_log::{LogEntry, RestoredState, SegmentOrigin, collect_state}; pub use store::{Store, StoreError}; pub use system_item::{SystemItem, SystemReminder, SystemReminderSource, render_pod_event}; diff --git a/crates/session-store/src/segment.rs b/crates/session-store/src/segment.rs index 0e3edb51..d776a5ac 100644 --- a/crates/session-store/src/segment.rs +++ b/crates/session-store/src/segment.rs @@ -5,7 +5,7 @@ //! functions after state-mutating operations. use crate::logged_item::{LoggedItem, to_logged}; -use crate::segment_log::{self, LogEntry, PodScopeSnapshot, SegmentOrigin}; +use crate::segment_log::{self, LogEntry, SegmentOrigin}; use crate::store::{Store, StoreError}; use crate::system_item::SystemItem; use crate::{SegmentId, SessionId}; @@ -385,23 +385,6 @@ pub fn save_extension( ) } -/// Log the Pod's latest runtime scope snapshot. -pub fn save_pod_scope( - store: &impl Store, - session_id: SessionId, - segment_id: SegmentId, - snapshot: &PodScopeSnapshot, -) -> Result<(), StoreError> { - let payload = serde_json::to_value(snapshot)?; - save_extension( - store, - session_id, - segment_id, - segment_log::POD_SCOPE_EXTENSION_DOMAIN, - payload, - ) -} - /// Log a `ConfigChanged` entry. pub fn save_config_changed( store: &impl Store, diff --git a/crates/session-store/src/segment_log.rs b/crates/session-store/src/segment_log.rs index 28696a92..ded51de8 100644 --- a/crates/session-store/src/segment_log.rs +++ b/crates/session-store/src/segment_log.rs @@ -11,7 +11,7 @@ use llm_worker::llm_client::types::{Item, RequestConfig}; use llm_worker::{UsageRecord, WorkerResult}; -use protocol::{InvokeKind, ScopeRule, Segment}; +use protocol::{InvokeKind, Segment}; use serde::{Deserialize, Serialize}; use crate::logged_item::LoggedItem; @@ -166,16 +166,6 @@ pub struct SegmentOrigin { pub at_turn_index: usize, } -/// Domain used by Pod to persist its latest effective runtime scope. -pub const POD_SCOPE_EXTENSION_DOMAIN: &str = "pod.scope"; - -/// Payload stored in `LogEntry::Extension { domain: "pod.scope", .. }`. -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub struct PodScopeSnapshot { - pub allow: Vec, - pub deny: Vec, -} - /// State collected from log entries. #[derive(Debug, Clone)] pub struct RestoredState { @@ -199,9 +189,6 @@ pub struct RestoredState { /// `LogEntry::Extension` を replay 順に積んだもの。`(domain, payload)`。 /// session-store は domain を不透明扱いし、各ドメインが自前で fold する。 pub extensions: Vec<(String, serde_json::Value)>, - /// Latest runtime scope snapshot persisted by the Pod. `None` means - /// the segment predates scope persistence or the payload was corrupt. - pub pod_scope: Option, /// User submissions in original typed form, in submit order. /// One entry per `LogEntry::UserInput`; the K-th entry corresponds to /// the K-th `Item::user_message` derived during replay (modulo @@ -223,7 +210,6 @@ pub fn collect_state(entries: &[LogEntry]) -> RestoredState { entries_count: 0, usage_history: Vec::new(), extensions: Vec::new(), - pod_scope: None, user_segments: Vec::new(), }; @@ -293,17 +279,6 @@ pub fn collect_state(entries: &[LogEntry]) -> RestoredState { LogEntry::Extension { domain, payload, .. } => { - if domain == POD_SCOPE_EXTENSION_DOMAIN { - match serde_json::from_value::(payload.clone()) { - Ok(snapshot) => state.pod_scope = Some(snapshot), - Err(err) => { - tracing::warn!( - error = %err, - "discarding malformed pod.scope snapshot from segment log" - ); - } - } - } state.extensions.push((domain.clone(), payload.clone())); } } diff --git a/crates/tui/src/spawn.rs b/crates/tui/src/spawn.rs index 3265660e..3988fdcb 100644 --- a/crates/tui/src/spawn.rs +++ b/crates/tui/src/spawn.rs @@ -17,7 +17,7 @@ use std::time::Duration; use client::{SpawnConfig, spawn_pod}; use crossterm::event::{self, Event as TermEvent, KeyCode, KeyEventKind, KeyModifiers}; -use manifest::{ProfileDiscovery, ScopeConfig}; +use manifest::ProfileDiscovery; use ratatui::Terminal; use ratatui::backend::CrosstermBackend; use ratatui::layout::{Constraint, Layout}; @@ -42,8 +42,6 @@ pub enum SpawnOutcome { #[derive(Debug)] pub enum SpawnError { Io(io::Error), - Store(session_store::StoreError), - MissingResumeScope { segment_id: SegmentId }, Spawn(client::SpawnError), } @@ -51,11 +49,6 @@ impl std::fmt::Display for SpawnError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::Io(e) => write!(f, "io error: {e}"), - Self::Store(e) => write!(f, "failed to read session log: {e}"), - Self::MissingResumeScope { segment_id } => write!( - f, - "session {segment_id} has no persisted scope snapshot; refusing resume without explicit scope" - ), Self::Spawn(e) => write!(f, "{e}"), } } @@ -69,12 +62,6 @@ impl From for SpawnError { } } -impl From for SpawnError { - fn from(e: session_store::StoreError) -> Self { - Self::Store(e) - } -} - impl From for SpawnError { fn from(e: client::SpawnError) -> Self { Self::Spawn(e) @@ -111,7 +98,6 @@ pub async fn run( editing: true, resume_from, resume_by_pod_name: false, - resume_scope: None, profile_choices, profile_index, }; @@ -149,10 +135,6 @@ pub async fn run( } } - if let Some(id) = form.resume_from { - form.resume_scope = Some(load_resume_scope(id).await?); - } - // Phase 2: launch pod and wait for ready line. Drop the cursor // out of the name field — subsequent frames are passive status // updates, not input — so the cursor doesn't end up parked there @@ -305,7 +287,6 @@ fn form_for_pod_name(pod_name: String, defaults: SpawnDefaults) -> Form { editing: false, resume_from: None, resume_by_pod_name: true, - resume_scope: None, profile_choices: Vec::new(), profile_index: 0, } @@ -383,7 +364,6 @@ async fn wait_for_ready( let config = SpawnConfig { pod_name: form.name.clone(), profile: form.selected_profile_selector(), - resume_scope: form.resume_scope.clone(), cwd: form.cwd.clone(), resume_from: form.resume_from, resume_by_pod_name: form.resume_by_pod_name, @@ -399,24 +379,6 @@ async fn wait_for_ready( }) } -async fn load_resume_scope(segment_id: SegmentId) -> Result { - let store_dir = manifest::paths::sessions_dir().ok_or_else(|| { - io::Error::new( - io::ErrorKind::NotFound, - "could not resolve sessions directory (set INSOMNIA_HOME, INSOMNIA_DATA_DIR, or HOME)", - ) - })?; - let store = session_store::FsStore::new(&store_dir)?; - let state = session_store::restore_by_segment(&store, segment_id)?; - let snapshot = state - .pod_scope - .ok_or(SpawnError::MissingResumeScope { segment_id })?; - Ok(ScopeConfig { - allow: snapshot.allow, - deny: snapshot.deny, - }) -} - #[derive(Debug, Clone, Copy, PartialEq, Eq)] enum MessageKind { Info, @@ -453,10 +415,6 @@ struct Form { /// When true, launch the child with `--pod ` so the pod process /// resolves name-keyed state before falling back to fresh creation. resume_by_pod_name: bool, - /// Scope snapshot recovered from the source session log. Set only for - /// resume runs and passed through a typed internal restore flag so resume - /// does not silently broaden access. - resume_scope: Option, /// Optional Nix profile choices passed to `insomnia-pod --profile` for /// fresh spawns. This is not used for resume/attach flows because those must /// restore Pod state rather than re-evaluate a profile source. @@ -616,17 +574,6 @@ fn context_line(form: &Form) -> Line<'_> { ]); } - if form.resume_scope.is_some() { - return Line::from(vec![ - Span::raw(" "), - Span::styled("scope: ", Style::default().fg(Color::DarkGray)), - Span::styled( - "from restored session snapshot", - Style::default().fg(Color::Green), - ), - ]); - } - match form.scope_origin { ScopeOrigin::FromProfile => Line::from(vec![ Span::raw(" "), @@ -670,7 +617,6 @@ mod tests { editing: true, resume_from: None, resume_by_pod_name: false, - resume_scope: None, profile_choices: Vec::new(), profile_index: 0, } @@ -691,7 +637,6 @@ mod tests { assert_eq!(f.name_cursor, "agent".chars().count()); assert_eq!(f.resume_from, None); assert!(f.resume_by_pod_name); - assert!(f.resume_scope.is_none()); assert!(!f.editing); assert_eq!( f.message, @@ -699,28 +644,6 @@ mod tests { ); } - #[test] - fn resume_scope_snapshot_stays_on_form_for_typed_restore_flag() { - let mut f = form("agent-r"); - f.resume_from = Some(session_store::new_segment_id()); - f.resume_scope = Some(ScopeConfig { - allow: vec![manifest::ScopeRule { - target: PathBuf::from("/work/example"), - permission: manifest::Permission::Write, - recursive: true, - }], - deny: vec![manifest::ScopeRule { - target: PathBuf::from("/work/example/child"), - permission: manifest::Permission::Write, - recursive: true, - }], - }); - - let scope = f.resume_scope.as_ref().unwrap(); - assert_eq!(scope.allow[0].target, PathBuf::from("/work/example")); - assert_eq!(scope.deny[0].target, PathBuf::from("/work/example/child")); - } - #[test] fn profile_choices_use_project_registry_default() { let temp = tempfile::tempdir().unwrap(); From d2e80871cec3bf48298bf9d69294db9f130a47c7 Mon Sep 17 00:00:00 2001 From: Hare Date: Sat, 30 May 2026 07:46:09 +0900 Subject: [PATCH 3/3] fix: reconcile missing delegated children --- crates/pod-registry/src/mutate.rs | 71 +++++++++++----- crates/pod/src/pod.rs | 106 +++++++++++++++++++++--- crates/pod/tests/pod_comm_tools_test.rs | 11 +-- 3 files changed, 142 insertions(+), 46 deletions(-) diff --git a/crates/pod-registry/src/mutate.rs b/crates/pod-registry/src/mutate.rs index 649eabaf..8ca49ce5 100644 --- a/crates/pod-registry/src/mutate.rs +++ b/crates/pod-registry/src/mutate.rs @@ -45,9 +45,9 @@ pub fn register_pod( /// and the registration proceeds. The check is structural (deny ⊇ /// competitor.rule), not relational — it does not verify that the /// competitor actually descends from this Pod's prior delegations. -/// In practice this is safe because the canonical caller is `restore`, -/// which derives `scope_deny` from the session's own snapshot, so any -/// covered competitor is guaranteed to be a descendant of the original +/// In practice this is safe because the canonical restore caller derives +/// `scope_deny` from outstanding `pod-store` child delegations, so any +/// covered competitor is expected to be a descendant of the original /// allocation. Direct callers must uphold the same invariant. pub fn register_pod_with_deny( guard: &mut LockFileGuard, @@ -180,10 +180,11 @@ pub fn release_pod(guard: &mut LockFileGuard, pod_name: &str) -> Result<(), Scop /// Reclaim a child delegation back into its parent allocation. /// -/// This is idempotent: missing child allocations and missing deny entries are -/// ignored. For each delegated Write rule, at most one exact matching deny rule -/// is removed from the parent's `scope_deny`, preserving any duplicate explicit -/// base deny that was not owned by this child delegation. +/// This is idempotent for missing deny entries. For each delegated Write rule, +/// at most one exact matching deny rule is removed from the parent's `scope_deny` +/// even when the child allocation is already absent; restore reconciliation uses +/// that case when durable Pod-state still records an outstanding delegation but +/// the live lock file no longer has a child allocation. pub fn reclaim_delegated_scope( guard: &mut LockFileGuard, parent: &str, @@ -199,17 +200,13 @@ pub fn reclaim_delegated_scope( .map(|idx| guard.data().allocations[idx].delegated_from.clone()) .unwrap_or(None); - let child_exists = child_idx.is_some(); - - if child_exists { - if let Some(parent_alloc) = guard.data_mut().find_mut(parent) { - for rule in delegated_scope - .iter() - .filter(|rule| rule.permission == Permission::Write) - { - if let Some(idx) = parent_alloc.scope_deny.iter().position(|deny| deny == rule) { - parent_alloc.scope_deny.remove(idx); - } + if let Some(parent_alloc) = guard.data_mut().find_mut(parent) { + for rule in delegated_scope + .iter() + .filter(|rule| rule.permission == Permission::Write) + { + if let Some(idx) = parent_alloc.scope_deny.iter().position(|deny| deny == rule) { + parent_alloc.scope_deny.remove(idx); } } } @@ -516,15 +513,43 @@ mod tests { assert_eq!(a.scope_deny, vec![delegated_rule.clone()]); assert!(g.data().find("b").is_none()); - reclaim_delegated_scope(&mut g, "a", "b", &[delegated_rule.clone()]).unwrap(); + reclaim_delegated_scope(&mut g, "a", "b", std::slice::from_ref(&delegated_rule)).unwrap(); let a = g.data().find("a").unwrap(); - assert_eq!( - a.scope_deny, - vec![delegated_rule], - "a repeated reclaim with no child allocation must not broaden an explicit duplicate base deny" + assert!( + a.scope_deny.is_empty(), + "a missing child allocation still reclaims one matching parent deny" ); } + #[test] + fn reclaim_delegated_scope_removes_parent_deny_when_child_allocation_missing() { + let dir = TempDir::new().unwrap(); + let path = dir.path().join("pods.json"); + let mut g = open_empty(&path); + let delegated_rule = write_rule("/src/core", true); + register_pod_with_deny( + &mut g, + "a".into(), + std::process::id(), + sock("a"), + vec![write_rule("/src", true)], + vec![delegated_rule.clone()], + sid(), + ) + .unwrap(); + + reclaim_delegated_scope( + &mut g, + "a", + "missing", + std::slice::from_ref(&delegated_rule), + ) + .unwrap(); + + let a = g.data().find("a").unwrap(); + assert!(a.scope_deny.is_empty()); + } + #[test] fn reclaim_stale_reparents_and_removes_dead_entries() { let dir = TempDir::new().unwrap(); diff --git a/crates/pod/src/pod.rs b/crates/pod/src/pod.rs index 5e79fc15..e4f039ac 100644 --- a/crates/pod/src/pod.rs +++ b/crates/pod/src/pod.rs @@ -1,6 +1,7 @@ use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; +use std::time::Duration; use arc_swap::ArcSwap; use llm_worker::Item; @@ -10,7 +11,8 @@ use llm_worker::llm_client::types::Role; use llm_worker::state::Mutable; use llm_worker::{ToolOutputLimits, UsageRecord, Worker, WorkerError, WorkerResult}; use pod_store::{ - PodActiveSegmentRef, PodMetadata, PodMetadataStore, PodSpawnedScopeRule, PodStoreError, + PodActiveSegmentRef, PodMetadata, PodMetadataStore, PodReclaimedChild, PodSpawnedChild, + PodSpawnedScopeRule, PodStoreError, }; use session_store::{ LogEntry, SegmentId, SessionId, Store, StoreError, SystemItem, segment_log, to_logged, @@ -45,9 +47,12 @@ use llm_worker::interceptor::PreRequestAction; use protocol::{ AlertLevel, AlertSource, Event, RewindSummary, RewindTarget, RewindTargetId, Segment, }; +use tokio::net::UnixStream; use tokio::sync::broadcast; use tokio::task::JoinHandle; +const RESTORE_RECONCILIATION_REACHABILITY_TIMEOUT: Duration = Duration::from_millis(500); + /// `(SessionId, SegmentId)` pair the Pod is currently writing to. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct SegmentLocation { @@ -4048,10 +4053,61 @@ where session_id, segment_id, })?; + pod.reconcile_restored_delegations().await?; drain_skill_shadows(&pod, skill_shadows); Ok(pod) } + async fn reconcile_restored_delegations(&mut self) -> Result<(), PodError> { + let pod_name = self.manifest.pod.name.clone(); + let Some(metadata) = self.store.read_by_name(&pod_name)? else { + return Ok(()); + }; + + let mut reclaimed = Vec::new(); + for child in metadata.spawned_children { + if restored_child_reachable(&child).await { + continue; + } + let delegated_scope = spawned_child_scope_rules(&child); + if !delegated_scope.is_empty() { + let lock_path = + pod_registry::default_registry_path().map_err(ScopeLockError::from)?; + let mut guard = + pod_registry::LockFileGuard::open(&lock_path).map_err(ScopeLockError::from)?; + pod_registry::reclaim_delegated_scope( + &mut guard, + &pod_name, + &child.pod_name, + &delegated_scope, + )?; + let write_rules = delegated_scope + .iter() + .filter(|rule| rule.permission == Permission::Write) + .cloned() + .collect::>(); + self.scope + .update(|current| current.with_removed_deny_rules(write_rules)) + .map_err(PodError::Scope)?; + } + reclaimed.push(PodReclaimedChild { + pod_name: child.pod_name, + scope_delegated: child.scope_delegated, + }); + } + + if reclaimed.is_empty() { + return Ok(()); + } + + self.store.reclaim_spawned_children(&pod_name, reclaimed)?; + self.push_notify( + "Restored Pod state contained missing or unreachable delegated child Pods; their delegated write scopes were reclaimed before resume." + .to_string(), + ); + Ok(()) + } + /// Convenience: build a Pod from a single-layer TOML manifest string. /// /// Parses the TOML into a [`PodManifestConfig`], converts to a @@ -4594,6 +4650,40 @@ struct PodCommon { skill_shadows: Vec, } +async fn restored_child_reachable(child: &PodSpawnedChild) -> bool { + tokio::time::timeout( + RESTORE_RECONCILIATION_REACHABILITY_TIMEOUT, + UnixStream::connect(&child.socket_path), + ) + .await + .map(|result| result.is_ok()) + .unwrap_or(false) +} + +fn spawned_child_scope_rules(child: &PodSpawnedChild) -> Vec { + child + .scope_delegated + .iter() + .filter_map(|rule| delegated_scope_rule_to_scope_rule(rule.clone())) + .collect() +} + +fn delegated_scope_rule_to_scope_rule(rule: PodSpawnedScopeRule) -> Option { + let permission = match rule.permission.as_str() { + "read" => Permission::Read, + "write" => Permission::Write, + other => { + warn!(permission = %other, "ignoring invalid delegated child scope permission"); + return None; + } + }; + Some(ScopeRule { + target: rule.target, + permission, + recursive: rule.recursive, + }) +} + fn effective_restore_scope_config( store: &St, manifest: &PodManifest, @@ -4616,18 +4706,8 @@ where } fn delegated_write_rule_to_deny(rule: PodSpawnedScopeRule) -> Option { - match rule.permission.as_str() { - "write" => Some(ScopeRule { - target: rule.target, - permission: Permission::Write, - recursive: rule.recursive, - }), - "read" => None, - other => { - warn!(permission = %other, "ignoring invalid delegated child scope permission"); - None - } - } + let rule = delegated_scope_rule_to_scope_rule(rule)?; + (rule.permission == Permission::Write).then_some(rule) } /// Resolve pwd / scope / LLM client / prompt catalog from a validated diff --git a/crates/pod/tests/pod_comm_tools_test.rs b/crates/pod/tests/pod_comm_tools_test.rs index 83678396..95984124 100644 --- a/crates/pod/tests/pod_comm_tools_test.rs +++ b/crates/pod/tests/pod_comm_tools_test.rs @@ -633,7 +633,7 @@ async fn load_from_pod_state_prunes_runtime_children_and_reclaims_durable_delega } #[tokio::test] -async fn load_from_pod_state_reclaims_pruned_child_scope_and_records_history() { +async fn load_from_pod_state_reclaims_missing_child_scope_and_records_history() { let _env = EnvGuard::acquire(); let runtime_tmp = TempDir::new().unwrap(); let store_tmp = TempDir::new().unwrap(); @@ -667,15 +667,6 @@ async fn load_from_pod_state_reclaims_pruned_child_scope_and_records_history() { session_store::new_segment_id(), ) .unwrap(); - pod_registry::register_pod( - &mut g, - "missing".into(), - std::process::id(), - "/tmp/missing.sock".into(), - vec![missing_rule.clone()], - session_store::new_segment_id(), - ) - .unwrap(); } let parent_scope = SharedScope::new(