From e10b4ad4f022b8a47c7ea2e5496e801479d34c00 Mon Sep 17 00:00:00 2001 From: Hare Date: Sat, 30 May 2026 07:36:17 +0900 Subject: [PATCH] refactor: move scope authority to pod store --- crates/client/src/spawn.rs | 8 -- crates/pod-store/src/lib.rs | 65 +++++++++++++ crates/pod/src/controller.rs | 9 +- crates/pod/src/discovery.rs | 5 + crates/pod/src/main.rs | 12 +-- crates/pod/src/pod.rs | 123 ++++++++---------------- crates/pod/src/spawn/registry.rs | 101 ++++++++++++------- crates/pod/src/spawn/tool.rs | 13 --- crates/pod/tests/pod_comm_tools_test.rs | 30 +++--- crates/pod/tests/restore_test.rs | 36 ------- crates/pod/tests/spawn_pod_test.rs | 3 - crates/session-store/src/lib.rs | 8 +- crates/session-store/src/segment.rs | 19 +--- crates/session-store/src/segment_log.rs | 27 +----- crates/tui/src/spawn.rs | 79 +-------------- 15 files changed, 200 insertions(+), 338 deletions(-) diff --git a/crates/client/src/spawn.rs b/crates/client/src/spawn.rs index c603c7fd..401cff03 100644 --- a/crates/client/src/spawn.rs +++ b/crates/client/src/spawn.rs @@ -31,8 +31,6 @@ pub struct SpawnConfig { /// `--profile`; the Pod name is supplied through `--profile-pod-name` so /// profile evaluation stays separate from `--pod` restore semantics. pub profile: Option, - /// Optional session-scope snapshot used when restoring by session id. - pub resume_scope: Option, /// pod の current_dir。 pub cwd: PathBuf, /// `Some(id)` のとき `--session ` を付与し、当該セッションから @@ -132,12 +130,6 @@ where .arg(id.to_string()) .arg("--session-pod-name") .arg(&config.pod_name); - if let Some(scope) = &config.resume_scope { - let scope_json = serde_json::to_string(scope).map_err(|e| { - SpawnError::PodLaunchFailed(io::Error::new(io::ErrorKind::InvalidInput, e)) - })?; - command.arg("--resume-scope-json").arg(scope_json); - } } let mut child = command.spawn().map_err(SpawnError::PodLaunchFailed)?; diff --git a/crates/pod-store/src/lib.rs b/crates/pod-store/src/lib.rs index c39ea5c4..07517b4d 100644 --- a/crates/pod-store/src/lib.rs +++ b/crates/pod-store/src/lib.rs @@ -75,6 +75,15 @@ pub struct PodSpawnedChild { pub callback_address: PathBuf, } +/// One child delegation that has been reclaimed. Kept as durable audit state so +/// restore can distinguish outstanding delegated scope from already-reclaimed +/// child state without consulting session logs. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct PodReclaimedChild { + pub pod_name: String, + pub scope_delegated: Vec, +} + /// Persistent metadata for a Pod name. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct PodMetadata { @@ -83,6 +92,8 @@ pub struct PodMetadata { pub active: Option, #[serde(default, skip_serializing_if = "Vec::is_empty")] pub spawned_children: Vec, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub reclaimed_children: Vec, #[serde(default, skip_serializing_if = "Option::is_none")] pub resolved_manifest_snapshot: Option, } @@ -94,6 +105,7 @@ impl PodMetadata { pod_name: pod_name.into(), active, spawned_children: Vec::new(), + reclaimed_children: Vec::new(), resolved_manifest_snapshot: None, } } @@ -155,6 +167,23 @@ pub trait PodMetadataStore: Send + Sync { metadata.spawned_children = children; }) } + + /// Remove reclaimed child delegations from the outstanding set and record + /// them in durable reclaim history. + fn reclaim_spawned_children( + &self, + pod_name: &str, + reclaimed: Vec, + ) -> Result { + self.update_by_name(pod_name, |metadata| { + for reclaimed_child in &reclaimed { + metadata + .spawned_children + .retain(|child| child.pod_name != reclaimed_child.pod_name); + } + metadata.reclaimed_children.extend(reclaimed); + }) + } } /// Filesystem-backed Pod metadata store. @@ -473,4 +502,40 @@ mod tests { assert_eq!(restored.active, Some(active)); assert_eq!(restored.resolved_manifest_snapshot, Some(snapshot)); } + + #[test] + fn reclaim_children_removes_outstanding_and_records_history() { + let tmp = tempfile::TempDir::new().unwrap(); + let store = FsPodStore::new(tmp.path()).unwrap(); + let scope = PodSpawnedScopeRule { + target: std::path::Path::new("/tmp/delegated").into(), + permission: "write".into(), + recursive: true, + }; + store + .set_spawned_children( + "agent", + vec![PodSpawnedChild { + pod_name: "child".into(), + socket_path: std::path::Path::new("/tmp/child.sock").into(), + scope_delegated: vec![scope.clone()], + callback_address: std::path::Path::new("/tmp/parent.sock").into(), + }], + ) + .unwrap(); + + store + .reclaim_spawned_children( + "agent", + vec![PodReclaimedChild { + pod_name: "child".into(), + scope_delegated: vec![scope.clone()], + }], + ) + .unwrap(); + let restored = store.read_by_name("agent").unwrap().unwrap(); + assert!(restored.spawned_children.is_empty()); + assert_eq!(restored.reclaimed_children.len(), 1); + assert_eq!(restored.reclaimed_children[0].scope_delegated, vec![scope]); + } } diff --git a/crates/pod/src/controller.rs b/crates/pod/src/controller.rs index 51cf9a98..b61c35e1 100644 --- a/crates/pod/src/controller.rs +++ b/crates/pod/src/controller.rs @@ -162,14 +162,15 @@ impl PodController { pod.store().clone(), spawner_name.clone(), Some(pod.scope().clone()), - Some(pod.scope_change_sink()), ) .await?; let reclaimed_unreachable = loaded_registry.reclaimed_unreachable; let spawned_registry = loaded_registry.registry; if reclaimed_unreachable { - pod.persist_scope_snapshot() - .map_err(std::io::Error::other)?; + pod.push_notify( + "Restored Pod state contained unreachable delegated child Pods; their delegated write scopes were reclaimed before resume." + .to_string(), + ); } // Hand the alerter to the Pod so internal operations (compaction, @@ -497,7 +498,6 @@ where let pwd = pod.pwd().to_path_buf(); let task_store = pod.task_store(); let session_id_for_usage = pod.segment_id().to_string(); - let scope_change_sink = pod.scope_change_sink(); let memory_config = pod.manifest().memory.clone(); let web_config = pod.manifest().web.clone(); let spawner_name = pod.manifest().pod.name.clone(); @@ -557,7 +557,6 @@ where self_parent_socket, spawner_model, scope_handle, - scope_change_sink, )); worker.register_tool(send_to_pod_tool(spawned_registry.clone())); worker.register_tool(read_pod_output_tool(spawned_registry.clone())); diff --git a/crates/pod/src/discovery.rs b/crates/pod/src/discovery.rs index bfbd79af..acceaa33 100644 --- a/crates/pod/src/discovery.rs +++ b/crates/pod/src/discovery.rs @@ -809,6 +809,7 @@ mod tests { child("child-stale", &stale_socket), child("child-pending", &pending_socket), ], + reclaimed_children: Vec::new(), resolved_manifest_snapshot: None, }; store.write(&parent).unwrap(); @@ -820,6 +821,7 @@ mod tests { active_child_segment, )), spawned_children: Vec::new(), + reclaimed_children: Vec::new(), resolved_manifest_snapshot: None, }) .unwrap(); @@ -831,6 +833,7 @@ mod tests { active_child_segment, )), spawned_children: Vec::new(), + reclaimed_children: Vec::new(), resolved_manifest_snapshot: None, }) .unwrap(); @@ -839,6 +842,7 @@ mod tests { pod_name: "child-pending".into(), active: Some(PodActiveSegmentRef::pending_segment(pending_session_id)), spawned_children: Vec::new(), + reclaimed_children: Vec::new(), resolved_manifest_snapshot: None, }) .unwrap(); @@ -850,6 +854,7 @@ mod tests { new_segment_id(), )), spawned_children: Vec::new(), + reclaimed_children: Vec::new(), resolved_manifest_snapshot: None, }) .unwrap(); diff --git a/crates/pod/src/main.rs b/crates/pod/src/main.rs index caf58305..9b9e6a06 100644 --- a/crates/pod/src/main.rs +++ b/crates/pod/src/main.rs @@ -2,9 +2,7 @@ use std::path::{Path, PathBuf}; use std::process::ExitCode; use clap::Parser; -use manifest::{ - NixProfileResolver, PodManifest, PodManifestConfig, ProfileSelector, ScopeConfig, paths, -}; +use manifest::{NixProfileResolver, PodManifest, PodManifestConfig, ProfileSelector, paths}; use pod::{Pod, PodController, PromptLoader}; use pod_store::{CombinedStore, FsPodStore, PodMetadataStore}; use session_store::{FsStore, SegmentId, Store}; @@ -46,10 +44,6 @@ struct Cli { #[arg(long, value_name = "NAME", requires = "session", hide = true)] session_pod_name: Option, - /// Internal typed scope snapshot for session restore launched by the TUI. - #[arg(long, value_name = "JSON", requires = "session", hide = true)] - resume_scope_json: Option, - /// Internal resolved manifest config for delegated child Pod spawning. #[arg( long, @@ -134,10 +128,6 @@ fn apply_session_restore_overrides(manifest: &mut PodManifest, cli: &Cli) -> Res if let Some(pod_name) = cli.session_pod_name.as_deref() { manifest.pod.name = pod_name.to_string(); } - if let Some(scope_json) = cli.resume_scope_json.as_deref() { - manifest.scope = serde_json::from_str::(scope_json) - .map_err(|e| format!("failed to parse --resume-scope-json: {e}"))?; - } Ok(()) } diff --git a/crates/pod/src/pod.rs b/crates/pod/src/pod.rs index 91768b82..5e79fc15 100644 --- a/crates/pod/src/pod.rs +++ b/crates/pod/src/pod.rs @@ -9,10 +9,11 @@ use llm_worker::llm_client::client::LlmClient; use llm_worker::llm_client::types::Role; use llm_worker::state::Mutable; use llm_worker::{ToolOutputLimits, UsageRecord, Worker, WorkerError, WorkerResult}; -use pod_store::{PodActiveSegmentRef, PodMetadata, PodMetadataStore, PodStoreError}; +use pod_store::{ + PodActiveSegmentRef, PodMetadata, PodMetadataStore, PodSpawnedScopeRule, PodStoreError, +}; use session_store::{ - LogEntry, PodScopeSnapshot, SegmentId, SessionId, Store, StoreError, SystemItem, segment_log, - to_logged, + LogEntry, SegmentId, SessionId, Store, StoreError, SystemItem, segment_log, to_logged, }; use tracing::{info, warn}; @@ -345,10 +346,6 @@ pub struct Pod { /// Workflow descriptions. This is intentionally independent from /// summary and Knowledge residency: each section has its own gate. inject_resident_workflows: bool, - /// Latest runtime scope snapshot queued by dynamic scope changes. - /// Drained into the session log before the next turn result is - /// persisted, so resume never silently reclaims delegated writes. - pending_scope_snapshot: Arc>>, /// extract (memory.extract) reentry guard. `true` while an extract /// worker is running; subsequent triggers are skipped per spec /// (`docs/plan/memory.md` §Extract 並走防止). `Arc` so @@ -454,7 +451,6 @@ impl Pod { inject_resident_summary: self.inject_resident_summary, inject_resident_knowledge: self.inject_resident_knowledge, inject_resident_workflows: self.inject_resident_workflows, - pending_scope_snapshot: self.pending_scope_snapshot.clone(), extract_in_flight: self.extract_in_flight.clone(), consolidation_in_flight: self.consolidation_in_flight.clone(), extract_pointer: self.extract_pointer.clone(), @@ -634,7 +630,6 @@ impl Pod { inject_resident_summary: true, inject_resident_knowledge: true, inject_resident_workflows: true, - pending_scope_snapshot: Arc::new(Mutex::new(None)), extract_in_flight: Arc::new(AtomicBool::new(false)), consolidation_in_flight: Arc::new(AtomicBool::new(false)), extract_pointer: Arc::new(Mutex::new(None)), @@ -753,30 +748,6 @@ impl Pod { .update(|cur| cur.with_added_deny_rules(revoke.clone())) } - /// Snapshot the current runtime scope in the session log. The entry - /// is intentionally appended as soon as a session log exists: if the - /// process later exits while children keep their allocations, resume - /// can restore the narrowed scope instead of reclaiming delegated - /// writes. - pub fn persist_scope_snapshot(&mut self) -> Result<(), StoreError> { - if self.segment_state.entries_written() == 0 { - return Ok(()); - } - let snapshot = { - let scope = self.scope.snapshot(); - PodScopeSnapshot { - allow: scope.allow_rules(), - deny: scope.deny_rules(), - } - }; - let payload = serde_json::to_value(&snapshot).expect("PodScopeSnapshot is Serialize"); - self.commit_entry(LogEntry::Extension { - ts: segment_log::now_millis(), - domain: session_store::POD_SCOPE_EXTENSION_DOMAIN.into(), - payload, - }) - } - /// Append `entry` to the session log AND publish it through the /// broadcast sink. No user-space serialization is needed across /// concurrent appenders — the kernel orders `O_APPEND` writes for @@ -796,34 +767,6 @@ impl Pod { self.sink.clone() } - /// Cloneable callback handed to dynamic-scope tools. It cannot append - /// directly to the async store from a sync tool callback, so it records - /// the latest snapshot and the controller flushes it after the tool - /// turn completes. - pub fn scope_change_sink(&self) -> Arc { - let pending = self.pending_scope_snapshot.clone(); - Arc::new(move |snapshot| { - *pending.lock().expect("pending_scope_snapshot poisoned") = Some(snapshot); - }) - } - - fn flush_pending_scope_snapshot(&mut self) -> Result<(), StoreError> { - let snapshot = self - .pending_scope_snapshot - .lock() - .expect("pending_scope_snapshot poisoned") - .take(); - if let Some(snapshot) = snapshot { - let payload = serde_json::to_value(&snapshot).expect("PodScopeSnapshot is Serialize"); - self.commit_entry(LogEntry::Extension { - ts: segment_log::now_millis(), - domain: session_store::POD_SCOPE_EXTENSION_DOMAIN.into(), - payload, - })?; - } - Ok(()) - } - /// Direct access to the underlying Worker. pub fn worker(&self) -> &Worker { self.worker.as_ref().expect("worker taken during run") @@ -2007,7 +1950,6 @@ impl Pod { compacted_from: None, }; self.commit_entry(initial)?; - self.persist_scope_snapshot()?; self.write_pod_metadata_active(loc)?; return Ok(()); } @@ -2302,8 +2244,6 @@ impl Pod { } } - self.flush_pending_scope_snapshot()?; - let turn_count = self.worker.as_ref().unwrap().turn_count(); self.commit_entry(LogEntry::TurnEnd { ts: segment_log::now_millis(), @@ -2775,7 +2715,6 @@ impl Pod { .lock() .expect("usage_history poisoned") .clear(); - self.persist_scope_snapshot()?; // Reset extract pointer alongside usage_history: the compacted // session has a fresh log with no `LogEntry::Extension` entries // yet, so a cold restore here would set extract_pointer to None @@ -3831,7 +3770,6 @@ where inject_resident_summary: true, inject_resident_knowledge: true, inject_resident_workflows: true, - pending_scope_snapshot: Arc::new(Mutex::new(None)), extract_in_flight: Arc::new(AtomicBool::new(false)), consolidation_in_flight: Arc::new(AtomicBool::new(false)), extract_pointer: Arc::new(Mutex::new(None)), @@ -3911,7 +3849,6 @@ where inject_resident_summary: true, inject_resident_knowledge: true, inject_resident_workflows: true, - pending_scope_snapshot: Arc::new(Mutex::new(None)), extract_in_flight: Arc::new(AtomicBool::new(false)), consolidation_in_flight: Arc::new(AtomicBool::new(false)), extract_pointer: Arc::new(Mutex::new(None)), @@ -4001,19 +3938,13 @@ where return Err(PodError::SegmentEmpty { segment_id }); } let mirror_entries: Vec = raw_entries.clone(); - let scope_snapshot = state - .pod_scope - .clone() - .ok_or(PodError::SegmentScopeMissing { segment_id })?; + let scope_config = effective_restore_scope_config(&store, &manifest)?; let mut common = prepare_pod_common_with_scope( &manifest, &loader, /* parse_template */ false, - ScopeConfig { - allow: scope_snapshot.allow, - deny: scope_snapshot.deny, - }, + scope_config, )?; let skill_shadows = std::mem::take(&mut common.skill_shadows); @@ -4099,7 +4030,6 @@ where inject_resident_summary: true, inject_resident_knowledge: true, inject_resident_workflows: true, - pending_scope_snapshot: Arc::new(Mutex::new(None)), extract_in_flight: Arc::new(AtomicBool::new(false)), consolidation_in_flight: Arc::new(AtomicBool::new(false)), extract_pointer: Arc::new(Mutex::new(extract_pointer)), @@ -4623,11 +4553,6 @@ pub enum PodError { #[error("session {segment_id} has no entries to restore")] SegmentEmpty { segment_id: SegmentId }, - #[error( - "session {segment_id} has no persisted scope snapshot; refusing resume without explicit scope" - )] - SegmentScopeMissing { segment_id: SegmentId }, - #[error("pod metadata for {pod_name} was not found")] PodMetadataMissing { pod_name: String }, @@ -4669,6 +4594,42 @@ struct PodCommon { skill_shadows: Vec, } +fn effective_restore_scope_config( + store: &St, + manifest: &PodManifest, +) -> Result +where + St: PodMetadataStore, +{ + let mut scope = manifest.scope.clone(); + let Some(metadata) = store.read_by_name(&manifest.pod.name)? else { + return Ok(scope); + }; + for child in metadata.spawned_children { + for rule in child.scope_delegated { + if let Some(deny) = delegated_write_rule_to_deny(rule) { + scope.deny.push(deny); + } + } + } + Ok(scope) +} + +fn delegated_write_rule_to_deny(rule: PodSpawnedScopeRule) -> Option { + match rule.permission.as_str() { + "write" => Some(ScopeRule { + target: rule.target, + permission: Permission::Write, + recursive: rule.recursive, + }), + "read" => None, + other => { + warn!(permission = %other, "ignoring invalid delegated child scope permission"); + None + } + } +} + /// Resolve pwd / scope / LLM client / prompt catalog from a validated /// manifest cascade. Used by `from_manifest`, `from_manifest_spawned`, /// and `restore_from_manifest` so they share one definition of "what diff --git a/crates/pod/src/spawn/registry.rs b/crates/pod/src/spawn/registry.rs index 0a220d24..e66a2602 100644 --- a/crates/pod/src/spawn/registry.rs +++ b/crates/pod/src/spawn/registry.rs @@ -20,8 +20,9 @@ use std::sync::Arc; use std::time::Duration; use manifest::{Permission, ScopeRule, SharedScope}; -use pod_store::{PodMetadataStore, PodSpawnedChild, PodSpawnedScopeRule, PodStoreError}; -use session_store::PodScopeSnapshot; +use pod_store::{ + PodMetadataStore, PodReclaimedChild, PodSpawnedChild, PodSpawnedScopeRule, PodStoreError, +}; use tokio::net::UnixStream; use tokio::sync::Mutex; use tracing::warn; @@ -30,7 +31,7 @@ use crate::runtime::dir::{RuntimeDir, SpawnedPodRecord}; use crate::runtime::pod_registry; type RegistryStateWriter = Arc io::Result<()> + Send + Sync>; -type ScopeChangeSink = Arc; +type RegistryReclaimWriter = Arc io::Result<()> + Send + Sync>; const RESTORE_REACHABILITY_TIMEOUT: Duration = Duration::from_millis(500); @@ -39,9 +40,9 @@ pub struct SpawnedPodRegistry { cursors: Mutex>, runtime_dir: Arc, state_writer: Option, + reclaim_writer: Option, parent_name: Option, parent_scope: Option, - scope_change_sink: Option, } pub struct SpawnedPodRegistryLoad { @@ -56,9 +57,9 @@ impl SpawnedPodRegistry { cursors: Mutex::new(HashMap::new()), runtime_dir, state_writer: None, + reclaim_writer: None, parent_name: None, parent_scope: None, - scope_change_sink: None, }) } @@ -75,8 +76,7 @@ impl SpawnedPodRegistry { St: PodMetadataStore + Clone + Send + Sync + 'static, { let loaded = - Self::load_from_pod_state_with_reclaim(runtime_dir, store, pod_name, None, None) - .await?; + Self::load_from_pod_state_with_reclaim(runtime_dir, store, pod_name, None).await?; Ok(loaded.registry) } @@ -85,7 +85,6 @@ impl SpawnedPodRegistry { store: St, pod_name: String, parent_scope: Option, - scope_change_sink: Option, ) -> io::Result where St: PodMetadataStore + Clone + Send + Sync + 'static, @@ -97,13 +96,11 @@ impl SpawnedPodRegistry { .unwrap_or_default(); let mut records = Vec::with_capacity(persisted_children.len()); - let mut pruned = false; let mut pruned_records = Vec::new(); for child in &persisted_children { let record = match record_from_pod_state(child) { Ok(record) => record, Err(err) => { - pruned = true; warn!( error = %err, pod = %child.pod_name, @@ -115,7 +112,6 @@ impl SpawnedPodRegistry { if is_reachable(&record.socket_path).await { records.push(record); } else { - pruned = true; warn!( pod = %record.pod_name, socket = %record.socket_path.display(), @@ -126,20 +122,40 @@ impl SpawnedPodRegistry { } runtime_dir.write_spawned_pods(&records).await?; - let state_writer = pod_state_writer(store, pod_name.clone()); - // Runtime spawned-pod records are a live registry for ListPods and - // cursor/scope cleanup; durable Pod state remains the discovery source - // for later attach/restore, so do not delete unreachable children from - // Pod state just because their sockets are gone. - if metadata.is_none() || !pruned { + let state_writer = pod_state_writer(store.clone(), pod_name.clone()); + let reclaim_writer = pod_state_reclaim_writer(store.clone(), pod_name.clone()); + if metadata.is_none() { state_writer(&records)?; } let mut reclaimed_unreachable = false; + if !pruned_records.is_empty() { + let reclaimed = pruned_records + .iter() + .map(|record| PodReclaimedChild { + pod_name: record.pod_name.clone(), + scope_delegated: record + .scope_delegated + .iter() + .map(|rule| PodSpawnedScopeRule { + target: rule.target.clone(), + permission: match rule.permission { + Permission::Read => "read".to_string(), + Permission::Write => "write".to_string(), + }, + recursive: rule.recursive, + }) + .collect(), + }) + .collect(); + store + .reclaim_spawned_children(&pod_name, reclaimed) + .map_err(store_error_to_io)?; + reclaimed_unreachable = true; + } if parent_scope.is_some() { for record in &pruned_records { - reclaim_record(&pod_name, parent_scope.as_ref(), None, record)?; - reclaimed_unreachable = true; + reclaim_record(&pod_name, parent_scope.as_ref(), record)?; } } @@ -149,9 +165,9 @@ impl SpawnedPodRegistry { cursors: Mutex::new(HashMap::new()), runtime_dir, state_writer: Some(state_writer), + reclaim_writer: Some(reclaim_writer), parent_name: Some(pod_name), parent_scope, - scope_change_sink, }), reclaimed_unreachable, }) @@ -194,6 +210,9 @@ impl SpawnedPodRegistry { self.cursors.lock().await.remove(pod_name); if let Some(record) = &removed { self.reclaim_record(record)?; + if let Some(write_reclaim) = &self.reclaim_writer { + write_reclaim(record)?; + } } Ok(removed) } @@ -203,12 +222,7 @@ impl SpawnedPodRegistry { release_child_allocation(&record.pod_name)?; return Ok(()); }; - reclaim_record( - parent_name, - self.parent_scope.as_ref(), - self.scope_change_sink.as_ref(), - record, - ) + reclaim_record(parent_name, self.parent_scope.as_ref(), record) } /// Read-only cursor lookup. Returns 0 when no cursor has been set. @@ -246,10 +260,36 @@ where }) } +fn pod_state_reclaim_writer(store: St, pod_name: String) -> RegistryReclaimWriter +where + St: PodMetadataStore + Clone + Send + Sync + 'static, +{ + Arc::new(move |record| { + let reclaimed = PodReclaimedChild { + pod_name: record.pod_name.clone(), + scope_delegated: record + .scope_delegated + .iter() + .map(|rule| PodSpawnedScopeRule { + target: rule.target.clone(), + permission: match rule.permission { + Permission::Read => "read".to_string(), + Permission::Write => "write".to_string(), + }, + recursive: rule.recursive, + }) + .collect(), + }; + store + .reclaim_spawned_children(&pod_name, vec![reclaimed]) + .map(|_| ()) + .map_err(store_error_to_io) + }) +} + fn reclaim_record( parent_name: &str, parent_scope: Option<&SharedScope>, - scope_change_sink: Option<&ScopeChangeSink>, record: &SpawnedPodRecord, ) -> io::Result<()> { let write_rules = record @@ -275,13 +315,6 @@ fn reclaim_record( scope .update(|current| current.with_removed_deny_rules(write_rules)) .map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))?; - if let Some(sink) = scope_change_sink { - let snapshot = scope.snapshot(); - sink(PodScopeSnapshot { - allow: snapshot.allow_rules(), - deny: snapshot.deny_rules(), - }); - } } Ok(()) diff --git a/crates/pod/src/spawn/tool.rs b/crates/pod/src/spawn/tool.rs index 0c21c5d7..89c327e1 100644 --- a/crates/pod/src/spawn/tool.rs +++ b/crates/pod/src/spawn/tool.rs @@ -18,7 +18,6 @@ use manifest::{ SharedScope, WorkerManifestConfig, }; use serde::Deserialize; -use session_store::PodScopeSnapshot; use tokio::net::UnixStream; use tokio::process::Command; use tokio::time::sleep; @@ -128,9 +127,6 @@ pub struct SpawnPodTool { /// `effective_write` semantics: Write is the only permission /// tracked across Pods, so revocation only touches Write. spawner_scope: SharedScope, - /// Called after the spawner scope has been updated so the new - /// effective scope can be persisted to the session log. - scope_changed: Arc, } impl SpawnPodTool { @@ -143,7 +139,6 @@ impl SpawnPodTool { parent_socket: Option, spawner_model: ModelManifest, spawner_scope: SharedScope, - scope_changed: Arc, ) -> Self { Self { spawner_name, @@ -154,7 +149,6 @@ impl SpawnPodTool { parent_socket, spawner_model, spawner_scope, - scope_changed, } } } @@ -250,11 +244,6 @@ impl Tool for SpawnPodTool { self.spawner_scope .update(|cur| cur.with_added_deny_rules(revoke_write.clone())) .map_err(|e| ToolError::ExecutionFailed(format!("revoke spawner scope: {e}")))?; - let current = self.spawner_scope.snapshot(); - (self.scope_changed)(PodScopeSnapshot { - allow: current.allow_rules(), - deny: current.deny_rules(), - }); } let record = SpawnedPodRecord { @@ -496,7 +485,6 @@ pub fn spawn_pod_tool( parent_socket: Option, spawner_model: ModelManifest, spawner_scope: SharedScope, - scope_changed: Arc, ) -> ToolDefinition { Arc::new(move || { let schema = schemars::schema_for!(SpawnPodInput); @@ -513,7 +501,6 @@ pub fn spawn_pod_tool( parent_socket.clone(), spawner_model.clone(), spawner_scope.clone(), - scope_changed.clone(), )); (meta, tool) }) diff --git a/crates/pod/tests/pod_comm_tools_test.rs b/crates/pod/tests/pod_comm_tools_test.rs index 6ea601c1..83678396 100644 --- a/crates/pod/tests/pod_comm_tools_test.rs +++ b/crates/pod/tests/pod_comm_tools_test.rs @@ -442,7 +442,6 @@ async fn stop_pod_sends_shutdown_and_releases_scope() { store.clone(), "spawner".into(), Some(parent_scope.clone()), - None, ) .await .unwrap(); @@ -580,13 +579,15 @@ async fn restored_registry_uses_pod_state_without_runtime_file() { .unwrap() .expect("spawner metadata should remain"); assert!(metadata.spawned_children.is_empty()); + assert_eq!(metadata.reclaimed_children.len(), 1); + assert_eq!(metadata.reclaimed_children[0].pod_name, "child"); let runtime_contents = std::fs::read_to_string(rd.path().join("spawned_pods.json")).unwrap(); let runtime_records: Vec = serde_json::from_str(&runtime_contents).unwrap(); assert!(runtime_records.is_empty()); } #[tokio::test] -async fn load_from_pod_state_prunes_runtime_children_but_preserves_durable_state() { +async fn load_from_pod_state_prunes_runtime_children_and_reclaims_durable_delegation() { let runtime_tmp = TempDir::new().unwrap(); let store_tmp = TempDir::new().unwrap(); let store = CombinedStore::new( @@ -625,23 +626,14 @@ async fn load_from_pod_state_prunes_runtime_children_but_preserves_durable_state .read_by_name("spawner") .unwrap() .expect("spawner metadata should be written"); - assert_eq!(metadata.spawned_children.len(), 2); - assert!( - metadata - .spawned_children - .iter() - .any(|c| c.pod_name == "alive") - ); - assert!( - metadata - .spawned_children - .iter() - .any(|c| c.pod_name == "missing") - ); + assert_eq!(metadata.spawned_children.len(), 1); + assert_eq!(metadata.spawned_children[0].pod_name, "alive"); + assert_eq!(metadata.reclaimed_children.len(), 1); + assert_eq!(metadata.reclaimed_children[0].pod_name, "missing"); } #[tokio::test] -async fn load_from_pod_state_reclaims_pruned_child_scope_without_deleting_pod_state() { +async fn load_from_pod_state_reclaims_pruned_child_scope_and_records_history() { let _env = EnvGuard::acquire(); let runtime_tmp = TempDir::new().unwrap(); let store_tmp = TempDir::new().unwrap(); @@ -709,7 +701,6 @@ async fn load_from_pod_state_reclaims_pruned_child_scope_without_deleting_pod_st store.clone(), "spawner".into(), Some(parent_scope.clone()), - None, ) .await .unwrap(); @@ -729,8 +720,9 @@ async fn load_from_pod_state_reclaims_pruned_child_scope_without_deleting_pod_st .read_by_name("spawner") .unwrap() .expect("spawner metadata should remain"); - assert_eq!(metadata.spawned_children.len(), 1); - assert_eq!(metadata.spawned_children[0].pod_name, "missing"); + assert!(metadata.spawned_children.is_empty()); + assert_eq!(metadata.reclaimed_children.len(), 1); + assert_eq!(metadata.reclaimed_children[0].pod_name, "missing"); let runtime_contents = std::fs::read_to_string(rd.path().join("spawned_pods.json")).unwrap(); let runtime_records: Vec = serde_json::from_str(&runtime_contents).unwrap(); assert!(runtime_records.is_empty()); diff --git a/crates/pod/tests/restore_test.rs b/crates/pod/tests/restore_test.rs index 6f0d4ad7..42968967 100644 --- a/crates/pod/tests/restore_test.rs +++ b/crates/pod/tests/restore_test.rs @@ -199,39 +199,3 @@ async fn restore_from_manifest_rejects_empty_segment_log() { Ok(_) => panic!("expected empty segment log to fail"), } } - -#[tokio::test] -async fn restore_from_manifest_rejects_segment_without_scope_snapshot() { - let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner()); - - let store_tmp = tempfile::tempdir().unwrap(); - let store = CombinedStore::new( - FsStore::new(store_tmp.path()).unwrap(), - FsPodStore::new(store_tmp.path().join("pods")).unwrap(), - ); - let manifest = pod::PodManifest::from_toml(MINIMAL_MANIFEST_TOML).unwrap(); - - let sid = session_store::new_session_id(); - let segid = session_store::new_segment_id(); - let state = session_store::SegmentStartState { - system_prompt: None, - config: &Default::default(), - history: &[], - }; - session_store::create_segment_with_ids(&store, sid, segid, state).unwrap(); - - let result = Pod::restore_from_manifest( - sid, - segid, - manifest, - store, - pod::PromptLoader::builtins_only(), - ) - .await; - - match result { - Err(PodError::SegmentScopeMissing { segment_id }) => assert_eq!(segment_id, segid), - Err(other) => panic!("expected SegmentScopeMissing, got {other:?}"), - Ok(_) => panic!("expected missing scope snapshot to fail"), - } -} diff --git a/crates/pod/tests/spawn_pod_test.rs b/crates/pod/tests/spawn_pod_test.rs index d60cf71f..7e99e2a3 100644 --- a/crates/pod/tests/spawn_pod_test.rs +++ b/crates/pod/tests/spawn_pod_test.rs @@ -193,7 +193,6 @@ async fn spawn_pod_delegates_scope_and_sends_run() { None, dummy_model(), spawner_scope.clone(), - std::sync::Arc::new(|_| {}), ); let (_meta, tool) = def(); @@ -282,7 +281,6 @@ async fn spawn_pod_rejects_scope_outside_spawner() { None, dummy_model(), spawner_scope.clone(), - std::sync::Arc::new(|_| {}), ); let (_meta, tool) = def(); @@ -354,7 +352,6 @@ async fn spawn_pod_rolls_back_reservation_when_socket_never_appears() { None, dummy_model(), spawner_scope.clone(), - std::sync::Arc::new(|_| {}), ); let (_meta, tool) = def(); diff --git a/crates/session-store/src/lib.rs b/crates/session-store/src/lib.rs index dc71b4d0..de383648 100644 --- a/crates/session-store/src/lib.rs +++ b/crates/session-store/src/lib.rs @@ -47,13 +47,9 @@ pub use segment::{ SegmentStartState, append_entry, append_system_item, classify_history_item, create_compacted_segment, create_segment, create_segment_with_ids, ensure_head_or_fork, fork, fork_at, restore, restore_by_segment, save_config_changed, save_delta, save_extension, - save_pod_scope, save_run_completed, save_run_errored, save_turn_end, save_usage, - save_user_input, -}; -pub use segment_log::{ - LogEntry, POD_SCOPE_EXTENSION_DOMAIN, PodScopeSnapshot, RestoredState, SegmentOrigin, - collect_state, + save_run_completed, save_run_errored, save_turn_end, save_usage, save_user_input, }; +pub use segment_log::{LogEntry, RestoredState, SegmentOrigin, collect_state}; pub use store::{Store, StoreError}; pub use system_item::{SystemItem, SystemReminder, SystemReminderSource, render_pod_event}; diff --git a/crates/session-store/src/segment.rs b/crates/session-store/src/segment.rs index 0e3edb51..d776a5ac 100644 --- a/crates/session-store/src/segment.rs +++ b/crates/session-store/src/segment.rs @@ -5,7 +5,7 @@ //! functions after state-mutating operations. use crate::logged_item::{LoggedItem, to_logged}; -use crate::segment_log::{self, LogEntry, PodScopeSnapshot, SegmentOrigin}; +use crate::segment_log::{self, LogEntry, SegmentOrigin}; use crate::store::{Store, StoreError}; use crate::system_item::SystemItem; use crate::{SegmentId, SessionId}; @@ -385,23 +385,6 @@ pub fn save_extension( ) } -/// Log the Pod's latest runtime scope snapshot. -pub fn save_pod_scope( - store: &impl Store, - session_id: SessionId, - segment_id: SegmentId, - snapshot: &PodScopeSnapshot, -) -> Result<(), StoreError> { - let payload = serde_json::to_value(snapshot)?; - save_extension( - store, - session_id, - segment_id, - segment_log::POD_SCOPE_EXTENSION_DOMAIN, - payload, - ) -} - /// Log a `ConfigChanged` entry. pub fn save_config_changed( store: &impl Store, diff --git a/crates/session-store/src/segment_log.rs b/crates/session-store/src/segment_log.rs index 28696a92..ded51de8 100644 --- a/crates/session-store/src/segment_log.rs +++ b/crates/session-store/src/segment_log.rs @@ -11,7 +11,7 @@ use llm_worker::llm_client::types::{Item, RequestConfig}; use llm_worker::{UsageRecord, WorkerResult}; -use protocol::{InvokeKind, ScopeRule, Segment}; +use protocol::{InvokeKind, Segment}; use serde::{Deserialize, Serialize}; use crate::logged_item::LoggedItem; @@ -166,16 +166,6 @@ pub struct SegmentOrigin { pub at_turn_index: usize, } -/// Domain used by Pod to persist its latest effective runtime scope. -pub const POD_SCOPE_EXTENSION_DOMAIN: &str = "pod.scope"; - -/// Payload stored in `LogEntry::Extension { domain: "pod.scope", .. }`. -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub struct PodScopeSnapshot { - pub allow: Vec, - pub deny: Vec, -} - /// State collected from log entries. #[derive(Debug, Clone)] pub struct RestoredState { @@ -199,9 +189,6 @@ pub struct RestoredState { /// `LogEntry::Extension` を replay 順に積んだもの。`(domain, payload)`。 /// session-store は domain を不透明扱いし、各ドメインが自前で fold する。 pub extensions: Vec<(String, serde_json::Value)>, - /// Latest runtime scope snapshot persisted by the Pod. `None` means - /// the segment predates scope persistence or the payload was corrupt. - pub pod_scope: Option, /// User submissions in original typed form, in submit order. /// One entry per `LogEntry::UserInput`; the K-th entry corresponds to /// the K-th `Item::user_message` derived during replay (modulo @@ -223,7 +210,6 @@ pub fn collect_state(entries: &[LogEntry]) -> RestoredState { entries_count: 0, usage_history: Vec::new(), extensions: Vec::new(), - pod_scope: None, user_segments: Vec::new(), }; @@ -293,17 +279,6 @@ pub fn collect_state(entries: &[LogEntry]) -> RestoredState { LogEntry::Extension { domain, payload, .. } => { - if domain == POD_SCOPE_EXTENSION_DOMAIN { - match serde_json::from_value::(payload.clone()) { - Ok(snapshot) => state.pod_scope = Some(snapshot), - Err(err) => { - tracing::warn!( - error = %err, - "discarding malformed pod.scope snapshot from segment log" - ); - } - } - } state.extensions.push((domain.clone(), payload.clone())); } } diff --git a/crates/tui/src/spawn.rs b/crates/tui/src/spawn.rs index 3265660e..3988fdcb 100644 --- a/crates/tui/src/spawn.rs +++ b/crates/tui/src/spawn.rs @@ -17,7 +17,7 @@ use std::time::Duration; use client::{SpawnConfig, spawn_pod}; use crossterm::event::{self, Event as TermEvent, KeyCode, KeyEventKind, KeyModifiers}; -use manifest::{ProfileDiscovery, ScopeConfig}; +use manifest::ProfileDiscovery; use ratatui::Terminal; use ratatui::backend::CrosstermBackend; use ratatui::layout::{Constraint, Layout}; @@ -42,8 +42,6 @@ pub enum SpawnOutcome { #[derive(Debug)] pub enum SpawnError { Io(io::Error), - Store(session_store::StoreError), - MissingResumeScope { segment_id: SegmentId }, Spawn(client::SpawnError), } @@ -51,11 +49,6 @@ impl std::fmt::Display for SpawnError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::Io(e) => write!(f, "io error: {e}"), - Self::Store(e) => write!(f, "failed to read session log: {e}"), - Self::MissingResumeScope { segment_id } => write!( - f, - "session {segment_id} has no persisted scope snapshot; refusing resume without explicit scope" - ), Self::Spawn(e) => write!(f, "{e}"), } } @@ -69,12 +62,6 @@ impl From for SpawnError { } } -impl From for SpawnError { - fn from(e: session_store::StoreError) -> Self { - Self::Store(e) - } -} - impl From for SpawnError { fn from(e: client::SpawnError) -> Self { Self::Spawn(e) @@ -111,7 +98,6 @@ pub async fn run( editing: true, resume_from, resume_by_pod_name: false, - resume_scope: None, profile_choices, profile_index, }; @@ -149,10 +135,6 @@ pub async fn run( } } - if let Some(id) = form.resume_from { - form.resume_scope = Some(load_resume_scope(id).await?); - } - // Phase 2: launch pod and wait for ready line. Drop the cursor // out of the name field — subsequent frames are passive status // updates, not input — so the cursor doesn't end up parked there @@ -305,7 +287,6 @@ fn form_for_pod_name(pod_name: String, defaults: SpawnDefaults) -> Form { editing: false, resume_from: None, resume_by_pod_name: true, - resume_scope: None, profile_choices: Vec::new(), profile_index: 0, } @@ -383,7 +364,6 @@ async fn wait_for_ready( let config = SpawnConfig { pod_name: form.name.clone(), profile: form.selected_profile_selector(), - resume_scope: form.resume_scope.clone(), cwd: form.cwd.clone(), resume_from: form.resume_from, resume_by_pod_name: form.resume_by_pod_name, @@ -399,24 +379,6 @@ async fn wait_for_ready( }) } -async fn load_resume_scope(segment_id: SegmentId) -> Result { - let store_dir = manifest::paths::sessions_dir().ok_or_else(|| { - io::Error::new( - io::ErrorKind::NotFound, - "could not resolve sessions directory (set INSOMNIA_HOME, INSOMNIA_DATA_DIR, or HOME)", - ) - })?; - let store = session_store::FsStore::new(&store_dir)?; - let state = session_store::restore_by_segment(&store, segment_id)?; - let snapshot = state - .pod_scope - .ok_or(SpawnError::MissingResumeScope { segment_id })?; - Ok(ScopeConfig { - allow: snapshot.allow, - deny: snapshot.deny, - }) -} - #[derive(Debug, Clone, Copy, PartialEq, Eq)] enum MessageKind { Info, @@ -453,10 +415,6 @@ struct Form { /// When true, launch the child with `--pod ` so the pod process /// resolves name-keyed state before falling back to fresh creation. resume_by_pod_name: bool, - /// Scope snapshot recovered from the source session log. Set only for - /// resume runs and passed through a typed internal restore flag so resume - /// does not silently broaden access. - resume_scope: Option, /// Optional Nix profile choices passed to `insomnia-pod --profile` for /// fresh spawns. This is not used for resume/attach flows because those must /// restore Pod state rather than re-evaluate a profile source. @@ -616,17 +574,6 @@ fn context_line(form: &Form) -> Line<'_> { ]); } - if form.resume_scope.is_some() { - return Line::from(vec![ - Span::raw(" "), - Span::styled("scope: ", Style::default().fg(Color::DarkGray)), - Span::styled( - "from restored session snapshot", - Style::default().fg(Color::Green), - ), - ]); - } - match form.scope_origin { ScopeOrigin::FromProfile => Line::from(vec![ Span::raw(" "), @@ -670,7 +617,6 @@ mod tests { editing: true, resume_from: None, resume_by_pod_name: false, - resume_scope: None, profile_choices: Vec::new(), profile_index: 0, } @@ -691,7 +637,6 @@ mod tests { assert_eq!(f.name_cursor, "agent".chars().count()); assert_eq!(f.resume_from, None); assert!(f.resume_by_pod_name); - assert!(f.resume_scope.is_none()); assert!(!f.editing); assert_eq!( f.message, @@ -699,28 +644,6 @@ mod tests { ); } - #[test] - fn resume_scope_snapshot_stays_on_form_for_typed_restore_flag() { - let mut f = form("agent-r"); - f.resume_from = Some(session_store::new_segment_id()); - f.resume_scope = Some(ScopeConfig { - allow: vec![manifest::ScopeRule { - target: PathBuf::from("/work/example"), - permission: manifest::Permission::Write, - recursive: true, - }], - deny: vec![manifest::ScopeRule { - target: PathBuf::from("/work/example/child"), - permission: manifest::Permission::Write, - recursive: true, - }], - }); - - let scope = f.resume_scope.as_ref().unwrap(); - assert_eq!(scope.allow[0].target, PathBuf::from("/work/example")); - assert_eq!(scope.deny[0].target, PathBuf::from("/work/example/child")); - } - #[test] fn profile_choices_use_project_registry_default() { let temp = tempfile::tempdir().unwrap();