merge: pod store split

This commit is contained in:
Keisuke Hirata 2026-05-30 07:49:07 +09:00
commit e2cf6ed85f
No known key found for this signature in database
35 changed files with 1044 additions and 736 deletions

13
Cargo.lock generated
View File

@ -2166,6 +2166,7 @@ dependencies = [
"memory", "memory",
"minijinja", "minijinja",
"pod-registry", "pod-registry",
"pod-store",
"protocol", "protocol",
"provider", "provider",
"schemars", "schemars",
@ -2197,6 +2198,17 @@ dependencies = [
"thiserror 2.0.18", "thiserror 2.0.18",
] ]
[[package]]
name = "pod-store"
version = "0.1.0"
dependencies = [
"serde",
"serde_json",
"session-store",
"tempfile",
"thiserror 2.0.18",
]
[[package]] [[package]]
name = "portable-atomic" name = "portable-atomic"
version = "1.13.1" version = "1.13.1"
@ -3652,6 +3664,7 @@ dependencies = [
"llm-worker", "llm-worker",
"manifest", "manifest",
"pod-registry", "pod-registry",
"pod-store",
"protocol", "protocol",
"pulldown-cmark", "pulldown-cmark",
"ratatui", "ratatui",

View File

@ -8,6 +8,7 @@ members = [
"crates/session-store", "crates/session-store",
"crates/manifest", "crates/manifest",
"crates/pod", "crates/pod",
"crates/pod-store",
"crates/protocol", "crates/protocol",
"crates/provider", "crates/provider",
"crates/pod-registry", "crates/pod-registry",
@ -32,6 +33,7 @@ manifest = { path = "crates/manifest" }
lint-common = { path = "crates/lint-common" } lint-common = { path = "crates/lint-common" }
memory = { path = "crates/memory" } memory = { path = "crates/memory" }
pod-registry = { path = "crates/pod-registry" } pod-registry = { path = "crates/pod-registry" }
pod-store = { path = "crates/pod-store" }
protocol = { path = "crates/protocol" } protocol = { path = "crates/protocol" }
provider = { path = "crates/provider" } provider = { path = "crates/provider" }
session-metrics = { path = "crates/session-metrics" } session-metrics = { path = "crates/session-metrics" }

View File

@ -31,8 +31,6 @@ pub struct SpawnConfig {
/// `--profile`; the Pod name is supplied through `--profile-pod-name` so /// `--profile`; the Pod name is supplied through `--profile-pod-name` so
/// profile evaluation stays separate from `--pod` restore semantics. /// profile evaluation stays separate from `--pod` restore semantics.
pub profile: Option<String>, pub profile: Option<String>,
/// Optional session-scope snapshot used when restoring by session id.
pub resume_scope: Option<manifest::ScopeConfig>,
/// pod の current_dir。 /// pod の current_dir。
pub cwd: PathBuf, pub cwd: PathBuf,
/// `Some(id)` のとき `--session <id>` を付与し、当該セッションから /// `Some(id)` のとき `--session <id>` を付与し、当該セッションから
@ -132,12 +130,6 @@ where
.arg(id.to_string()) .arg(id.to_string())
.arg("--session-pod-name") .arg("--session-pod-name")
.arg(&config.pod_name); .arg(&config.pod_name);
if let Some(scope) = &config.resume_scope {
let scope_json = serde_json::to_string(scope).map_err(|e| {
SpawnError::PodLaunchFailed(io::Error::new(io::ErrorKind::InvalidInput, e))
})?;
command.arg("--resume-scope-json").arg(scope_json);
}
} }
let mut child = command.spawn().map_err(SpawnError::PodLaunchFailed)?; let mut child = command.spawn().map_err(SpawnError::PodLaunchFailed)?;

View File

@ -45,9 +45,9 @@ pub fn register_pod(
/// and the registration proceeds. The check is structural (deny ⊇ /// and the registration proceeds. The check is structural (deny ⊇
/// competitor.rule), not relational — it does not verify that the /// competitor.rule), not relational — it does not verify that the
/// competitor actually descends from this Pod's prior delegations. /// competitor actually descends from this Pod's prior delegations.
/// In practice this is safe because the canonical caller is `restore`, /// In practice this is safe because the canonical restore caller derives
/// which derives `scope_deny` from the session's own snapshot, so any /// `scope_deny` from outstanding `pod-store` child delegations, so any
/// covered competitor is guaranteed to be a descendant of the original /// covered competitor is expected to be a descendant of the original
/// allocation. Direct callers must uphold the same invariant. /// allocation. Direct callers must uphold the same invariant.
pub fn register_pod_with_deny( pub fn register_pod_with_deny(
guard: &mut LockFileGuard, guard: &mut LockFileGuard,
@ -180,10 +180,11 @@ pub fn release_pod(guard: &mut LockFileGuard, pod_name: &str) -> Result<(), Scop
/// Reclaim a child delegation back into its parent allocation. /// Reclaim a child delegation back into its parent allocation.
/// ///
/// This is idempotent: missing child allocations and missing deny entries are /// This is idempotent for missing deny entries. For each delegated Write rule,
/// ignored. For each delegated Write rule, at most one exact matching deny rule /// at most one exact matching deny rule is removed from the parent's `scope_deny`
/// is removed from the parent's `scope_deny`, preserving any duplicate explicit /// even when the child allocation is already absent; restore reconciliation uses
/// base deny that was not owned by this child delegation. /// that case when durable Pod-state still records an outstanding delegation but
/// the live lock file no longer has a child allocation.
pub fn reclaim_delegated_scope( pub fn reclaim_delegated_scope(
guard: &mut LockFileGuard, guard: &mut LockFileGuard,
parent: &str, parent: &str,
@ -199,17 +200,13 @@ pub fn reclaim_delegated_scope(
.map(|idx| guard.data().allocations[idx].delegated_from.clone()) .map(|idx| guard.data().allocations[idx].delegated_from.clone())
.unwrap_or(None); .unwrap_or(None);
let child_exists = child_idx.is_some(); if let Some(parent_alloc) = guard.data_mut().find_mut(parent) {
for rule in delegated_scope
if child_exists { .iter()
if let Some(parent_alloc) = guard.data_mut().find_mut(parent) { .filter(|rule| rule.permission == Permission::Write)
for rule in delegated_scope {
.iter() if let Some(idx) = parent_alloc.scope_deny.iter().position(|deny| deny == rule) {
.filter(|rule| rule.permission == Permission::Write) parent_alloc.scope_deny.remove(idx);
{
if let Some(idx) = parent_alloc.scope_deny.iter().position(|deny| deny == rule) {
parent_alloc.scope_deny.remove(idx);
}
} }
} }
} }
@ -516,15 +513,43 @@ mod tests {
assert_eq!(a.scope_deny, vec![delegated_rule.clone()]); assert_eq!(a.scope_deny, vec![delegated_rule.clone()]);
assert!(g.data().find("b").is_none()); assert!(g.data().find("b").is_none());
reclaim_delegated_scope(&mut g, "a", "b", &[delegated_rule.clone()]).unwrap(); reclaim_delegated_scope(&mut g, "a", "b", std::slice::from_ref(&delegated_rule)).unwrap();
let a = g.data().find("a").unwrap(); let a = g.data().find("a").unwrap();
assert_eq!( assert!(
a.scope_deny, a.scope_deny.is_empty(),
vec![delegated_rule], "a missing child allocation still reclaims one matching parent deny"
"a repeated reclaim with no child allocation must not broaden an explicit duplicate base deny"
); );
} }
#[test]
fn reclaim_delegated_scope_removes_parent_deny_when_child_allocation_missing() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("pods.json");
let mut g = open_empty(&path);
let delegated_rule = write_rule("/src/core", true);
register_pod_with_deny(
&mut g,
"a".into(),
std::process::id(),
sock("a"),
vec![write_rule("/src", true)],
vec![delegated_rule.clone()],
sid(),
)
.unwrap();
reclaim_delegated_scope(
&mut g,
"a",
"missing",
std::slice::from_ref(&delegated_rule),
)
.unwrap();
let a = g.data().find("a").unwrap();
assert!(a.scope_deny.is_empty());
}
#[test] #[test]
fn reclaim_stale_reparents_and_removes_dead_entries() { fn reclaim_stale_reparents_and_removes_dead_entries() {
let dir = TempDir::new().unwrap(); let dir = TempDir::new().unwrap();

View File

@ -0,0 +1,15 @@
[package]
name = "pod-store"
description = "Durable Pod-name metadata/state persistence"
version = "0.1.0"
edition.workspace = true
license.workspace = true
[dependencies]
session-store = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
thiserror = { workspace = true }
[dev-dependencies]
tempfile = { workspace = true }

541
crates/pod-store/src/lib.rs Normal file
View File

@ -0,0 +1,541 @@
//! Durable Pod-name metadata/state persistence.
//!
//! This crate owns the name-keyed Pod state surface under a Pod-state root,
//! e.g. `{data_dir}/pods/{pod_name}/metadata.json`. Session JSONL replay stays
//! in `session-store`; Pod metadata may point at a `(SessionId, SegmentId)` but
//! does not own or replay session logs.
//!
//! `resolved_manifest_snapshot` is authority only for Pod-name restore before
//! loading the session log. Existing segment replay still uses `SegmentStart`
//! entries from `session-store`. `spawned_children` is durable current parent
//! Pod state for child registry/reclaim; child lifecycle messages shown to the
//! model remain session JSONL history. Socket and callback paths are last-known
//! runtime hints, not proof of liveness.
use serde::{Deserialize, Serialize};
use session_store::{SegmentId, SessionId};
use std::fs;
use std::path::PathBuf;
/// Errors from Pod metadata persistence.
#[derive(Debug, thiserror::Error)]
pub enum PodStoreError {
#[error("I/O error: {0}")]
Io(#[from] std::io::Error),
#[error("serialization error: {0}")]
Serde(#[from] serde_json::Error),
#[error("invalid pod name: {0}")]
InvalidPodName(String),
}
/// Active Session/Segment pointer for a Pod.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct PodActiveSegmentRef {
pub session_id: SessionId,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub segment_id: Option<SegmentId>,
}
impl PodActiveSegmentRef {
/// Create a reference whose active Segment is not known yet.
pub fn pending_segment(session_id: SessionId) -> Self {
Self {
session_id,
segment_id: None,
}
}
/// Create a fully resolved active Session/Segment reference.
pub fn active_segment(session_id: SessionId, segment_id: SegmentId) -> Self {
Self {
session_id,
segment_id: Some(segment_id),
}
}
}
/// One delegated scope rule for a spawned child, kept local to avoid depending
/// on manifest scope types in durable Pod state.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct PodSpawnedScopeRule {
pub target: PathBuf,
pub permission: String,
pub recursive: bool,
}
/// One child Pod spawned by this Pod and persisted with the spawner's
/// name-keyed Pod state. Runtime paths are last-known hints only.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct PodSpawnedChild {
pub pod_name: String,
pub socket_path: PathBuf,
pub scope_delegated: Vec<PodSpawnedScopeRule>,
pub callback_address: PathBuf,
}
/// One child delegation that has been reclaimed. Kept as durable audit state so
/// restore can distinguish outstanding delegated scope from already-reclaimed
/// child state without consulting session logs.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct PodReclaimedChild {
pub pod_name: String,
pub scope_delegated: Vec<PodSpawnedScopeRule>,
}
/// Persistent metadata for a Pod name.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct PodMetadata {
pub pod_name: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub active: Option<PodActiveSegmentRef>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub spawned_children: Vec<PodSpawnedChild>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub reclaimed_children: Vec<PodReclaimedChild>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub resolved_manifest_snapshot: Option<serde_json::Value>,
}
impl PodMetadata {
/// Create Pod metadata for `pod_name`.
pub fn new(pod_name: impl Into<String>, active: Option<PodActiveSegmentRef>) -> Self {
Self {
pod_name: pod_name.into(),
active,
spawned_children: Vec::new(),
reclaimed_children: Vec::new(),
resolved_manifest_snapshot: None,
}
}
}
/// Sync persistence backend for Pod metadata.
pub trait PodMetadataStore: Send + Sync {
/// Create or replace metadata for its `pod_name` key.
fn write(&self, metadata: &PodMetadata) -> Result<(), PodStoreError>;
/// Read metadata by Pod name. Returns `None` when no metadata exists.
fn read_by_name(&self, pod_name: &str) -> Result<Option<PodMetadata>, PodStoreError>;
/// List persisted Pod metadata keys.
fn list_names(&self) -> Result<Vec<String>, PodStoreError>;
/// Return the metadata root directory when this backend is path-backed.
fn root_dir(&self) -> Option<PathBuf> {
None
}
/// Delete metadata by Pod name. Missing metadata is a successful no-op.
fn delete_by_name(&self, pod_name: &str) -> Result<(), PodStoreError>;
/// Merge an update into one Pod's metadata, preserving unrelated fields.
fn update_by_name<F>(&self, pod_name: &str, update: F) -> Result<PodMetadata, PodStoreError>
where
F: FnOnce(&mut PodMetadata),
{
let mut metadata = self
.read_by_name(pod_name)?
.unwrap_or_else(|| PodMetadata::new(pod_name, None));
update(&mut metadata);
metadata.pod_name = pod_name.to_string();
self.write(&metadata)?;
Ok(metadata)
}
/// Set the active pointer while preserving spawned children and manifest snapshot.
fn set_active(
&self,
pod_name: &str,
active: Option<PodActiveSegmentRef>,
resolved_manifest_snapshot: Option<serde_json::Value>,
) -> Result<PodMetadata, PodStoreError> {
self.update_by_name(pod_name, |metadata| {
metadata.active = active;
metadata.resolved_manifest_snapshot = resolved_manifest_snapshot;
})
}
/// Set spawned-child registry state while preserving active pointer and manifest snapshot.
fn set_spawned_children(
&self,
pod_name: &str,
children: Vec<PodSpawnedChild>,
) -> Result<PodMetadata, PodStoreError> {
self.update_by_name(pod_name, |metadata| {
metadata.spawned_children = children;
})
}
/// Remove reclaimed child delegations from the outstanding set and record
/// them in durable reclaim history.
fn reclaim_spawned_children(
&self,
pod_name: &str,
reclaimed: Vec<PodReclaimedChild>,
) -> Result<PodMetadata, PodStoreError> {
self.update_by_name(pod_name, |metadata| {
for reclaimed_child in &reclaimed {
metadata
.spawned_children
.retain(|child| child.pod_name != reclaimed_child.pod_name);
}
metadata.reclaimed_children.extend(reclaimed);
})
}
}
/// Filesystem-backed Pod metadata store.
#[derive(Clone)]
pub struct FsPodStore {
root: PathBuf,
}
impl FsPodStore {
/// Create a store rooted at the Pod-state directory, usually `{data_dir}/pods`.
pub fn new(root: impl Into<PathBuf>) -> Result<Self, PodStoreError> {
let root = root.into();
fs::create_dir_all(&root)?;
Ok(Self { root })
}
fn pod_dir(&self, pod_name: &str) -> Result<PathBuf, PodStoreError> {
validate_pod_name(pod_name)?;
Ok(self.root.join(pod_name))
}
fn metadata_path(&self, pod_name: &str) -> Result<PathBuf, PodStoreError> {
Ok(self.pod_dir(pod_name)?.join("metadata.json"))
}
}
impl PodMetadataStore for FsPodStore {
fn write(&self, metadata: &PodMetadata) -> Result<(), PodStoreError> {
let path = self.metadata_path(&metadata.pod_name)?;
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)?;
}
let content = serde_json::to_vec_pretty(metadata)?;
fs::write(path, content)?;
Ok(())
}
fn read_by_name(&self, pod_name: &str) -> Result<Option<PodMetadata>, PodStoreError> {
let path = self.metadata_path(pod_name)?;
let content = match fs::read_to_string(path) {
Ok(content) => content,
Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(None),
Err(err) => return Err(PodStoreError::Io(err)),
};
Ok(Some(serde_json::from_str(&content)?))
}
fn list_names(&self) -> Result<Vec<String>, PodStoreError> {
let mut names = Vec::new();
if !self.root.exists() {
return Ok(names);
}
for entry in fs::read_dir(&self.root)? {
let entry = entry?;
if !entry.file_type()?.is_dir() {
continue;
}
if !entry.path().join("metadata.json").exists() {
continue;
}
let Some(name) = entry.file_name().to_str().map(ToOwned::to_owned) else {
continue;
};
if validate_pod_name(&name).is_ok() {
names.push(name);
}
}
names.sort();
Ok(names)
}
fn root_dir(&self) -> Option<PathBuf> {
Some(self.root.clone())
}
fn delete_by_name(&self, pod_name: &str) -> Result<(), PodStoreError> {
let path = self.metadata_path(pod_name)?;
match fs::remove_file(&path) {
Ok(()) => {}
Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(()),
Err(err) => return Err(PodStoreError::Io(err)),
}
if let Some(parent) = path.parent() {
let _ = fs::remove_dir(parent);
}
Ok(())
}
}
pub fn validate_pod_name(pod_name: &str) -> Result<(), PodStoreError> {
if pod_name.is_empty()
|| pod_name == "."
|| pod_name == ".."
|| pod_name.contains('/')
|| pod_name.contains('\0')
{
return Err(PodStoreError::InvalidPodName(pod_name.to_string()));
}
Ok(())
}
/// Convenience composition for callers that want one handle carrying separate
/// session-log and Pod-state roots.
#[derive(Clone)]
pub struct CombinedStore<S, P> {
pub session_store: S,
pub pod_store: P,
}
impl<S, P> CombinedStore<S, P> {
pub fn new(session_store: S, pod_store: P) -> Self {
Self {
session_store,
pod_store,
}
}
}
impl<S, P> session_store::Store for CombinedStore<S, P>
where
S: session_store::Store,
P: Send + Sync,
{
fn append(
&self,
session_id: SessionId,
segment_id: SegmentId,
entry: &session_store::LogEntry,
) -> Result<(), session_store::StoreError> {
self.session_store.append(session_id, segment_id, entry)
}
fn read_all(
&self,
session_id: SessionId,
segment_id: SegmentId,
) -> Result<Vec<session_store::LogEntry>, session_store::StoreError> {
self.session_store.read_all(session_id, segment_id)
}
fn list_sessions(&self) -> Result<Vec<SessionId>, session_store::StoreError> {
self.session_store.list_sessions()
}
fn list_segments(
&self,
session_id: SessionId,
) -> Result<Vec<SegmentId>, session_store::StoreError> {
self.session_store.list_segments(session_id)
}
fn lookup_session_of(
&self,
segment_id: SegmentId,
) -> Result<Option<SessionId>, session_store::StoreError> {
self.session_store.lookup_session_of(segment_id)
}
fn create_segment(
&self,
session_id: SessionId,
segment_id: SegmentId,
entries: &[session_store::LogEntry],
) -> Result<(), session_store::StoreError> {
self.session_store
.create_segment(session_id, segment_id, entries)
}
fn exists(
&self,
session_id: SessionId,
segment_id: SegmentId,
) -> Result<bool, session_store::StoreError> {
self.session_store.exists(session_id, segment_id)
}
fn truncate(
&self,
session_id: SessionId,
segment_id: SegmentId,
entries_len: usize,
) -> Result<(), session_store::StoreError> {
self.session_store
.truncate(session_id, segment_id, entries_len)
}
fn read_entry_count(
&self,
session_id: SessionId,
segment_id: SegmentId,
) -> Result<usize, session_store::StoreError> {
self.session_store.read_entry_count(session_id, segment_id)
}
fn append_trace(
&self,
session_id: SessionId,
segment_id: SegmentId,
entry: &session_store::TraceEntry,
) -> Result<(), session_store::StoreError> {
self.session_store
.append_trace(session_id, segment_id, entry)
}
}
impl<S, P> PodMetadataStore for CombinedStore<S, P>
where
S: Send + Sync,
P: PodMetadataStore,
{
fn write(&self, metadata: &PodMetadata) -> Result<(), PodStoreError> {
self.pod_store.write(metadata)
}
fn read_by_name(&self, pod_name: &str) -> Result<Option<PodMetadata>, PodStoreError> {
self.pod_store.read_by_name(pod_name)
}
fn list_names(&self) -> Result<Vec<String>, PodStoreError> {
self.pod_store.list_names()
}
fn root_dir(&self) -> Option<PathBuf> {
self.pod_store.root_dir()
}
fn delete_by_name(&self, pod_name: &str) -> Result<(), PodStoreError> {
self.pod_store.delete_by_name(pod_name)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn pod_metadata_manifest_snapshot_roundtrips() {
let mut metadata = PodMetadata::new(
"profile-pod",
Some(PodActiveSegmentRef::pending_segment(
session_store::new_session_id(),
)),
);
metadata.resolved_manifest_snapshot = Some(serde_json::json!({
"pod": { "name": "profile-pod" },
"profile": { "source": { "kind": "path", "path": "/profiles/coder.nix" } }
}));
let json = serde_json::to_string(&metadata).unwrap();
let restored: PodMetadata = serde_json::from_str(&json).unwrap();
assert_eq!(restored, metadata);
}
#[test]
fn fs_store_writes_under_pod_state_root_only() {
let tmp = tempfile::TempDir::new().unwrap();
let session_root = tmp.path().join("sessions");
let pod_root = tmp.path().join("pods");
fs::create_dir_all(&session_root).unwrap();
let store = FsPodStore::new(&pod_root).unwrap();
store
.write(&PodMetadata::new(
"agent",
Some(PodActiveSegmentRef::pending_segment(
session_store::new_session_id(),
)),
))
.unwrap();
assert!(pod_root.join("agent/metadata.json").exists());
assert!(!session_root.join("pods/agent/metadata.json").exists());
}
#[test]
fn active_updates_preserve_children_and_manifest_snapshot() {
let tmp = tempfile::TempDir::new().unwrap();
let store = FsPodStore::new(tmp.path()).unwrap();
let mut metadata = PodMetadata::new("agent", None);
metadata.spawned_children.push(PodSpawnedChild {
pod_name: "child".into(),
socket_path: std::path::Path::new("/tmp/child.sock").into(),
scope_delegated: vec![],
callback_address: std::path::Path::new("/tmp/parent.sock").into(),
});
metadata.resolved_manifest_snapshot = Some(serde_json::json!({"pod":{"name":"agent"}}));
store.write(&metadata).unwrap();
let snapshot = serde_json::json!({"pod":{"name":"updated"}});
store
.set_active(
"agent",
Some(PodActiveSegmentRef::active_segment(
session_store::new_session_id(),
session_store::new_segment_id(),
)),
Some(snapshot.clone()),
)
.unwrap();
let restored = store.read_by_name("agent").unwrap().unwrap();
assert_eq!(restored.spawned_children.len(), 1);
assert_eq!(restored.resolved_manifest_snapshot, Some(snapshot));
}
#[test]
fn child_updates_preserve_active_and_manifest_snapshot() {
let tmp = tempfile::TempDir::new().unwrap();
let store = FsPodStore::new(tmp.path()).unwrap();
let active = PodActiveSegmentRef::active_segment(
session_store::new_session_id(),
session_store::new_segment_id(),
);
let snapshot = serde_json::json!({"pod":{"name":"agent"}});
store
.set_active("agent", Some(active.clone()), Some(snapshot.clone()))
.unwrap();
store
.set_spawned_children(
"agent",
vec![PodSpawnedChild {
pod_name: "child".into(),
socket_path: std::path::Path::new("/tmp/child.sock").into(),
scope_delegated: vec![],
callback_address: std::path::Path::new("/tmp/parent.sock").into(),
}],
)
.unwrap();
let restored = store.read_by_name("agent").unwrap().unwrap();
assert_eq!(restored.active, Some(active));
assert_eq!(restored.resolved_manifest_snapshot, Some(snapshot));
}
#[test]
fn reclaim_children_removes_outstanding_and_records_history() {
let tmp = tempfile::TempDir::new().unwrap();
let store = FsPodStore::new(tmp.path()).unwrap();
let scope = PodSpawnedScopeRule {
target: std::path::Path::new("/tmp/delegated").into(),
permission: "write".into(),
recursive: true,
};
store
.set_spawned_children(
"agent",
vec![PodSpawnedChild {
pod_name: "child".into(),
socket_path: std::path::Path::new("/tmp/child.sock").into(),
scope_delegated: vec![scope.clone()],
callback_address: std::path::Path::new("/tmp/parent.sock").into(),
}],
)
.unwrap();
store
.reclaim_spawned_children(
"agent",
vec![PodReclaimedChild {
pod_name: "child".into(),
scope_delegated: vec![scope.clone()],
}],
)
.unwrap();
let restored = store.read_by_name("agent").unwrap().unwrap();
assert!(restored.spawned_children.is_empty());
assert_eq!(restored.reclaimed_children.len(), 1);
assert_eq!(restored.reclaimed_children[0].scope_delegated, vec![scope]);
}
}

View File

@ -13,6 +13,7 @@ async-trait = { workspace = true }
clap = { version = "4.6.0", features = ["derive"] } clap = { version = "4.6.0", features = ["derive"] }
llm-worker = { workspace = true } llm-worker = { workspace = true }
session-store = { workspace = true } session-store = { workspace = true }
pod-store = { workspace = true }
manifest = { workspace = true } manifest = { workspace = true }
protocol = { workspace = true } protocol = { workspace = true }
provider = { workspace = true } provider = { workspace = true }

View File

@ -12,6 +12,7 @@
//! ``` //! ```
use pod::{Pod, PodManifest, PodRunResult}; use pod::{Pod, PodManifest, PodRunResult};
use pod_store::{CombinedStore, FsPodStore};
use session_store::FsStore; use session_store::FsStore;
fn manifest_toml(pwd: &std::path::Path) -> String { fn manifest_toml(pwd: &std::path::Path) -> String {
@ -48,7 +49,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 2. Create a persistent store (temp dir for demo) // 2. Create a persistent store (temp dir for demo)
let tmp = tempfile::tempdir()?; let tmp = tempfile::tempdir()?;
let store = FsStore::new(tmp.path())?; let store = CombinedStore::new(
FsStore::new(tmp.path().join("sessions"))?,
FsPodStore::new(tmp.path().join("pods"))?,
);
// 3. Build the Pod from the single-layer manifest TOML // 3. Build the Pod from the single-layer manifest TOML
let mut pod = Pod::from_manifest_toml(&toml, store).await?; let mut pod = Pod::from_manifest_toml(&toml, store).await?;

View File

@ -6,6 +6,7 @@
//! ``` //! ```
use pod::{Event, Method, PodController}; use pod::{Event, Method, PodController};
use pod_store::{CombinedStore, FsPodStore};
use session_store::FsStore; use session_store::FsStore;
fn manifest_toml(pwd: &std::path::Path) -> String { fn manifest_toml(pwd: &std::path::Path) -> String {
@ -39,7 +40,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let pwd = std::env::current_dir()?; let pwd = std::env::current_dir()?;
let toml = manifest_toml(&pwd); let toml = manifest_toml(&pwd);
let tmp = tempfile::tempdir()?; let tmp = tempfile::tempdir()?;
let store = FsStore::new(tmp.path())?; let store = CombinedStore::new(
FsStore::new(tmp.path().join("sessions"))?,
FsPodStore::new(tmp.path().join("pods"))?,
);
let pod = pod::Pod::from_manifest_toml(&toml, store).await?; let pod = pod::Pod::from_manifest_toml(&toml, store).await?;
let runtime_tmp = tempfile::tempdir()?; let runtime_tmp = tempfile::tempdir()?;

View File

@ -4,7 +4,8 @@ use std::sync::atomic::Ordering;
use llm_worker::WorkerError; use llm_worker::WorkerError;
use llm_worker::llm_client::client::LlmClient; use llm_worker::llm_client::client::LlmClient;
use session_store::{PodMetadataStore, Store}; use pod_store::PodMetadataStore;
use session_store::Store;
use tokio::sync::{broadcast, mpsc, oneshot}; use tokio::sync::{broadcast, mpsc, oneshot};
use crate::discovery::{ use crate::discovery::{
@ -161,14 +162,15 @@ impl PodController {
pod.store().clone(), pod.store().clone(),
spawner_name.clone(), spawner_name.clone(),
Some(pod.scope().clone()), Some(pod.scope().clone()),
Some(pod.scope_change_sink()),
) )
.await?; .await?;
let reclaimed_unreachable = loaded_registry.reclaimed_unreachable; let reclaimed_unreachable = loaded_registry.reclaimed_unreachable;
let spawned_registry = loaded_registry.registry; let spawned_registry = loaded_registry.registry;
if reclaimed_unreachable { if reclaimed_unreachable {
pod.persist_scope_snapshot() pod.push_notify(
.map_err(std::io::Error::other)?; "Restored Pod state contained unreachable delegated child Pods; their delegated write scopes were reclaimed before resume."
.to_string(),
);
} }
// Hand the alerter to the Pod so internal operations (compaction, // Hand the alerter to the Pod so internal operations (compaction,
@ -496,7 +498,6 @@ where
let pwd = pod.pwd().to_path_buf(); let pwd = pod.pwd().to_path_buf();
let task_store = pod.task_store(); let task_store = pod.task_store();
let session_id_for_usage = pod.segment_id().to_string(); let session_id_for_usage = pod.segment_id().to_string();
let scope_change_sink = pod.scope_change_sink();
let memory_config = pod.manifest().memory.clone(); let memory_config = pod.manifest().memory.clone();
let web_config = pod.manifest().web.clone(); let web_config = pod.manifest().web.clone();
let spawner_name = pod.manifest().pod.name.clone(); let spawner_name = pod.manifest().pod.name.clone();
@ -556,7 +557,6 @@ where
self_parent_socket, self_parent_socket,
spawner_model, spawner_model,
scope_handle, scope_handle,
scope_change_sink,
)); ));
worker.register_tool(send_to_pod_tool(spawned_registry.clone())); worker.register_tool(send_to_pod_tool(spawned_registry.clone()));
worker.register_tool(read_pod_output_tool(spawned_registry.clone())); worker.register_tool(read_pod_output_tool(spawned_registry.clone()));

View File

@ -15,11 +15,12 @@ use std::time::Duration;
use async_trait::async_trait; use async_trait::async_trait;
use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput}; use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput};
use pod_store::{PodActiveSegmentRef, PodMetadata, PodMetadataStore};
use protocol::stream::JsonLineReader; use protocol::stream::JsonLineReader;
use protocol::{Event, PodStatus}; use protocol::{Event, PodStatus};
use schemars::JsonSchema; use schemars::JsonSchema;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use session_store::{PodActiveSegmentRef, PodMetadata, PodMetadataStore, SegmentId, SessionId}; use session_store::{SegmentId, SessionId};
use tokio::net::UnixStream; use tokio::net::UnixStream;
use tokio::process::Command; use tokio::process::Command;
@ -496,8 +497,10 @@ pub enum PodDiscoveryError {
socket_path: PathBuf, socket_path: PathBuf,
pid: u32, pid: u32,
}, },
#[error("store error: {0}")] #[error("session store error: {0}")]
Store(#[from] session_store::StoreError), Store(#[from] session_store::StoreError),
#[error("pod store error: {0}")]
PodStore(#[from] pod_store::PodStoreError),
#[error("scope lock error: {0}")] #[error("scope lock error: {0}")]
ScopeLock(#[from] pod_registry::ScopeLockError), ScopeLock(#[from] pod_registry::ScopeLockError),
#[error("failed to launch restore process: {0}")] #[error("failed to launch restore process: {0}")]
@ -527,7 +530,7 @@ impl VisibilitySet {
} }
async fn summarize_spawned_children( async fn summarize_spawned_children(
children: &[session_store::PodSpawnedChild], children: &[pod_store::PodSpawnedChild],
) -> SpawnedChildrenSummary { ) -> SpawnedChildrenSummary {
let mut summary = SpawnedChildrenSummary { let mut summary = SpawnedChildrenSummary {
count: children.len(), count: children.len(),
@ -752,6 +755,7 @@ fn discovery_error_to_tool_error(error: PodDiscoveryError) -> ToolError {
| PodDiscoveryError::NotRestorable { .. } => ToolError::InvalidArgument(error.to_string()), | PodDiscoveryError::NotRestorable { .. } => ToolError::InvalidArgument(error.to_string()),
PodDiscoveryError::LockConflict { .. } PodDiscoveryError::LockConflict { .. }
| PodDiscoveryError::Store(_) | PodDiscoveryError::Store(_)
| PodDiscoveryError::PodStore(_)
| PodDiscoveryError::ScopeLock(_) | PodDiscoveryError::ScopeLock(_)
| PodDiscoveryError::RestoreSpawn(_) | PodDiscoveryError::RestoreSpawn(_)
| PodDiscoveryError::RestoreExited { .. } | PodDiscoveryError::RestoreExited { .. }
@ -765,11 +769,10 @@ mod tests {
use std::sync::Mutex; use std::sync::Mutex;
use manifest::{Permission, ScopeRule}; use manifest::{Permission, ScopeRule};
use pod_store::{FsPodStore, PodSpawnedChild, PodSpawnedScopeRule};
use protocol::stream::JsonLineWriter; use protocol::stream::JsonLineWriter;
use protocol::{Alert, AlertLevel, AlertSource, Greeting}; use protocol::{Alert, AlertLevel, AlertSource, Greeting};
use session_store::{ use session_store::{new_segment_id, new_session_id};
FsStore, PodSpawnedChild, PodSpawnedScopeRule, new_segment_id, new_session_id,
};
use tempfile::TempDir; use tempfile::TempDir;
use tokio::net::UnixListener; use tokio::net::UnixListener;
@ -788,7 +791,7 @@ mod tests {
std::env::set_var("INSOMNIA_RUNTIME_DIR", &runtime_base); std::env::set_var("INSOMNIA_RUNTIME_DIR", &runtime_base);
} }
let store = FsStore::new(&store_dir).unwrap(); let store = FsPodStore::new(&store_dir).unwrap();
let session_id = new_session_id(); let session_id = new_session_id();
let active_child_segment = new_segment_id(); let active_child_segment = new_segment_id();
let pending_session_id = new_session_id(); let pending_session_id = new_session_id();
@ -806,6 +809,7 @@ mod tests {
child("child-stale", &stale_socket), child("child-stale", &stale_socket),
child("child-pending", &pending_socket), child("child-pending", &pending_socket),
], ],
reclaimed_children: Vec::new(),
resolved_manifest_snapshot: None, resolved_manifest_snapshot: None,
}; };
store.write(&parent).unwrap(); store.write(&parent).unwrap();
@ -817,6 +821,7 @@ mod tests {
active_child_segment, active_child_segment,
)), )),
spawned_children: Vec::new(), spawned_children: Vec::new(),
reclaimed_children: Vec::new(),
resolved_manifest_snapshot: None, resolved_manifest_snapshot: None,
}) })
.unwrap(); .unwrap();
@ -828,6 +833,7 @@ mod tests {
active_child_segment, active_child_segment,
)), )),
spawned_children: Vec::new(), spawned_children: Vec::new(),
reclaimed_children: Vec::new(),
resolved_manifest_snapshot: None, resolved_manifest_snapshot: None,
}) })
.unwrap(); .unwrap();
@ -836,6 +842,7 @@ mod tests {
pod_name: "child-pending".into(), pod_name: "child-pending".into(),
active: Some(PodActiveSegmentRef::pending_segment(pending_session_id)), active: Some(PodActiveSegmentRef::pending_segment(pending_session_id)),
spawned_children: Vec::new(), spawned_children: Vec::new(),
reclaimed_children: Vec::new(),
resolved_manifest_snapshot: None, resolved_manifest_snapshot: None,
}) })
.unwrap(); .unwrap();
@ -847,6 +854,7 @@ mod tests {
new_segment_id(), new_segment_id(),
)), )),
spawned_children: Vec::new(), spawned_children: Vec::new(),
reclaimed_children: Vec::new(),
resolved_manifest_snapshot: None, resolved_manifest_snapshot: None,
}) })
.unwrap(); .unwrap();

View File

@ -2,11 +2,10 @@ use std::path::{Path, PathBuf};
use std::process::ExitCode; use std::process::ExitCode;
use clap::Parser; use clap::Parser;
use manifest::{ use manifest::{NixProfileResolver, PodManifest, PodManifestConfig, ProfileSelector, paths};
NixProfileResolver, PodManifest, PodManifestConfig, ProfileSelector, ScopeConfig, paths,
};
use pod::{Pod, PodController, PromptLoader}; use pod::{Pod, PodController, PromptLoader};
use session_store::{FsStore, PodMetadataStore, SegmentId, Store}; use pod_store::{CombinedStore, FsPodStore, PodMetadataStore};
use session_store::{FsStore, SegmentId, Store};
#[derive(Debug, Parser)] #[derive(Debug, Parser)]
#[command( #[command(
@ -45,10 +44,6 @@ struct Cli {
#[arg(long, value_name = "NAME", requires = "session", hide = true)] #[arg(long, value_name = "NAME", requires = "session", hide = true)]
session_pod_name: Option<String>, session_pod_name: Option<String>,
/// Internal typed scope snapshot for session restore launched by the TUI.
#[arg(long, value_name = "JSON", requires = "session", hide = true)]
resume_scope_json: Option<String>,
/// Internal resolved manifest config for delegated child Pod spawning. /// Internal resolved manifest config for delegated child Pod spawning.
#[arg( #[arg(
long, long,
@ -133,10 +128,6 @@ fn apply_session_restore_overrides(manifest: &mut PodManifest, cli: &Cli) -> Res
if let Some(pod_name) = cli.session_pod_name.as_deref() { if let Some(pod_name) = cli.session_pod_name.as_deref() {
manifest.pod.name = pod_name.to_string(); manifest.pod.name = pod_name.to_string();
} }
if let Some(scope_json) = cli.resume_scope_json.as_deref() {
manifest.scope = serde_json::from_str::<ScopeConfig>(scope_json)
.map_err(|e| format!("failed to parse --resume-scope-json: {e}"))?;
}
Ok(()) Ok(())
} }
@ -229,13 +220,28 @@ async fn main() -> ExitCode {
} }
}, },
}; };
let store = match FsStore::new(&store_dir) { let session_store = match FsStore::new(&store_dir) {
Ok(s) => s, Ok(s) => s,
Err(e) => { Err(e) => {
eprintln!("error: failed to initialize store at {store_dir:?}: {e}"); eprintln!("error: failed to initialize session store at {store_dir:?}: {e}");
return ExitCode::FAILURE; return ExitCode::FAILURE;
} }
}; };
let pod_store_dir = match paths::data_dir() {
Some(data_dir) => data_dir.join("pods"),
None => store_dir
.parent()
.map(|parent| parent.join("pods"))
.unwrap_or_else(|| PathBuf::from("pods")),
};
let pod_store = match FsPodStore::new(&pod_store_dir) {
Ok(s) => s,
Err(e) => {
eprintln!("error: failed to initialize pod store at {pod_store_dir:?}: {e}");
return ExitCode::FAILURE;
}
};
let store = CombinedStore::new(session_store, pod_store);
let pod = if cli.adopt { let pod = if cli.adopt {
let callback = match cli.callback.clone() { let callback = match cli.callback.clone() {

View File

@ -1,6 +1,7 @@
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::time::Duration;
use arc_swap::ArcSwap; use arc_swap::ArcSwap;
use llm_worker::Item; use llm_worker::Item;
@ -9,9 +10,12 @@ use llm_worker::llm_client::client::LlmClient;
use llm_worker::llm_client::types::Role; use llm_worker::llm_client::types::Role;
use llm_worker::state::Mutable; use llm_worker::state::Mutable;
use llm_worker::{ToolOutputLimits, UsageRecord, Worker, WorkerError, WorkerResult}; use llm_worker::{ToolOutputLimits, UsageRecord, Worker, WorkerError, WorkerResult};
use pod_store::{
PodActiveSegmentRef, PodMetadata, PodMetadataStore, PodReclaimedChild, PodSpawnedChild,
PodSpawnedScopeRule, PodStoreError,
};
use session_store::{ use session_store::{
LogEntry, PodActiveSegmentRef, PodMetadata, PodMetadataStore, PodScopeSnapshot, SegmentId, LogEntry, SegmentId, SessionId, Store, StoreError, SystemItem, segment_log, to_logged,
SessionId, Store, StoreError, SystemItem, segment_log, to_logged,
}; };
use tracing::{info, warn}; use tracing::{info, warn};
@ -43,9 +47,12 @@ use llm_worker::interceptor::PreRequestAction;
use protocol::{ use protocol::{
AlertLevel, AlertSource, Event, RewindSummary, RewindTarget, RewindTargetId, Segment, AlertLevel, AlertSource, Event, RewindSummary, RewindTarget, RewindTargetId, Segment,
}; };
use tokio::net::UnixStream;
use tokio::sync::broadcast; use tokio::sync::broadcast;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
const RESTORE_RECONCILIATION_REACHABILITY_TIMEOUT: Duration = Duration::from_millis(500);
/// `(SessionId, SegmentId)` pair the Pod is currently writing to. /// `(SessionId, SegmentId)` pair the Pod is currently writing to.
#[derive(Debug, Clone, Copy, PartialEq, Eq)] #[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct SegmentLocation { pub struct SegmentLocation {
@ -53,18 +60,21 @@ pub struct SegmentLocation {
pub segment_id: SegmentId, pub segment_id: SegmentId,
} }
type PodMetadataWriter = Arc<dyn Fn(PodMetadata) -> Result<(), StoreError> + Send + Sync>; type PodMetadataWriter = Arc<dyn Fn(PodMetadata) -> Result<(), PodStoreError> + Send + Sync>;
fn pod_metadata_writer_for_store<St>(store: &St) -> PodMetadataWriter fn pod_metadata_writer_for_store<St>(store: &St) -> PodMetadataWriter
where where
St: PodMetadataStore + Clone + Send + Sync + 'static, St: PodMetadataStore + Clone + Send + Sync + 'static,
{ {
let store = store.clone(); let store = store.clone();
Arc::new(move |mut metadata| { Arc::new(move |metadata| {
if let Some(existing) = store.read_by_name(&metadata.pod_name)? { store
metadata.spawned_children = existing.spawned_children; .set_active(
} &metadata.pod_name,
store.write(&metadata) metadata.active,
metadata.resolved_manifest_snapshot,
)
.map(|_| ())
}) })
} }
@ -341,10 +351,6 @@ pub struct Pod<C: LlmClient, St: Store> {
/// Workflow descriptions. This is intentionally independent from /// Workflow descriptions. This is intentionally independent from
/// summary and Knowledge residency: each section has its own gate. /// summary and Knowledge residency: each section has its own gate.
inject_resident_workflows: bool, inject_resident_workflows: bool,
/// Latest runtime scope snapshot queued by dynamic scope changes.
/// Drained into the session log before the next turn result is
/// persisted, so resume never silently reclaims delegated writes.
pending_scope_snapshot: Arc<Mutex<Option<PodScopeSnapshot>>>,
/// extract (memory.extract) reentry guard. `true` while an extract /// extract (memory.extract) reentry guard. `true` while an extract
/// worker is running; subsequent triggers are skipped per spec /// worker is running; subsequent triggers are skipped per spec
/// (`docs/plan/memory.md` §Extract 並走防止). `Arc<AtomicBool>` so /// (`docs/plan/memory.md` §Extract 並走防止). `Arc<AtomicBool>` so
@ -450,7 +456,6 @@ impl<C: LlmClient + Clone + 'static, St: Store + Clone + 'static> Pod<C, St> {
inject_resident_summary: self.inject_resident_summary, inject_resident_summary: self.inject_resident_summary,
inject_resident_knowledge: self.inject_resident_knowledge, inject_resident_knowledge: self.inject_resident_knowledge,
inject_resident_workflows: self.inject_resident_workflows, inject_resident_workflows: self.inject_resident_workflows,
pending_scope_snapshot: self.pending_scope_snapshot.clone(),
extract_in_flight: self.extract_in_flight.clone(), extract_in_flight: self.extract_in_flight.clone(),
consolidation_in_flight: self.consolidation_in_flight.clone(), consolidation_in_flight: self.consolidation_in_flight.clone(),
extract_pointer: self.extract_pointer.clone(), extract_pointer: self.extract_pointer.clone(),
@ -630,7 +635,6 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
inject_resident_summary: true, inject_resident_summary: true,
inject_resident_knowledge: true, inject_resident_knowledge: true,
inject_resident_workflows: true, inject_resident_workflows: true,
pending_scope_snapshot: Arc::new(Mutex::new(None)),
extract_in_flight: Arc::new(AtomicBool::new(false)), extract_in_flight: Arc::new(AtomicBool::new(false)),
consolidation_in_flight: Arc::new(AtomicBool::new(false)), consolidation_in_flight: Arc::new(AtomicBool::new(false)),
extract_pointer: Arc::new(Mutex::new(None)), extract_pointer: Arc::new(Mutex::new(None)),
@ -749,30 +753,6 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
.update(|cur| cur.with_added_deny_rules(revoke.clone())) .update(|cur| cur.with_added_deny_rules(revoke.clone()))
} }
/// Snapshot the current runtime scope in the session log. The entry
/// is intentionally appended as soon as a session log exists: if the
/// process later exits while children keep their allocations, resume
/// can restore the narrowed scope instead of reclaiming delegated
/// writes.
pub fn persist_scope_snapshot(&mut self) -> Result<(), StoreError> {
if self.segment_state.entries_written() == 0 {
return Ok(());
}
let snapshot = {
let scope = self.scope.snapshot();
PodScopeSnapshot {
allow: scope.allow_rules(),
deny: scope.deny_rules(),
}
};
let payload = serde_json::to_value(&snapshot).expect("PodScopeSnapshot is Serialize");
self.commit_entry(LogEntry::Extension {
ts: segment_log::now_millis(),
domain: session_store::POD_SCOPE_EXTENSION_DOMAIN.into(),
payload,
})
}
/// Append `entry` to the session log AND publish it through the /// Append `entry` to the session log AND publish it through the
/// broadcast sink. No user-space serialization is needed across /// broadcast sink. No user-space serialization is needed across
/// concurrent appenders — the kernel orders `O_APPEND` writes for /// concurrent appenders — the kernel orders `O_APPEND` writes for
@ -792,34 +772,6 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
self.sink.clone() self.sink.clone()
} }
/// Cloneable callback handed to dynamic-scope tools. It cannot append
/// directly to the async store from a sync tool callback, so it records
/// the latest snapshot and the controller flushes it after the tool
/// turn completes.
pub fn scope_change_sink(&self) -> Arc<dyn Fn(PodScopeSnapshot) + Send + Sync> {
let pending = self.pending_scope_snapshot.clone();
Arc::new(move |snapshot| {
*pending.lock().expect("pending_scope_snapshot poisoned") = Some(snapshot);
})
}
fn flush_pending_scope_snapshot(&mut self) -> Result<(), StoreError> {
let snapshot = self
.pending_scope_snapshot
.lock()
.expect("pending_scope_snapshot poisoned")
.take();
if let Some(snapshot) = snapshot {
let payload = serde_json::to_value(&snapshot).expect("PodScopeSnapshot is Serialize");
self.commit_entry(LogEntry::Extension {
ts: segment_log::now_millis(),
domain: session_store::POD_SCOPE_EXTENSION_DOMAIN.into(),
payload,
})?;
}
Ok(())
}
/// Direct access to the underlying Worker. /// Direct access to the underlying Worker.
pub fn worker(&self) -> &Worker<C, Mutable> { pub fn worker(&self) -> &Worker<C, Mutable> {
self.worker.as_ref().expect("worker taken during run") self.worker.as_ref().expect("worker taken during run")
@ -925,30 +877,32 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
metadata metadata
} }
fn write_pod_metadata_pending(&self) -> Result<(), StoreError> { fn write_pod_metadata_pending(&self) -> Result<(), PodError> {
let Some(writer) = &self.pod_metadata_writer else { let Some(writer) = &self.pod_metadata_writer else {
return Ok(()); return Ok(());
}; };
writer(self.pod_metadata(Some(PodActiveSegmentRef::pending_segment( writer(self.pod_metadata(Some(PodActiveSegmentRef::pending_segment(
self.session_id(), self.session_id(),
)))) ))))?;
Ok(())
} }
fn write_pod_metadata_active(&self, loc: SegmentLocation) -> Result<(), StoreError> { fn write_pod_metadata_active(&self, loc: SegmentLocation) -> Result<(), PodError> {
let Some(writer) = &self.pod_metadata_writer else { let Some(writer) = &self.pod_metadata_writer else {
return Ok(()); return Ok(());
}; };
writer(self.pod_metadata(Some(PodActiveSegmentRef::active_segment( writer(self.pod_metadata(Some(PodActiveSegmentRef::active_segment(
loc.session_id, loc.session_id,
loc.segment_id, loc.segment_id,
)))) ))))?;
Ok(())
} }
/// Enable name-keyed Pod metadata write-through for Pods built through /// Enable name-keyed Pod metadata write-through for Pods built through
/// the low-level constructor. High-level manifest constructors enable it /// the low-level constructor. High-level manifest constructors enable it
/// automatically; this hook lets tests and custom embedders opt into the /// automatically; this hook lets tests and custom embedders opt into the
/// same persistence behavior without changing `Pod::new`'s minimal bounds. /// same persistence behavior without changing `Pod::new`'s minimal bounds.
pub fn enable_pod_metadata_write_through(&mut self) -> Result<(), StoreError> pub fn enable_pod_metadata_write_through(&mut self) -> Result<(), PodError>
where where
St: PodMetadataStore + Clone + Send + Sync + 'static, St: PodMetadataStore + Clone + Send + Sync + 'static,
{ {
@ -2001,7 +1955,6 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
compacted_from: None, compacted_from: None,
}; };
self.commit_entry(initial)?; self.commit_entry(initial)?;
self.persist_scope_snapshot()?;
self.write_pod_metadata_active(loc)?; self.write_pod_metadata_active(loc)?;
return Ok(()); return Ok(());
} }
@ -2296,8 +2249,6 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
} }
} }
self.flush_pending_scope_snapshot()?;
let turn_count = self.worker.as_ref().unwrap().turn_count(); let turn_count = self.worker.as_ref().unwrap().turn_count();
self.commit_entry(LogEntry::TurnEnd { self.commit_entry(LogEntry::TurnEnd {
ts: segment_log::now_millis(), ts: segment_log::now_millis(),
@ -2769,7 +2720,6 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
.lock() .lock()
.expect("usage_history poisoned") .expect("usage_history poisoned")
.clear(); .clear();
self.persist_scope_snapshot()?;
// Reset extract pointer alongside usage_history: the compacted // Reset extract pointer alongside usage_history: the compacted
// session has a fresh log with no `LogEntry::Extension` entries // session has a fresh log with no `LogEntry::Extension` entries
// yet, so a cold restore here would set extract_pointer to None // yet, so a cold restore here would set extract_pointer to None
@ -3825,7 +3775,6 @@ where
inject_resident_summary: true, inject_resident_summary: true,
inject_resident_knowledge: true, inject_resident_knowledge: true,
inject_resident_workflows: true, inject_resident_workflows: true,
pending_scope_snapshot: Arc::new(Mutex::new(None)),
extract_in_flight: Arc::new(AtomicBool::new(false)), extract_in_flight: Arc::new(AtomicBool::new(false)),
consolidation_in_flight: Arc::new(AtomicBool::new(false)), consolidation_in_flight: Arc::new(AtomicBool::new(false)),
extract_pointer: Arc::new(Mutex::new(None)), extract_pointer: Arc::new(Mutex::new(None)),
@ -3905,7 +3854,6 @@ where
inject_resident_summary: true, inject_resident_summary: true,
inject_resident_knowledge: true, inject_resident_knowledge: true,
inject_resident_workflows: true, inject_resident_workflows: true,
pending_scope_snapshot: Arc::new(Mutex::new(None)),
extract_in_flight: Arc::new(AtomicBool::new(false)), extract_in_flight: Arc::new(AtomicBool::new(false)),
consolidation_in_flight: Arc::new(AtomicBool::new(false)), consolidation_in_flight: Arc::new(AtomicBool::new(false)),
extract_pointer: Arc::new(Mutex::new(None)), extract_pointer: Arc::new(Mutex::new(None)),
@ -3995,19 +3943,13 @@ where
return Err(PodError::SegmentEmpty { segment_id }); return Err(PodError::SegmentEmpty { segment_id });
} }
let mirror_entries: Vec<LogEntry> = raw_entries.clone(); let mirror_entries: Vec<LogEntry> = raw_entries.clone();
let scope_snapshot = state let scope_config = effective_restore_scope_config(&store, &manifest)?;
.pod_scope
.clone()
.ok_or(PodError::SegmentScopeMissing { segment_id })?;
let mut common = prepare_pod_common_with_scope( let mut common = prepare_pod_common_with_scope(
&manifest, &manifest,
&loader, &loader,
/* parse_template */ false, /* parse_template */ false,
ScopeConfig { scope_config,
allow: scope_snapshot.allow,
deny: scope_snapshot.deny,
},
)?; )?;
let skill_shadows = std::mem::take(&mut common.skill_shadows); let skill_shadows = std::mem::take(&mut common.skill_shadows);
@ -4093,7 +4035,6 @@ where
inject_resident_summary: true, inject_resident_summary: true,
inject_resident_knowledge: true, inject_resident_knowledge: true,
inject_resident_workflows: true, inject_resident_workflows: true,
pending_scope_snapshot: Arc::new(Mutex::new(None)),
extract_in_flight: Arc::new(AtomicBool::new(false)), extract_in_flight: Arc::new(AtomicBool::new(false)),
consolidation_in_flight: Arc::new(AtomicBool::new(false)), consolidation_in_flight: Arc::new(AtomicBool::new(false)),
extract_pointer: Arc::new(Mutex::new(extract_pointer)), extract_pointer: Arc::new(Mutex::new(extract_pointer)),
@ -4112,10 +4053,61 @@ where
session_id, session_id,
segment_id, segment_id,
})?; })?;
pod.reconcile_restored_delegations().await?;
drain_skill_shadows(&pod, skill_shadows); drain_skill_shadows(&pod, skill_shadows);
Ok(pod) Ok(pod)
} }
async fn reconcile_restored_delegations(&mut self) -> Result<(), PodError> {
let pod_name = self.manifest.pod.name.clone();
let Some(metadata) = self.store.read_by_name(&pod_name)? else {
return Ok(());
};
let mut reclaimed = Vec::new();
for child in metadata.spawned_children {
if restored_child_reachable(&child).await {
continue;
}
let delegated_scope = spawned_child_scope_rules(&child);
if !delegated_scope.is_empty() {
let lock_path =
pod_registry::default_registry_path().map_err(ScopeLockError::from)?;
let mut guard =
pod_registry::LockFileGuard::open(&lock_path).map_err(ScopeLockError::from)?;
pod_registry::reclaim_delegated_scope(
&mut guard,
&pod_name,
&child.pod_name,
&delegated_scope,
)?;
let write_rules = delegated_scope
.iter()
.filter(|rule| rule.permission == Permission::Write)
.cloned()
.collect::<Vec<_>>();
self.scope
.update(|current| current.with_removed_deny_rules(write_rules))
.map_err(PodError::Scope)?;
}
reclaimed.push(PodReclaimedChild {
pod_name: child.pod_name,
scope_delegated: child.scope_delegated,
});
}
if reclaimed.is_empty() {
return Ok(());
}
self.store.reclaim_spawned_children(&pod_name, reclaimed)?;
self.push_notify(
"Restored Pod state contained missing or unreachable delegated child Pods; their delegated write scopes were reclaimed before resume."
.to_string(),
);
Ok(())
}
/// Convenience: build a Pod from a single-layer TOML manifest string. /// Convenience: build a Pod from a single-layer TOML manifest string.
/// ///
/// Parses the TOML into a [`PodManifestConfig`], converts to a /// Parses the TOML into a [`PodManifestConfig`], converts to a
@ -4438,6 +4430,7 @@ fn token_budget_bytes(tokens: u64) -> usize {
pub enum RewindError { pub enum RewindError {
#[error(transparent)] #[error(transparent)]
Store(#[from] StoreError), Store(#[from] StoreError),
#[error("{0}")] #[error("{0}")]
Invalid(String), Invalid(String),
} }
@ -4546,6 +4539,9 @@ pub enum PodError {
#[error(transparent)] #[error(transparent)]
Store(#[from] StoreError), Store(#[from] StoreError),
#[error(transparent)]
PodStore(#[from] PodStoreError),
#[error(transparent)] #[error(transparent)]
Scope(ScopeError), Scope(ScopeError),
@ -4613,11 +4609,6 @@ pub enum PodError {
#[error("session {segment_id} has no entries to restore")] #[error("session {segment_id} has no entries to restore")]
SegmentEmpty { segment_id: SegmentId }, SegmentEmpty { segment_id: SegmentId },
#[error(
"session {segment_id} has no persisted scope snapshot; refusing resume without explicit scope"
)]
SegmentScopeMissing { segment_id: SegmentId },
#[error("pod metadata for {pod_name} was not found")] #[error("pod metadata for {pod_name} was not found")]
PodMetadataMissing { pod_name: String }, PodMetadataMissing { pod_name: String },
@ -4659,6 +4650,66 @@ struct PodCommon {
skill_shadows: Vec<workflow_crate::ShadowedSkill>, skill_shadows: Vec<workflow_crate::ShadowedSkill>,
} }
async fn restored_child_reachable(child: &PodSpawnedChild) -> bool {
tokio::time::timeout(
RESTORE_RECONCILIATION_REACHABILITY_TIMEOUT,
UnixStream::connect(&child.socket_path),
)
.await
.map(|result| result.is_ok())
.unwrap_or(false)
}
fn spawned_child_scope_rules(child: &PodSpawnedChild) -> Vec<ScopeRule> {
child
.scope_delegated
.iter()
.filter_map(|rule| delegated_scope_rule_to_scope_rule(rule.clone()))
.collect()
}
fn delegated_scope_rule_to_scope_rule(rule: PodSpawnedScopeRule) -> Option<ScopeRule> {
let permission = match rule.permission.as_str() {
"read" => Permission::Read,
"write" => Permission::Write,
other => {
warn!(permission = %other, "ignoring invalid delegated child scope permission");
return None;
}
};
Some(ScopeRule {
target: rule.target,
permission,
recursive: rule.recursive,
})
}
fn effective_restore_scope_config<St>(
store: &St,
manifest: &PodManifest,
) -> Result<ScopeConfig, PodStoreError>
where
St: PodMetadataStore,
{
let mut scope = manifest.scope.clone();
let Some(metadata) = store.read_by_name(&manifest.pod.name)? else {
return Ok(scope);
};
for child in metadata.spawned_children {
for rule in child.scope_delegated {
if let Some(deny) = delegated_write_rule_to_deny(rule) {
scope.deny.push(deny);
}
}
}
Ok(scope)
}
fn delegated_write_rule_to_deny(rule: PodSpawnedScopeRule) -> Option<ScopeRule> {
let rule = delegated_scope_rule_to_scope_rule(rule)?;
(rule.permission == Permission::Write).then_some(rule)
}
/// Resolve pwd / scope / LLM client / prompt catalog from a validated /// Resolve pwd / scope / LLM client / prompt catalog from a validated
/// manifest cascade. Used by `from_manifest`, `from_manifest_spawned`, /// manifest cascade. Used by `from_manifest`, `from_manifest_spawned`,
/// and `restore_from_manifest` so they share one definition of "what /// and `restore_from_manifest` so they share one definition of "what

View File

@ -20,9 +20,8 @@ use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use manifest::{Permission, ScopeRule, SharedScope}; use manifest::{Permission, ScopeRule, SharedScope};
use session_store::{ use pod_store::{
PodMetadata, PodMetadataStore, PodScopeSnapshot, PodSpawnedChild, PodSpawnedScopeRule, PodMetadataStore, PodReclaimedChild, PodSpawnedChild, PodSpawnedScopeRule, PodStoreError,
StoreError,
}; };
use tokio::net::UnixStream; use tokio::net::UnixStream;
use tokio::sync::Mutex; use tokio::sync::Mutex;
@ -32,7 +31,7 @@ use crate::runtime::dir::{RuntimeDir, SpawnedPodRecord};
use crate::runtime::pod_registry; use crate::runtime::pod_registry;
type RegistryStateWriter = Arc<dyn Fn(&[SpawnedPodRecord]) -> io::Result<()> + Send + Sync>; type RegistryStateWriter = Arc<dyn Fn(&[SpawnedPodRecord]) -> io::Result<()> + Send + Sync>;
type ScopeChangeSink = Arc<dyn Fn(PodScopeSnapshot) + Send + Sync>; type RegistryReclaimWriter = Arc<dyn Fn(&SpawnedPodRecord) -> io::Result<()> + Send + Sync>;
const RESTORE_REACHABILITY_TIMEOUT: Duration = Duration::from_millis(500); const RESTORE_REACHABILITY_TIMEOUT: Duration = Duration::from_millis(500);
@ -41,9 +40,9 @@ pub struct SpawnedPodRegistry {
cursors: Mutex<HashMap<String, usize>>, cursors: Mutex<HashMap<String, usize>>,
runtime_dir: Arc<RuntimeDir>, runtime_dir: Arc<RuntimeDir>,
state_writer: Option<RegistryStateWriter>, state_writer: Option<RegistryStateWriter>,
reclaim_writer: Option<RegistryReclaimWriter>,
parent_name: Option<String>, parent_name: Option<String>,
parent_scope: Option<SharedScope>, parent_scope: Option<SharedScope>,
scope_change_sink: Option<ScopeChangeSink>,
} }
pub struct SpawnedPodRegistryLoad { pub struct SpawnedPodRegistryLoad {
@ -58,9 +57,9 @@ impl SpawnedPodRegistry {
cursors: Mutex::new(HashMap::new()), cursors: Mutex::new(HashMap::new()),
runtime_dir, runtime_dir,
state_writer: None, state_writer: None,
reclaim_writer: None,
parent_name: None, parent_name: None,
parent_scope: None, parent_scope: None,
scope_change_sink: None,
}) })
} }
@ -77,8 +76,7 @@ impl SpawnedPodRegistry {
St: PodMetadataStore + Clone + Send + Sync + 'static, St: PodMetadataStore + Clone + Send + Sync + 'static,
{ {
let loaded = let loaded =
Self::load_from_pod_state_with_reclaim(runtime_dir, store, pod_name, None, None) Self::load_from_pod_state_with_reclaim(runtime_dir, store, pod_name, None).await?;
.await?;
Ok(loaded.registry) Ok(loaded.registry)
} }
@ -87,7 +85,6 @@ impl SpawnedPodRegistry {
store: St, store: St,
pod_name: String, pod_name: String,
parent_scope: Option<SharedScope>, parent_scope: Option<SharedScope>,
scope_change_sink: Option<ScopeChangeSink>,
) -> io::Result<SpawnedPodRegistryLoad> ) -> io::Result<SpawnedPodRegistryLoad>
where where
St: PodMetadataStore + Clone + Send + Sync + 'static, St: PodMetadataStore + Clone + Send + Sync + 'static,
@ -99,13 +96,11 @@ impl SpawnedPodRegistry {
.unwrap_or_default(); .unwrap_or_default();
let mut records = Vec::with_capacity(persisted_children.len()); let mut records = Vec::with_capacity(persisted_children.len());
let mut pruned = false;
let mut pruned_records = Vec::new(); let mut pruned_records = Vec::new();
for child in &persisted_children { for child in &persisted_children {
let record = match record_from_pod_state(child) { let record = match record_from_pod_state(child) {
Ok(record) => record, Ok(record) => record,
Err(err) => { Err(err) => {
pruned = true;
warn!( warn!(
error = %err, error = %err,
pod = %child.pod_name, pod = %child.pod_name,
@ -117,7 +112,6 @@ impl SpawnedPodRegistry {
if is_reachable(&record.socket_path).await { if is_reachable(&record.socket_path).await {
records.push(record); records.push(record);
} else { } else {
pruned = true;
warn!( warn!(
pod = %record.pod_name, pod = %record.pod_name,
socket = %record.socket_path.display(), socket = %record.socket_path.display(),
@ -128,20 +122,40 @@ impl SpawnedPodRegistry {
} }
runtime_dir.write_spawned_pods(&records).await?; runtime_dir.write_spawned_pods(&records).await?;
let state_writer = pod_state_writer(store, pod_name.clone()); let state_writer = pod_state_writer(store.clone(), pod_name.clone());
// Runtime spawned-pod records are a live registry for ListPods and let reclaim_writer = pod_state_reclaim_writer(store.clone(), pod_name.clone());
// cursor/scope cleanup; durable Pod state remains the discovery source if metadata.is_none() {
// for later attach/restore, so do not delete unreachable children from
// Pod state just because their sockets are gone.
if metadata.is_none() || !pruned {
state_writer(&records)?; state_writer(&records)?;
} }
let mut reclaimed_unreachable = false; let mut reclaimed_unreachable = false;
if !pruned_records.is_empty() {
let reclaimed = pruned_records
.iter()
.map(|record| PodReclaimedChild {
pod_name: record.pod_name.clone(),
scope_delegated: record
.scope_delegated
.iter()
.map(|rule| PodSpawnedScopeRule {
target: rule.target.clone(),
permission: match rule.permission {
Permission::Read => "read".to_string(),
Permission::Write => "write".to_string(),
},
recursive: rule.recursive,
})
.collect(),
})
.collect();
store
.reclaim_spawned_children(&pod_name, reclaimed)
.map_err(store_error_to_io)?;
reclaimed_unreachable = true;
}
if parent_scope.is_some() { if parent_scope.is_some() {
for record in &pruned_records { for record in &pruned_records {
reclaim_record(&pod_name, parent_scope.as_ref(), None, record)?; reclaim_record(&pod_name, parent_scope.as_ref(), record)?;
reclaimed_unreachable = true;
} }
} }
@ -151,9 +165,9 @@ impl SpawnedPodRegistry {
cursors: Mutex::new(HashMap::new()), cursors: Mutex::new(HashMap::new()),
runtime_dir, runtime_dir,
state_writer: Some(state_writer), state_writer: Some(state_writer),
reclaim_writer: Some(reclaim_writer),
parent_name: Some(pod_name), parent_name: Some(pod_name),
parent_scope, parent_scope,
scope_change_sink,
}), }),
reclaimed_unreachable, reclaimed_unreachable,
}) })
@ -196,6 +210,9 @@ impl SpawnedPodRegistry {
self.cursors.lock().await.remove(pod_name); self.cursors.lock().await.remove(pod_name);
if let Some(record) = &removed { if let Some(record) = &removed {
self.reclaim_record(record)?; self.reclaim_record(record)?;
if let Some(write_reclaim) = &self.reclaim_writer {
write_reclaim(record)?;
}
} }
Ok(removed) Ok(removed)
} }
@ -205,12 +222,7 @@ impl SpawnedPodRegistry {
release_child_allocation(&record.pod_name)?; release_child_allocation(&record.pod_name)?;
return Ok(()); return Ok(());
}; };
reclaim_record( reclaim_record(parent_name, self.parent_scope.as_ref(), record)
parent_name,
self.parent_scope.as_ref(),
self.scope_change_sink.as_ref(),
record,
)
} }
/// Read-only cursor lookup. Returns 0 when no cursor has been set. /// Read-only cursor lookup. Returns 0 when no cursor has been set.
@ -248,10 +260,36 @@ where
}) })
} }
fn pod_state_reclaim_writer<St>(store: St, pod_name: String) -> RegistryReclaimWriter
where
St: PodMetadataStore + Clone + Send + Sync + 'static,
{
Arc::new(move |record| {
let reclaimed = PodReclaimedChild {
pod_name: record.pod_name.clone(),
scope_delegated: record
.scope_delegated
.iter()
.map(|rule| PodSpawnedScopeRule {
target: rule.target.clone(),
permission: match rule.permission {
Permission::Read => "read".to_string(),
Permission::Write => "write".to_string(),
},
recursive: rule.recursive,
})
.collect(),
};
store
.reclaim_spawned_children(&pod_name, vec![reclaimed])
.map(|_| ())
.map_err(store_error_to_io)
})
}
fn reclaim_record( fn reclaim_record(
parent_name: &str, parent_name: &str,
parent_scope: Option<&SharedScope>, parent_scope: Option<&SharedScope>,
scope_change_sink: Option<&ScopeChangeSink>,
record: &SpawnedPodRecord, record: &SpawnedPodRecord,
) -> io::Result<()> { ) -> io::Result<()> {
let write_rules = record let write_rules = record
@ -277,13 +315,6 @@ fn reclaim_record(
scope scope
.update(|current| current.with_removed_deny_rules(write_rules)) .update(|current| current.with_removed_deny_rules(write_rules))
.map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))?; .map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))?;
if let Some(sink) = scope_change_sink {
let snapshot = scope.snapshot();
sink(PodScopeSnapshot {
allow: snapshot.allow_rules(),
deny: snapshot.deny_rules(),
});
}
} }
Ok(()) Ok(())
@ -304,18 +335,16 @@ fn write_records_to_pod_state<St>(
store: &St, store: &St,
pod_name: &str, pod_name: &str,
records: &[SpawnedPodRecord], records: &[SpawnedPodRecord],
) -> Result<(), StoreError> ) -> Result<(), PodStoreError>
where where
St: PodMetadataStore, St: PodMetadataStore,
{ {
let mut metadata = store let children = records
.read_by_name(pod_name)?
.unwrap_or_else(|| PodMetadata::new(pod_name, None));
metadata.spawned_children = records
.iter() .iter()
.map(record_to_pod_state) .map(record_to_pod_state)
.collect::<Result<Vec<_>, _>>()?; .collect::<Result<Vec<_>, _>>()?;
store.write(&metadata) store.set_spawned_children(pod_name, children)?;
Ok(())
} }
fn record_to_pod_state(record: &SpawnedPodRecord) -> Result<PodSpawnedChild, serde_json::Error> { fn record_to_pod_state(record: &SpawnedPodRecord) -> Result<PodSpawnedChild, serde_json::Error> {
@ -366,7 +395,7 @@ fn record_from_pod_state(child: &PodSpawnedChild) -> Result<SpawnedPodRecord, se
}) })
} }
fn store_error_to_io(error: StoreError) -> io::Error { fn store_error_to_io(error: PodStoreError) -> io::Error {
io::Error::other(error) io::Error::other(error)
} }

View File

@ -18,7 +18,6 @@ use manifest::{
SharedScope, WorkerManifestConfig, SharedScope, WorkerManifestConfig,
}; };
use serde::Deserialize; use serde::Deserialize;
use session_store::PodScopeSnapshot;
use tokio::net::UnixStream; use tokio::net::UnixStream;
use tokio::process::Command; use tokio::process::Command;
use tokio::time::sleep; use tokio::time::sleep;
@ -128,9 +127,6 @@ pub struct SpawnPodTool {
/// `effective_write` semantics: Write is the only permission /// `effective_write` semantics: Write is the only permission
/// tracked across Pods, so revocation only touches Write. /// tracked across Pods, so revocation only touches Write.
spawner_scope: SharedScope, spawner_scope: SharedScope,
/// Called after the spawner scope has been updated so the new
/// effective scope can be persisted to the session log.
scope_changed: Arc<dyn Fn(PodScopeSnapshot) + Send + Sync>,
} }
impl SpawnPodTool { impl SpawnPodTool {
@ -143,7 +139,6 @@ impl SpawnPodTool {
parent_socket: Option<PathBuf>, parent_socket: Option<PathBuf>,
spawner_model: ModelManifest, spawner_model: ModelManifest,
spawner_scope: SharedScope, spawner_scope: SharedScope,
scope_changed: Arc<dyn Fn(PodScopeSnapshot) + Send + Sync>,
) -> Self { ) -> Self {
Self { Self {
spawner_name, spawner_name,
@ -154,7 +149,6 @@ impl SpawnPodTool {
parent_socket, parent_socket,
spawner_model, spawner_model,
spawner_scope, spawner_scope,
scope_changed,
} }
} }
} }
@ -250,11 +244,6 @@ impl Tool for SpawnPodTool {
self.spawner_scope self.spawner_scope
.update(|cur| cur.with_added_deny_rules(revoke_write.clone())) .update(|cur| cur.with_added_deny_rules(revoke_write.clone()))
.map_err(|e| ToolError::ExecutionFailed(format!("revoke spawner scope: {e}")))?; .map_err(|e| ToolError::ExecutionFailed(format!("revoke spawner scope: {e}")))?;
let current = self.spawner_scope.snapshot();
(self.scope_changed)(PodScopeSnapshot {
allow: current.allow_rules(),
deny: current.deny_rules(),
});
} }
let record = SpawnedPodRecord { let record = SpawnedPodRecord {
@ -496,7 +485,6 @@ pub fn spawn_pod_tool(
parent_socket: Option<PathBuf>, parent_socket: Option<PathBuf>,
spawner_model: ModelManifest, spawner_model: ModelManifest,
spawner_scope: SharedScope, spawner_scope: SharedScope,
scope_changed: Arc<dyn Fn(PodScopeSnapshot) + Send + Sync>,
) -> ToolDefinition { ) -> ToolDefinition {
Arc::new(move || { Arc::new(move || {
let schema = schemars::schema_for!(SpawnPodInput); let schema = schemars::schema_for!(SpawnPodInput);
@ -513,7 +501,6 @@ pub fn spawn_pod_tool(
parent_socket.clone(), parent_socket.clone(),
spawner_model.clone(), spawner_model.clone(),
spawner_scope.clone(), spawner_scope.clone(),
scope_changed.clone(),
)); ));
(meta, tool) (meta, tool)
}) })

View File

@ -16,12 +16,15 @@ use llm_worker::Worker;
use llm_worker::llm_client::event::{Event as LlmEvent, ResponseStatus, StatusEvent}; use llm_worker::llm_client::event::{Event as LlmEvent, ResponseStatus, StatusEvent};
use llm_worker::llm_client::types::Item; use llm_worker::llm_client::types::Item;
use llm_worker::llm_client::{ClientError, LlmClient, Request}; use llm_worker::llm_client::{ClientError, LlmClient, Request};
use pod_store::{CombinedStore, FsPodStore, PodMetadataStore};
use protocol::{Event, Method, RunResult}; use protocol::{Event, Method, RunResult};
use session_store::{FsStore, LogEntry, PodMetadataStore, Store}; use session_store::{FsStore, LogEntry, Store};
use tokio::sync::broadcast; use tokio::sync::broadcast;
use pod::{Pod, PodController}; use pod::{Pod, PodController};
type TestStore = CombinedStore<FsStore, FsPodStore>;
#[derive(Clone)] #[derive(Clone)]
struct MockClient { struct MockClient {
responses: Arc<Vec<Vec<LlmEvent>>>, responses: Arc<Vec<Vec<LlmEvent>>>,
@ -145,11 +148,14 @@ permission = "write"
async fn make_pod_with_manifest( async fn make_pod_with_manifest(
manifest_toml: &str, manifest_toml: &str,
client: MockClient, client: MockClient,
) -> Pod<MockClient, FsStore> { ) -> Pod<MockClient, TestStore> {
let manifest = pod::PodManifest::from_toml(manifest_toml).unwrap(); let manifest = pod::PodManifest::from_toml(manifest_toml).unwrap();
let store_tmp = tempfile::tempdir().unwrap(); let store_tmp = tempfile::tempdir().unwrap();
let store = FsStore::new(store_tmp.path()).unwrap(); let store = CombinedStore::new(
FsStore::new(store_tmp.path()).unwrap(),
FsPodStore::new(store_tmp.path().join("pods")).unwrap(),
);
std::mem::forget(store_tmp); std::mem::forget(store_tmp);
let pwd_tmp = tempfile::tempdir().unwrap(); let pwd_tmp = tempfile::tempdir().unwrap();
@ -163,7 +169,7 @@ async fn make_pod_with_manifest(
pod pod
} }
async fn make_pod(client: MockClient) -> Pod<MockClient, FsStore> { async fn make_pod(client: MockClient) -> Pod<MockClient, TestStore> {
make_pod_with_manifest(POST_RUN_MANIFEST_TOML, client).await make_pod_with_manifest(POST_RUN_MANIFEST_TOML, client).await
} }

View File

@ -26,7 +26,10 @@ use llm_worker::llm_client::{ClientError, LlmClient, Request};
use memory::WorkspaceLayout; use memory::WorkspaceLayout;
use memory::extract::{ExtractedPayload, write_staging}; use memory::extract::{ExtractedPayload, write_staging};
use memory::schema::SourceRef; use memory::schema::SourceRef;
use pod_store::{CombinedStore, FsPodStore};
use session_store::FsStore; use session_store::FsStore;
type TestStore = CombinedStore<FsStore, FsPodStore>;
use tokio::sync::broadcast; use tokio::sync::broadcast;
use pod::{Event, Pod}; use pod::{Event, Pod};
@ -155,11 +158,14 @@ async fn make_pod_with(
manifest_toml: &str, manifest_toml: &str,
pwd: std::path::PathBuf, pwd: std::path::PathBuf,
client: MockClient, client: MockClient,
) -> Pod<MockClient, FsStore> { ) -> Pod<MockClient, TestStore> {
let manifest = pod::PodManifest::from_toml(manifest_toml).unwrap(); let manifest = pod::PodManifest::from_toml(manifest_toml).unwrap();
let store_tmp = tempfile::tempdir().unwrap(); let store_tmp = tempfile::tempdir().unwrap();
let store = FsStore::new(store_tmp.path()).unwrap(); let store = CombinedStore::new(
FsStore::new(store_tmp.path()).unwrap(),
FsPodStore::new(store_tmp.path().join("pods")).unwrap(),
);
std::mem::forget(store_tmp); std::mem::forget(store_tmp);
let scope = pod::Scope::writable(&pwd).unwrap(); let scope = pod::Scope::writable(&pwd).unwrap();
@ -184,7 +190,7 @@ fn write_n_staging(layout: &WorkspaceLayout, n: usize) -> Vec<uuid::Uuid> {
ids ids
} }
fn attach_event_receiver(pod: &mut Pod<MockClient, FsStore>) -> broadcast::Receiver<Event> { fn attach_event_receiver(pod: &mut Pod<MockClient, TestStore>) -> broadcast::Receiver<Event> {
let (tx, rx) = broadcast::channel(16); let (tx, rx) = broadcast::channel(16);
pod.attach_event_tx(tx); pod.attach_event_tx(tx);
rx rx

View File

@ -9,10 +9,13 @@ use llm_worker::llm_client::event::{Event as LlmEvent, ResponseStatus, StatusEve
use llm_worker::llm_client::types::Item; use llm_worker::llm_client::types::Item;
use llm_worker::llm_client::{ClientError, LlmClient, Request}; use llm_worker::llm_client::{ClientError, LlmClient, Request};
use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput}; use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput};
use pod_store::{CombinedStore, FsPodStore};
use session_store::{FsStore, LogEntry}; use session_store::{FsStore, LogEntry};
use pod::{Event, Method, Pod, PodController, PodHandle, PodManifest, PodStatus}; use pod::{Event, Method, Pod, PodController, PodHandle, PodManifest, PodStatus};
type TestStore = CombinedStore<FsStore, FsPodStore>;
/// Reconstruct a worker-history-like `Vec<Item>` from the live session /// Reconstruct a worker-history-like `Vec<Item>` from the live session
/// log mirror held by the Pod's broadcast sink. Replaces the previous /// log mirror held by the Pod's broadcast sink. Replaces the previous
/// `PodSharedState.history()` test helper now that the mirror lives in /// `PodSharedState.history()` test helper now that the mirror lives in
@ -152,21 +155,24 @@ target = "./"
permission = "write" permission = "write"
"#; "#;
async fn make_pod(client: MockClient) -> Pod<MockClient, FsStore> { async fn make_pod(client: MockClient) -> Pod<MockClient, TestStore> {
make_pod_with_pwd(client).await.0 make_pod_with_pwd(client).await.0
} }
async fn make_pod_with_pwd(client: MockClient) -> (Pod<MockClient, FsStore>, std::path::PathBuf) { async fn make_pod_with_pwd(client: MockClient) -> (Pod<MockClient, TestStore>, std::path::PathBuf) {
make_pod_with_pwd_and_manifest(client, MANIFEST_TOML).await make_pod_with_pwd_and_manifest(client, MANIFEST_TOML).await
} }
async fn make_pod_with_pwd_and_manifest( async fn make_pod_with_pwd_and_manifest(
client: MockClient, client: MockClient,
manifest_toml: &str, manifest_toml: &str,
) -> (Pod<MockClient, FsStore>, std::path::PathBuf) { ) -> (Pod<MockClient, TestStore>, std::path::PathBuf) {
let manifest = PodManifest::from_toml(manifest_toml).unwrap(); let manifest = PodManifest::from_toml(manifest_toml).unwrap();
let store_tmp = tempfile::tempdir().unwrap(); let store_tmp = tempfile::tempdir().unwrap();
let store = FsStore::new(store_tmp.path()).unwrap(); let store = CombinedStore::new(
FsStore::new(store_tmp.path()).unwrap(),
FsPodStore::new(store_tmp.path().join("pods")).unwrap(),
);
std::mem::forget(store_tmp); std::mem::forget(store_tmp);
// Separate tempdir to serve as the Pod's pwd/scope — these tests // Separate tempdir to serve as the Pod's pwd/scope — these tests
@ -184,7 +190,7 @@ async fn make_pod_with_pwd_and_manifest(
(pod, pwd) (pod, pwd)
} }
async fn spawn_controller(pod: Pod<MockClient, FsStore>) -> PodHandle { async fn spawn_controller(pod: Pod<MockClient, TestStore>) -> PodHandle {
let tmp = tempfile::tempdir().unwrap(); let tmp = tempfile::tempdir().unwrap();
let runtime_base = tmp.path().to_owned(); let runtime_base = tmp.path().to_owned();
std::mem::forget(tmp); std::mem::forget(tmp);

View File

@ -20,10 +20,11 @@ use pod::spawn::comm_tools::{
list_pods_tool, read_pod_output_tool, send_to_pod_tool, stop_pod_tool, list_pods_tool, read_pod_output_tool, send_to_pod_tool, stop_pod_tool,
}; };
use pod::spawn::registry::SpawnedPodRegistry; use pod::spawn::registry::SpawnedPodRegistry;
use pod_store::{CombinedStore, FsPodStore, PodMetadataStore};
use protocol::stream::{JsonLineReader, JsonLineWriter}; use protocol::stream::{JsonLineReader, JsonLineWriter};
use protocol::{ErrorCode, Event, Greeting, Method}; use protocol::{ErrorCode, Event, Greeting, Method};
use serde_json::json; use serde_json::json;
use session_store::{FsStore, PodMetadataStore}; use session_store::FsStore;
use tempfile::TempDir; use tempfile::TempDir;
use tokio::net::UnixListener; use tokio::net::UnixListener;
use tokio::sync::mpsc; use tokio::sync::mpsc;
@ -385,7 +386,10 @@ async fn stop_pod_sends_shutdown_and_releases_scope() {
let _env = EnvGuard::acquire(); let _env = EnvGuard::acquire();
let tmp = TempDir::new().unwrap(); let tmp = TempDir::new().unwrap();
let store_tmp = TempDir::new().unwrap(); let store_tmp = TempDir::new().unwrap();
let store = FsStore::new(store_tmp.path()).unwrap(); let store = CombinedStore::new(
FsStore::new(store_tmp.path()).unwrap(),
FsPodStore::new(store_tmp.path().join("pods")).unwrap(),
);
let rd = Arc::new(RuntimeDir::create(tmp.path(), "spawner").await.unwrap()); let rd = Arc::new(RuntimeDir::create(tmp.path(), "spawner").await.unwrap());
let parent_scope = SharedScope::new( let parent_scope = SharedScope::new(
Scope::writable(tmp.path()) Scope::writable(tmp.path())
@ -438,7 +442,6 @@ async fn stop_pod_sends_shutdown_and_releases_scope() {
store.clone(), store.clone(),
"spawner".into(), "spawner".into(),
Some(parent_scope.clone()), Some(parent_scope.clone()),
None,
) )
.await .await
.unwrap(); .unwrap();
@ -512,7 +515,10 @@ async fn restored_registry_uses_pod_state_without_runtime_file() {
let _env = EnvGuard::acquire(); let _env = EnvGuard::acquire();
let runtime_tmp = TempDir::new().unwrap(); let runtime_tmp = TempDir::new().unwrap();
let store_tmp = TempDir::new().unwrap(); let store_tmp = TempDir::new().unwrap();
let store = FsStore::new(store_tmp.path()).unwrap(); let store = CombinedStore::new(
FsStore::new(store_tmp.path()).unwrap(),
FsPodStore::new(store_tmp.path().join("pods")).unwrap(),
);
unsafe { unsafe {
std::env::set_var("INSOMNIA_RUNTIME_DIR", runtime_tmp.path()); std::env::set_var("INSOMNIA_RUNTIME_DIR", runtime_tmp.path());
} }
@ -573,16 +579,21 @@ async fn restored_registry_uses_pod_state_without_runtime_file() {
.unwrap() .unwrap()
.expect("spawner metadata should remain"); .expect("spawner metadata should remain");
assert!(metadata.spawned_children.is_empty()); assert!(metadata.spawned_children.is_empty());
assert_eq!(metadata.reclaimed_children.len(), 1);
assert_eq!(metadata.reclaimed_children[0].pod_name, "child");
let runtime_contents = std::fs::read_to_string(rd.path().join("spawned_pods.json")).unwrap(); let runtime_contents = std::fs::read_to_string(rd.path().join("spawned_pods.json")).unwrap();
let runtime_records: Vec<SpawnedPodRecord> = serde_json::from_str(&runtime_contents).unwrap(); let runtime_records: Vec<SpawnedPodRecord> = serde_json::from_str(&runtime_contents).unwrap();
assert!(runtime_records.is_empty()); assert!(runtime_records.is_empty());
} }
#[tokio::test] #[tokio::test]
async fn load_from_pod_state_prunes_runtime_children_but_preserves_durable_state() { async fn load_from_pod_state_prunes_runtime_children_and_reclaims_durable_delegation() {
let runtime_tmp = TempDir::new().unwrap(); let runtime_tmp = TempDir::new().unwrap();
let store_tmp = TempDir::new().unwrap(); let store_tmp = TempDir::new().unwrap();
let store = FsStore::new(store_tmp.path()).unwrap(); let store = CombinedStore::new(
FsStore::new(store_tmp.path()).unwrap(),
FsPodStore::new(store_tmp.path().join("pods")).unwrap(),
);
let rd = Arc::new( let rd = Arc::new(
RuntimeDir::create(runtime_tmp.path(), "spawner") RuntimeDir::create(runtime_tmp.path(), "spawner")
.await .await
@ -615,27 +626,21 @@ async fn load_from_pod_state_prunes_runtime_children_but_preserves_durable_state
.read_by_name("spawner") .read_by_name("spawner")
.unwrap() .unwrap()
.expect("spawner metadata should be written"); .expect("spawner metadata should be written");
assert_eq!(metadata.spawned_children.len(), 2); assert_eq!(metadata.spawned_children.len(), 1);
assert!( assert_eq!(metadata.spawned_children[0].pod_name, "alive");
metadata assert_eq!(metadata.reclaimed_children.len(), 1);
.spawned_children assert_eq!(metadata.reclaimed_children[0].pod_name, "missing");
.iter()
.any(|c| c.pod_name == "alive")
);
assert!(
metadata
.spawned_children
.iter()
.any(|c| c.pod_name == "missing")
);
} }
#[tokio::test] #[tokio::test]
async fn load_from_pod_state_reclaims_pruned_child_scope_without_deleting_pod_state() { async fn load_from_pod_state_reclaims_missing_child_scope_and_records_history() {
let _env = EnvGuard::acquire(); let _env = EnvGuard::acquire();
let runtime_tmp = TempDir::new().unwrap(); let runtime_tmp = TempDir::new().unwrap();
let store_tmp = TempDir::new().unwrap(); let store_tmp = TempDir::new().unwrap();
let store = FsStore::new(store_tmp.path()).unwrap(); let store = CombinedStore::new(
FsStore::new(store_tmp.path()).unwrap(),
FsPodStore::new(store_tmp.path().join("pods")).unwrap(),
);
unsafe { unsafe {
std::env::set_var("INSOMNIA_RUNTIME_DIR", runtime_tmp.path()); std::env::set_var("INSOMNIA_RUNTIME_DIR", runtime_tmp.path());
} }
@ -662,15 +667,6 @@ async fn load_from_pod_state_reclaims_pruned_child_scope_without_deleting_pod_st
session_store::new_segment_id(), session_store::new_segment_id(),
) )
.unwrap(); .unwrap();
pod_registry::register_pod(
&mut g,
"missing".into(),
std::process::id(),
"/tmp/missing.sock".into(),
vec![missing_rule.clone()],
session_store::new_segment_id(),
)
.unwrap();
} }
let parent_scope = SharedScope::new( let parent_scope = SharedScope::new(
@ -696,7 +692,6 @@ async fn load_from_pod_state_reclaims_pruned_child_scope_without_deleting_pod_st
store.clone(), store.clone(),
"spawner".into(), "spawner".into(),
Some(parent_scope.clone()), Some(parent_scope.clone()),
None,
) )
.await .await
.unwrap(); .unwrap();
@ -716,8 +711,9 @@ async fn load_from_pod_state_reclaims_pruned_child_scope_without_deleting_pod_st
.read_by_name("spawner") .read_by_name("spawner")
.unwrap() .unwrap()
.expect("spawner metadata should remain"); .expect("spawner metadata should remain");
assert_eq!(metadata.spawned_children.len(), 1); assert!(metadata.spawned_children.is_empty());
assert_eq!(metadata.spawned_children[0].pod_name, "missing"); assert_eq!(metadata.reclaimed_children.len(), 1);
assert_eq!(metadata.reclaimed_children[0].pod_name, "missing");
let runtime_contents = std::fs::read_to_string(rd.path().join("spawned_pods.json")).unwrap(); let runtime_contents = std::fs::read_to_string(rd.path().join("spawned_pods.json")).unwrap();
let runtime_records: Vec<SpawnedPodRecord> = serde_json::from_str(&runtime_contents).unwrap(); let runtime_records: Vec<SpawnedPodRecord> = serde_json::from_str(&runtime_contents).unwrap();
assert!(runtime_records.is_empty()); assert!(runtime_records.is_empty());

View File

@ -8,7 +8,8 @@
use std::sync::{LazyLock, Mutex}; use std::sync::{LazyLock, Mutex};
use pod::{Pod, PodError}; use pod::{Pod, PodError};
use session_store::{FsStore, PodActiveSegmentRef, PodMetadata, PodMetadataStore, StoreError}; use pod_store::{CombinedStore, FsPodStore, PodActiveSegmentRef, PodMetadata, PodMetadataStore};
use session_store::{FsStore, StoreError};
const MINIMAL_MANIFEST_TOML: &str = r#" const MINIMAL_MANIFEST_TOML: &str = r#"
[pod] [pod]
@ -36,7 +37,10 @@ async fn restore_from_pod_metadata_rejects_missing_metadata() {
let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner()); let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
let store_tmp = tempfile::tempdir().unwrap(); let store_tmp = tempfile::tempdir().unwrap();
let store = FsStore::new(store_tmp.path()).unwrap(); let store = CombinedStore::new(
FsStore::new(store_tmp.path()).unwrap(),
FsPodStore::new(store_tmp.path().join("pods")).unwrap(),
);
let manifest = pod::PodManifest::from_toml(MINIMAL_MANIFEST_TOML).unwrap(); let manifest = pod::PodManifest::from_toml(MINIMAL_MANIFEST_TOML).unwrap();
let result = Pod::restore_from_pod_metadata( let result = Pod::restore_from_pod_metadata(
@ -59,7 +63,10 @@ async fn restore_from_pod_metadata_rejects_pending_segment() {
let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner()); let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
let store_tmp = tempfile::tempdir().unwrap(); let store_tmp = tempfile::tempdir().unwrap();
let store = FsStore::new(store_tmp.path()).unwrap(); let store = CombinedStore::new(
FsStore::new(store_tmp.path()).unwrap(),
FsPodStore::new(store_tmp.path().join("pods")).unwrap(),
);
let manifest = pod::PodManifest::from_toml(MINIMAL_MANIFEST_TOML).unwrap(); let manifest = pod::PodManifest::from_toml(MINIMAL_MANIFEST_TOML).unwrap();
let session_id = session_store::new_session_id(); let session_id = session_store::new_session_id();
store store
@ -95,7 +102,10 @@ async fn restore_from_pod_metadata_resolves_active_pointer_through_session_log()
let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner()); let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
let store_tmp = tempfile::tempdir().unwrap(); let store_tmp = tempfile::tempdir().unwrap();
let store = FsStore::new(store_tmp.path()).unwrap(); let store = CombinedStore::new(
FsStore::new(store_tmp.path()).unwrap(),
FsPodStore::new(store_tmp.path().join("pods")).unwrap(),
);
let manifest = pod::PodManifest::from_toml(MINIMAL_MANIFEST_TOML).unwrap(); let manifest = pod::PodManifest::from_toml(MINIMAL_MANIFEST_TOML).unwrap();
let session_id = session_store::new_session_id(); let session_id = session_store::new_session_id();
let segment_id = session_store::new_segment_id(); let segment_id = session_store::new_segment_id();
@ -126,7 +136,10 @@ async fn restore_from_manifest_rejects_unknown_segment() {
let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner()); let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
let store_tmp = tempfile::tempdir().unwrap(); let store_tmp = tempfile::tempdir().unwrap();
let store = FsStore::new(store_tmp.path()).unwrap(); let store = CombinedStore::new(
FsStore::new(store_tmp.path()).unwrap(),
FsPodStore::new(store_tmp.path().join("pods")).unwrap(),
);
let manifest = pod::PodManifest::from_toml(MINIMAL_MANIFEST_TOML).unwrap(); let manifest = pod::PodManifest::from_toml(MINIMAL_MANIFEST_TOML).unwrap();
// A freshly-minted id with no jsonl file at all → store returns // A freshly-minted id with no jsonl file at all → store returns
@ -155,7 +168,10 @@ async fn restore_from_manifest_rejects_empty_segment_log() {
let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner()); let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
let store_tmp = tempfile::tempdir().unwrap(); let store_tmp = tempfile::tempdir().unwrap();
let store = FsStore::new(store_tmp.path()).unwrap(); let store = CombinedStore::new(
FsStore::new(store_tmp.path()).unwrap(),
FsPodStore::new(store_tmp.path().join("pods")).unwrap(),
);
let manifest = pod::PodManifest::from_toml(MINIMAL_MANIFEST_TOML).unwrap(); let manifest = pod::PodManifest::from_toml(MINIMAL_MANIFEST_TOML).unwrap();
// Pre-create an empty `<sid>/<segid>.jsonl` so `read_all` succeeds // Pre-create an empty `<sid>/<segid>.jsonl` so `read_all` succeeds
@ -183,36 +199,3 @@ async fn restore_from_manifest_rejects_empty_segment_log() {
Ok(_) => panic!("expected empty segment log to fail"), Ok(_) => panic!("expected empty segment log to fail"),
} }
} }
#[tokio::test]
async fn restore_from_manifest_rejects_segment_without_scope_snapshot() {
let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
let store_tmp = tempfile::tempdir().unwrap();
let store = FsStore::new(store_tmp.path()).unwrap();
let manifest = pod::PodManifest::from_toml(MINIMAL_MANIFEST_TOML).unwrap();
let sid = session_store::new_session_id();
let segid = session_store::new_segment_id();
let state = session_store::SegmentStartState {
system_prompt: None,
config: &Default::default(),
history: &[],
};
session_store::create_segment_with_ids(&store, sid, segid, state).unwrap();
let result = Pod::restore_from_manifest(
sid,
segid,
manifest,
store,
pod::PromptLoader::builtins_only(),
)
.await;
match result {
Err(PodError::SegmentScopeMissing { segment_id }) => assert_eq!(segment_id, segid),
Err(other) => panic!("expected SegmentScopeMissing, got {other:?}"),
Ok(_) => panic!("expected missing scope snapshot to fail"),
}
}

View File

@ -25,11 +25,14 @@ use llm_worker::Worker;
use llm_worker::llm_client::event::{Event as LlmEvent, ResponseStatus, StatusEvent, UsageEvent}; use llm_worker::llm_client::event::{Event as LlmEvent, ResponseStatus, StatusEvent, UsageEvent};
use llm_worker::llm_client::{ClientError, LlmClient, Request}; use llm_worker::llm_client::{ClientError, LlmClient, Request};
use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput}; use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput};
use pod_store::{CombinedStore, FsPodStore};
use session_metrics::{DOMAIN, Metric, metrics_from_extensions}; use session_metrics::{DOMAIN, Metric, metrics_from_extensions};
use session_store::{FsStore, LogEntry, SegmentId, SessionId, Store, StoreError, TraceEntry}; use session_store::{FsStore, LogEntry, SegmentId, SessionId, Store, StoreError, TraceEntry};
use pod::{Pod, PodManifest}; use pod::{Pod, PodManifest};
type TestStore = CombinedStore<FsStore, FsPodStore>;
#[derive(Clone)] #[derive(Clone)]
struct MockClient { struct MockClient {
responses: Arc<Vec<Vec<LlmEvent>>>, responses: Arc<Vec<Vec<LlmEvent>>>,
@ -166,13 +169,16 @@ async fn make_pod(
client: MockClient, client: MockClient,
tool_name: &'static str, tool_name: &'static str,
) -> ( ) -> (
Pod<MockClient, FsStore>, Pod<MockClient, TestStore>,
tempfile::TempDir, tempfile::TempDir,
tempfile::TempDir, tempfile::TempDir,
) { ) {
let manifest = PodManifest::from_toml(&manifest_toml).unwrap(); let manifest = PodManifest::from_toml(&manifest_toml).unwrap();
let store_tmp = tempfile::tempdir().unwrap(); let store_tmp = tempfile::tempdir().unwrap();
let store = FsStore::new(store_tmp.path()).unwrap(); let store = CombinedStore::new(
FsStore::new(store_tmp.path()).unwrap(),
FsPodStore::new(store_tmp.path().join("pods")).unwrap(),
);
let pwd_tmp = tempfile::tempdir().unwrap(); let pwd_tmp = tempfile::tempdir().unwrap();
let pwd = pwd_tmp.path().to_path_buf(); let pwd = pwd_tmp.path().to_path_buf();
let scope = pod::Scope::writable(&pwd).unwrap(); let scope = pod::Scope::writable(&pwd).unwrap();
@ -500,7 +506,10 @@ permission = "write"
let client = MockClient::new(vec![text_response_with_cache("hi", 0, 0)]); let client = MockClient::new(vec![text_response_with_cache("hi", 0, 0)]);
let manifest = PodManifest::from_toml(manifest_toml).unwrap(); let manifest = PodManifest::from_toml(manifest_toml).unwrap();
let store_tmp = tempfile::tempdir().unwrap(); let store_tmp = tempfile::tempdir().unwrap();
let store = FsStore::new(store_tmp.path()).unwrap(); let store = CombinedStore::new(
FsStore::new(store_tmp.path()).unwrap(),
FsPodStore::new(store_tmp.path().join("pods")).unwrap(),
);
let pwd_tmp = tempfile::tempdir().unwrap(); let pwd_tmp = tempfile::tempdir().unwrap();
let pwd = pwd_tmp.path().to_path_buf(); let pwd = pwd_tmp.path().to_path_buf();
let scope = pod::Scope::writable(&pwd).unwrap(); let scope = pod::Scope::writable(&pwd).unwrap();

View File

@ -193,7 +193,6 @@ async fn spawn_pod_delegates_scope_and_sends_run() {
None, None,
dummy_model(), dummy_model(),
spawner_scope.clone(), spawner_scope.clone(),
std::sync::Arc::new(|_| {}),
); );
let (_meta, tool) = def(); let (_meta, tool) = def();
@ -282,7 +281,6 @@ async fn spawn_pod_rejects_scope_outside_spawner() {
None, None,
dummy_model(), dummy_model(),
spawner_scope.clone(), spawner_scope.clone(),
std::sync::Arc::new(|_| {}),
); );
let (_meta, tool) = def(); let (_meta, tool) = def();
@ -354,7 +352,6 @@ async fn spawn_pod_rolls_back_reservation_when_socket_never_appears() {
None, None,
dummy_model(), dummy_model(),
spawner_scope.clone(), spawner_scope.clone(),
std::sync::Arc::new(|_| {}),
); );
let (_meta, tool) = def(); let (_meta, tool) = def();

View File

@ -8,10 +8,13 @@ use futures::Stream;
use llm_worker::Worker; use llm_worker::Worker;
use llm_worker::llm_client::event::{Event as LlmEvent, ResponseStatus, StatusEvent}; use llm_worker::llm_client::event::{Event as LlmEvent, ResponseStatus, StatusEvent};
use llm_worker::llm_client::{ClientError, LlmClient, Request}; use llm_worker::llm_client::{ClientError, LlmClient, Request};
use pod_store::{CombinedStore, FsPodStore};
use session_store::{FsStore, LogEntry, Store}; use session_store::{FsStore, LogEntry, Store};
use pod::{Pod, PodError, PromptLoader, SystemPromptTemplate}; use pod::{Pod, PodError, PromptLoader, SystemPromptTemplate};
type TestStore = CombinedStore<FsStore, FsPodStore>;
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Mock LLM Client // Mock LLM Client
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@ -99,11 +102,14 @@ permission = "write"
async fn make_pod_with_body( async fn make_pod_with_body(
body: &str, body: &str,
client: MockClient, client: MockClient,
) -> Result<(Pod<MockClient, FsStore>, PathBuf), PodError> { ) -> Result<(Pod<MockClient, TestStore>, PathBuf), PodError> {
let manifest = pod::PodManifest::from_toml(MINIMAL_MANIFEST_TOML).unwrap(); let manifest = pod::PodManifest::from_toml(MINIMAL_MANIFEST_TOML).unwrap();
let store_tmp = tempfile::tempdir().unwrap(); let store_tmp = tempfile::tempdir().unwrap();
let store = FsStore::new(store_tmp.path()).unwrap(); let store = CombinedStore::new(
FsStore::new(store_tmp.path()).unwrap(),
FsPodStore::new(store_tmp.path().join("pods")).unwrap(),
);
std::mem::forget(store_tmp); std::mem::forget(store_tmp);
let pwd_tmp = tempfile::tempdir().unwrap(); let pwd_tmp = tempfile::tempdir().unwrap();

View File

@ -3,7 +3,6 @@
//! Layout: //! Layout:
//! - Segment log: `{root}/{session_id}/{segment_id}.jsonl` //! - Segment log: `{root}/{session_id}/{segment_id}.jsonl`
//! - Event trace: `{root}/{session_id}/{segment_id}.trace.jsonl` //! - Event trace: `{root}/{session_id}/{segment_id}.trace.jsonl`
//! - Pod metadata: `{root}/pods/{pod_name}/metadata.json`
//! //!
//! The per-Session directory makes `list_segments(session_id)` an O(dir) //! The per-Session directory makes `list_segments(session_id)` an O(dir)
//! scan and gives the fork tree a visible grouping in the filesystem. //! scan and gives the fork tree a visible grouping in the filesystem.
@ -17,7 +16,6 @@
//! enumerable by the picker. //! enumerable by the picker.
use crate::event_trace::TraceEntry; use crate::event_trace::TraceEntry;
use crate::pod_metadata::{PodMetadata, PodMetadataStore, validate_pod_name};
use crate::segment_log::LogEntry; use crate::segment_log::LogEntry;
use crate::store::{Store, StoreError}; use crate::store::{Store, StoreError};
use crate::{SegmentId, SessionId}; use crate::{SegmentId, SessionId};
@ -57,19 +55,6 @@ impl FsStore {
.join(format!("{segment_id}.trace.jsonl")) .join(format!("{segment_id}.trace.jsonl"))
} }
fn pods_dir(&self) -> PathBuf {
self.root.join("pods")
}
fn pod_dir(&self, pod_name: &str) -> Result<PathBuf, StoreError> {
validate_pod_name(pod_name)?;
Ok(self.pods_dir().join(pod_name))
}
fn pod_metadata_path(&self, pod_name: &str) -> Result<PathBuf, StoreError> {
Ok(self.pod_dir(pod_name)?.join("metadata.json"))
}
fn append_line(&self, path: &Path, line: &str) -> Result<(), StoreError> { fn append_line(&self, path: &Path, line: &str) -> Result<(), StoreError> {
if let Some(parent) = path.parent() { if let Some(parent) = path.parent() {
fs::create_dir_all(parent)?; fs::create_dir_all(parent)?;
@ -102,70 +87,6 @@ impl FsStore {
} }
} }
impl PodMetadataStore for FsStore {
fn write(&self, metadata: &PodMetadata) -> Result<(), StoreError> {
let path = self.pod_metadata_path(&metadata.pod_name)?;
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)?;
}
let content = serde_json::to_vec_pretty(metadata)?;
fs::write(path, content)?;
Ok(())
}
fn read_by_name(&self, pod_name: &str) -> Result<Option<PodMetadata>, StoreError> {
let path = self.pod_metadata_path(pod_name)?;
let content = match fs::read_to_string(path) {
Ok(content) => content,
Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(None),
Err(err) => return Err(StoreError::Io(err)),
};
Ok(Some(serde_json::from_str(&content)?))
}
fn list_names(&self) -> Result<Vec<String>, StoreError> {
let dir = self.pods_dir();
let mut names = Vec::new();
if !dir.exists() {
return Ok(names);
}
for entry in fs::read_dir(dir)? {
let entry = entry?;
if !entry.file_type()?.is_dir() {
continue;
}
if !entry.path().join("metadata.json").exists() {
continue;
}
let Some(name) = entry.file_name().to_str().map(ToOwned::to_owned) else {
continue;
};
if validate_pod_name(&name).is_ok() {
names.push(name);
}
}
names.sort();
Ok(names)
}
fn root_dir(&self) -> Option<PathBuf> {
Some(self.root.clone())
}
fn delete_by_name(&self, pod_name: &str) -> Result<(), StoreError> {
let path = self.pod_metadata_path(pod_name)?;
match fs::remove_file(&path) {
Ok(()) => {}
Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(()),
Err(err) => return Err(StoreError::Io(err)),
}
if let Some(parent) = path.parent() {
let _ = fs::remove_dir(parent);
}
Ok(())
}
}
impl Store for FsStore { impl Store for FsStore {
fn append( fn append(
&self, &self,

View File

@ -33,7 +33,6 @@
pub mod event_trace; pub mod event_trace;
pub mod fs_store; pub mod fs_store;
pub mod logged_item; pub mod logged_item;
pub mod pod_metadata;
pub mod segment; pub mod segment;
pub mod segment_log; pub mod segment_log;
pub mod store; pub mod store;
@ -44,20 +43,13 @@ pub use fs_store::FsStore;
pub use llm_worker::UsageRecord; pub use llm_worker::UsageRecord;
pub use llm_worker::llm_client::types::{ContentPart, Item, Role}; pub use llm_worker::llm_client::types::{ContentPart, Item, Role};
pub use logged_item::{LoggedContentPart, LoggedItem, LoggedRole, from_logged, to_logged}; pub use logged_item::{LoggedContentPart, LoggedItem, LoggedRole, from_logged, to_logged};
pub use pod_metadata::{
PodActiveSegmentRef, PodMetadata, PodMetadataStore, PodSpawnedChild, PodSpawnedScopeRule,
};
pub use segment::{ pub use segment::{
SegmentStartState, append_entry, append_system_item, classify_history_item, SegmentStartState, append_entry, append_system_item, classify_history_item,
create_compacted_segment, create_segment, create_segment_with_ids, ensure_head_or_fork, fork, create_compacted_segment, create_segment, create_segment_with_ids, ensure_head_or_fork, fork,
fork_at, restore, restore_by_segment, save_config_changed, save_delta, save_extension, fork_at, restore, restore_by_segment, save_config_changed, save_delta, save_extension,
save_pod_scope, save_run_completed, save_run_errored, save_turn_end, save_usage, save_run_completed, save_run_errored, save_turn_end, save_usage, save_user_input,
save_user_input,
};
pub use segment_log::{
LogEntry, POD_SCOPE_EXTENSION_DOMAIN, PodScopeSnapshot, RestoredState, SegmentOrigin,
collect_state,
}; };
pub use segment_log::{LogEntry, RestoredState, SegmentOrigin, collect_state};
pub use store::{Store, StoreError}; pub use store::{Store, StoreError};
pub use system_item::{SystemItem, SystemReminder, SystemReminderSource, render_pod_event}; pub use system_item::{SystemItem, SystemReminder, SystemReminderSource, render_pod_event};

View File

@ -1,150 +0,0 @@
//! Pod metadata persistence API.
//!
//! Pod metadata is a lightweight name-keyed pointer to the Session/Segment
//! currently active for a Pod. Conversation content remains in the segment log;
//! this metadata only records references needed by Pod-name resume/attach flows.
use crate::store::StoreError;
use crate::{SegmentId, SessionId};
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
/// Active Session/Segment pointer for a Pod.
///
/// `segment_id` is optional so callers can persist a reserved Session before
/// the first Segment ID is known. Once a segment exists, callers should rewrite
/// the metadata with `Some(segment_id)`.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct PodActiveSegmentRef {
pub session_id: SessionId,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub segment_id: Option<SegmentId>,
}
impl PodActiveSegmentRef {
/// Create a reference whose active Segment is not known yet.
pub fn pending_segment(session_id: SessionId) -> Self {
Self {
session_id,
segment_id: None,
}
}
/// Create a fully resolved active Session/Segment reference.
pub fn active_segment(session_id: SessionId, segment_id: SegmentId) -> Self {
Self {
session_id,
segment_id: Some(segment_id),
}
}
}
/// One delegated scope rule for a spawned child, kept local to
/// `session-store` so the persistence crate does not depend on manifest
/// scope types.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct PodSpawnedScopeRule {
pub target: PathBuf,
pub permission: String,
pub recursive: bool,
}
/// One child Pod spawned by this Pod and persisted with the spawner's
/// name-keyed Pod state.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct PodSpawnedChild {
pub pod_name: String,
pub socket_path: PathBuf,
pub scope_delegated: Vec<PodSpawnedScopeRule>,
pub callback_address: PathBuf,
}
/// Persistent metadata for a Pod name.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct PodMetadata {
pub pod_name: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub active: Option<PodActiveSegmentRef>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub spawned_children: Vec<PodSpawnedChild>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub resolved_manifest_snapshot: Option<serde_json::Value>,
}
impl PodMetadata {
/// Create Pod metadata for `pod_name`.
pub fn new(pod_name: impl Into<String>, active: Option<PodActiveSegmentRef>) -> Self {
Self {
pod_name: pod_name.into(),
active,
spawned_children: Vec::new(),
resolved_manifest_snapshot: None,
}
}
}
/// Sync persistence backend for Pod metadata.
///
/// The key is the Pod name. Missing state is not an error: `read_by_name`
/// returns `Ok(None)` for Pods that have never persisted metadata or whose
/// metadata was deleted.
pub trait PodMetadataStore: Send + Sync {
/// Create or replace metadata for its `pod_name` key.
fn write(&self, metadata: &PodMetadata) -> Result<(), StoreError>;
/// Read metadata by Pod name. Returns `None` when no metadata exists.
fn read_by_name(&self, pod_name: &str) -> Result<Option<PodMetadata>, StoreError>;
/// List persisted Pod metadata keys. Implementations return names only;
/// callers can then read each item independently so a corrupt metadata
/// file does not make the whole discovery result fail.
fn list_names(&self) -> Result<Vec<String>, StoreError>;
/// Return the metadata root directory when this backend is path-backed.
fn root_dir(&self) -> Option<PathBuf> {
None
}
/// Delete metadata by Pod name. Missing metadata is a successful no-op.
fn delete_by_name(&self, pod_name: &str) -> Result<(), StoreError>;
}
pub(crate) fn validate_pod_name(pod_name: &str) -> Result<(), StoreError> {
if pod_name.is_empty()
|| pod_name == "."
|| pod_name == ".."
|| pod_name.contains('/')
|| pod_name.contains('\0')
{
return Err(StoreError::InvalidPodName(pod_name.to_string()));
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn pod_metadata_manifest_snapshot_roundtrips() {
let mut metadata = PodMetadata::new(
"profile-pod",
Some(PodActiveSegmentRef::pending_segment(crate::new_session_id())),
);
metadata.resolved_manifest_snapshot = Some(serde_json::json!({
"pod": { "name": "profile-pod" },
"profile": {
"source": { "kind": "path", "path": "/profiles/coder.nix" }
}
}));
let json = serde_json::to_string(&metadata).unwrap();
let restored: PodMetadata = serde_json::from_str(&json).unwrap();
assert_eq!(restored, metadata);
assert_eq!(
restored.resolved_manifest_snapshot.as_ref().unwrap()["profile"]["source"]["kind"],
"path"
);
}
}

View File

@ -5,7 +5,7 @@
//! functions after state-mutating operations. //! functions after state-mutating operations.
use crate::logged_item::{LoggedItem, to_logged}; use crate::logged_item::{LoggedItem, to_logged};
use crate::segment_log::{self, LogEntry, PodScopeSnapshot, SegmentOrigin}; use crate::segment_log::{self, LogEntry, SegmentOrigin};
use crate::store::{Store, StoreError}; use crate::store::{Store, StoreError};
use crate::system_item::SystemItem; use crate::system_item::SystemItem;
use crate::{SegmentId, SessionId}; use crate::{SegmentId, SessionId};
@ -385,23 +385,6 @@ pub fn save_extension(
) )
} }
/// Log the Pod's latest runtime scope snapshot.
pub fn save_pod_scope(
store: &impl Store,
session_id: SessionId,
segment_id: SegmentId,
snapshot: &PodScopeSnapshot,
) -> Result<(), StoreError> {
let payload = serde_json::to_value(snapshot)?;
save_extension(
store,
session_id,
segment_id,
segment_log::POD_SCOPE_EXTENSION_DOMAIN,
payload,
)
}
/// Log a `ConfigChanged` entry. /// Log a `ConfigChanged` entry.
pub fn save_config_changed( pub fn save_config_changed(
store: &impl Store, store: &impl Store,

View File

@ -11,7 +11,7 @@
use llm_worker::llm_client::types::{Item, RequestConfig}; use llm_worker::llm_client::types::{Item, RequestConfig};
use llm_worker::{UsageRecord, WorkerResult}; use llm_worker::{UsageRecord, WorkerResult};
use protocol::{InvokeKind, ScopeRule, Segment}; use protocol::{InvokeKind, Segment};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use crate::logged_item::LoggedItem; use crate::logged_item::LoggedItem;
@ -166,16 +166,6 @@ pub struct SegmentOrigin {
pub at_turn_index: usize, pub at_turn_index: usize,
} }
/// Domain used by Pod to persist its latest effective runtime scope.
pub const POD_SCOPE_EXTENSION_DOMAIN: &str = "pod.scope";
/// Payload stored in `LogEntry::Extension { domain: "pod.scope", .. }`.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct PodScopeSnapshot {
pub allow: Vec<ScopeRule>,
pub deny: Vec<ScopeRule>,
}
/// State collected from log entries. /// State collected from log entries.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct RestoredState { pub struct RestoredState {
@ -199,9 +189,6 @@ pub struct RestoredState {
/// `LogEntry::Extension` を replay 順に積んだもの。`(domain, payload)`。 /// `LogEntry::Extension` を replay 順に積んだもの。`(domain, payload)`。
/// session-store は domain を不透明扱いし、各ドメインが自前で fold する。 /// session-store は domain を不透明扱いし、各ドメインが自前で fold する。
pub extensions: Vec<(String, serde_json::Value)>, pub extensions: Vec<(String, serde_json::Value)>,
/// Latest runtime scope snapshot persisted by the Pod. `None` means
/// the segment predates scope persistence or the payload was corrupt.
pub pod_scope: Option<PodScopeSnapshot>,
/// User submissions in original typed form, in submit order. /// User submissions in original typed form, in submit order.
/// One entry per `LogEntry::UserInput`; the K-th entry corresponds to /// One entry per `LogEntry::UserInput`; the K-th entry corresponds to
/// the K-th `Item::user_message` derived during replay (modulo /// the K-th `Item::user_message` derived during replay (modulo
@ -223,7 +210,6 @@ pub fn collect_state(entries: &[LogEntry]) -> RestoredState {
entries_count: 0, entries_count: 0,
usage_history: Vec::new(), usage_history: Vec::new(),
extensions: Vec::new(), extensions: Vec::new(),
pod_scope: None,
user_segments: Vec::new(), user_segments: Vec::new(),
}; };
@ -293,17 +279,6 @@ pub fn collect_state(entries: &[LogEntry]) -> RestoredState {
LogEntry::Extension { LogEntry::Extension {
domain, payload, .. domain, payload, ..
} => { } => {
if domain == POD_SCOPE_EXTENSION_DOMAIN {
match serde_json::from_value::<PodScopeSnapshot>(payload.clone()) {
Ok(snapshot) => state.pod_scope = Some(snapshot),
Err(err) => {
tracing::warn!(
error = %err,
"discarding malformed pod.scope snapshot from segment log"
);
}
}
}
state.extensions.push((domain.clone(), payload.clone())); state.extensions.push((domain.clone(), payload.clone()));
} }
} }

View File

@ -29,9 +29,6 @@ pub enum StoreError {
#[error("log corrupted at line {line}: {message}")] #[error("log corrupted at line {line}: {message}")]
Corrupt { line: usize, message: String }, Corrupt { line: usize, message: String },
#[error("invalid pod name: {0}")]
InvalidPodName(String),
} }
/// Sync persistence backend for segment logs. /// Sync persistence backend for segment logs.

View File

@ -1,8 +1,7 @@
use llm_worker::WorkerResult; use llm_worker::WorkerResult;
use llm_worker::llm_client::types::{Item, RequestConfig}; use llm_worker::llm_client::types::{Item, RequestConfig};
use session_store::{ use session_store::{
FsStore, LogEntry, PodActiveSegmentRef, PodMetadata, PodMetadataStore, Store, TraceEntry, FsStore, LogEntry, Store, TraceEntry, collect_state, new_segment_id, new_session_id,
collect_state, new_segment_id, new_session_id,
}; };
fn nil_session_start(ts: u64, session_id: uuid::Uuid) -> LogEntry { fn nil_session_start(ts: u64, session_id: uuid::Uuid) -> LogEntry {
@ -240,40 +239,3 @@ fn lookup_session_of_finds_owning_session() {
assert_eq!(store.lookup_session_of(segid).unwrap(), Some(sid)); assert_eq!(store.lookup_session_of(segid).unwrap(), Some(sid));
} }
#[test]
fn pod_metadata_minimal_crud() {
let dir = tempfile::tempdir().unwrap();
let store = FsStore::new(dir.path()).unwrap();
let pod_name = "worker-a";
let sid = new_session_id();
let segid = new_segment_id();
assert_eq!(store.read_by_name(pod_name).unwrap(), None);
let pending = PodMetadata::new(pod_name, Some(PodActiveSegmentRef::pending_segment(sid)));
store.write(&pending).unwrap();
assert_eq!(store.list_names().unwrap(), vec![pod_name.to_string()]);
assert_eq!(store.read_by_name(pod_name).unwrap(), Some(pending.clone()));
assert!(
dir.path()
.join("pods")
.join(pod_name)
.join("metadata.json")
.exists(),
"Pod metadata must live under <data_dir>/pods/<pod_name>/"
);
let resolved = PodMetadata::new(
pod_name,
Some(PodActiveSegmentRef::active_segment(sid, segid)),
);
store.write(&resolved).unwrap();
assert_eq!(store.read_by_name(pod_name).unwrap(), Some(resolved));
store.delete_by_name(pod_name).unwrap();
assert_eq!(store.read_by_name(pod_name).unwrap(), None);
// Delete is idempotent for missing metadata.
store.delete_by_name(pod_name).unwrap();
}

View File

@ -20,6 +20,7 @@ uuid = { workspace = true }
toml = { workspace = true } toml = { workspace = true }
manifest = { workspace = true } manifest = { workspace = true }
session-store = { workspace = true } session-store = { workspace = true }
pod-store = { workspace = true }
pod-registry = { workspace = true } pod-registry = { workspace = true }
serde = { workspace = true, features = ["derive"] } serde = { workspace = true, features = ["derive"] }
pulldown-cmark = { version = "0.13.3", default-features = false } pulldown-cmark = { version = "0.13.3", default-features = false }

View File

@ -3,6 +3,7 @@ use std::path::{Path, PathBuf};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use crossterm::event::{Event as TermEvent, KeyCode, KeyEvent, KeyModifiers, poll, read}; use crossterm::event::{Event as TermEvent, KeyCode, KeyEvent, KeyModifiers, poll, read};
use pod_store::FsPodStore;
use protocol::stream::{JsonLineReader, JsonLineWriter}; use protocol::stream::{JsonLineReader, JsonLineWriter};
use protocol::{ErrorCode, Event, InvokeKind, Method, PodStatus, Segment}; use protocol::{ErrorCode, Event, InvokeKind, Method, PodStatus, Segment};
use ratatui::Frame; use ratatui::Frame;
@ -199,12 +200,22 @@ fn default_store_dir() -> Result<PathBuf, MultiPodError> {
manifest::paths::sessions_dir().ok_or_else(|| { manifest::paths::sessions_dir().ok_or_else(|| {
MultiPodError::Io(io::Error::new( MultiPodError::Io(io::Error::new(
io::ErrorKind::NotFound, io::ErrorKind::NotFound,
"could not resolve sessions directory \ "could not resolve sessions directory",
(set INSOMNIA_HOME, INSOMNIA_DATA_DIR, or HOME)",
)) ))
}) })
} }
fn default_pod_store_dir() -> Result<PathBuf, MultiPodError> {
manifest::paths::data_dir()
.map(|dir| dir.join("pods"))
.ok_or_else(|| {
MultiPodError::Io(io::Error::new(
io::ErrorKind::NotFound,
"could not resolve pod state directory",
))
})
}
#[cfg(test)] #[cfg(test)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)] #[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum SendEligibility { pub(crate) enum SendEligibility {
@ -483,7 +494,8 @@ enum MultiPodAction {
async fn load_pod_list(selected_name: Option<String>) -> Result<PodList, MultiPodError> { async fn load_pod_list(selected_name: Option<String>) -> Result<PodList, MultiPodError> {
let store_dir = default_store_dir()?; let store_dir = default_store_dir()?;
let store = FsStore::new(&store_dir)?; let store = FsStore::new(&store_dir)?;
let stored = read_stored_pod_infos(&store_dir, &store)?; let pod_store = FsPodStore::new(default_pod_store_dir()?).map_err(io::Error::other)?;
let stored = read_stored_pod_infos(&store, &pod_store)?;
let live = read_reachable_live_pod_infos(&store) let live = read_reachable_live_pod_infos(&store)
.await .await
.unwrap_or_default(); .unwrap_or_default();

View File

@ -1,7 +1,7 @@
//! Inline-viewport "pick a Pod to attach or restore" UX. //! Inline-viewport "pick a Pod to attach or restore" UX.
//! //!
//! Reads live Pod allocations from the runtime registry and stopped Pod state //! Reads live Pod allocations from the runtime registry and stopped Pod state
//! from the session store's name-keyed metadata. Picking a live row attaches to //! from the pod-store name-keyed metadata. Picking a live row attaches to
//! its socket; picking a stopped row restores via `insomnia-pod --pod <name>`. //! its socket; picking a stopped row restores via `insomnia-pod --pod <name>`.
use std::io; use std::io;
@ -9,6 +9,7 @@ use std::path::PathBuf;
use std::time::Duration; use std::time::Duration;
use crossterm::event::{self, Event as TermEvent, KeyCode, KeyEventKind, KeyModifiers}; use crossterm::event::{self, Event as TermEvent, KeyCode, KeyEventKind, KeyModifiers};
use pod_store::FsPodStore;
use ratatui::Terminal; use ratatui::Terminal;
use ratatui::backend::CrosstermBackend; use ratatui::backend::CrosstermBackend;
use ratatui::layout::{Constraint, Layout}; use ratatui::layout::{Constraint, Layout};
@ -102,7 +103,8 @@ impl PodRowState {
pub async fn run() -> Result<PickerOutcome, PickerError> { pub async fn run() -> Result<PickerOutcome, PickerError> {
let store_dir = default_store_dir()?; let store_dir = default_store_dir()?;
let store = FsStore::new(&store_dir)?; let store = FsStore::new(&store_dir)?;
let stored_pods = read_stored_pod_infos(&store_dir, &store)?; let pod_store = FsPodStore::new(default_pod_store_dir()?).map_err(io::Error::other)?;
let stored_pods = read_stored_pod_infos(&store, &pod_store)?;
let live_pods = read_reachable_live_pod_infos(&store) let live_pods = read_reachable_live_pod_infos(&store)
.await .await
.unwrap_or_default(); .unwrap_or_default();
@ -172,6 +174,18 @@ fn default_store_dir() -> Result<PathBuf, PickerError> {
}) })
} }
fn default_pod_store_dir() -> Result<PathBuf, PickerError> {
manifest::paths::data_dir()
.map(|dir| dir.join("pods"))
.ok_or_else(|| {
PickerError::Io(io::Error::new(
io::ErrorKind::NotFound,
"could not resolve pod state directory \
(set INSOMNIA_HOME, INSOMNIA_DATA_DIR, or HOME)",
))
})
}
pub(crate) fn live_socket_for_pod(pod_name: &str) -> Option<PathBuf> { pub(crate) fn live_socket_for_pod(pod_name: &str) -> Option<PathBuf> {
pod_list_live_socket_for_pod(pod_name) pod_list_live_socket_for_pod(pod_name)
} }

View File

@ -1,14 +1,14 @@
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::fs;
use std::io; use std::io;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::time::Duration; use std::time::Duration;
use client::PodClient; use client::PodClient;
use pod_registry::{LockFileGuard, default_registry_path}; use pod_registry::{LockFileGuard, default_registry_path};
use pod_store::{PodActiveSegmentRef, PodMetadata, PodMetadataStore};
use protocol::{Event, PodStatus}; use protocol::{Event, PodStatus};
use session_store::{ use session_store::{
FsStore, LogEntry, LoggedContentPart, LoggedItem, PodMetadata, SegmentId, SessionId, Store, FsStore, LogEntry, LoggedContentPart, LoggedItem, SegmentId, SessionId, Store,
}; };
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@ -234,27 +234,17 @@ pub(crate) enum PodEntryDiagnosticKind {
} }
pub(crate) fn read_stored_pod_infos( pub(crate) fn read_stored_pod_infos(
store_dir: &Path,
store: &FsStore, store: &FsStore,
pod_store: &impl PodMetadataStore,
) -> Result<Vec<StoredPodInfo>, io::Error> { ) -> Result<Vec<StoredPodInfo>, io::Error> {
let pods_dir = store_dir.join("pods");
let mut records = Vec::new(); let mut records = Vec::new();
if !pods_dir.exists() { for pod_name in pod_store.list_names().map_err(io::Error::other)? {
return Ok(records); let info = match pod_store.read_by_name(&pod_name) {
} Ok(Some(metadata)) => stored_info_from_metadata(store, pod_name, metadata),
Ok(None) => corrupt_stored_info(
for entry in fs::read_dir(pods_dir)? { pod_name,
let entry = entry?; "metadata disappeared during discovery".to_string(),
if !entry.file_type()?.is_dir() { ),
continue;
}
let pod_name = entry.file_name().to_string_lossy().to_string();
let path = entry.path().join("metadata.json");
let info = match fs::read_to_string(&path) {
Ok(content) => match serde_json::from_str::<PodMetadata>(&content) {
Ok(metadata) => stored_info_from_metadata(store, pod_name, metadata),
Err(e) => corrupt_stored_info(pod_name, e.to_string()),
},
Err(e) => corrupt_stored_info(pod_name, e.to_string()), Err(e) => corrupt_stored_info(pod_name, e.to_string()),
}; };
records.push(info); records.push(info);
@ -392,10 +382,7 @@ fn summarize_live_pod(store: &FsStore, live: &LivePodInfo) -> PodEntrySummary {
} }
} }
fn summarize_metadata( fn summarize_metadata(store: &FsStore, active: Option<&PodActiveSegmentRef>) -> SegmentSummary {
store: &FsStore,
active: Option<&session_store::PodActiveSegmentRef>,
) -> SegmentSummary {
let Some(active) = active else { let Some(active) = active else {
return SegmentSummary { return SegmentSummary {
updated_at: 0, updated_at: 0,
@ -558,7 +545,9 @@ fn trim_one_line(s: &str, max_chars: usize) -> String {
mod tests { mod tests {
use super::*; use super::*;
use llm_worker::llm_client::types::RequestConfig; use llm_worker::llm_client::types::RequestConfig;
use session_store::{PodActiveSegmentRef, PodMetadataStore, new_segment_id, new_session_id}; use pod_store::FsPodStore;
use pod_store::{PodActiveSegmentRef, PodMetadataStore};
use session_store::{new_segment_id, new_session_id};
use tempfile::tempdir; use tempfile::tempdir;
const SOURCE: PodVisibilitySource = PodVisibilitySource::ResumePicker; const SOURCE: PodVisibilitySource = PodVisibilitySource::ResumePicker;
@ -776,11 +765,12 @@ mod tests {
fn read_stored_pod_infos_reports_corrupt_metadata() { fn read_stored_pod_infos_reports_corrupt_metadata() {
let dir = tempdir().unwrap(); let dir = tempdir().unwrap();
let store = FsStore::new(dir.path()).unwrap(); let store = FsStore::new(dir.path()).unwrap();
let pod_store = FsPodStore::new(dir.path().join("pods")).unwrap();
let pod_dir = dir.path().join("pods").join("broken"); let pod_dir = dir.path().join("pods").join("broken");
fs::create_dir_all(&pod_dir).unwrap(); std::fs::create_dir_all(&pod_dir).unwrap();
fs::write(pod_dir.join("metadata.json"), "{not-json").unwrap(); std::fs::write(pod_dir.join("metadata.json"), "{not-json").unwrap();
let records = read_stored_pod_infos(dir.path(), &store).unwrap(); let records = read_stored_pod_infos(&store, &pod_store).unwrap();
assert_eq!(records.len(), 1); assert_eq!(records.len(), 1);
assert_eq!(records[0].pod_name, "broken"); assert_eq!(records[0].pod_name, "broken");
assert!(matches!( assert!(matches!(
@ -793,16 +783,17 @@ mod tests {
fn read_stored_pod_infos_reads_metadata() { fn read_stored_pod_infos_reads_metadata() {
let dir = tempdir().unwrap(); let dir = tempdir().unwrap();
let store = FsStore::new(dir.path()).unwrap(); let store = FsStore::new(dir.path()).unwrap();
let pod_store = FsPodStore::new(dir.path().join("pods")).unwrap();
let session_id = new_session_id(); let session_id = new_session_id();
let segment_id = new_segment_id(); let segment_id = new_segment_id();
store pod_store
.write(&PodMetadata::new( .write(&PodMetadata::new(
"agent", "agent",
Some(PodActiveSegmentRef::active_segment(session_id, segment_id)), Some(PodActiveSegmentRef::active_segment(session_id, segment_id)),
)) ))
.unwrap(); .unwrap();
let records = read_stored_pod_infos(dir.path(), &store).unwrap(); let records = read_stored_pod_infos(&store, &pod_store).unwrap();
assert_eq!(records.len(), 1); assert_eq!(records.len(), 1);
assert_eq!(records[0].pod_name, "agent"); assert_eq!(records[0].pod_name, "agent");
assert_eq!(records[0].metadata_state, StoredMetadataState::Present); assert_eq!(records[0].metadata_state, StoredMetadataState::Present);

View File

@ -17,7 +17,7 @@ use std::time::Duration;
use client::{SpawnConfig, spawn_pod}; use client::{SpawnConfig, spawn_pod};
use crossterm::event::{self, Event as TermEvent, KeyCode, KeyEventKind, KeyModifiers}; use crossterm::event::{self, Event as TermEvent, KeyCode, KeyEventKind, KeyModifiers};
use manifest::{ProfileDiscovery, ScopeConfig}; use manifest::ProfileDiscovery;
use ratatui::Terminal; use ratatui::Terminal;
use ratatui::backend::CrosstermBackend; use ratatui::backend::CrosstermBackend;
use ratatui::layout::{Constraint, Layout}; use ratatui::layout::{Constraint, Layout};
@ -42,8 +42,6 @@ pub enum SpawnOutcome {
#[derive(Debug)] #[derive(Debug)]
pub enum SpawnError { pub enum SpawnError {
Io(io::Error), Io(io::Error),
Store(session_store::StoreError),
MissingResumeScope { segment_id: SegmentId },
Spawn(client::SpawnError), Spawn(client::SpawnError),
} }
@ -51,11 +49,6 @@ impl std::fmt::Display for SpawnError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self { match self {
Self::Io(e) => write!(f, "io error: {e}"), Self::Io(e) => write!(f, "io error: {e}"),
Self::Store(e) => write!(f, "failed to read session log: {e}"),
Self::MissingResumeScope { segment_id } => write!(
f,
"session {segment_id} has no persisted scope snapshot; refusing resume without explicit scope"
),
Self::Spawn(e) => write!(f, "{e}"), Self::Spawn(e) => write!(f, "{e}"),
} }
} }
@ -69,12 +62,6 @@ impl From<io::Error> for SpawnError {
} }
} }
impl From<session_store::StoreError> for SpawnError {
fn from(e: session_store::StoreError) -> Self {
Self::Store(e)
}
}
impl From<client::SpawnError> for SpawnError { impl From<client::SpawnError> for SpawnError {
fn from(e: client::SpawnError) -> Self { fn from(e: client::SpawnError) -> Self {
Self::Spawn(e) Self::Spawn(e)
@ -111,7 +98,6 @@ pub async fn run(
editing: true, editing: true,
resume_from, resume_from,
resume_by_pod_name: false, resume_by_pod_name: false,
resume_scope: None,
profile_choices, profile_choices,
profile_index, profile_index,
}; };
@ -149,10 +135,6 @@ pub async fn run(
} }
} }
if let Some(id) = form.resume_from {
form.resume_scope = Some(load_resume_scope(id).await?);
}
// Phase 2: launch pod and wait for ready line. Drop the cursor // Phase 2: launch pod and wait for ready line. Drop the cursor
// out of the name field — subsequent frames are passive status // out of the name field — subsequent frames are passive status
// updates, not input — so the cursor doesn't end up parked there // updates, not input — so the cursor doesn't end up parked there
@ -305,7 +287,6 @@ fn form_for_pod_name(pod_name: String, defaults: SpawnDefaults) -> Form {
editing: false, editing: false,
resume_from: None, resume_from: None,
resume_by_pod_name: true, resume_by_pod_name: true,
resume_scope: None,
profile_choices: Vec::new(), profile_choices: Vec::new(),
profile_index: 0, profile_index: 0,
} }
@ -383,7 +364,6 @@ async fn wait_for_ready(
let config = SpawnConfig { let config = SpawnConfig {
pod_name: form.name.clone(), pod_name: form.name.clone(),
profile: form.selected_profile_selector(), profile: form.selected_profile_selector(),
resume_scope: form.resume_scope.clone(),
cwd: form.cwd.clone(), cwd: form.cwd.clone(),
resume_from: form.resume_from, resume_from: form.resume_from,
resume_by_pod_name: form.resume_by_pod_name, resume_by_pod_name: form.resume_by_pod_name,
@ -399,24 +379,6 @@ async fn wait_for_ready(
}) })
} }
async fn load_resume_scope(segment_id: SegmentId) -> Result<ScopeConfig, SpawnError> {
let store_dir = manifest::paths::sessions_dir().ok_or_else(|| {
io::Error::new(
io::ErrorKind::NotFound,
"could not resolve sessions directory (set INSOMNIA_HOME, INSOMNIA_DATA_DIR, or HOME)",
)
})?;
let store = session_store::FsStore::new(&store_dir)?;
let state = session_store::restore_by_segment(&store, segment_id)?;
let snapshot = state
.pod_scope
.ok_or(SpawnError::MissingResumeScope { segment_id })?;
Ok(ScopeConfig {
allow: snapshot.allow,
deny: snapshot.deny,
})
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)] #[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum MessageKind { enum MessageKind {
Info, Info,
@ -453,10 +415,6 @@ struct Form {
/// When true, launch the child with `--pod <name>` so the pod process /// When true, launch the child with `--pod <name>` so the pod process
/// resolves name-keyed state before falling back to fresh creation. /// resolves name-keyed state before falling back to fresh creation.
resume_by_pod_name: bool, resume_by_pod_name: bool,
/// Scope snapshot recovered from the source session log. Set only for
/// resume runs and passed through a typed internal restore flag so resume
/// does not silently broaden access.
resume_scope: Option<ScopeConfig>,
/// Optional Nix profile choices passed to `insomnia-pod --profile` for /// Optional Nix profile choices passed to `insomnia-pod --profile` for
/// fresh spawns. This is not used for resume/attach flows because those must /// fresh spawns. This is not used for resume/attach flows because those must
/// restore Pod state rather than re-evaluate a profile source. /// restore Pod state rather than re-evaluate a profile source.
@ -616,17 +574,6 @@ fn context_line(form: &Form) -> Line<'_> {
]); ]);
} }
if form.resume_scope.is_some() {
return Line::from(vec![
Span::raw(" "),
Span::styled("scope: ", Style::default().fg(Color::DarkGray)),
Span::styled(
"from restored session snapshot",
Style::default().fg(Color::Green),
),
]);
}
match form.scope_origin { match form.scope_origin {
ScopeOrigin::FromProfile => Line::from(vec![ ScopeOrigin::FromProfile => Line::from(vec![
Span::raw(" "), Span::raw(" "),
@ -670,7 +617,6 @@ mod tests {
editing: true, editing: true,
resume_from: None, resume_from: None,
resume_by_pod_name: false, resume_by_pod_name: false,
resume_scope: None,
profile_choices: Vec::new(), profile_choices: Vec::new(),
profile_index: 0, profile_index: 0,
} }
@ -691,7 +637,6 @@ mod tests {
assert_eq!(f.name_cursor, "agent".chars().count()); assert_eq!(f.name_cursor, "agent".chars().count());
assert_eq!(f.resume_from, None); assert_eq!(f.resume_from, None);
assert!(f.resume_by_pod_name); assert!(f.resume_by_pod_name);
assert!(f.resume_scope.is_none());
assert!(!f.editing); assert!(!f.editing);
assert_eq!( assert_eq!(
f.message, f.message,
@ -699,28 +644,6 @@ mod tests {
); );
} }
#[test]
fn resume_scope_snapshot_stays_on_form_for_typed_restore_flag() {
let mut f = form("agent-r");
f.resume_from = Some(session_store::new_segment_id());
f.resume_scope = Some(ScopeConfig {
allow: vec![manifest::ScopeRule {
target: PathBuf::from("/work/example"),
permission: manifest::Permission::Write,
recursive: true,
}],
deny: vec![manifest::ScopeRule {
target: PathBuf::from("/work/example/child"),
permission: manifest::Permission::Write,
recursive: true,
}],
});
let scope = f.resume_scope.as_ref().unwrap();
assert_eq!(scope.allow[0].target, PathBuf::from("/work/example"));
assert_eq!(scope.deny[0].target, PathBuf::from("/work/example/child"));
}
#[test] #[test]
fn profile_choices_use_project_registry_default() { fn profile_choices_use_project_registry_default() {
let temp = tempfile::tempdir().unwrap(); let temp = tempfile::tempdir().unwrap();