fix: reconcile missing delegated children
This commit is contained in:
parent
e10b4ad4f0
commit
d2e80871ce
|
|
@ -45,9 +45,9 @@ pub fn register_pod(
|
|||
/// and the registration proceeds. The check is structural (deny ⊇
|
||||
/// competitor.rule), not relational — it does not verify that the
|
||||
/// competitor actually descends from this Pod's prior delegations.
|
||||
/// In practice this is safe because the canonical caller is `restore`,
|
||||
/// which derives `scope_deny` from the session's own snapshot, so any
|
||||
/// covered competitor is guaranteed to be a descendant of the original
|
||||
/// In practice this is safe because the canonical restore caller derives
|
||||
/// `scope_deny` from outstanding `pod-store` child delegations, so any
|
||||
/// covered competitor is expected to be a descendant of the original
|
||||
/// allocation. Direct callers must uphold the same invariant.
|
||||
pub fn register_pod_with_deny(
|
||||
guard: &mut LockFileGuard,
|
||||
|
|
@ -180,10 +180,11 @@ pub fn release_pod(guard: &mut LockFileGuard, pod_name: &str) -> Result<(), Scop
|
|||
|
||||
/// Reclaim a child delegation back into its parent allocation.
|
||||
///
|
||||
/// This is idempotent: missing child allocations and missing deny entries are
|
||||
/// ignored. For each delegated Write rule, at most one exact matching deny rule
|
||||
/// is removed from the parent's `scope_deny`, preserving any duplicate explicit
|
||||
/// base deny that was not owned by this child delegation.
|
||||
/// This is idempotent for missing deny entries. For each delegated Write rule,
|
||||
/// at most one exact matching deny rule is removed from the parent's `scope_deny`
|
||||
/// even when the child allocation is already absent; restore reconciliation uses
|
||||
/// that case when durable Pod-state still records an outstanding delegation but
|
||||
/// the live lock file no longer has a child allocation.
|
||||
pub fn reclaim_delegated_scope(
|
||||
guard: &mut LockFileGuard,
|
||||
parent: &str,
|
||||
|
|
@ -199,17 +200,13 @@ pub fn reclaim_delegated_scope(
|
|||
.map(|idx| guard.data().allocations[idx].delegated_from.clone())
|
||||
.unwrap_or(None);
|
||||
|
||||
let child_exists = child_idx.is_some();
|
||||
|
||||
if child_exists {
|
||||
if let Some(parent_alloc) = guard.data_mut().find_mut(parent) {
|
||||
for rule in delegated_scope
|
||||
.iter()
|
||||
.filter(|rule| rule.permission == Permission::Write)
|
||||
{
|
||||
if let Some(idx) = parent_alloc.scope_deny.iter().position(|deny| deny == rule) {
|
||||
parent_alloc.scope_deny.remove(idx);
|
||||
}
|
||||
if let Some(parent_alloc) = guard.data_mut().find_mut(parent) {
|
||||
for rule in delegated_scope
|
||||
.iter()
|
||||
.filter(|rule| rule.permission == Permission::Write)
|
||||
{
|
||||
if let Some(idx) = parent_alloc.scope_deny.iter().position(|deny| deny == rule) {
|
||||
parent_alloc.scope_deny.remove(idx);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -516,15 +513,43 @@ mod tests {
|
|||
assert_eq!(a.scope_deny, vec![delegated_rule.clone()]);
|
||||
assert!(g.data().find("b").is_none());
|
||||
|
||||
reclaim_delegated_scope(&mut g, "a", "b", &[delegated_rule.clone()]).unwrap();
|
||||
reclaim_delegated_scope(&mut g, "a", "b", std::slice::from_ref(&delegated_rule)).unwrap();
|
||||
let a = g.data().find("a").unwrap();
|
||||
assert_eq!(
|
||||
a.scope_deny,
|
||||
vec![delegated_rule],
|
||||
"a repeated reclaim with no child allocation must not broaden an explicit duplicate base deny"
|
||||
assert!(
|
||||
a.scope_deny.is_empty(),
|
||||
"a missing child allocation still reclaims one matching parent deny"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn reclaim_delegated_scope_removes_parent_deny_when_child_allocation_missing() {
|
||||
let dir = TempDir::new().unwrap();
|
||||
let path = dir.path().join("pods.json");
|
||||
let mut g = open_empty(&path);
|
||||
let delegated_rule = write_rule("/src/core", true);
|
||||
register_pod_with_deny(
|
||||
&mut g,
|
||||
"a".into(),
|
||||
std::process::id(),
|
||||
sock("a"),
|
||||
vec![write_rule("/src", true)],
|
||||
vec![delegated_rule.clone()],
|
||||
sid(),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
reclaim_delegated_scope(
|
||||
&mut g,
|
||||
"a",
|
||||
"missing",
|
||||
std::slice::from_ref(&delegated_rule),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let a = g.data().find("a").unwrap();
|
||||
assert!(a.scope_deny.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn reclaim_stale_reparents_and_removes_dead_entries() {
|
||||
let dir = TempDir::new().unwrap();
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
use std::path::{Path, PathBuf};
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Duration;
|
||||
|
||||
use arc_swap::ArcSwap;
|
||||
use llm_worker::Item;
|
||||
|
|
@ -10,7 +11,8 @@ use llm_worker::llm_client::types::Role;
|
|||
use llm_worker::state::Mutable;
|
||||
use llm_worker::{ToolOutputLimits, UsageRecord, Worker, WorkerError, WorkerResult};
|
||||
use pod_store::{
|
||||
PodActiveSegmentRef, PodMetadata, PodMetadataStore, PodSpawnedScopeRule, PodStoreError,
|
||||
PodActiveSegmentRef, PodMetadata, PodMetadataStore, PodReclaimedChild, PodSpawnedChild,
|
||||
PodSpawnedScopeRule, PodStoreError,
|
||||
};
|
||||
use session_store::{
|
||||
LogEntry, SegmentId, SessionId, Store, StoreError, SystemItem, segment_log, to_logged,
|
||||
|
|
@ -45,9 +47,12 @@ use llm_worker::interceptor::PreRequestAction;
|
|||
use protocol::{
|
||||
AlertLevel, AlertSource, Event, RewindSummary, RewindTarget, RewindTargetId, Segment,
|
||||
};
|
||||
use tokio::net::UnixStream;
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
const RESTORE_RECONCILIATION_REACHABILITY_TIMEOUT: Duration = Duration::from_millis(500);
|
||||
|
||||
/// `(SessionId, SegmentId)` pair the Pod is currently writing to.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub struct SegmentLocation {
|
||||
|
|
@ -4048,10 +4053,61 @@ where
|
|||
session_id,
|
||||
segment_id,
|
||||
})?;
|
||||
pod.reconcile_restored_delegations().await?;
|
||||
drain_skill_shadows(&pod, skill_shadows);
|
||||
Ok(pod)
|
||||
}
|
||||
|
||||
async fn reconcile_restored_delegations(&mut self) -> Result<(), PodError> {
|
||||
let pod_name = self.manifest.pod.name.clone();
|
||||
let Some(metadata) = self.store.read_by_name(&pod_name)? else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let mut reclaimed = Vec::new();
|
||||
for child in metadata.spawned_children {
|
||||
if restored_child_reachable(&child).await {
|
||||
continue;
|
||||
}
|
||||
let delegated_scope = spawned_child_scope_rules(&child);
|
||||
if !delegated_scope.is_empty() {
|
||||
let lock_path =
|
||||
pod_registry::default_registry_path().map_err(ScopeLockError::from)?;
|
||||
let mut guard =
|
||||
pod_registry::LockFileGuard::open(&lock_path).map_err(ScopeLockError::from)?;
|
||||
pod_registry::reclaim_delegated_scope(
|
||||
&mut guard,
|
||||
&pod_name,
|
||||
&child.pod_name,
|
||||
&delegated_scope,
|
||||
)?;
|
||||
let write_rules = delegated_scope
|
||||
.iter()
|
||||
.filter(|rule| rule.permission == Permission::Write)
|
||||
.cloned()
|
||||
.collect::<Vec<_>>();
|
||||
self.scope
|
||||
.update(|current| current.with_removed_deny_rules(write_rules))
|
||||
.map_err(PodError::Scope)?;
|
||||
}
|
||||
reclaimed.push(PodReclaimedChild {
|
||||
pod_name: child.pod_name,
|
||||
scope_delegated: child.scope_delegated,
|
||||
});
|
||||
}
|
||||
|
||||
if reclaimed.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
self.store.reclaim_spawned_children(&pod_name, reclaimed)?;
|
||||
self.push_notify(
|
||||
"Restored Pod state contained missing or unreachable delegated child Pods; their delegated write scopes were reclaimed before resume."
|
||||
.to_string(),
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Convenience: build a Pod from a single-layer TOML manifest string.
|
||||
///
|
||||
/// Parses the TOML into a [`PodManifestConfig`], converts to a
|
||||
|
|
@ -4594,6 +4650,40 @@ struct PodCommon {
|
|||
skill_shadows: Vec<workflow_crate::ShadowedSkill>,
|
||||
}
|
||||
|
||||
async fn restored_child_reachable(child: &PodSpawnedChild) -> bool {
|
||||
tokio::time::timeout(
|
||||
RESTORE_RECONCILIATION_REACHABILITY_TIMEOUT,
|
||||
UnixStream::connect(&child.socket_path),
|
||||
)
|
||||
.await
|
||||
.map(|result| result.is_ok())
|
||||
.unwrap_or(false)
|
||||
}
|
||||
|
||||
fn spawned_child_scope_rules(child: &PodSpawnedChild) -> Vec<ScopeRule> {
|
||||
child
|
||||
.scope_delegated
|
||||
.iter()
|
||||
.filter_map(|rule| delegated_scope_rule_to_scope_rule(rule.clone()))
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn delegated_scope_rule_to_scope_rule(rule: PodSpawnedScopeRule) -> Option<ScopeRule> {
|
||||
let permission = match rule.permission.as_str() {
|
||||
"read" => Permission::Read,
|
||||
"write" => Permission::Write,
|
||||
other => {
|
||||
warn!(permission = %other, "ignoring invalid delegated child scope permission");
|
||||
return None;
|
||||
}
|
||||
};
|
||||
Some(ScopeRule {
|
||||
target: rule.target,
|
||||
permission,
|
||||
recursive: rule.recursive,
|
||||
})
|
||||
}
|
||||
|
||||
fn effective_restore_scope_config<St>(
|
||||
store: &St,
|
||||
manifest: &PodManifest,
|
||||
|
|
@ -4616,18 +4706,8 @@ where
|
|||
}
|
||||
|
||||
fn delegated_write_rule_to_deny(rule: PodSpawnedScopeRule) -> Option<ScopeRule> {
|
||||
match rule.permission.as_str() {
|
||||
"write" => Some(ScopeRule {
|
||||
target: rule.target,
|
||||
permission: Permission::Write,
|
||||
recursive: rule.recursive,
|
||||
}),
|
||||
"read" => None,
|
||||
other => {
|
||||
warn!(permission = %other, "ignoring invalid delegated child scope permission");
|
||||
None
|
||||
}
|
||||
}
|
||||
let rule = delegated_scope_rule_to_scope_rule(rule)?;
|
||||
(rule.permission == Permission::Write).then_some(rule)
|
||||
}
|
||||
|
||||
/// Resolve pwd / scope / LLM client / prompt catalog from a validated
|
||||
|
|
|
|||
|
|
@ -633,7 +633,7 @@ async fn load_from_pod_state_prunes_runtime_children_and_reclaims_durable_delega
|
|||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn load_from_pod_state_reclaims_pruned_child_scope_and_records_history() {
|
||||
async fn load_from_pod_state_reclaims_missing_child_scope_and_records_history() {
|
||||
let _env = EnvGuard::acquire();
|
||||
let runtime_tmp = TempDir::new().unwrap();
|
||||
let store_tmp = TempDir::new().unwrap();
|
||||
|
|
@ -667,15 +667,6 @@ async fn load_from_pod_state_reclaims_pruned_child_scope_and_records_history() {
|
|||
session_store::new_segment_id(),
|
||||
)
|
||||
.unwrap();
|
||||
pod_registry::register_pod(
|
||||
&mut g,
|
||||
"missing".into(),
|
||||
std::process::id(),
|
||||
"/tmp/missing.sock".into(),
|
||||
vec![missing_rule.clone()],
|
||||
session_store::new_segment_id(),
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
let parent_scope = SharedScope::new(
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user