fix: reclaim delegated scope from stopped children
This commit is contained in:
parent
5ade50dec5
commit
d62cd09c4d
|
|
@ -24,7 +24,7 @@ pub struct Scope {
|
||||||
deny: Vec<ResolvedRule>,
|
deny: Vec<ResolvedRule>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
struct ResolvedRule {
|
struct ResolvedRule {
|
||||||
/// Absolute, canonicalized-or-normalized target directory/file.
|
/// Absolute, canonicalized-or-normalized target directory/file.
|
||||||
target: PathBuf,
|
target: PathBuf,
|
||||||
|
|
@ -217,6 +217,32 @@ impl Scope {
|
||||||
Self::from_config(&config)
|
Self::from_config(&config)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Build a new [`Scope`] with one matching deny rule removed for each
|
||||||
|
/// rule in `remove_deny`.
|
||||||
|
///
|
||||||
|
/// This is intentionally exact (after the same target resolution used
|
||||||
|
/// by [`Scope::from_config`]) rather than geometric: reclaiming a
|
||||||
|
/// delegated child must remove the deny layer that was added for that
|
||||||
|
/// child without broadening any explicit base deny that merely overlaps
|
||||||
|
/// the delegated path. Missing rules are ignored, making repeated
|
||||||
|
/// reclaim calls harmless.
|
||||||
|
pub fn with_removed_deny_rules(
|
||||||
|
&self,
|
||||||
|
remove_deny: impl IntoIterator<Item = ScopeRule>,
|
||||||
|
) -> Result<Self, ScopeError> {
|
||||||
|
let mut deny = self.deny.clone();
|
||||||
|
for rule in remove_deny {
|
||||||
|
let resolved = resolve_rule(&rule)?;
|
||||||
|
if let Some(idx) = deny.iter().position(|existing| existing == &resolved) {
|
||||||
|
deny.remove(idx);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(Self {
|
||||||
|
allow: self.allow.clone(),
|
||||||
|
deny,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
/// Human-readable grouping of allow rules, suitable for embedding in
|
/// Human-readable grouping of allow rules, suitable for embedding in
|
||||||
/// LLM system prompts. Deny rules are intentionally omitted — they
|
/// LLM system prompts. Deny rules are intentionally omitted — they
|
||||||
/// only cap effective permission and surface them would mislead the
|
/// only cap effective permission and surface them would mislead the
|
||||||
|
|
@ -684,6 +710,44 @@ mod tests {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn with_removed_deny_rules_reclaims_one_matching_layer() {
|
||||||
|
let dir = TempDir::new().unwrap();
|
||||||
|
let sub = dir.path().join("sub");
|
||||||
|
std::fs::create_dir(&sub).unwrap();
|
||||||
|
let rule = ScopeRule {
|
||||||
|
target: sub.clone(),
|
||||||
|
permission: Permission::Write,
|
||||||
|
recursive: true,
|
||||||
|
};
|
||||||
|
let base = Scope::writable(dir.path())
|
||||||
|
.unwrap()
|
||||||
|
.with_added_deny_rules([rule.clone(), rule.clone()])
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let reclaimed_once = base.with_removed_deny_rules([rule.clone()]).unwrap();
|
||||||
|
assert_eq!(
|
||||||
|
reclaimed_once.permission_at(&sub.join("a.txt")),
|
||||||
|
Some(Permission::Read),
|
||||||
|
"one duplicate deny layer must remain"
|
||||||
|
);
|
||||||
|
|
||||||
|
let reclaimed_twice = reclaimed_once
|
||||||
|
.with_removed_deny_rules([rule.clone()])
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(
|
||||||
|
reclaimed_twice.permission_at(&sub.join("a.txt")),
|
||||||
|
Some(Permission::Write)
|
||||||
|
);
|
||||||
|
|
||||||
|
let reclaimed_again = reclaimed_twice.with_removed_deny_rules([rule]).unwrap();
|
||||||
|
assert_eq!(
|
||||||
|
reclaimed_again.permission_at(&sub.join("a.txt")),
|
||||||
|
Some(Permission::Write),
|
||||||
|
"missing rules are ignored for idempotent reclaim"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn shared_scope_load_returns_current_value() {
|
fn shared_scope_load_returns_current_value() {
|
||||||
let dir = TempDir::new().unwrap();
|
let dir = TempDir::new().unwrap();
|
||||||
|
|
|
||||||
|
|
@ -31,7 +31,7 @@ pub use lifecycle::{
|
||||||
install_top_level_with_deny, lookup_segment, update_segment,
|
install_top_level_with_deny, lookup_segment, update_segment,
|
||||||
};
|
};
|
||||||
pub use mutate::{
|
pub use mutate::{
|
||||||
delegate_scope, reclaim_stale, reclaim_stale_with, register_pod, register_pod_with_deny,
|
delegate_scope, reclaim_delegated_scope, reclaim_stale, reclaim_stale_with, register_pod,
|
||||||
release_pod,
|
register_pod_with_deny, release_pod,
|
||||||
};
|
};
|
||||||
pub use table::{Allocation, LockFile, LockFileGuard, default_registry_path};
|
pub use table::{Allocation, LockFile, LockFileGuard, default_registry_path};
|
||||||
|
|
|
||||||
|
|
@ -178,6 +178,55 @@ pub fn release_pod(guard: &mut LockFileGuard, pod_name: &str) -> Result<(), Scop
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Reclaim a child delegation back into its parent allocation.
|
||||||
|
///
|
||||||
|
/// This is idempotent: missing child allocations and missing deny entries are
|
||||||
|
/// ignored. For each delegated Write rule, at most one exact matching deny rule
|
||||||
|
/// is removed from the parent's `scope_deny`, preserving any duplicate explicit
|
||||||
|
/// base deny that was not owned by this child delegation.
|
||||||
|
pub fn reclaim_delegated_scope(
|
||||||
|
guard: &mut LockFileGuard,
|
||||||
|
parent: &str,
|
||||||
|
child: &str,
|
||||||
|
delegated_scope: &[ScopeRule],
|
||||||
|
) -> Result<(), ScopeLockError> {
|
||||||
|
let child_idx = guard
|
||||||
|
.data()
|
||||||
|
.allocations
|
||||||
|
.iter()
|
||||||
|
.position(|a| a.pod_name == child);
|
||||||
|
let removed_child_parent = child_idx
|
||||||
|
.map(|idx| guard.data().allocations[idx].delegated_from.clone())
|
||||||
|
.unwrap_or(None);
|
||||||
|
|
||||||
|
let child_exists = child_idx.is_some();
|
||||||
|
|
||||||
|
if child_exists {
|
||||||
|
if let Some(parent_alloc) = guard.data_mut().find_mut(parent) {
|
||||||
|
for rule in delegated_scope
|
||||||
|
.iter()
|
||||||
|
.filter(|rule| rule.permission == Permission::Write)
|
||||||
|
{
|
||||||
|
if let Some(idx) = parent_alloc.scope_deny.iter().position(|deny| deny == rule) {
|
||||||
|
parent_alloc.scope_deny.remove(idx);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(idx) = child_idx {
|
||||||
|
for alloc in guard.data_mut().allocations.iter_mut() {
|
||||||
|
if alloc.delegated_from.as_deref() == Some(child) {
|
||||||
|
alloc.delegated_from.clone_from(&removed_child_parent);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
guard.data_mut().allocations.remove(idx);
|
||||||
|
}
|
||||||
|
|
||||||
|
guard.save()?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
/// Remove allocations whose PID is dead, reparenting children to the
|
/// Remove allocations whose PID is dead, reparenting children to the
|
||||||
/// dead Pod's `delegated_from`. Idempotent and best-effort — I/O
|
/// dead Pod's `delegated_from`. Idempotent and best-effort — I/O
|
||||||
/// errors on save are swallowed so a crashed Pod's entry never blocks
|
/// errors on save are swallowed so a crashed Pod's entry never blocks
|
||||||
|
|
@ -436,6 +485,46 @@ mod tests {
|
||||||
assert!(g.data().find("b").is_none());
|
assert!(g.data().find("b").is_none());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn reclaim_delegated_scope_removes_child_and_one_parent_deny_layer() {
|
||||||
|
let dir = TempDir::new().unwrap();
|
||||||
|
let path = dir.path().join("pods.json");
|
||||||
|
let mut g = open_empty(&path);
|
||||||
|
let delegated_rule = write_rule("/src/core", true);
|
||||||
|
register_pod_with_deny(
|
||||||
|
&mut g,
|
||||||
|
"a".into(),
|
||||||
|
std::process::id(),
|
||||||
|
sock("a"),
|
||||||
|
vec![write_rule("/src", true)],
|
||||||
|
vec![delegated_rule.clone(), delegated_rule.clone()],
|
||||||
|
sid(),
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
register_pod(
|
||||||
|
&mut g,
|
||||||
|
"b".into(),
|
||||||
|
std::process::id(),
|
||||||
|
sock("b"),
|
||||||
|
vec![delegated_rule.clone()],
|
||||||
|
sid(),
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
reclaim_delegated_scope(&mut g, "a", "b", std::slice::from_ref(&delegated_rule)).unwrap();
|
||||||
|
let a = g.data().find("a").unwrap();
|
||||||
|
assert_eq!(a.scope_deny, vec![delegated_rule.clone()]);
|
||||||
|
assert!(g.data().find("b").is_none());
|
||||||
|
|
||||||
|
reclaim_delegated_scope(&mut g, "a", "b", &[delegated_rule.clone()]).unwrap();
|
||||||
|
let a = g.data().find("a").unwrap();
|
||||||
|
assert_eq!(
|
||||||
|
a.scope_deny,
|
||||||
|
vec![delegated_rule],
|
||||||
|
"a repeated reclaim with no child allocation must not broaden an explicit duplicate base deny"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn reclaim_stale_reparents_and_removes_dead_entries() {
|
fn reclaim_stale_reparents_and_removes_dead_entries() {
|
||||||
let dir = TempDir::new().unwrap();
|
let dir = TempDir::new().unwrap();
|
||||||
|
|
|
||||||
|
|
@ -151,12 +151,20 @@ impl PodController {
|
||||||
|
|
||||||
let spawner_name = pod.manifest().pod.name.clone();
|
let spawner_name = pod.manifest().pod.name.clone();
|
||||||
let self_parent_socket = pod.callback_socket().cloned();
|
let self_parent_socket = pod.callback_socket().cloned();
|
||||||
let spawned_registry = SpawnedPodRegistry::load_from_pod_state(
|
let loaded_registry = SpawnedPodRegistry::load_from_pod_state_with_reclaim(
|
||||||
runtime_dir.clone(),
|
runtime_dir.clone(),
|
||||||
pod.store().clone(),
|
pod.store().clone(),
|
||||||
spawner_name.clone(),
|
spawner_name.clone(),
|
||||||
|
Some(pod.scope().clone()),
|
||||||
|
Some(pod.scope_change_sink()),
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
let reclaimed_unreachable = loaded_registry.reclaimed_unreachable;
|
||||||
|
let spawned_registry = loaded_registry.registry;
|
||||||
|
if reclaimed_unreachable {
|
||||||
|
pod.persist_scope_snapshot()
|
||||||
|
.map_err(std::io::Error::other)?;
|
||||||
|
}
|
||||||
|
|
||||||
// Hand the alerter to the Pod so internal operations (compaction,
|
// Hand the alerter to the Pod so internal operations (compaction,
|
||||||
// AGENTS.md ingestion during the first turn) can emit user-facing
|
// AGENTS.md ingestion during the first turn) can emit user-facing
|
||||||
|
|
|
||||||
|
|
@ -27,7 +27,6 @@ use std::sync::Arc;
|
||||||
use protocol::{Method, PodEvent, ScopeRule};
|
use protocol::{Method, PodEvent, ScopeRule};
|
||||||
|
|
||||||
use crate::runtime::dir::SpawnedPodRecord;
|
use crate::runtime::dir::SpawnedPodRecord;
|
||||||
use crate::runtime::pod_registry::{self, ScopeLockError};
|
|
||||||
use crate::spawn::comm_tools::connect_and_send;
|
use crate::spawn::comm_tools::connect_and_send;
|
||||||
use crate::spawn::registry::SpawnedPodRegistry;
|
use crate::spawn::registry::SpawnedPodRegistry;
|
||||||
|
|
||||||
|
|
@ -86,8 +85,8 @@ pub fn render_event(event: &PodEvent) -> String {
|
||||||
///
|
///
|
||||||
/// - `TurnEnded` / `Errored`: no system work; the LLM handles the
|
/// - `TurnEnded` / `Errored`: no system work; the LLM handles the
|
||||||
/// semantic response.
|
/// semantic response.
|
||||||
/// - `ShutDown`: remove the child from `spawned_pods.json` and release
|
/// - `ShutDown`: remove the child from `spawned_pods.json`, Pod state,
|
||||||
/// its scope allocation. Missing entries are swallowed.
|
/// and reclaim its delegated scope/allocation. Missing entries are swallowed.
|
||||||
/// - `ScopeSubDelegated`: register the grandchild locally and re-emit
|
/// - `ScopeSubDelegated`: register the grandchild locally and re-emit
|
||||||
/// upward to our own parent if we have one. Duplicate grandchild
|
/// upward to our own parent if we have one. Duplicate grandchild
|
||||||
/// entries (re-delivery) are swallowed.
|
/// entries (re-delivery) are swallowed.
|
||||||
|
|
@ -104,7 +103,6 @@ pub async fn apply_event_side_effects(
|
||||||
if let Err(e) = registry.remove(pod_name).await {
|
if let Err(e) = registry.remove(pod_name).await {
|
||||||
tracing::warn!(error = %e, pod = %pod_name, "registry remove on ShutDown failed");
|
tracing::warn!(error = %e, pod = %pod_name, "registry remove on ShutDown failed");
|
||||||
}
|
}
|
||||||
release_scope_silently(pod_name);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
PodEvent::ScopeSubDelegated {
|
PodEvent::ScopeSubDelegated {
|
||||||
|
|
@ -145,28 +143,6 @@ pub async fn apply_event_side_effects(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn release_scope_silently(pod_name: &str) {
|
|
||||||
let lock_path = match pod_registry::default_registry_path() {
|
|
||||||
Ok(p) => p,
|
|
||||||
Err(e) => {
|
|
||||||
tracing::warn!(error = %e, "default_registry_path failed");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
let mut guard = match pod_registry::LockFileGuard::open(&lock_path) {
|
|
||||||
Ok(g) => g,
|
|
||||||
Err(e) => {
|
|
||||||
tracing::warn!(error = %e, "LockFileGuard open failed");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
match pod_registry::release_pod(&mut guard, pod_name) {
|
|
||||||
Ok(()) => {}
|
|
||||||
Err(ScopeLockError::UnknownPod(_)) => {}
|
|
||||||
Err(e) => tracing::warn!(error = ?e, pod = %pod_name, "release_pod failed"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn reemit_scope_sub_delegated(
|
fn reemit_scope_sub_delegated(
|
||||||
self_parent_socket: &Option<PathBuf>,
|
self_parent_socket: &Option<PathBuf>,
|
||||||
self_name: &str,
|
self_name: &str,
|
||||||
|
|
|
||||||
|
|
@ -204,15 +204,12 @@ impl Tool for StopPodTool {
|
||||||
.ok_or_else(|| unknown_pod_err(&input.name))?;
|
.ok_or_else(|| unknown_pod_err(&input.name))?;
|
||||||
|
|
||||||
// Best-effort Shutdown. The child's own `ScopeAllocationGuard`
|
// Best-effort Shutdown. The child's own `ScopeAllocationGuard`
|
||||||
// releases the entry on clean exit; we also release explicitly
|
// releases its entry on clean exit; the parent reclaim below is the
|
||||||
// below so callers can't observe a window where the scope is
|
// authoritative operation for removing the child record and returning
|
||||||
// still registered but StopPod has returned. Duplicate release
|
// delegated Write scope to the spawner.
|
||||||
// is harmless — `ScopeAllocationGuard`'s drop path swallows
|
|
||||||
// `UnknownPod` errors.
|
|
||||||
let _ = connect_and_send(&record.socket_path, &Method::Shutdown).await;
|
let _ = connect_and_send(&record.socket_path, &Method::Shutdown).await;
|
||||||
|
|
||||||
let scope_summary = summarize_scope(&record);
|
let scope_summary = summarize_scope(&record);
|
||||||
release_scope(&record.pod_name);
|
|
||||||
|
|
||||||
self.registry
|
self.registry
|
||||||
.remove(&record.pod_name)
|
.remove(&record.pod_name)
|
||||||
|
|
@ -516,20 +513,6 @@ fn summarize_scope(record: &SpawnedPodRecord) -> String {
|
||||||
parts.join(", ")
|
parts.join(", ")
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Best-effort release of the pod's scope allocation. Swallows every
|
|
||||||
/// error: the caller has already completed its user-visible side
|
|
||||||
/// effects (Method::Shutdown was sent), and stale-reclaim will clean
|
|
||||||
/// up whatever we couldn't.
|
|
||||||
fn release_scope(pod_name: &str) {
|
|
||||||
let Ok(lock_path) = pod_registry::default_registry_path() else {
|
|
||||||
return;
|
|
||||||
};
|
|
||||||
let Ok(mut guard) = LockFileGuard::open(&lock_path) else {
|
|
||||||
return;
|
|
||||||
};
|
|
||||||
let _ = pod_registry::release_pod(&mut guard, pod_name);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
|
||||||
|
|
@ -19,17 +19,21 @@ use std::path::Path;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use manifest::{Permission, ScopeRule};
|
use manifest::{Permission, ScopeRule, SharedScope};
|
||||||
use session_store::{
|
use session_store::{
|
||||||
PodMetadata, PodMetadataStore, PodSpawnedChild, PodSpawnedScopeRule, StoreError,
|
PodMetadata, PodMetadataStore, PodScopeSnapshot, PodSpawnedChild, PodSpawnedScopeRule,
|
||||||
|
StoreError,
|
||||||
};
|
};
|
||||||
use tokio::net::UnixStream;
|
use tokio::net::UnixStream;
|
||||||
use tokio::sync::Mutex;
|
use tokio::sync::Mutex;
|
||||||
use tracing::warn;
|
use tracing::warn;
|
||||||
|
|
||||||
use crate::runtime::dir::{RuntimeDir, SpawnedPodRecord};
|
use crate::runtime::dir::{RuntimeDir, SpawnedPodRecord};
|
||||||
|
use crate::runtime::pod_registry;
|
||||||
|
|
||||||
type RegistryStateWriter = Arc<dyn Fn(&[SpawnedPodRecord]) -> io::Result<()> + Send + Sync>;
|
type RegistryStateWriter = Arc<dyn Fn(&[SpawnedPodRecord]) -> io::Result<()> + Send + Sync>;
|
||||||
|
type ScopeChangeSink = Arc<dyn Fn(PodScopeSnapshot) + Send + Sync>;
|
||||||
|
|
||||||
const RESTORE_REACHABILITY_TIMEOUT: Duration = Duration::from_millis(500);
|
const RESTORE_REACHABILITY_TIMEOUT: Duration = Duration::from_millis(500);
|
||||||
|
|
||||||
pub struct SpawnedPodRegistry {
|
pub struct SpawnedPodRegistry {
|
||||||
|
|
@ -37,6 +41,14 @@ pub struct SpawnedPodRegistry {
|
||||||
cursors: Mutex<HashMap<String, usize>>,
|
cursors: Mutex<HashMap<String, usize>>,
|
||||||
runtime_dir: Arc<RuntimeDir>,
|
runtime_dir: Arc<RuntimeDir>,
|
||||||
state_writer: Option<RegistryStateWriter>,
|
state_writer: Option<RegistryStateWriter>,
|
||||||
|
parent_name: Option<String>,
|
||||||
|
parent_scope: Option<SharedScope>,
|
||||||
|
scope_change_sink: Option<ScopeChangeSink>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct SpawnedPodRegistryLoad {
|
||||||
|
pub registry: Arc<SpawnedPodRegistry>,
|
||||||
|
pub reclaimed_unreachable: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SpawnedPodRegistry {
|
impl SpawnedPodRegistry {
|
||||||
|
|
@ -46,6 +58,9 @@ impl SpawnedPodRegistry {
|
||||||
cursors: Mutex::new(HashMap::new()),
|
cursors: Mutex::new(HashMap::new()),
|
||||||
runtime_dir,
|
runtime_dir,
|
||||||
state_writer: None,
|
state_writer: None,
|
||||||
|
parent_name: None,
|
||||||
|
parent_scope: None,
|
||||||
|
scope_change_sink: None,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -58,6 +73,22 @@ impl SpawnedPodRegistry {
|
||||||
store: St,
|
store: St,
|
||||||
pod_name: String,
|
pod_name: String,
|
||||||
) -> io::Result<Arc<Self>>
|
) -> io::Result<Arc<Self>>
|
||||||
|
where
|
||||||
|
St: PodMetadataStore + Clone + Send + Sync + 'static,
|
||||||
|
{
|
||||||
|
let loaded =
|
||||||
|
Self::load_from_pod_state_with_reclaim(runtime_dir, store, pod_name, None, None)
|
||||||
|
.await?;
|
||||||
|
Ok(loaded.registry)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn load_from_pod_state_with_reclaim<St>(
|
||||||
|
runtime_dir: Arc<RuntimeDir>,
|
||||||
|
store: St,
|
||||||
|
pod_name: String,
|
||||||
|
parent_scope: Option<SharedScope>,
|
||||||
|
scope_change_sink: Option<ScopeChangeSink>,
|
||||||
|
) -> io::Result<SpawnedPodRegistryLoad>
|
||||||
where
|
where
|
||||||
St: PodMetadataStore + Clone + Send + Sync + 'static,
|
St: PodMetadataStore + Clone + Send + Sync + 'static,
|
||||||
{
|
{
|
||||||
|
|
@ -69,6 +100,7 @@ impl SpawnedPodRegistry {
|
||||||
|
|
||||||
let mut records = Vec::with_capacity(persisted_children.len());
|
let mut records = Vec::with_capacity(persisted_children.len());
|
||||||
let mut pruned = false;
|
let mut pruned = false;
|
||||||
|
let mut pruned_records = Vec::new();
|
||||||
for child in &persisted_children {
|
for child in &persisted_children {
|
||||||
let record = match record_from_pod_state(child) {
|
let record = match record_from_pod_state(child) {
|
||||||
Ok(record) => record,
|
Ok(record) => record,
|
||||||
|
|
@ -91,21 +123,36 @@ impl SpawnedPodRegistry {
|
||||||
socket = %record.socket_path.display(),
|
socket = %record.socket_path.display(),
|
||||||
"dropping unreachable persisted spawned-pod record"
|
"dropping unreachable persisted spawned-pod record"
|
||||||
);
|
);
|
||||||
|
pruned_records.push(record);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
runtime_dir.write_spawned_pods(&records).await?;
|
runtime_dir.write_spawned_pods(&records).await?;
|
||||||
let state_writer = pod_state_writer(store, pod_name);
|
let state_writer = pod_state_writer(store, pod_name.clone());
|
||||||
if pruned || metadata.is_some() {
|
if pruned || metadata.is_some() {
|
||||||
state_writer(&records)?;
|
state_writer(&records)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(Arc::new(Self {
|
let mut reclaimed_unreachable = false;
|
||||||
records: Mutex::new(records),
|
if parent_scope.is_some() {
|
||||||
cursors: Mutex::new(HashMap::new()),
|
for record in &pruned_records {
|
||||||
runtime_dir,
|
reclaim_record(&pod_name, parent_scope.as_ref(), None, record)?;
|
||||||
state_writer: Some(state_writer),
|
reclaimed_unreachable = true;
|
||||||
}))
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(SpawnedPodRegistryLoad {
|
||||||
|
registry: Arc::new(Self {
|
||||||
|
records: Mutex::new(records),
|
||||||
|
cursors: Mutex::new(HashMap::new()),
|
||||||
|
runtime_dir,
|
||||||
|
state_writer: Some(state_writer),
|
||||||
|
parent_name: Some(pod_name),
|
||||||
|
parent_scope,
|
||||||
|
scope_change_sink,
|
||||||
|
}),
|
||||||
|
reclaimed_unreachable,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Append a new record and persist the full list. Returns an I/O
|
/// Append a new record and persist the full list. Returns an I/O
|
||||||
|
|
@ -131,8 +178,9 @@ impl SpawnedPodRegistry {
|
||||||
self.records.lock().await.clone()
|
self.records.lock().await.clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Remove the record for `pod_name`, persist, and clear its cursor.
|
/// Remove the record for `pod_name`, persist, clear its cursor, and
|
||||||
/// Returns the removed record (if any).
|
/// reclaim any delegated Write scope owned by that child. Returns the
|
||||||
|
/// removed record (if any).
|
||||||
pub async fn remove(&self, pod_name: &str) -> io::Result<Option<SpawnedPodRecord>> {
|
pub async fn remove(&self, pod_name: &str) -> io::Result<Option<SpawnedPodRecord>> {
|
||||||
let removed = {
|
let removed = {
|
||||||
let mut records = self.records.lock().await;
|
let mut records = self.records.lock().await;
|
||||||
|
|
@ -142,9 +190,25 @@ impl SpawnedPodRegistry {
|
||||||
removed
|
removed
|
||||||
};
|
};
|
||||||
self.cursors.lock().await.remove(pod_name);
|
self.cursors.lock().await.remove(pod_name);
|
||||||
|
if let Some(record) = &removed {
|
||||||
|
self.reclaim_record(record)?;
|
||||||
|
}
|
||||||
Ok(removed)
|
Ok(removed)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn reclaim_record(&self, record: &SpawnedPodRecord) -> io::Result<()> {
|
||||||
|
let Some(parent_name) = &self.parent_name else {
|
||||||
|
release_child_allocation(&record.pod_name)?;
|
||||||
|
return Ok(());
|
||||||
|
};
|
||||||
|
reclaim_record(
|
||||||
|
parent_name,
|
||||||
|
self.parent_scope.as_ref(),
|
||||||
|
self.scope_change_sink.as_ref(),
|
||||||
|
record,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
/// Read-only cursor lookup. Returns 0 when no cursor has been set.
|
/// Read-only cursor lookup. Returns 0 when no cursor has been set.
|
||||||
pub async fn cursor(&self, pod_name: &str) -> usize {
|
pub async fn cursor(&self, pod_name: &str) -> usize {
|
||||||
self.cursors
|
self.cursors
|
||||||
|
|
@ -180,6 +244,58 @@ where
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn reclaim_record(
|
||||||
|
parent_name: &str,
|
||||||
|
parent_scope: Option<&SharedScope>,
|
||||||
|
scope_change_sink: Option<&ScopeChangeSink>,
|
||||||
|
record: &SpawnedPodRecord,
|
||||||
|
) -> io::Result<()> {
|
||||||
|
let write_rules = record
|
||||||
|
.scope_delegated
|
||||||
|
.iter()
|
||||||
|
.filter(|rule| rule.permission == Permission::Write)
|
||||||
|
.cloned()
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
let lock_path = pod_registry::default_registry_path()
|
||||||
|
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
|
||||||
|
let mut guard = pod_registry::LockFileGuard::open(&lock_path)
|
||||||
|
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
|
||||||
|
pod_registry::reclaim_delegated_scope(
|
||||||
|
&mut guard,
|
||||||
|
parent_name,
|
||||||
|
&record.pod_name,
|
||||||
|
&record.scope_delegated,
|
||||||
|
)
|
||||||
|
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
|
||||||
|
|
||||||
|
if let Some(scope) = parent_scope {
|
||||||
|
scope
|
||||||
|
.update(|current| current.with_removed_deny_rules(write_rules))
|
||||||
|
.map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))?;
|
||||||
|
if let Some(sink) = scope_change_sink {
|
||||||
|
let snapshot = scope.snapshot();
|
||||||
|
sink(PodScopeSnapshot {
|
||||||
|
allow: snapshot.allow_rules(),
|
||||||
|
deny: snapshot.deny_rules(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn release_child_allocation(pod_name: &str) -> io::Result<()> {
|
||||||
|
let lock_path = pod_registry::default_registry_path()
|
||||||
|
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
|
||||||
|
let mut guard = pod_registry::LockFileGuard::open(&lock_path)
|
||||||
|
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
|
||||||
|
match pod_registry::release_pod(&mut guard, pod_name) {
|
||||||
|
Ok(()) | Err(pod_registry::ScopeLockError::UnknownPod(_)) => Ok(()),
|
||||||
|
Err(err) => Err(io::Error::new(io::ErrorKind::Other, err)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn write_records_to_pod_state<St>(
|
fn write_records_to_pod_state<St>(
|
||||||
store: &St,
|
store: &St,
|
||||||
pod_name: &str,
|
pod_name: &str,
|
||||||
|
|
|
||||||
|
|
@ -13,7 +13,7 @@ use std::sync::{Arc, LazyLock, Mutex};
|
||||||
|
|
||||||
use llm_worker::llm_client::types::{ContentPart, Item, Role};
|
use llm_worker::llm_client::types::{ContentPart, Item, Role};
|
||||||
use llm_worker::tool::ToolOutput;
|
use llm_worker::tool::ToolOutput;
|
||||||
use manifest::{Permission, ScopeRule};
|
use manifest::{Permission, Scope, ScopeRule, SharedScope};
|
||||||
use pod::runtime::dir::{RuntimeDir, SpawnedPodRecord};
|
use pod::runtime::dir::{RuntimeDir, SpawnedPodRecord};
|
||||||
use pod::runtime::pod_registry::{self, LockFileGuard};
|
use pod::runtime::pod_registry::{self, LockFileGuard};
|
||||||
use pod::spawn::comm_tools::{
|
use pod::spawn::comm_tools::{
|
||||||
|
|
@ -383,45 +383,67 @@ async fn read_pod_output_reports_stopped_on_dead_socket() {
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn stop_pod_sends_shutdown_and_releases_scope() {
|
async fn stop_pod_sends_shutdown_and_releases_scope() {
|
||||||
let _env = EnvGuard::acquire();
|
let _env = EnvGuard::acquire();
|
||||||
let (tmp, registry, rd) = setup_registry().await;
|
let tmp = TempDir::new().unwrap();
|
||||||
|
let store_tmp = TempDir::new().unwrap();
|
||||||
|
let store = FsStore::new(store_tmp.path()).unwrap();
|
||||||
|
let rd = Arc::new(RuntimeDir::create(tmp.path(), "spawner").await.unwrap());
|
||||||
|
let parent_scope = SharedScope::new(
|
||||||
|
Scope::writable(tmp.path())
|
||||||
|
.unwrap()
|
||||||
|
.with_added_deny_rules([ScopeRule {
|
||||||
|
target: tmp.path().to_path_buf(),
|
||||||
|
permission: Permission::Write,
|
||||||
|
recursive: true,
|
||||||
|
}])
|
||||||
|
.unwrap(),
|
||||||
|
);
|
||||||
unsafe {
|
unsafe {
|
||||||
std::env::set_var("INSOMNIA_RUNTIME_DIR", tmp.path());
|
std::env::set_var("INSOMNIA_RUNTIME_DIR", tmp.path());
|
||||||
}
|
}
|
||||||
let lock_path = tmp.path().join("pods.json");
|
let lock_path = tmp.path().join("pods.json");
|
||||||
|
|
||||||
// Seed pods.json with a top-level `spawner` allocation plus a
|
// Seed pods.json with a restored top-level `spawner` allocation whose
|
||||||
// delegated `child` allocation — mimics what SpawnPod would have
|
// scope_deny contains the delegated child path plus the live child
|
||||||
// done so StopPod has something to release.
|
// allocation — mimics a parent resumed after SpawnPod.
|
||||||
{
|
{
|
||||||
let mut g = LockFileGuard::open(&lock_path).unwrap();
|
let mut g = LockFileGuard::open(&lock_path).unwrap();
|
||||||
pod_registry::register_pod(
|
let rule = ScopeRule {
|
||||||
|
target: tmp.path().to_path_buf(),
|
||||||
|
permission: Permission::Write,
|
||||||
|
recursive: true,
|
||||||
|
};
|
||||||
|
pod_registry::register_pod_with_deny(
|
||||||
&mut g,
|
&mut g,
|
||||||
"spawner".into(),
|
"spawner".into(),
|
||||||
std::process::id(),
|
std::process::id(),
|
||||||
"/tmp/spawner.sock".into(),
|
"/tmp/spawner.sock".into(),
|
||||||
vec![ScopeRule {
|
vec![rule.clone()],
|
||||||
target: tmp.path().to_path_buf(),
|
vec![rule.clone()],
|
||||||
permission: Permission::Write,
|
|
||||||
recursive: true,
|
|
||||||
}],
|
|
||||||
session_store::new_segment_id(),
|
session_store::new_segment_id(),
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
pod_registry::delegate_scope(
|
pod_registry::register_pod(
|
||||||
&mut g,
|
&mut g,
|
||||||
"spawner",
|
|
||||||
"child".into(),
|
"child".into(),
|
||||||
std::process::id(),
|
std::process::id(),
|
||||||
"/tmp/child.sock".into(),
|
"/tmp/child.sock".into(),
|
||||||
vec![ScopeRule {
|
vec![rule],
|
||||||
target: tmp.path().to_path_buf(),
|
session_store::new_segment_id(),
|
||||||
permission: Permission::Write,
|
|
||||||
recursive: true,
|
|
||||||
}],
|
|
||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let loaded = SpawnedPodRegistry::load_from_pod_state_with_reclaim(
|
||||||
|
rd.clone(),
|
||||||
|
store.clone(),
|
||||||
|
"spawner".into(),
|
||||||
|
Some(parent_scope.clone()),
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
let registry = loaded.registry;
|
||||||
|
|
||||||
let (socket, listener) = bind_mock_socket(tmp.path(), "child").await;
|
let (socket, listener) = bind_mock_socket(tmp.path(), "child").await;
|
||||||
let received = accept_one_method(listener);
|
let received = accept_one_method(listener);
|
||||||
register_child(®istry, "child", &socket, tmp.path()).await;
|
register_child(®istry, "child", &socket, tmp.path()).await;
|
||||||
|
|
@ -436,12 +458,20 @@ async fn stop_pod_sends_shutdown_and_releases_scope() {
|
||||||
let method = received.await.unwrap().expect("expected shutdown");
|
let method = received.await.unwrap().expect("expected shutdown");
|
||||||
assert!(matches!(method, Method::Shutdown));
|
assert!(matches!(method, Method::Shutdown));
|
||||||
|
|
||||||
// Allocation for `child` is gone; `spawner` remains.
|
// Allocation for `child` is gone; `spawner` remains and its restored
|
||||||
|
// dynamic deny layer has been reclaimed.
|
||||||
{
|
{
|
||||||
let g = LockFileGuard::open(&lock_path).unwrap();
|
let g = LockFileGuard::open(&lock_path).unwrap();
|
||||||
assert!(g.data().find("child").is_none(), "child still allocated");
|
assert!(g.data().find("child").is_none(), "child still allocated");
|
||||||
assert!(g.data().find("spawner").is_some(), "spawner missing");
|
let spawner = g.data().find("spawner").expect("spawner missing");
|
||||||
|
assert!(spawner.scope_deny.is_empty(), "deny not reclaimed");
|
||||||
}
|
}
|
||||||
|
assert_eq!(
|
||||||
|
parent_scope
|
||||||
|
.snapshot()
|
||||||
|
.permission_at(&tmp.path().join("file.txt")),
|
||||||
|
Some(Permission::Write)
|
||||||
|
);
|
||||||
|
|
||||||
// spawned_pods.json now lists zero children.
|
// spawned_pods.json now lists zero children.
|
||||||
let spawned = rd.path().join("spawned_pods.json");
|
let spawned = rd.path().join("spawned_pods.json");
|
||||||
|
|
@ -589,6 +619,98 @@ async fn load_from_pod_state_prunes_children_with_missing_sockets() {
|
||||||
assert_eq!(metadata.spawned_children[0].pod_name, "alive");
|
assert_eq!(metadata.spawned_children[0].pod_name, "alive");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn load_from_pod_state_reclaims_pruned_child_scope_and_registry_deny() {
|
||||||
|
let _env = EnvGuard::acquire();
|
||||||
|
let runtime_tmp = TempDir::new().unwrap();
|
||||||
|
let store_tmp = TempDir::new().unwrap();
|
||||||
|
let store = FsStore::new(store_tmp.path()).unwrap();
|
||||||
|
unsafe {
|
||||||
|
std::env::set_var("INSOMNIA_RUNTIME_DIR", runtime_tmp.path());
|
||||||
|
}
|
||||||
|
let rd = Arc::new(
|
||||||
|
RuntimeDir::create(runtime_tmp.path(), "spawner")
|
||||||
|
.await
|
||||||
|
.unwrap(),
|
||||||
|
);
|
||||||
|
let missing_rule = ScopeRule {
|
||||||
|
target: runtime_tmp.path().to_path_buf(),
|
||||||
|
permission: Permission::Write,
|
||||||
|
recursive: true,
|
||||||
|
};
|
||||||
|
|
||||||
|
{
|
||||||
|
let mut g = LockFileGuard::open(&runtime_tmp.path().join("pods.json")).unwrap();
|
||||||
|
pod_registry::register_pod_with_deny(
|
||||||
|
&mut g,
|
||||||
|
"spawner".into(),
|
||||||
|
std::process::id(),
|
||||||
|
"/tmp/spawner.sock".into(),
|
||||||
|
vec![missing_rule.clone()],
|
||||||
|
vec![missing_rule.clone()],
|
||||||
|
session_store::new_segment_id(),
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
pod_registry::register_pod(
|
||||||
|
&mut g,
|
||||||
|
"missing".into(),
|
||||||
|
std::process::id(),
|
||||||
|
"/tmp/missing.sock".into(),
|
||||||
|
vec![missing_rule.clone()],
|
||||||
|
session_store::new_segment_id(),
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
let parent_scope = SharedScope::new(
|
||||||
|
Scope::writable(runtime_tmp.path())
|
||||||
|
.unwrap()
|
||||||
|
.with_added_deny_rules([missing_rule.clone()])
|
||||||
|
.unwrap(),
|
||||||
|
);
|
||||||
|
let seed = SpawnedPodRegistry::load_from_pod_state(rd.clone(), store.clone(), "spawner".into())
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
seed.add(SpawnedPodRecord {
|
||||||
|
pod_name: "missing".into(),
|
||||||
|
socket_path: runtime_tmp.path().join("missing.sock"),
|
||||||
|
scope_delegated: vec![missing_rule.clone()],
|
||||||
|
callback_address: "/dev/null".into(),
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let loaded = SpawnedPodRegistry::load_from_pod_state_with_reclaim(
|
||||||
|
rd.clone(),
|
||||||
|
store.clone(),
|
||||||
|
"spawner".into(),
|
||||||
|
Some(parent_scope.clone()),
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
assert!(loaded.reclaimed_unreachable);
|
||||||
|
assert!(loaded.registry.get("missing").await.is_none());
|
||||||
|
assert_eq!(
|
||||||
|
parent_scope
|
||||||
|
.snapshot()
|
||||||
|
.permission_at(&runtime_tmp.path().join("file.txt")),
|
||||||
|
Some(Permission::Write)
|
||||||
|
);
|
||||||
|
|
||||||
|
let g = LockFileGuard::open(&runtime_tmp.path().join("pods.json")).unwrap();
|
||||||
|
assert!(g.data().find("missing").is_none());
|
||||||
|
assert!(g.data().find("spawner").unwrap().scope_deny.is_empty());
|
||||||
|
let metadata = store
|
||||||
|
.read_by_name("spawner")
|
||||||
|
.unwrap()
|
||||||
|
.expect("spawner metadata should remain");
|
||||||
|
assert!(metadata.spawned_children.is_empty());
|
||||||
|
let runtime_contents = std::fs::read_to_string(rd.path().join("spawned_pods.json")).unwrap();
|
||||||
|
let runtime_records: Vec<SpawnedPodRecord> = serde_json::from_str(&runtime_contents).unwrap();
|
||||||
|
assert!(runtime_records.is_empty());
|
||||||
|
}
|
||||||
|
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
// ListPods
|
// ListPods
|
||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user