From 211738132ca330e320a6b18921c28122aa8deaf7 Mon Sep 17 00:00:00 2001 From: Hare Date: Sat, 30 May 2026 07:16:50 +0900 Subject: [PATCH] refactor: split pod metadata store --- Cargo.lock | 13 + Cargo.toml | 2 + crates/pod-store/Cargo.toml | 15 + crates/pod-store/src/lib.rs | 476 ++++++++++++++++++ crates/pod/Cargo.toml | 1 + crates/pod/examples/pod_cli.rs | 6 +- crates/pod/examples/pod_protocol.rs | 6 +- crates/pod/src/controller.rs | 3 +- crates/pod/src/discovery.rs | 17 +- crates/pod/src/main.rs | 22 +- crates/pod/src/pod.rs | 36 +- crates/pod/src/spawn/registry.rs | 18 +- crates/pod/tests/compact_events_test.rs | 14 +- crates/pod/tests/consolidation_test.rs | 12 +- crates/pod/tests/controller_test.rs | 16 +- crates/pod/tests/pod_comm_tools_test.rs | 23 +- crates/pod/tests/restore_test.rs | 33 +- crates/pod/tests/session_metrics_test.rs | 15 +- .../pod/tests/system_prompt_template_test.rs | 10 +- crates/session-store/src/fs_store.rs | 79 --- crates/session-store/src/lib.rs | 4 - crates/session-store/src/pod_metadata.rs | 150 ------ crates/session-store/src/store.rs | 3 - crates/session-store/tests/fs_store_test.rs | 40 +- crates/tui/Cargo.toml | 1 + crates/tui/src/multi_pod.rs | 18 +- crates/tui/src/picker.rs | 18 +- crates/tui/src/pod_list.rs | 51 +- 28 files changed, 726 insertions(+), 376 deletions(-) create mode 100644 crates/pod-store/Cargo.toml create mode 100644 crates/pod-store/src/lib.rs delete mode 100644 crates/session-store/src/pod_metadata.rs diff --git a/Cargo.lock b/Cargo.lock index 6d339000..23e533d9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2166,6 +2166,7 @@ dependencies = [ "memory", "minijinja", "pod-registry", + "pod-store", "protocol", "provider", "schemars", @@ -2197,6 +2198,17 @@ dependencies = [ "thiserror 2.0.18", ] +[[package]] +name = "pod-store" +version = "0.1.0" +dependencies = [ + "serde", + "serde_json", + "session-store", + "tempfile", + "thiserror 2.0.18", +] + [[package]] name = "portable-atomic" version = "1.13.1" @@ -3652,6 +3664,7 @@ dependencies = [ "llm-worker", "manifest", "pod-registry", + "pod-store", "protocol", "pulldown-cmark", "ratatui", diff --git a/Cargo.toml b/Cargo.toml index 0294b309..9179d56b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ members = [ "crates/session-store", "crates/manifest", "crates/pod", + "crates/pod-store", "crates/protocol", "crates/provider", "crates/pod-registry", @@ -32,6 +33,7 @@ manifest = { path = "crates/manifest" } lint-common = { path = "crates/lint-common" } memory = { path = "crates/memory" } pod-registry = { path = "crates/pod-registry" } +pod-store = { path = "crates/pod-store" } protocol = { path = "crates/protocol" } provider = { path = "crates/provider" } session-metrics = { path = "crates/session-metrics" } diff --git a/crates/pod-store/Cargo.toml b/crates/pod-store/Cargo.toml new file mode 100644 index 00000000..cf6ea475 --- /dev/null +++ b/crates/pod-store/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "pod-store" +description = "Durable Pod-name metadata/state persistence" +version = "0.1.0" +edition.workspace = true +license.workspace = true + +[dependencies] +session-store = { workspace = true } +serde = { workspace = true, features = ["derive"] } +serde_json = { workspace = true } +thiserror = { workspace = true } + +[dev-dependencies] +tempfile = { workspace = true } diff --git a/crates/pod-store/src/lib.rs b/crates/pod-store/src/lib.rs new file mode 100644 index 00000000..c39ea5c4 --- /dev/null +++ b/crates/pod-store/src/lib.rs @@ -0,0 +1,476 @@ +//! Durable Pod-name metadata/state persistence. +//! +//! This crate owns the name-keyed Pod state surface under a Pod-state root, +//! e.g. `{data_dir}/pods/{pod_name}/metadata.json`. Session JSONL replay stays +//! in `session-store`; Pod metadata may point at a `(SessionId, SegmentId)` but +//! does not own or replay session logs. +//! +//! `resolved_manifest_snapshot` is authority only for Pod-name restore before +//! loading the session log. Existing segment replay still uses `SegmentStart` +//! entries from `session-store`. `spawned_children` is durable current parent +//! Pod state for child registry/reclaim; child lifecycle messages shown to the +//! model remain session JSONL history. Socket and callback paths are last-known +//! runtime hints, not proof of liveness. + +use serde::{Deserialize, Serialize}; +use session_store::{SegmentId, SessionId}; +use std::fs; +use std::path::PathBuf; + +/// Errors from Pod metadata persistence. +#[derive(Debug, thiserror::Error)] +pub enum PodStoreError { + #[error("I/O error: {0}")] + Io(#[from] std::io::Error), + + #[error("serialization error: {0}")] + Serde(#[from] serde_json::Error), + + #[error("invalid pod name: {0}")] + InvalidPodName(String), +} + +/// Active Session/Segment pointer for a Pod. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct PodActiveSegmentRef { + pub session_id: SessionId, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub segment_id: Option, +} + +impl PodActiveSegmentRef { + /// Create a reference whose active Segment is not known yet. + pub fn pending_segment(session_id: SessionId) -> Self { + Self { + session_id, + segment_id: None, + } + } + + /// Create a fully resolved active Session/Segment reference. + pub fn active_segment(session_id: SessionId, segment_id: SegmentId) -> Self { + Self { + session_id, + segment_id: Some(segment_id), + } + } +} + +/// One delegated scope rule for a spawned child, kept local to avoid depending +/// on manifest scope types in durable Pod state. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct PodSpawnedScopeRule { + pub target: PathBuf, + pub permission: String, + pub recursive: bool, +} + +/// One child Pod spawned by this Pod and persisted with the spawner's +/// name-keyed Pod state. Runtime paths are last-known hints only. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct PodSpawnedChild { + pub pod_name: String, + pub socket_path: PathBuf, + pub scope_delegated: Vec, + pub callback_address: PathBuf, +} + +/// Persistent metadata for a Pod name. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct PodMetadata { + pub pod_name: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub active: Option, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub spawned_children: Vec, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub resolved_manifest_snapshot: Option, +} + +impl PodMetadata { + /// Create Pod metadata for `pod_name`. + pub fn new(pod_name: impl Into, active: Option) -> Self { + Self { + pod_name: pod_name.into(), + active, + spawned_children: Vec::new(), + resolved_manifest_snapshot: None, + } + } +} + +/// Sync persistence backend for Pod metadata. +pub trait PodMetadataStore: Send + Sync { + /// Create or replace metadata for its `pod_name` key. + fn write(&self, metadata: &PodMetadata) -> Result<(), PodStoreError>; + + /// Read metadata by Pod name. Returns `None` when no metadata exists. + fn read_by_name(&self, pod_name: &str) -> Result, PodStoreError>; + + /// List persisted Pod metadata keys. + fn list_names(&self) -> Result, PodStoreError>; + + /// Return the metadata root directory when this backend is path-backed. + fn root_dir(&self) -> Option { + None + } + + /// Delete metadata by Pod name. Missing metadata is a successful no-op. + fn delete_by_name(&self, pod_name: &str) -> Result<(), PodStoreError>; + + /// Merge an update into one Pod's metadata, preserving unrelated fields. + fn update_by_name(&self, pod_name: &str, update: F) -> Result + where + F: FnOnce(&mut PodMetadata), + { + let mut metadata = self + .read_by_name(pod_name)? + .unwrap_or_else(|| PodMetadata::new(pod_name, None)); + update(&mut metadata); + metadata.pod_name = pod_name.to_string(); + self.write(&metadata)?; + Ok(metadata) + } + + /// Set the active pointer while preserving spawned children and manifest snapshot. + fn set_active( + &self, + pod_name: &str, + active: Option, + resolved_manifest_snapshot: Option, + ) -> Result { + self.update_by_name(pod_name, |metadata| { + metadata.active = active; + metadata.resolved_manifest_snapshot = resolved_manifest_snapshot; + }) + } + + /// Set spawned-child registry state while preserving active pointer and manifest snapshot. + fn set_spawned_children( + &self, + pod_name: &str, + children: Vec, + ) -> Result { + self.update_by_name(pod_name, |metadata| { + metadata.spawned_children = children; + }) + } +} + +/// Filesystem-backed Pod metadata store. +#[derive(Clone)] +pub struct FsPodStore { + root: PathBuf, +} + +impl FsPodStore { + /// Create a store rooted at the Pod-state directory, usually `{data_dir}/pods`. + pub fn new(root: impl Into) -> Result { + let root = root.into(); + fs::create_dir_all(&root)?; + Ok(Self { root }) + } + + fn pod_dir(&self, pod_name: &str) -> Result { + validate_pod_name(pod_name)?; + Ok(self.root.join(pod_name)) + } + + fn metadata_path(&self, pod_name: &str) -> Result { + Ok(self.pod_dir(pod_name)?.join("metadata.json")) + } +} + +impl PodMetadataStore for FsPodStore { + fn write(&self, metadata: &PodMetadata) -> Result<(), PodStoreError> { + let path = self.metadata_path(&metadata.pod_name)?; + if let Some(parent) = path.parent() { + fs::create_dir_all(parent)?; + } + let content = serde_json::to_vec_pretty(metadata)?; + fs::write(path, content)?; + Ok(()) + } + + fn read_by_name(&self, pod_name: &str) -> Result, PodStoreError> { + let path = self.metadata_path(pod_name)?; + let content = match fs::read_to_string(path) { + Ok(content) => content, + Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(None), + Err(err) => return Err(PodStoreError::Io(err)), + }; + Ok(Some(serde_json::from_str(&content)?)) + } + + fn list_names(&self) -> Result, PodStoreError> { + let mut names = Vec::new(); + if !self.root.exists() { + return Ok(names); + } + for entry in fs::read_dir(&self.root)? { + let entry = entry?; + if !entry.file_type()?.is_dir() { + continue; + } + if !entry.path().join("metadata.json").exists() { + continue; + } + let Some(name) = entry.file_name().to_str().map(ToOwned::to_owned) else { + continue; + }; + if validate_pod_name(&name).is_ok() { + names.push(name); + } + } + names.sort(); + Ok(names) + } + + fn root_dir(&self) -> Option { + Some(self.root.clone()) + } + + fn delete_by_name(&self, pod_name: &str) -> Result<(), PodStoreError> { + let path = self.metadata_path(pod_name)?; + match fs::remove_file(&path) { + Ok(()) => {} + Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(()), + Err(err) => return Err(PodStoreError::Io(err)), + } + if let Some(parent) = path.parent() { + let _ = fs::remove_dir(parent); + } + Ok(()) + } +} + +pub fn validate_pod_name(pod_name: &str) -> Result<(), PodStoreError> { + if pod_name.is_empty() + || pod_name == "." + || pod_name == ".." + || pod_name.contains('/') + || pod_name.contains('\0') + { + return Err(PodStoreError::InvalidPodName(pod_name.to_string())); + } + Ok(()) +} + +/// Convenience composition for callers that want one handle carrying separate +/// session-log and Pod-state roots. +#[derive(Clone)] +pub struct CombinedStore { + pub session_store: S, + pub pod_store: P, +} + +impl CombinedStore { + pub fn new(session_store: S, pod_store: P) -> Self { + Self { + session_store, + pod_store, + } + } +} + +impl session_store::Store for CombinedStore +where + S: session_store::Store, + P: Send + Sync, +{ + fn append( + &self, + session_id: SessionId, + segment_id: SegmentId, + entry: &session_store::LogEntry, + ) -> Result<(), session_store::StoreError> { + self.session_store.append(session_id, segment_id, entry) + } + fn read_all( + &self, + session_id: SessionId, + segment_id: SegmentId, + ) -> Result, session_store::StoreError> { + self.session_store.read_all(session_id, segment_id) + } + fn list_sessions(&self) -> Result, session_store::StoreError> { + self.session_store.list_sessions() + } + fn list_segments( + &self, + session_id: SessionId, + ) -> Result, session_store::StoreError> { + self.session_store.list_segments(session_id) + } + fn lookup_session_of( + &self, + segment_id: SegmentId, + ) -> Result, session_store::StoreError> { + self.session_store.lookup_session_of(segment_id) + } + fn create_segment( + &self, + session_id: SessionId, + segment_id: SegmentId, + entries: &[session_store::LogEntry], + ) -> Result<(), session_store::StoreError> { + self.session_store + .create_segment(session_id, segment_id, entries) + } + fn exists( + &self, + session_id: SessionId, + segment_id: SegmentId, + ) -> Result { + self.session_store.exists(session_id, segment_id) + } + fn truncate( + &self, + session_id: SessionId, + segment_id: SegmentId, + entries_len: usize, + ) -> Result<(), session_store::StoreError> { + self.session_store + .truncate(session_id, segment_id, entries_len) + } + fn read_entry_count( + &self, + session_id: SessionId, + segment_id: SegmentId, + ) -> Result { + self.session_store.read_entry_count(session_id, segment_id) + } + fn append_trace( + &self, + session_id: SessionId, + segment_id: SegmentId, + entry: &session_store::TraceEntry, + ) -> Result<(), session_store::StoreError> { + self.session_store + .append_trace(session_id, segment_id, entry) + } +} + +impl PodMetadataStore for CombinedStore +where + S: Send + Sync, + P: PodMetadataStore, +{ + fn write(&self, metadata: &PodMetadata) -> Result<(), PodStoreError> { + self.pod_store.write(metadata) + } + fn read_by_name(&self, pod_name: &str) -> Result, PodStoreError> { + self.pod_store.read_by_name(pod_name) + } + fn list_names(&self) -> Result, PodStoreError> { + self.pod_store.list_names() + } + fn root_dir(&self) -> Option { + self.pod_store.root_dir() + } + fn delete_by_name(&self, pod_name: &str) -> Result<(), PodStoreError> { + self.pod_store.delete_by_name(pod_name) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn pod_metadata_manifest_snapshot_roundtrips() { + let mut metadata = PodMetadata::new( + "profile-pod", + Some(PodActiveSegmentRef::pending_segment( + session_store::new_session_id(), + )), + ); + metadata.resolved_manifest_snapshot = Some(serde_json::json!({ + "pod": { "name": "profile-pod" }, + "profile": { "source": { "kind": "path", "path": "/profiles/coder.nix" } } + })); + + let json = serde_json::to_string(&metadata).unwrap(); + let restored: PodMetadata = serde_json::from_str(&json).unwrap(); + + assert_eq!(restored, metadata); + } + + #[test] + fn fs_store_writes_under_pod_state_root_only() { + let tmp = tempfile::TempDir::new().unwrap(); + let session_root = tmp.path().join("sessions"); + let pod_root = tmp.path().join("pods"); + fs::create_dir_all(&session_root).unwrap(); + let store = FsPodStore::new(&pod_root).unwrap(); + store + .write(&PodMetadata::new( + "agent", + Some(PodActiveSegmentRef::pending_segment( + session_store::new_session_id(), + )), + )) + .unwrap(); + + assert!(pod_root.join("agent/metadata.json").exists()); + assert!(!session_root.join("pods/agent/metadata.json").exists()); + } + + #[test] + fn active_updates_preserve_children_and_manifest_snapshot() { + let tmp = tempfile::TempDir::new().unwrap(); + let store = FsPodStore::new(tmp.path()).unwrap(); + let mut metadata = PodMetadata::new("agent", None); + metadata.spawned_children.push(PodSpawnedChild { + pod_name: "child".into(), + socket_path: std::path::Path::new("/tmp/child.sock").into(), + scope_delegated: vec![], + callback_address: std::path::Path::new("/tmp/parent.sock").into(), + }); + metadata.resolved_manifest_snapshot = Some(serde_json::json!({"pod":{"name":"agent"}})); + store.write(&metadata).unwrap(); + + let snapshot = serde_json::json!({"pod":{"name":"updated"}}); + store + .set_active( + "agent", + Some(PodActiveSegmentRef::active_segment( + session_store::new_session_id(), + session_store::new_segment_id(), + )), + Some(snapshot.clone()), + ) + .unwrap(); + let restored = store.read_by_name("agent").unwrap().unwrap(); + assert_eq!(restored.spawned_children.len(), 1); + assert_eq!(restored.resolved_manifest_snapshot, Some(snapshot)); + } + + #[test] + fn child_updates_preserve_active_and_manifest_snapshot() { + let tmp = tempfile::TempDir::new().unwrap(); + let store = FsPodStore::new(tmp.path()).unwrap(); + let active = PodActiveSegmentRef::active_segment( + session_store::new_session_id(), + session_store::new_segment_id(), + ); + let snapshot = serde_json::json!({"pod":{"name":"agent"}}); + store + .set_active("agent", Some(active.clone()), Some(snapshot.clone())) + .unwrap(); + store + .set_spawned_children( + "agent", + vec![PodSpawnedChild { + pod_name: "child".into(), + socket_path: std::path::Path::new("/tmp/child.sock").into(), + scope_delegated: vec![], + callback_address: std::path::Path::new("/tmp/parent.sock").into(), + }], + ) + .unwrap(); + let restored = store.read_by_name("agent").unwrap().unwrap(); + assert_eq!(restored.active, Some(active)); + assert_eq!(restored.resolved_manifest_snapshot, Some(snapshot)); + } +} diff --git a/crates/pod/Cargo.toml b/crates/pod/Cargo.toml index 1ac1fa50..875c7077 100644 --- a/crates/pod/Cargo.toml +++ b/crates/pod/Cargo.toml @@ -13,6 +13,7 @@ async-trait = { workspace = true } clap = { version = "4.6.0", features = ["derive"] } llm-worker = { workspace = true } session-store = { workspace = true } +pod-store = { workspace = true } manifest = { workspace = true } protocol = { workspace = true } provider = { workspace = true } diff --git a/crates/pod/examples/pod_cli.rs b/crates/pod/examples/pod_cli.rs index c5768170..5cdcf355 100644 --- a/crates/pod/examples/pod_cli.rs +++ b/crates/pod/examples/pod_cli.rs @@ -12,6 +12,7 @@ //! ``` use pod::{Pod, PodManifest, PodRunResult}; +use pod_store::{CombinedStore, FsPodStore}; use session_store::FsStore; fn manifest_toml(pwd: &std::path::Path) -> String { @@ -48,7 +49,10 @@ async fn main() -> Result<(), Box> { // 2. Create a persistent store (temp dir for demo) let tmp = tempfile::tempdir()?; - let store = FsStore::new(tmp.path())?; + let store = CombinedStore::new( + FsStore::new(tmp.path().join("sessions"))?, + FsPodStore::new(tmp.path().join("pods"))?, + ); // 3. Build the Pod from the single-layer manifest TOML let mut pod = Pod::from_manifest_toml(&toml, store).await?; diff --git a/crates/pod/examples/pod_protocol.rs b/crates/pod/examples/pod_protocol.rs index 7d6d6347..11811794 100644 --- a/crates/pod/examples/pod_protocol.rs +++ b/crates/pod/examples/pod_protocol.rs @@ -6,6 +6,7 @@ //! ``` use pod::{Event, Method, PodController}; +use pod_store::{CombinedStore, FsPodStore}; use session_store::FsStore; fn manifest_toml(pwd: &std::path::Path) -> String { @@ -39,7 +40,10 @@ async fn main() -> Result<(), Box> { let pwd = std::env::current_dir()?; let toml = manifest_toml(&pwd); let tmp = tempfile::tempdir()?; - let store = FsStore::new(tmp.path())?; + let store = CombinedStore::new( + FsStore::new(tmp.path().join("sessions"))?, + FsPodStore::new(tmp.path().join("pods"))?, + ); let pod = pod::Pod::from_manifest_toml(&toml, store).await?; let runtime_tmp = tempfile::tempdir()?; diff --git a/crates/pod/src/controller.rs b/crates/pod/src/controller.rs index 9449e265..51cf9a98 100644 --- a/crates/pod/src/controller.rs +++ b/crates/pod/src/controller.rs @@ -4,7 +4,8 @@ use std::sync::atomic::Ordering; use llm_worker::WorkerError; use llm_worker::llm_client::client::LlmClient; -use session_store::{PodMetadataStore, Store}; +use pod_store::PodMetadataStore; +use session_store::Store; use tokio::sync::{broadcast, mpsc, oneshot}; use crate::discovery::{ diff --git a/crates/pod/src/discovery.rs b/crates/pod/src/discovery.rs index b5581bac..bfbd79af 100644 --- a/crates/pod/src/discovery.rs +++ b/crates/pod/src/discovery.rs @@ -15,11 +15,12 @@ use std::time::Duration; use async_trait::async_trait; use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput}; +use pod_store::{PodActiveSegmentRef, PodMetadata, PodMetadataStore}; use protocol::stream::JsonLineReader; use protocol::{Event, PodStatus}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -use session_store::{PodActiveSegmentRef, PodMetadata, PodMetadataStore, SegmentId, SessionId}; +use session_store::{SegmentId, SessionId}; use tokio::net::UnixStream; use tokio::process::Command; @@ -496,8 +497,10 @@ pub enum PodDiscoveryError { socket_path: PathBuf, pid: u32, }, - #[error("store error: {0}")] + #[error("session store error: {0}")] Store(#[from] session_store::StoreError), + #[error("pod store error: {0}")] + PodStore(#[from] pod_store::PodStoreError), #[error("scope lock error: {0}")] ScopeLock(#[from] pod_registry::ScopeLockError), #[error("failed to launch restore process: {0}")] @@ -527,7 +530,7 @@ impl VisibilitySet { } async fn summarize_spawned_children( - children: &[session_store::PodSpawnedChild], + children: &[pod_store::PodSpawnedChild], ) -> SpawnedChildrenSummary { let mut summary = SpawnedChildrenSummary { count: children.len(), @@ -752,6 +755,7 @@ fn discovery_error_to_tool_error(error: PodDiscoveryError) -> ToolError { | PodDiscoveryError::NotRestorable { .. } => ToolError::InvalidArgument(error.to_string()), PodDiscoveryError::LockConflict { .. } | PodDiscoveryError::Store(_) + | PodDiscoveryError::PodStore(_) | PodDiscoveryError::ScopeLock(_) | PodDiscoveryError::RestoreSpawn(_) | PodDiscoveryError::RestoreExited { .. } @@ -765,11 +769,10 @@ mod tests { use std::sync::Mutex; use manifest::{Permission, ScopeRule}; + use pod_store::{FsPodStore, PodSpawnedChild, PodSpawnedScopeRule}; use protocol::stream::JsonLineWriter; use protocol::{Alert, AlertLevel, AlertSource, Greeting}; - use session_store::{ - FsStore, PodSpawnedChild, PodSpawnedScopeRule, new_segment_id, new_session_id, - }; + use session_store::{new_segment_id, new_session_id}; use tempfile::TempDir; use tokio::net::UnixListener; @@ -788,7 +791,7 @@ mod tests { std::env::set_var("INSOMNIA_RUNTIME_DIR", &runtime_base); } - let store = FsStore::new(&store_dir).unwrap(); + let store = FsPodStore::new(&store_dir).unwrap(); let session_id = new_session_id(); let active_child_segment = new_segment_id(); let pending_session_id = new_session_id(); diff --git a/crates/pod/src/main.rs b/crates/pod/src/main.rs index f2c38db7..caf58305 100644 --- a/crates/pod/src/main.rs +++ b/crates/pod/src/main.rs @@ -6,7 +6,8 @@ use manifest::{ NixProfileResolver, PodManifest, PodManifestConfig, ProfileSelector, ScopeConfig, paths, }; use pod::{Pod, PodController, PromptLoader}; -use session_store::{FsStore, PodMetadataStore, SegmentId, Store}; +use pod_store::{CombinedStore, FsPodStore, PodMetadataStore}; +use session_store::{FsStore, SegmentId, Store}; #[derive(Debug, Parser)] #[command( @@ -229,13 +230,28 @@ async fn main() -> ExitCode { } }, }; - let store = match FsStore::new(&store_dir) { + let session_store = match FsStore::new(&store_dir) { Ok(s) => s, Err(e) => { - eprintln!("error: failed to initialize store at {store_dir:?}: {e}"); + eprintln!("error: failed to initialize session store at {store_dir:?}: {e}"); return ExitCode::FAILURE; } }; + let pod_store_dir = match paths::data_dir() { + Some(data_dir) => data_dir.join("pods"), + None => store_dir + .parent() + .map(|parent| parent.join("pods")) + .unwrap_or_else(|| PathBuf::from("pods")), + }; + let pod_store = match FsPodStore::new(&pod_store_dir) { + Ok(s) => s, + Err(e) => { + eprintln!("error: failed to initialize pod store at {pod_store_dir:?}: {e}"); + return ExitCode::FAILURE; + } + }; + let store = CombinedStore::new(session_store, pod_store); let pod = if cli.adopt { let callback = match cli.callback.clone() { diff --git a/crates/pod/src/pod.rs b/crates/pod/src/pod.rs index 7402cc12..91768b82 100644 --- a/crates/pod/src/pod.rs +++ b/crates/pod/src/pod.rs @@ -9,9 +9,10 @@ use llm_worker::llm_client::client::LlmClient; use llm_worker::llm_client::types::Role; use llm_worker::state::Mutable; use llm_worker::{ToolOutputLimits, UsageRecord, Worker, WorkerError, WorkerResult}; +use pod_store::{PodActiveSegmentRef, PodMetadata, PodMetadataStore, PodStoreError}; use session_store::{ - LogEntry, PodActiveSegmentRef, PodMetadata, PodMetadataStore, PodScopeSnapshot, SegmentId, - SessionId, Store, StoreError, SystemItem, segment_log, to_logged, + LogEntry, PodScopeSnapshot, SegmentId, SessionId, Store, StoreError, SystemItem, segment_log, + to_logged, }; use tracing::{info, warn}; @@ -53,18 +54,21 @@ pub struct SegmentLocation { pub segment_id: SegmentId, } -type PodMetadataWriter = Arc Result<(), StoreError> + Send + Sync>; +type PodMetadataWriter = Arc Result<(), PodStoreError> + Send + Sync>; fn pod_metadata_writer_for_store(store: &St) -> PodMetadataWriter where St: PodMetadataStore + Clone + Send + Sync + 'static, { let store = store.clone(); - Arc::new(move |mut metadata| { - if let Some(existing) = store.read_by_name(&metadata.pod_name)? { - metadata.spawned_children = existing.spawned_children; - } - store.write(&metadata) + Arc::new(move |metadata| { + store + .set_active( + &metadata.pod_name, + metadata.active, + metadata.resolved_manifest_snapshot, + ) + .map(|_| ()) }) } @@ -925,30 +929,32 @@ impl Pod { metadata } - fn write_pod_metadata_pending(&self) -> Result<(), StoreError> { + fn write_pod_metadata_pending(&self) -> Result<(), PodError> { let Some(writer) = &self.pod_metadata_writer else { return Ok(()); }; writer(self.pod_metadata(Some(PodActiveSegmentRef::pending_segment( self.session_id(), - )))) + ))))?; + Ok(()) } - fn write_pod_metadata_active(&self, loc: SegmentLocation) -> Result<(), StoreError> { + fn write_pod_metadata_active(&self, loc: SegmentLocation) -> Result<(), PodError> { let Some(writer) = &self.pod_metadata_writer else { return Ok(()); }; writer(self.pod_metadata(Some(PodActiveSegmentRef::active_segment( loc.session_id, loc.segment_id, - )))) + ))))?; + Ok(()) } /// Enable name-keyed Pod metadata write-through for Pods built through /// the low-level constructor. High-level manifest constructors enable it /// automatically; this hook lets tests and custom embedders opt into the /// same persistence behavior without changing `Pod::new`'s minimal bounds. - pub fn enable_pod_metadata_write_through(&mut self) -> Result<(), StoreError> + pub fn enable_pod_metadata_write_through(&mut self) -> Result<(), PodError> where St: PodMetadataStore + Clone + Send + Sync + 'static, { @@ -4438,6 +4444,7 @@ fn token_budget_bytes(tokens: u64) -> usize { pub enum RewindError { #[error(transparent)] Store(#[from] StoreError), + #[error("{0}")] Invalid(String), } @@ -4546,6 +4553,9 @@ pub enum PodError { #[error(transparent)] Store(#[from] StoreError), + #[error(transparent)] + PodStore(#[from] PodStoreError), + #[error(transparent)] Scope(ScopeError), diff --git a/crates/pod/src/spawn/registry.rs b/crates/pod/src/spawn/registry.rs index d570ede8..0a220d24 100644 --- a/crates/pod/src/spawn/registry.rs +++ b/crates/pod/src/spawn/registry.rs @@ -20,10 +20,8 @@ use std::sync::Arc; use std::time::Duration; use manifest::{Permission, ScopeRule, SharedScope}; -use session_store::{ - PodMetadata, PodMetadataStore, PodScopeSnapshot, PodSpawnedChild, PodSpawnedScopeRule, - StoreError, -}; +use pod_store::{PodMetadataStore, PodSpawnedChild, PodSpawnedScopeRule, PodStoreError}; +use session_store::PodScopeSnapshot; use tokio::net::UnixStream; use tokio::sync::Mutex; use tracing::warn; @@ -304,18 +302,16 @@ fn write_records_to_pod_state( store: &St, pod_name: &str, records: &[SpawnedPodRecord], -) -> Result<(), StoreError> +) -> Result<(), PodStoreError> where St: PodMetadataStore, { - let mut metadata = store - .read_by_name(pod_name)? - .unwrap_or_else(|| PodMetadata::new(pod_name, None)); - metadata.spawned_children = records + let children = records .iter() .map(record_to_pod_state) .collect::, _>>()?; - store.write(&metadata) + store.set_spawned_children(pod_name, children)?; + Ok(()) } fn record_to_pod_state(record: &SpawnedPodRecord) -> Result { @@ -366,7 +362,7 @@ fn record_from_pod_state(child: &PodSpawnedChild) -> Result io::Error { +fn store_error_to_io(error: PodStoreError) -> io::Error { io::Error::other(error) } diff --git a/crates/pod/tests/compact_events_test.rs b/crates/pod/tests/compact_events_test.rs index 5c6406ef..de3d54a9 100644 --- a/crates/pod/tests/compact_events_test.rs +++ b/crates/pod/tests/compact_events_test.rs @@ -16,12 +16,15 @@ use llm_worker::Worker; use llm_worker::llm_client::event::{Event as LlmEvent, ResponseStatus, StatusEvent}; use llm_worker::llm_client::types::Item; use llm_worker::llm_client::{ClientError, LlmClient, Request}; +use pod_store::{CombinedStore, FsPodStore, PodMetadataStore}; use protocol::{Event, Method, RunResult}; -use session_store::{FsStore, LogEntry, PodMetadataStore, Store}; +use session_store::{FsStore, LogEntry, Store}; use tokio::sync::broadcast; use pod::{Pod, PodController}; +type TestStore = CombinedStore; + #[derive(Clone)] struct MockClient { responses: Arc>>, @@ -145,11 +148,14 @@ permission = "write" async fn make_pod_with_manifest( manifest_toml: &str, client: MockClient, -) -> Pod { +) -> Pod { let manifest = pod::PodManifest::from_toml(manifest_toml).unwrap(); let store_tmp = tempfile::tempdir().unwrap(); - let store = FsStore::new(store_tmp.path()).unwrap(); + let store = CombinedStore::new( + FsStore::new(store_tmp.path()).unwrap(), + FsPodStore::new(store_tmp.path().join("pods")).unwrap(), + ); std::mem::forget(store_tmp); let pwd_tmp = tempfile::tempdir().unwrap(); @@ -163,7 +169,7 @@ async fn make_pod_with_manifest( pod } -async fn make_pod(client: MockClient) -> Pod { +async fn make_pod(client: MockClient) -> Pod { make_pod_with_manifest(POST_RUN_MANIFEST_TOML, client).await } diff --git a/crates/pod/tests/consolidation_test.rs b/crates/pod/tests/consolidation_test.rs index 76a83ffb..4b81d5cf 100644 --- a/crates/pod/tests/consolidation_test.rs +++ b/crates/pod/tests/consolidation_test.rs @@ -26,7 +26,10 @@ use llm_worker::llm_client::{ClientError, LlmClient, Request}; use memory::WorkspaceLayout; use memory::extract::{ExtractedPayload, write_staging}; use memory::schema::SourceRef; +use pod_store::{CombinedStore, FsPodStore}; use session_store::FsStore; + +type TestStore = CombinedStore; use tokio::sync::broadcast; use pod::{Event, Pod}; @@ -155,11 +158,14 @@ async fn make_pod_with( manifest_toml: &str, pwd: std::path::PathBuf, client: MockClient, -) -> Pod { +) -> Pod { let manifest = pod::PodManifest::from_toml(manifest_toml).unwrap(); let store_tmp = tempfile::tempdir().unwrap(); - let store = FsStore::new(store_tmp.path()).unwrap(); + let store = CombinedStore::new( + FsStore::new(store_tmp.path()).unwrap(), + FsPodStore::new(store_tmp.path().join("pods")).unwrap(), + ); std::mem::forget(store_tmp); let scope = pod::Scope::writable(&pwd).unwrap(); @@ -184,7 +190,7 @@ fn write_n_staging(layout: &WorkspaceLayout, n: usize) -> Vec { ids } -fn attach_event_receiver(pod: &mut Pod) -> broadcast::Receiver { +fn attach_event_receiver(pod: &mut Pod) -> broadcast::Receiver { let (tx, rx) = broadcast::channel(16); pod.attach_event_tx(tx); rx diff --git a/crates/pod/tests/controller_test.rs b/crates/pod/tests/controller_test.rs index 0252cb31..41340376 100644 --- a/crates/pod/tests/controller_test.rs +++ b/crates/pod/tests/controller_test.rs @@ -9,10 +9,13 @@ use llm_worker::llm_client::event::{Event as LlmEvent, ResponseStatus, StatusEve use llm_worker::llm_client::types::Item; use llm_worker::llm_client::{ClientError, LlmClient, Request}; use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput}; +use pod_store::{CombinedStore, FsPodStore}; use session_store::{FsStore, LogEntry}; use pod::{Event, Method, Pod, PodController, PodHandle, PodManifest, PodStatus}; +type TestStore = CombinedStore; + /// Reconstruct a worker-history-like `Vec` from the live session /// log mirror held by the Pod's broadcast sink. Replaces the previous /// `PodSharedState.history()` test helper now that the mirror lives in @@ -152,21 +155,24 @@ target = "./" permission = "write" "#; -async fn make_pod(client: MockClient) -> Pod { +async fn make_pod(client: MockClient) -> Pod { make_pod_with_pwd(client).await.0 } -async fn make_pod_with_pwd(client: MockClient) -> (Pod, std::path::PathBuf) { +async fn make_pod_with_pwd(client: MockClient) -> (Pod, std::path::PathBuf) { make_pod_with_pwd_and_manifest(client, MANIFEST_TOML).await } async fn make_pod_with_pwd_and_manifest( client: MockClient, manifest_toml: &str, -) -> (Pod, std::path::PathBuf) { +) -> (Pod, std::path::PathBuf) { let manifest = PodManifest::from_toml(manifest_toml).unwrap(); let store_tmp = tempfile::tempdir().unwrap(); - let store = FsStore::new(store_tmp.path()).unwrap(); + let store = CombinedStore::new( + FsStore::new(store_tmp.path()).unwrap(), + FsPodStore::new(store_tmp.path().join("pods")).unwrap(), + ); std::mem::forget(store_tmp); // Separate tempdir to serve as the Pod's pwd/scope — these tests @@ -184,7 +190,7 @@ async fn make_pod_with_pwd_and_manifest( (pod, pwd) } -async fn spawn_controller(pod: Pod) -> PodHandle { +async fn spawn_controller(pod: Pod) -> PodHandle { let tmp = tempfile::tempdir().unwrap(); let runtime_base = tmp.path().to_owned(); std::mem::forget(tmp); diff --git a/crates/pod/tests/pod_comm_tools_test.rs b/crates/pod/tests/pod_comm_tools_test.rs index 08b8adfe..6ea601c1 100644 --- a/crates/pod/tests/pod_comm_tools_test.rs +++ b/crates/pod/tests/pod_comm_tools_test.rs @@ -20,10 +20,11 @@ use pod::spawn::comm_tools::{ list_pods_tool, read_pod_output_tool, send_to_pod_tool, stop_pod_tool, }; use pod::spawn::registry::SpawnedPodRegistry; +use pod_store::{CombinedStore, FsPodStore, PodMetadataStore}; use protocol::stream::{JsonLineReader, JsonLineWriter}; use protocol::{ErrorCode, Event, Greeting, Method}; use serde_json::json; -use session_store::{FsStore, PodMetadataStore}; +use session_store::FsStore; use tempfile::TempDir; use tokio::net::UnixListener; use tokio::sync::mpsc; @@ -385,7 +386,10 @@ async fn stop_pod_sends_shutdown_and_releases_scope() { let _env = EnvGuard::acquire(); let tmp = TempDir::new().unwrap(); let store_tmp = TempDir::new().unwrap(); - let store = FsStore::new(store_tmp.path()).unwrap(); + let store = CombinedStore::new( + FsStore::new(store_tmp.path()).unwrap(), + FsPodStore::new(store_tmp.path().join("pods")).unwrap(), + ); let rd = Arc::new(RuntimeDir::create(tmp.path(), "spawner").await.unwrap()); let parent_scope = SharedScope::new( Scope::writable(tmp.path()) @@ -512,7 +516,10 @@ async fn restored_registry_uses_pod_state_without_runtime_file() { let _env = EnvGuard::acquire(); let runtime_tmp = TempDir::new().unwrap(); let store_tmp = TempDir::new().unwrap(); - let store = FsStore::new(store_tmp.path()).unwrap(); + let store = CombinedStore::new( + FsStore::new(store_tmp.path()).unwrap(), + FsPodStore::new(store_tmp.path().join("pods")).unwrap(), + ); unsafe { std::env::set_var("INSOMNIA_RUNTIME_DIR", runtime_tmp.path()); } @@ -582,7 +589,10 @@ async fn restored_registry_uses_pod_state_without_runtime_file() { async fn load_from_pod_state_prunes_runtime_children_but_preserves_durable_state() { let runtime_tmp = TempDir::new().unwrap(); let store_tmp = TempDir::new().unwrap(); - let store = FsStore::new(store_tmp.path()).unwrap(); + let store = CombinedStore::new( + FsStore::new(store_tmp.path()).unwrap(), + FsPodStore::new(store_tmp.path().join("pods")).unwrap(), + ); let rd = Arc::new( RuntimeDir::create(runtime_tmp.path(), "spawner") .await @@ -635,7 +645,10 @@ async fn load_from_pod_state_reclaims_pruned_child_scope_without_deleting_pod_st let _env = EnvGuard::acquire(); let runtime_tmp = TempDir::new().unwrap(); let store_tmp = TempDir::new().unwrap(); - let store = FsStore::new(store_tmp.path()).unwrap(); + let store = CombinedStore::new( + FsStore::new(store_tmp.path()).unwrap(), + FsPodStore::new(store_tmp.path().join("pods")).unwrap(), + ); unsafe { std::env::set_var("INSOMNIA_RUNTIME_DIR", runtime_tmp.path()); } diff --git a/crates/pod/tests/restore_test.rs b/crates/pod/tests/restore_test.rs index 1dc97f37..6f0d4ad7 100644 --- a/crates/pod/tests/restore_test.rs +++ b/crates/pod/tests/restore_test.rs @@ -8,7 +8,8 @@ use std::sync::{LazyLock, Mutex}; use pod::{Pod, PodError}; -use session_store::{FsStore, PodActiveSegmentRef, PodMetadata, PodMetadataStore, StoreError}; +use pod_store::{CombinedStore, FsPodStore, PodActiveSegmentRef, PodMetadata, PodMetadataStore}; +use session_store::{FsStore, StoreError}; const MINIMAL_MANIFEST_TOML: &str = r#" [pod] @@ -36,7 +37,10 @@ async fn restore_from_pod_metadata_rejects_missing_metadata() { let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner()); let store_tmp = tempfile::tempdir().unwrap(); - let store = FsStore::new(store_tmp.path()).unwrap(); + let store = CombinedStore::new( + FsStore::new(store_tmp.path()).unwrap(), + FsPodStore::new(store_tmp.path().join("pods")).unwrap(), + ); let manifest = pod::PodManifest::from_toml(MINIMAL_MANIFEST_TOML).unwrap(); let result = Pod::restore_from_pod_metadata( @@ -59,7 +63,10 @@ async fn restore_from_pod_metadata_rejects_pending_segment() { let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner()); let store_tmp = tempfile::tempdir().unwrap(); - let store = FsStore::new(store_tmp.path()).unwrap(); + let store = CombinedStore::new( + FsStore::new(store_tmp.path()).unwrap(), + FsPodStore::new(store_tmp.path().join("pods")).unwrap(), + ); let manifest = pod::PodManifest::from_toml(MINIMAL_MANIFEST_TOML).unwrap(); let session_id = session_store::new_session_id(); store @@ -95,7 +102,10 @@ async fn restore_from_pod_metadata_resolves_active_pointer_through_session_log() let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner()); let store_tmp = tempfile::tempdir().unwrap(); - let store = FsStore::new(store_tmp.path()).unwrap(); + let store = CombinedStore::new( + FsStore::new(store_tmp.path()).unwrap(), + FsPodStore::new(store_tmp.path().join("pods")).unwrap(), + ); let manifest = pod::PodManifest::from_toml(MINIMAL_MANIFEST_TOML).unwrap(); let session_id = session_store::new_session_id(); let segment_id = session_store::new_segment_id(); @@ -126,7 +136,10 @@ async fn restore_from_manifest_rejects_unknown_segment() { let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner()); let store_tmp = tempfile::tempdir().unwrap(); - let store = FsStore::new(store_tmp.path()).unwrap(); + let store = CombinedStore::new( + FsStore::new(store_tmp.path()).unwrap(), + FsPodStore::new(store_tmp.path().join("pods")).unwrap(), + ); let manifest = pod::PodManifest::from_toml(MINIMAL_MANIFEST_TOML).unwrap(); // A freshly-minted id with no jsonl file at all → store returns @@ -155,7 +168,10 @@ async fn restore_from_manifest_rejects_empty_segment_log() { let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner()); let store_tmp = tempfile::tempdir().unwrap(); - let store = FsStore::new(store_tmp.path()).unwrap(); + let store = CombinedStore::new( + FsStore::new(store_tmp.path()).unwrap(), + FsPodStore::new(store_tmp.path().join("pods")).unwrap(), + ); let manifest = pod::PodManifest::from_toml(MINIMAL_MANIFEST_TOML).unwrap(); // Pre-create an empty `/.jsonl` so `read_all` succeeds @@ -189,7 +205,10 @@ async fn restore_from_manifest_rejects_segment_without_scope_snapshot() { let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner()); let store_tmp = tempfile::tempdir().unwrap(); - let store = FsStore::new(store_tmp.path()).unwrap(); + let store = CombinedStore::new( + FsStore::new(store_tmp.path()).unwrap(), + FsPodStore::new(store_tmp.path().join("pods")).unwrap(), + ); let manifest = pod::PodManifest::from_toml(MINIMAL_MANIFEST_TOML).unwrap(); let sid = session_store::new_session_id(); diff --git a/crates/pod/tests/session_metrics_test.rs b/crates/pod/tests/session_metrics_test.rs index 75add356..08bb2eb6 100644 --- a/crates/pod/tests/session_metrics_test.rs +++ b/crates/pod/tests/session_metrics_test.rs @@ -25,11 +25,14 @@ use llm_worker::Worker; use llm_worker::llm_client::event::{Event as LlmEvent, ResponseStatus, StatusEvent, UsageEvent}; use llm_worker::llm_client::{ClientError, LlmClient, Request}; use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput}; +use pod_store::{CombinedStore, FsPodStore}; use session_metrics::{DOMAIN, Metric, metrics_from_extensions}; use session_store::{FsStore, LogEntry, SegmentId, SessionId, Store, StoreError, TraceEntry}; use pod::{Pod, PodManifest}; +type TestStore = CombinedStore; + #[derive(Clone)] struct MockClient { responses: Arc>>, @@ -166,13 +169,16 @@ async fn make_pod( client: MockClient, tool_name: &'static str, ) -> ( - Pod, + Pod, tempfile::TempDir, tempfile::TempDir, ) { let manifest = PodManifest::from_toml(&manifest_toml).unwrap(); let store_tmp = tempfile::tempdir().unwrap(); - let store = FsStore::new(store_tmp.path()).unwrap(); + let store = CombinedStore::new( + FsStore::new(store_tmp.path()).unwrap(), + FsPodStore::new(store_tmp.path().join("pods")).unwrap(), + ); let pwd_tmp = tempfile::tempdir().unwrap(); let pwd = pwd_tmp.path().to_path_buf(); let scope = pod::Scope::writable(&pwd).unwrap(); @@ -500,7 +506,10 @@ permission = "write" let client = MockClient::new(vec![text_response_with_cache("hi", 0, 0)]); let manifest = PodManifest::from_toml(manifest_toml).unwrap(); let store_tmp = tempfile::tempdir().unwrap(); - let store = FsStore::new(store_tmp.path()).unwrap(); + let store = CombinedStore::new( + FsStore::new(store_tmp.path()).unwrap(), + FsPodStore::new(store_tmp.path().join("pods")).unwrap(), + ); let pwd_tmp = tempfile::tempdir().unwrap(); let pwd = pwd_tmp.path().to_path_buf(); let scope = pod::Scope::writable(&pwd).unwrap(); diff --git a/crates/pod/tests/system_prompt_template_test.rs b/crates/pod/tests/system_prompt_template_test.rs index 8433968a..c9b32ca7 100644 --- a/crates/pod/tests/system_prompt_template_test.rs +++ b/crates/pod/tests/system_prompt_template_test.rs @@ -8,10 +8,13 @@ use futures::Stream; use llm_worker::Worker; use llm_worker::llm_client::event::{Event as LlmEvent, ResponseStatus, StatusEvent}; use llm_worker::llm_client::{ClientError, LlmClient, Request}; +use pod_store::{CombinedStore, FsPodStore}; use session_store::{FsStore, LogEntry, Store}; use pod::{Pod, PodError, PromptLoader, SystemPromptTemplate}; +type TestStore = CombinedStore; + // --------------------------------------------------------------------------- // Mock LLM Client // --------------------------------------------------------------------------- @@ -99,11 +102,14 @@ permission = "write" async fn make_pod_with_body( body: &str, client: MockClient, -) -> Result<(Pod, PathBuf), PodError> { +) -> Result<(Pod, PathBuf), PodError> { let manifest = pod::PodManifest::from_toml(MINIMAL_MANIFEST_TOML).unwrap(); let store_tmp = tempfile::tempdir().unwrap(); - let store = FsStore::new(store_tmp.path()).unwrap(); + let store = CombinedStore::new( + FsStore::new(store_tmp.path()).unwrap(), + FsPodStore::new(store_tmp.path().join("pods")).unwrap(), + ); std::mem::forget(store_tmp); let pwd_tmp = tempfile::tempdir().unwrap(); diff --git a/crates/session-store/src/fs_store.rs b/crates/session-store/src/fs_store.rs index 3925b603..8a871c6b 100644 --- a/crates/session-store/src/fs_store.rs +++ b/crates/session-store/src/fs_store.rs @@ -3,7 +3,6 @@ //! Layout: //! - Segment log: `{root}/{session_id}/{segment_id}.jsonl` //! - Event trace: `{root}/{session_id}/{segment_id}.trace.jsonl` -//! - Pod metadata: `{root}/pods/{pod_name}/metadata.json` //! //! The per-Session directory makes `list_segments(session_id)` an O(dir) //! scan and gives the fork tree a visible grouping in the filesystem. @@ -17,7 +16,6 @@ //! enumerable by the picker. use crate::event_trace::TraceEntry; -use crate::pod_metadata::{PodMetadata, PodMetadataStore, validate_pod_name}; use crate::segment_log::LogEntry; use crate::store::{Store, StoreError}; use crate::{SegmentId, SessionId}; @@ -57,19 +55,6 @@ impl FsStore { .join(format!("{segment_id}.trace.jsonl")) } - fn pods_dir(&self) -> PathBuf { - self.root.join("pods") - } - - fn pod_dir(&self, pod_name: &str) -> Result { - validate_pod_name(pod_name)?; - Ok(self.pods_dir().join(pod_name)) - } - - fn pod_metadata_path(&self, pod_name: &str) -> Result { - Ok(self.pod_dir(pod_name)?.join("metadata.json")) - } - fn append_line(&self, path: &Path, line: &str) -> Result<(), StoreError> { if let Some(parent) = path.parent() { fs::create_dir_all(parent)?; @@ -102,70 +87,6 @@ impl FsStore { } } -impl PodMetadataStore for FsStore { - fn write(&self, metadata: &PodMetadata) -> Result<(), StoreError> { - let path = self.pod_metadata_path(&metadata.pod_name)?; - if let Some(parent) = path.parent() { - fs::create_dir_all(parent)?; - } - let content = serde_json::to_vec_pretty(metadata)?; - fs::write(path, content)?; - Ok(()) - } - - fn read_by_name(&self, pod_name: &str) -> Result, StoreError> { - let path = self.pod_metadata_path(pod_name)?; - let content = match fs::read_to_string(path) { - Ok(content) => content, - Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(None), - Err(err) => return Err(StoreError::Io(err)), - }; - Ok(Some(serde_json::from_str(&content)?)) - } - - fn list_names(&self) -> Result, StoreError> { - let dir = self.pods_dir(); - let mut names = Vec::new(); - if !dir.exists() { - return Ok(names); - } - for entry in fs::read_dir(dir)? { - let entry = entry?; - if !entry.file_type()?.is_dir() { - continue; - } - if !entry.path().join("metadata.json").exists() { - continue; - } - let Some(name) = entry.file_name().to_str().map(ToOwned::to_owned) else { - continue; - }; - if validate_pod_name(&name).is_ok() { - names.push(name); - } - } - names.sort(); - Ok(names) - } - - fn root_dir(&self) -> Option { - Some(self.root.clone()) - } - - fn delete_by_name(&self, pod_name: &str) -> Result<(), StoreError> { - let path = self.pod_metadata_path(pod_name)?; - match fs::remove_file(&path) { - Ok(()) => {} - Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(()), - Err(err) => return Err(StoreError::Io(err)), - } - if let Some(parent) = path.parent() { - let _ = fs::remove_dir(parent); - } - Ok(()) - } -} - impl Store for FsStore { fn append( &self, diff --git a/crates/session-store/src/lib.rs b/crates/session-store/src/lib.rs index 892c98b8..dc71b4d0 100644 --- a/crates/session-store/src/lib.rs +++ b/crates/session-store/src/lib.rs @@ -33,7 +33,6 @@ pub mod event_trace; pub mod fs_store; pub mod logged_item; -pub mod pod_metadata; pub mod segment; pub mod segment_log; pub mod store; @@ -44,9 +43,6 @@ pub use fs_store::FsStore; pub use llm_worker::UsageRecord; pub use llm_worker::llm_client::types::{ContentPart, Item, Role}; pub use logged_item::{LoggedContentPart, LoggedItem, LoggedRole, from_logged, to_logged}; -pub use pod_metadata::{ - PodActiveSegmentRef, PodMetadata, PodMetadataStore, PodSpawnedChild, PodSpawnedScopeRule, -}; pub use segment::{ SegmentStartState, append_entry, append_system_item, classify_history_item, create_compacted_segment, create_segment, create_segment_with_ids, ensure_head_or_fork, fork, diff --git a/crates/session-store/src/pod_metadata.rs b/crates/session-store/src/pod_metadata.rs deleted file mode 100644 index d6fd8e07..00000000 --- a/crates/session-store/src/pod_metadata.rs +++ /dev/null @@ -1,150 +0,0 @@ -//! Pod metadata persistence API. -//! -//! Pod metadata is a lightweight name-keyed pointer to the Session/Segment -//! currently active for a Pod. Conversation content remains in the segment log; -//! this metadata only records references needed by Pod-name resume/attach flows. - -use crate::store::StoreError; -use crate::{SegmentId, SessionId}; -use serde::{Deserialize, Serialize}; -use std::path::PathBuf; - -/// Active Session/Segment pointer for a Pod. -/// -/// `segment_id` is optional so callers can persist a reserved Session before -/// the first Segment ID is known. Once a segment exists, callers should rewrite -/// the metadata with `Some(segment_id)`. -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct PodActiveSegmentRef { - pub session_id: SessionId, - #[serde(default, skip_serializing_if = "Option::is_none")] - pub segment_id: Option, -} - -impl PodActiveSegmentRef { - /// Create a reference whose active Segment is not known yet. - pub fn pending_segment(session_id: SessionId) -> Self { - Self { - session_id, - segment_id: None, - } - } - - /// Create a fully resolved active Session/Segment reference. - pub fn active_segment(session_id: SessionId, segment_id: SegmentId) -> Self { - Self { - session_id, - segment_id: Some(segment_id), - } - } -} - -/// One delegated scope rule for a spawned child, kept local to -/// `session-store` so the persistence crate does not depend on manifest -/// scope types. -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct PodSpawnedScopeRule { - pub target: PathBuf, - pub permission: String, - pub recursive: bool, -} - -/// One child Pod spawned by this Pod and persisted with the spawner's -/// name-keyed Pod state. -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct PodSpawnedChild { - pub pod_name: String, - pub socket_path: PathBuf, - pub scope_delegated: Vec, - pub callback_address: PathBuf, -} - -/// Persistent metadata for a Pod name. -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct PodMetadata { - pub pod_name: String, - #[serde(default, skip_serializing_if = "Option::is_none")] - pub active: Option, - #[serde(default, skip_serializing_if = "Vec::is_empty")] - pub spawned_children: Vec, - #[serde(default, skip_serializing_if = "Option::is_none")] - pub resolved_manifest_snapshot: Option, -} - -impl PodMetadata { - /// Create Pod metadata for `pod_name`. - pub fn new(pod_name: impl Into, active: Option) -> Self { - Self { - pod_name: pod_name.into(), - active, - spawned_children: Vec::new(), - resolved_manifest_snapshot: None, - } - } -} - -/// Sync persistence backend for Pod metadata. -/// -/// The key is the Pod name. Missing state is not an error: `read_by_name` -/// returns `Ok(None)` for Pods that have never persisted metadata or whose -/// metadata was deleted. -pub trait PodMetadataStore: Send + Sync { - /// Create or replace metadata for its `pod_name` key. - fn write(&self, metadata: &PodMetadata) -> Result<(), StoreError>; - - /// Read metadata by Pod name. Returns `None` when no metadata exists. - fn read_by_name(&self, pod_name: &str) -> Result, StoreError>; - - /// List persisted Pod metadata keys. Implementations return names only; - /// callers can then read each item independently so a corrupt metadata - /// file does not make the whole discovery result fail. - fn list_names(&self) -> Result, StoreError>; - - /// Return the metadata root directory when this backend is path-backed. - fn root_dir(&self) -> Option { - None - } - - /// Delete metadata by Pod name. Missing metadata is a successful no-op. - fn delete_by_name(&self, pod_name: &str) -> Result<(), StoreError>; -} - -pub(crate) fn validate_pod_name(pod_name: &str) -> Result<(), StoreError> { - if pod_name.is_empty() - || pod_name == "." - || pod_name == ".." - || pod_name.contains('/') - || pod_name.contains('\0') - { - return Err(StoreError::InvalidPodName(pod_name.to_string())); - } - Ok(()) -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn pod_metadata_manifest_snapshot_roundtrips() { - let mut metadata = PodMetadata::new( - "profile-pod", - Some(PodActiveSegmentRef::pending_segment(crate::new_session_id())), - ); - metadata.resolved_manifest_snapshot = Some(serde_json::json!({ - "pod": { "name": "profile-pod" }, - "profile": { - "source": { "kind": "path", "path": "/profiles/coder.nix" } - } - })); - - let json = serde_json::to_string(&metadata).unwrap(); - let restored: PodMetadata = serde_json::from_str(&json).unwrap(); - - assert_eq!(restored, metadata); - assert_eq!( - restored.resolved_manifest_snapshot.as_ref().unwrap()["profile"]["source"]["kind"], - "path" - ); - } -} diff --git a/crates/session-store/src/store.rs b/crates/session-store/src/store.rs index e2c5b6b8..ff5489fc 100644 --- a/crates/session-store/src/store.rs +++ b/crates/session-store/src/store.rs @@ -29,9 +29,6 @@ pub enum StoreError { #[error("log corrupted at line {line}: {message}")] Corrupt { line: usize, message: String }, - - #[error("invalid pod name: {0}")] - InvalidPodName(String), } /// Sync persistence backend for segment logs. diff --git a/crates/session-store/tests/fs_store_test.rs b/crates/session-store/tests/fs_store_test.rs index 5c6d7327..c5d0b9b0 100644 --- a/crates/session-store/tests/fs_store_test.rs +++ b/crates/session-store/tests/fs_store_test.rs @@ -1,8 +1,7 @@ use llm_worker::WorkerResult; use llm_worker::llm_client::types::{Item, RequestConfig}; use session_store::{ - FsStore, LogEntry, PodActiveSegmentRef, PodMetadata, PodMetadataStore, Store, TraceEntry, - collect_state, new_segment_id, new_session_id, + FsStore, LogEntry, Store, TraceEntry, collect_state, new_segment_id, new_session_id, }; fn nil_session_start(ts: u64, session_id: uuid::Uuid) -> LogEntry { @@ -240,40 +239,3 @@ fn lookup_session_of_finds_owning_session() { assert_eq!(store.lookup_session_of(segid).unwrap(), Some(sid)); } - -#[test] -fn pod_metadata_minimal_crud() { - let dir = tempfile::tempdir().unwrap(); - let store = FsStore::new(dir.path()).unwrap(); - let pod_name = "worker-a"; - let sid = new_session_id(); - let segid = new_segment_id(); - - assert_eq!(store.read_by_name(pod_name).unwrap(), None); - - let pending = PodMetadata::new(pod_name, Some(PodActiveSegmentRef::pending_segment(sid))); - store.write(&pending).unwrap(); - assert_eq!(store.list_names().unwrap(), vec![pod_name.to_string()]); - assert_eq!(store.read_by_name(pod_name).unwrap(), Some(pending.clone())); - assert!( - dir.path() - .join("pods") - .join(pod_name) - .join("metadata.json") - .exists(), - "Pod metadata must live under /pods//" - ); - - let resolved = PodMetadata::new( - pod_name, - Some(PodActiveSegmentRef::active_segment(sid, segid)), - ); - store.write(&resolved).unwrap(); - assert_eq!(store.read_by_name(pod_name).unwrap(), Some(resolved)); - - store.delete_by_name(pod_name).unwrap(); - assert_eq!(store.read_by_name(pod_name).unwrap(), None); - - // Delete is idempotent for missing metadata. - store.delete_by_name(pod_name).unwrap(); -} diff --git a/crates/tui/Cargo.toml b/crates/tui/Cargo.toml index 997e3e00..09799309 100644 --- a/crates/tui/Cargo.toml +++ b/crates/tui/Cargo.toml @@ -20,6 +20,7 @@ uuid = { workspace = true } toml = { workspace = true } manifest = { workspace = true } session-store = { workspace = true } +pod-store = { workspace = true } pod-registry = { workspace = true } serde = { workspace = true, features = ["derive"] } pulldown-cmark = { version = "0.13.3", default-features = false } diff --git a/crates/tui/src/multi_pod.rs b/crates/tui/src/multi_pod.rs index 2388c5b8..ac0dfccc 100644 --- a/crates/tui/src/multi_pod.rs +++ b/crates/tui/src/multi_pod.rs @@ -3,6 +3,7 @@ use std::path::{Path, PathBuf}; use std::time::{Duration, Instant}; use crossterm::event::{Event as TermEvent, KeyCode, KeyEvent, KeyModifiers, poll, read}; +use pod_store::FsPodStore; use protocol::stream::{JsonLineReader, JsonLineWriter}; use protocol::{ErrorCode, Event, InvokeKind, Method, PodStatus, Segment}; use ratatui::Frame; @@ -199,12 +200,22 @@ fn default_store_dir() -> Result { manifest::paths::sessions_dir().ok_or_else(|| { MultiPodError::Io(io::Error::new( io::ErrorKind::NotFound, - "could not resolve sessions directory \ - (set INSOMNIA_HOME, INSOMNIA_DATA_DIR, or HOME)", + "could not resolve sessions directory", )) }) } +fn default_pod_store_dir() -> Result { + manifest::paths::data_dir() + .map(|dir| dir.join("pods")) + .ok_or_else(|| { + MultiPodError::Io(io::Error::new( + io::ErrorKind::NotFound, + "could not resolve pod state directory", + )) + }) +} + #[cfg(test)] #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub(crate) enum SendEligibility { @@ -483,7 +494,8 @@ enum MultiPodAction { async fn load_pod_list(selected_name: Option) -> Result { let store_dir = default_store_dir()?; let store = FsStore::new(&store_dir)?; - let stored = read_stored_pod_infos(&store_dir, &store)?; + let pod_store = FsPodStore::new(default_pod_store_dir()?).map_err(io::Error::other)?; + let stored = read_stored_pod_infos(&store, &pod_store)?; let live = read_reachable_live_pod_infos(&store) .await .unwrap_or_default(); diff --git a/crates/tui/src/picker.rs b/crates/tui/src/picker.rs index 51acc21d..633405c1 100644 --- a/crates/tui/src/picker.rs +++ b/crates/tui/src/picker.rs @@ -1,7 +1,7 @@ //! Inline-viewport "pick a Pod to attach or restore" UX. //! //! Reads live Pod allocations from the runtime registry and stopped Pod state -//! from the session store's name-keyed metadata. Picking a live row attaches to +//! from the pod-store name-keyed metadata. Picking a live row attaches to //! its socket; picking a stopped row restores via `insomnia-pod --pod `. use std::io; @@ -9,6 +9,7 @@ use std::path::PathBuf; use std::time::Duration; use crossterm::event::{self, Event as TermEvent, KeyCode, KeyEventKind, KeyModifiers}; +use pod_store::FsPodStore; use ratatui::Terminal; use ratatui::backend::CrosstermBackend; use ratatui::layout::{Constraint, Layout}; @@ -102,7 +103,8 @@ impl PodRowState { pub async fn run() -> Result { let store_dir = default_store_dir()?; let store = FsStore::new(&store_dir)?; - let stored_pods = read_stored_pod_infos(&store_dir, &store)?; + let pod_store = FsPodStore::new(default_pod_store_dir()?).map_err(io::Error::other)?; + let stored_pods = read_stored_pod_infos(&store, &pod_store)?; let live_pods = read_reachable_live_pod_infos(&store) .await .unwrap_or_default(); @@ -172,6 +174,18 @@ fn default_store_dir() -> Result { }) } +fn default_pod_store_dir() -> Result { + manifest::paths::data_dir() + .map(|dir| dir.join("pods")) + .ok_or_else(|| { + PickerError::Io(io::Error::new( + io::ErrorKind::NotFound, + "could not resolve pod state directory \ + (set INSOMNIA_HOME, INSOMNIA_DATA_DIR, or HOME)", + )) + }) +} + pub(crate) fn live_socket_for_pod(pod_name: &str) -> Option { pod_list_live_socket_for_pod(pod_name) } diff --git a/crates/tui/src/pod_list.rs b/crates/tui/src/pod_list.rs index 81e84f7e..6f3d160b 100644 --- a/crates/tui/src/pod_list.rs +++ b/crates/tui/src/pod_list.rs @@ -1,14 +1,14 @@ use std::collections::BTreeMap; -use std::fs; use std::io; use std::path::{Path, PathBuf}; use std::time::Duration; use client::PodClient; use pod_registry::{LockFileGuard, default_registry_path}; +use pod_store::{PodActiveSegmentRef, PodMetadata, PodMetadataStore}; use protocol::{Event, PodStatus}; use session_store::{ - FsStore, LogEntry, LoggedContentPart, LoggedItem, PodMetadata, SegmentId, SessionId, Store, + FsStore, LogEntry, LoggedContentPart, LoggedItem, SegmentId, SessionId, Store, }; #[derive(Debug, Clone)] @@ -234,27 +234,17 @@ pub(crate) enum PodEntryDiagnosticKind { } pub(crate) fn read_stored_pod_infos( - store_dir: &Path, store: &FsStore, + pod_store: &impl PodMetadataStore, ) -> Result, io::Error> { - let pods_dir = store_dir.join("pods"); let mut records = Vec::new(); - if !pods_dir.exists() { - return Ok(records); - } - - for entry in fs::read_dir(pods_dir)? { - let entry = entry?; - if !entry.file_type()?.is_dir() { - continue; - } - let pod_name = entry.file_name().to_string_lossy().to_string(); - let path = entry.path().join("metadata.json"); - let info = match fs::read_to_string(&path) { - Ok(content) => match serde_json::from_str::(&content) { - Ok(metadata) => stored_info_from_metadata(store, pod_name, metadata), - Err(e) => corrupt_stored_info(pod_name, e.to_string()), - }, + for pod_name in pod_store.list_names().map_err(io::Error::other)? { + let info = match pod_store.read_by_name(&pod_name) { + Ok(Some(metadata)) => stored_info_from_metadata(store, pod_name, metadata), + Ok(None) => corrupt_stored_info( + pod_name, + "metadata disappeared during discovery".to_string(), + ), Err(e) => corrupt_stored_info(pod_name, e.to_string()), }; records.push(info); @@ -392,10 +382,7 @@ fn summarize_live_pod(store: &FsStore, live: &LivePodInfo) -> PodEntrySummary { } } -fn summarize_metadata( - store: &FsStore, - active: Option<&session_store::PodActiveSegmentRef>, -) -> SegmentSummary { +fn summarize_metadata(store: &FsStore, active: Option<&PodActiveSegmentRef>) -> SegmentSummary { let Some(active) = active else { return SegmentSummary { updated_at: 0, @@ -558,7 +545,9 @@ fn trim_one_line(s: &str, max_chars: usize) -> String { mod tests { use super::*; use llm_worker::llm_client::types::RequestConfig; - use session_store::{PodActiveSegmentRef, PodMetadataStore, new_segment_id, new_session_id}; + use pod_store::FsPodStore; + use pod_store::{PodActiveSegmentRef, PodMetadataStore}; + use session_store::{new_segment_id, new_session_id}; use tempfile::tempdir; const SOURCE: PodVisibilitySource = PodVisibilitySource::ResumePicker; @@ -776,11 +765,12 @@ mod tests { fn read_stored_pod_infos_reports_corrupt_metadata() { let dir = tempdir().unwrap(); let store = FsStore::new(dir.path()).unwrap(); + let pod_store = FsPodStore::new(dir.path().join("pods")).unwrap(); let pod_dir = dir.path().join("pods").join("broken"); - fs::create_dir_all(&pod_dir).unwrap(); - fs::write(pod_dir.join("metadata.json"), "{not-json").unwrap(); + std::fs::create_dir_all(&pod_dir).unwrap(); + std::fs::write(pod_dir.join("metadata.json"), "{not-json").unwrap(); - let records = read_stored_pod_infos(dir.path(), &store).unwrap(); + let records = read_stored_pod_infos(&store, &pod_store).unwrap(); assert_eq!(records.len(), 1); assert_eq!(records[0].pod_name, "broken"); assert!(matches!( @@ -793,16 +783,17 @@ mod tests { fn read_stored_pod_infos_reads_metadata() { let dir = tempdir().unwrap(); let store = FsStore::new(dir.path()).unwrap(); + let pod_store = FsPodStore::new(dir.path().join("pods")).unwrap(); let session_id = new_session_id(); let segment_id = new_segment_id(); - store + pod_store .write(&PodMetadata::new( "agent", Some(PodActiveSegmentRef::active_segment(session_id, segment_id)), )) .unwrap(); - let records = read_stored_pod_infos(dir.path(), &store).unwrap(); + let records = read_stored_pod_infos(&store, &pod_store).unwrap(); assert_eq!(records.len(), 1); assert_eq!(records[0].pod_name, "agent"); assert_eq!(records[0].metadata_state, StoredMetadataState::Present);