//! Shared registry of Pods spawned by this Pod. //! //! `SpawnPod` writes here; the pod-comm tools (`SendToPod`, //! `ReadPodOutput`, `StopPod`) read and mutate the same instance. Discovery //! tools consult this registry together with durable Pod state. Runtime //! write-through still materialises `spawned_pods.json`, but durable state lives //! in the spawner's Pod metadata. //! //! `ReadPodOutput` additionally owns a per-spawned-pod cursor here so //! two consecutive reads yield only new assistant text. The cursor is //! an item-index into the child's history; push-only history makes //! index stable across reads. //! //! Cursors intentionally do not persist; a restored registry starts with //! fresh read positions. use std::collections::HashMap; use std::io; use std::path::Path; use std::sync::Arc; use std::time::Duration; use manifest::{Permission, ScopeRule, SharedScope}; use pod_store::{ PodMetadataStore, PodReclaimedChild, PodSpawnedChild, PodSpawnedScopeRule, PodStoreError, }; use tokio::net::UnixStream; use tokio::sync::Mutex; use tracing::warn; use crate::runtime::dir::{RuntimeDir, SpawnedPodRecord}; use crate::runtime::pod_registry; type RegistryStateWriter = Arc io::Result<()> + Send + Sync>; type RegistryReclaimWriter = Arc io::Result<()> + Send + Sync>; const RESTORE_REACHABILITY_TIMEOUT: Duration = Duration::from_millis(500); pub struct SpawnedPodRegistry { records: Mutex>, cursors: Mutex>, runtime_dir: Arc, state_writer: Option, reclaim_writer: Option, parent_name: Option, parent_scope: Option, } pub struct SpawnedPodRegistryLoad { pub registry: Arc, pub reclaimed_unreachable: bool, } impl SpawnedPodRegistry { pub fn new(runtime_dir: Arc) -> Arc { Arc::new(Self { records: Mutex::new(Vec::new()), cursors: Mutex::new(HashMap::new()), runtime_dir, state_writer: None, reclaim_writer: None, parent_name: None, parent_scope: None, }) } /// Build a registry from the spawner's durable Pod state, pruning child /// records whose socket path is already gone. The surviving list is /// written through to both `spawned_pods.json` and Pod state so runtime /// and durable views start aligned. pub async fn load_from_pod_state( runtime_dir: Arc, store: St, pod_name: String, ) -> io::Result> where St: PodMetadataStore + Clone + Send + Sync + 'static, { let loaded = Self::load_from_pod_state_with_reclaim(runtime_dir, store, pod_name, None).await?; Ok(loaded.registry) } pub async fn load_from_pod_state_with_reclaim( runtime_dir: Arc, store: St, pod_name: String, parent_scope: Option, ) -> io::Result where St: PodMetadataStore + Clone + Send + Sync + 'static, { let metadata = store.read_by_name(&pod_name).map_err(store_error_to_io)?; let persisted_children = metadata .as_ref() .map(|m| m.spawned_children.clone()) .unwrap_or_default(); let mut records = Vec::with_capacity(persisted_children.len()); let mut pruned_records = Vec::new(); for child in &persisted_children { let record = match record_from_pod_state(child) { Ok(record) => record, Err(err) => { warn!( error = %err, pod = %child.pod_name, "dropping corrupt persisted spawned-pod record" ); continue; } }; if is_reachable(&record.socket_path).await { records.push(record); } else { warn!( pod = %record.pod_name, socket = %record.socket_path.display(), "dropping unreachable persisted spawned-pod record" ); pruned_records.push(record); } } runtime_dir.write_spawned_pods(&records).await?; let state_writer = pod_state_writer(store.clone(), pod_name.clone()); let reclaim_writer = pod_state_reclaim_writer(store.clone(), pod_name.clone()); if metadata.is_none() { state_writer(&records)?; } let mut reclaimed_unreachable = false; if !pruned_records.is_empty() { let reclaimed = pruned_records .iter() .map(|record| PodReclaimedChild { pod_name: record.pod_name.clone(), scope_delegated: record .scope_delegated .iter() .map(|rule| PodSpawnedScopeRule { target: rule.target.clone(), permission: match rule.permission { Permission::Read => "read".to_string(), Permission::Write => "write".to_string(), }, recursive: rule.recursive, }) .collect(), }) .collect(); store .reclaim_spawned_children(&pod_name, reclaimed) .map_err(store_error_to_io)?; reclaimed_unreachable = true; } if parent_scope.is_some() { for record in &pruned_records { reclaim_record(&pod_name, parent_scope.as_ref(), record)?; } } Ok(SpawnedPodRegistryLoad { registry: Arc::new(Self { records: Mutex::new(records), cursors: Mutex::new(HashMap::new()), runtime_dir, state_writer: Some(state_writer), reclaim_writer: Some(reclaim_writer), parent_name: Some(pod_name), parent_scope, }), reclaimed_unreachable, }) } /// Append a new record and persist the full list. Returns an I/O /// error if either persisted write fails; the in-memory state is still /// updated in that case — the next successful write will reconcile. pub async fn add(&self, record: SpawnedPodRecord) -> io::Result<()> { let mut records = self.records.lock().await; records.push(record); self.persist_records(records.as_slice()).await } /// Look up a record by pod name. Cloned so callers can drop the lock. pub async fn get(&self, pod_name: &str) -> Option { self.records .lock() .await .iter() .find(|r| r.pod_name == pod_name) .cloned() } pub async fn list(&self) -> Vec { self.records.lock().await.clone() } /// Remove the record for `pod_name`, persist, clear its cursor, and /// reclaim any delegated Write scope owned by that child. Returns the /// removed record (if any). pub async fn remove(&self, pod_name: &str) -> io::Result> { let removed = { let mut records = self.records.lock().await; let idx = records.iter().position(|r| r.pod_name == pod_name); let removed = idx.map(|i| records.remove(i)); self.persist_records(records.as_slice()).await?; removed }; self.cursors.lock().await.remove(pod_name); if let Some(record) = &removed { self.reclaim_record(record)?; if let Some(write_reclaim) = &self.reclaim_writer { write_reclaim(record)?; } } Ok(removed) } fn reclaim_record(&self, record: &SpawnedPodRecord) -> io::Result<()> { let Some(parent_name) = &self.parent_name else { release_child_allocation(&record.pod_name)?; return Ok(()); }; reclaim_record(parent_name, self.parent_scope.as_ref(), record) } /// Read-only cursor lookup. Returns 0 when no cursor has been set. pub async fn cursor(&self, pod_name: &str) -> usize { self.cursors .lock() .await .get(pod_name) .copied() .unwrap_or(0) } pub async fn set_cursor(&self, pod_name: &str, cursor: usize) { self.cursors .lock() .await .insert(pod_name.to_string(), cursor); } async fn persist_records(&self, records: &[SpawnedPodRecord]) -> io::Result<()> { self.runtime_dir.write_spawned_pods(records).await?; if let Some(write_state) = &self.state_writer { write_state(records)?; } Ok(()) } } fn pod_state_writer(store: St, pod_name: String) -> RegistryStateWriter where St: PodMetadataStore + Clone + Send + Sync + 'static, { Arc::new(move |records| { write_records_to_pod_state(&store, &pod_name, records).map_err(store_error_to_io) }) } fn pod_state_reclaim_writer(store: St, pod_name: String) -> RegistryReclaimWriter where St: PodMetadataStore + Clone + Send + Sync + 'static, { Arc::new(move |record| { let reclaimed = PodReclaimedChild { pod_name: record.pod_name.clone(), scope_delegated: record .scope_delegated .iter() .map(|rule| PodSpawnedScopeRule { target: rule.target.clone(), permission: match rule.permission { Permission::Read => "read".to_string(), Permission::Write => "write".to_string(), }, recursive: rule.recursive, }) .collect(), }; store .reclaim_spawned_children(&pod_name, vec![reclaimed]) .map(|_| ()) .map_err(store_error_to_io) }) } fn reclaim_record( parent_name: &str, parent_scope: Option<&SharedScope>, record: &SpawnedPodRecord, ) -> io::Result<()> { let write_rules = record .scope_delegated .iter() .filter(|rule| rule.permission == Permission::Write) .cloned() .collect::>(); let lock_path = pod_registry::default_registry_path() .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; let mut guard = pod_registry::LockFileGuard::open(&lock_path) .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; pod_registry::reclaim_delegated_scope( &mut guard, parent_name, &record.pod_name, &record.scope_delegated, ) .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; if let Some(scope) = parent_scope { scope .update(|current| current.with_removed_deny_rules(write_rules)) .map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))?; } Ok(()) } fn release_child_allocation(pod_name: &str) -> io::Result<()> { let lock_path = pod_registry::default_registry_path() .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; let mut guard = pod_registry::LockFileGuard::open(&lock_path) .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; match pod_registry::release_pod(&mut guard, pod_name) { Ok(()) | Err(pod_registry::ScopeLockError::UnknownPod(_)) => Ok(()), Err(err) => Err(io::Error::new(io::ErrorKind::Other, err)), } } fn write_records_to_pod_state( store: &St, pod_name: &str, records: &[SpawnedPodRecord], ) -> Result<(), PodStoreError> where St: PodMetadataStore, { let children = records .iter() .map(record_to_pod_state) .collect::, _>>()?; store.set_spawned_children(pod_name, children)?; Ok(()) } fn record_to_pod_state(record: &SpawnedPodRecord) -> Result { Ok(PodSpawnedChild { pod_name: record.pod_name.clone(), socket_path: record.socket_path.clone(), scope_delegated: record .scope_delegated .iter() .map(|rule| PodSpawnedScopeRule { target: rule.target.clone(), permission: match rule.permission { Permission::Read => "read".to_string(), Permission::Write => "write".to_string(), }, recursive: rule.recursive, }) .collect(), callback_address: record.callback_address.clone(), }) } fn record_from_pod_state(child: &PodSpawnedChild) -> Result { Ok(SpawnedPodRecord { pod_name: child.pod_name.clone(), socket_path: child.socket_path.clone(), scope_delegated: child .scope_delegated .iter() .map(|rule| { Ok(ScopeRule { target: rule.target.clone(), permission: match rule.permission.as_str() { "read" => Permission::Read, "write" => Permission::Write, other => { return Err(serde_json::Error::io(io::Error::new( io::ErrorKind::InvalidData, format!("invalid permission `{other}`"), ))); } }, recursive: rule.recursive, }) }) .collect::, _>>()?, callback_address: child.callback_address.clone(), }) } fn store_error_to_io(error: PodStoreError) -> io::Error { io::Error::other(error) } async fn is_reachable(socket: &Path) -> bool { tokio::time::timeout(RESTORE_REACHABILITY_TIMEOUT, UnixStream::connect(socket)) .await .map(|result| result.is_ok()) .unwrap_or(false) }