From 5fa060a74875ee0ec065887f7c864ecac06e9a19 Mon Sep 17 00:00:00 2001 From: Hare Date: Wed, 29 Apr 2026 20:14:34 +0900 Subject: [PATCH] =?UTF-8?q?pod-registry=E3=81=AE=E3=83=A2=E3=82=B8?= =?UTF-8?q?=E3=83=A5=E3=83=BC=E3=83=AB=E5=88=86=E5=89=B2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- crates/pod-registry/src/conflict.rs | 203 ++++ crates/pod-registry/src/error.rs | 34 + crates/pod-registry/src/lib.rs | 1419 +------------------------- crates/pod-registry/src/lifecycle.rs | 317 ++++++ crates/pod-registry/src/mutate.rs | 567 ++++++++++ crates/pod-registry/src/table.rs | 259 +++++ crates/pod-registry/src/test_util.rs | 97 ++ 7 files changed, 1491 insertions(+), 1405 deletions(-) create mode 100644 crates/pod-registry/src/conflict.rs create mode 100644 crates/pod-registry/src/error.rs create mode 100644 crates/pod-registry/src/lifecycle.rs create mode 100644 crates/pod-registry/src/mutate.rs create mode 100644 crates/pod-registry/src/table.rs create mode 100644 crates/pod-registry/src/test_util.rs diff --git a/crates/pod-registry/src/conflict.rs b/crates/pod-registry/src/conflict.rs new file mode 100644 index 00000000..60ee1e27 --- /dev/null +++ b/crates/pod-registry/src/conflict.rs @@ -0,0 +1,203 @@ +//! Pure functions that decide whether scope rules collide. +//! +//! These helpers are read-only over [`LockFile`]; they never touch the +//! file or the lock itself. The mutating operations in [`crate::mutate`] +//! call them under the [`crate::LockFileGuard`]. + +use manifest::{Permission, ScopeRule}; + +use crate::table::{Allocation, LockFile}; + +/// Whether `a` and `b` claim any overlapping concrete path. +/// +/// Recursive rules cover `target/**`; non-recursive rules cover the +/// target itself and its direct children. The four cases enumerate +/// when those coverage sets intersect. +pub(crate) fn rules_overlap(a: &ScopeRule, b: &ScopeRule) -> bool { + match (a.recursive, b.recursive) { + (true, true) => a.target.starts_with(&b.target) || b.target.starts_with(&a.target), + (true, false) => { + // a covers a.target/**; b covers {b.target, b.target/*}. + b.target.starts_with(&a.target) || a.target.parent() == Some(b.target.as_path()) + } + (false, true) => { + a.target.starts_with(&b.target) || b.target.parent() == Some(a.target.as_path()) + } + (false, false) => { + a.target == b.target + || a.target.parent() == Some(b.target.as_path()) + || b.target.parent() == Some(a.target.as_path()) + } + } +} + +/// Does `cover` fully contain `inner`'s claimed paths? +fn covers_fully(cover: &ScopeRule, inner: &ScopeRule) -> bool { + if cover.permission < inner.permission { + return false; + } + if cover.recursive { + inner.target.starts_with(&cover.target) + } else { + inner.target == cover.target && !inner.recursive + } +} + +/// Check whether `rule` is contained in `parent`'s effective write +/// scope: its allow set covers `rule`, and no child of `parent` has +/// already taken a piece that would overlap `rule`. +pub fn is_within_effective_write(lock: &LockFile, parent: &str, rule: &ScopeRule) -> bool { + let Some(alloc) = lock.find(parent) else { + return false; + }; + if rule.permission != Permission::Write { + return alloc.scope_allow.iter().any(|r| covers_fully(r, rule)); + } + let covered = alloc + .scope_allow + .iter() + .filter(|r| r.permission == Permission::Write) + .any(|r| covers_fully(r, rule)); + if !covered { + return false; + } + let child_conflict = lock + .allocations + .iter() + .filter(|a| a.delegated_from.as_deref() == Some(parent)) + .flat_map(|a| a.scope_allow.iter()) + .filter(|r| r.permission == Permission::Write) + .any(|r| rules_overlap(r, rule)); + !child_conflict +} + +/// Find the Pod that actually owns a write scope overlapping `rule`. +/// +/// Walks the delegation tree: if an allocation overlaps `rule`, we +/// descend into its children and return the deepest overlapping node +/// as the true owner. `exempt` names a Pod whose ownership is +/// permitted (used during delegation: the spawner itself is allowed +/// to still own the rule's region because it is handing it down). +pub fn find_conflict_owner( + lock: &LockFile, + rule: &ScopeRule, + exempt: Option<&str>, +) -> Option { + if rule.permission != Permission::Write { + return None; + } + for alloc in lock + .allocations + .iter() + .filter(|a| a.delegated_from.is_none()) + { + if let Some(owner) = find_conflict_in_subtree(lock, alloc, rule) { + if Some(owner.as_str()) == exempt { + continue; + } + return Some(owner); + } + } + None +} + +fn find_conflict_in_subtree( + lock: &LockFile, + alloc: &Allocation, + rule: &ScopeRule, +) -> Option { + let overlaps_here = alloc + .scope_allow + .iter() + .filter(|r| r.permission == Permission::Write) + .any(|r| rules_overlap(r, rule)); + if !overlaps_here { + return None; + } + for child in lock + .allocations + .iter() + .filter(|a| a.delegated_from.as_deref() == Some(alloc.pod_name.as_str())) + { + if let Some(owner) = find_conflict_in_subtree(lock, child, rule) { + return Some(owner); + } + } + Some(alloc.pod_name.clone()) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::test_util::*; + use crate::{ScopeLockError, delegate_scope, register_pod}; + use tempfile::TempDir; + + #[test] + fn rules_overlap_prefix_relation() { + assert!(rules_overlap( + &write_rule("/src", true), + &write_rule("/src/core", true) + )); + assert!(rules_overlap( + &write_rule("/src/core", true), + &write_rule("/src", true), + )); + assert!(!rules_overlap( + &write_rule("/src", true), + &write_rule("/docs", true), + )); + } + + #[test] + fn rules_overlap_non_recursive() { + assert!(!rules_overlap( + &write_rule("/src", false), + &write_rule("/src/a/b", true), + )); + assert!(rules_overlap( + &write_rule("/src", false), + &write_rule("/src/child", false), + )); + } + + #[test] + fn conflict_detection_descends_to_real_owner() { + let dir = TempDir::new().unwrap(); + let path = dir.path().join("pods.json"); + let mut g = open_empty(&path); + register_pod( + &mut g, + "a".into(), + std::process::id(), + sock("a"), + vec![write_rule("/src", true)], + sid(), + ) + .unwrap(); + delegate_scope( + &mut g, + "a", + "b".into(), + std::process::id(), + sock("b"), + vec![write_rule("/src/core", true)], + ) + .unwrap(); + // A different top-level Pod trying to register /src/core/x + // should be blamed on B (deepest owner), not A. + let err = register_pod( + &mut g, + "x".into(), + std::process::id(), + sock("x"), + vec![write_rule("/src/core/x", true)], + sid(), + ) + .unwrap_err(); + match err { + ScopeLockError::WriteConflict { competitor, .. } => assert_eq!(competitor, "b"), + other => panic!("expected WriteConflict, got {other:?}"), + } + } +} diff --git a/crates/pod-registry/src/error.rs b/crates/pod-registry/src/error.rs new file mode 100644 index 00000000..05807ab6 --- /dev/null +++ b/crates/pod-registry/src/error.rs @@ -0,0 +1,34 @@ +//! Error type for mutating pod-registry operations. + +use std::io; +use std::path::PathBuf; + +use manifest::ScopeRule; +use session_store::SessionId; + +/// Errors raised by the mutating pod-registry operations. +#[derive(Debug, thiserror::Error)] +pub enum ScopeLockError { + #[error("I/O error on pods.json: {0}")] + Io(#[from] io::Error), + #[error("pod name `{0}` is already registered")] + DuplicatePodName(String), + #[error("requested scope `{}` conflicts with pod `{competitor}`", .rule.target.display())] + WriteConflict { competitor: String, rule: ScopeRule }, + #[error( + "requested scope `{}` is not within spawner `{spawner}`'s effective scope", + .rule.target.display() + )] + NotSubset { spawner: String, rule: ScopeRule }, + #[error("pod `{0}` is not registered")] + UnknownPod(String), + #[error( + "session {session_id} is already held by pod `{pod_name}` at {}", + .socket.display() + )] + SessionConflict { + session_id: SessionId, + pod_name: String, + socket: PathBuf, + }, +} diff --git a/crates/pod-registry/src/lib.rs b/crates/pod-registry/src/lib.rs index 690cbdfa..ede20133 100644 --- a/crates/pod-registry/src/lib.rs +++ b/crates/pod-registry/src/lib.rs @@ -13,1411 +13,20 @@ //! recovery rides on the next Pod that opens the file — no background //! reaper. -use std::fs::{DirBuilder, File, OpenOptions}; -use std::io::{self, Read, Seek, SeekFrom, Write}; -use std::os::unix::fs::{DirBuilderExt, OpenOptionsExt}; -use std::path::{Path, PathBuf}; - -use fs4::fs_std::FileExt; -use manifest::{Permission, ScopeRule, paths}; -use serde::{Deserialize, Serialize}; -use session_store::SessionId; - -/// On-disk representation of the allocation table. -#[derive(Debug, Clone, Default, Serialize, Deserialize)] -pub struct LockFile { - #[serde(default)] - pub allocations: Vec, -} - -/// One Pod's scope allocation. -/// -/// `scope_allow` is the full set of allow rules the Pod was granted. -/// Portions delegated out to child Pods are **not** subtracted in -/// storage — the effective write scope is derived on the fly by -/// removing rules owned by any Pod whose `delegated_from` points to -/// this one. Keeping the raw allow set makes reparenting (stale -/// reclaim) trivial. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct Allocation { - /// Pod name — also the identity used throughout orchestration. - pub pod_name: String, - /// Owning process. Checked with `kill(pid, 0)` for stale detection. - pub pid: u32, - /// Pod's Unix socket path. - pub socket: PathBuf, - /// Allow rules granted to this Pod (write + read). - pub scope_allow: Vec, - /// Name of the Pod that delegated scope to this one, or `None` for - /// a top-level Pod started directly by a human. - pub delegated_from: Option, - /// Session ID this Pod is currently writing to. `None` means this - /// is a pre-reservation made by a spawner via [`delegate_scope`] - /// before the child has come up; the child fills it in at - /// [`adopt_allocation`] time. - #[serde(default)] - pub session_id: Option, -} - -impl LockFile { - pub fn find(&self, pod_name: &str) -> Option<&Allocation> { - self.allocations.iter().find(|a| a.pod_name == pod_name) - } - - pub fn find_mut(&mut self, pod_name: &str) -> Option<&mut Allocation> { - self.allocations.iter_mut().find(|a| a.pod_name == pod_name) - } - - /// Find the allocation currently writing to `session_id`. Skips - /// pre-reservations whose `session_id` is still `None`. - pub fn find_by_session(&self, session_id: SessionId) -> Option<&Allocation> { - self.allocations - .iter() - .find(|a| a.session_id == Some(session_id)) - } -} - -/// Default on-disk path: `/pods.json` resolved via -/// [`manifest::paths::pod_registry_path`]. Tests should point this -/// elsewhere by setting `INSOMNIA_HOME` or `INSOMNIA_RUNTIME_DIR` to a -/// tempdir. -pub fn default_registry_path() -> io::Result { - paths::pod_registry_path().ok_or_else(|| { - io::Error::new( - io::ErrorKind::NotFound, - "could not resolve pods.json path (no INSOMNIA_HOME / \ - INSOMNIA_RUNTIME_DIR / XDG_RUNTIME_DIR / HOME)", - ) - }) -} - -/// RAII guard over an exclusively-locked lock file. -/// -/// The file is kept open for the lifetime of the guard; `flock(LOCK_EX)` -/// is released automatically on drop. Mutations go through -/// [`LockFileGuard::data_mut`] and are committed with -/// [`LockFileGuard::save`] before dropping — callers who mutate but -/// never call `save` leave the table unchanged, which is the right -/// behaviour for error paths. -pub struct LockFileGuard { - file: File, - data: LockFile, -} - -impl LockFileGuard { - /// Open the lock file at `path` (creating it + parent dirs if - /// needed), acquire an exclusive `flock`, then parse the contents. - /// - /// An empty file is treated as an empty allocation table. - /// - /// File is created with mode `0600` and its parent directory with - /// mode `0700` so no other user on the machine can read the - /// allocation table. Existing files/directories are left alone. - pub fn open(path: &Path) -> io::Result { - if let Some(parent) = path.parent() { - DirBuilder::new() - .recursive(true) - .mode(0o700) - .create(parent)?; - } - let file = OpenOptions::new() - .read(true) - .write(true) - .create(true) - .truncate(false) - .mode(0o600) - .open(path)?; - FileExt::lock_exclusive(&file)?; - let mut this = Self { - file, - data: LockFile::default(), - }; - this.reload()?; - Ok(this) - } - - fn reload(&mut self) -> io::Result<()> { - self.file.seek(SeekFrom::Start(0))?; - let mut buf = String::new(); - self.file.read_to_string(&mut buf)?; - self.data = if buf.trim().is_empty() { - LockFile::default() - } else { - serde_json::from_str(&buf).map_err(|e| { - io::Error::new( - io::ErrorKind::InvalidData, - format!("pods.json parse error: {e}"), - ) - })? - }; - Ok(()) - } - - pub fn data(&self) -> &LockFile { - &self.data - } - - pub fn data_mut(&mut self) -> &mut LockFile { - &mut self.data - } - - /// Serialise `self.data` back to the file (truncate + rewrite). - pub fn save(&mut self) -> io::Result<()> { - let json = serde_json::to_vec_pretty(&self.data).map_err(io::Error::other)?; - self.file.seek(SeekFrom::Start(0))?; - self.file.set_len(0)?; - self.file.write_all(&json)?; - self.file.sync_data()?; - Ok(()) - } -} - -impl Drop for LockFileGuard { - fn drop(&mut self) { - let _ = FileExt::unlock(&self.file); - } -} - -/// Whether `a` and `b` claim any overlapping concrete path. -/// -/// Recursive rules cover `target/**`; non-recursive rules cover the -/// target itself and its direct children. The four cases enumerate -/// when those coverage sets intersect. -pub(crate) fn rules_overlap(a: &ScopeRule, b: &ScopeRule) -> bool { - match (a.recursive, b.recursive) { - (true, true) => a.target.starts_with(&b.target) || b.target.starts_with(&a.target), - (true, false) => { - // a covers a.target/**; b covers {b.target, b.target/*}. - b.target.starts_with(&a.target) || a.target.parent() == Some(b.target.as_path()) - } - (false, true) => { - a.target.starts_with(&b.target) || b.target.parent() == Some(a.target.as_path()) - } - (false, false) => { - a.target == b.target - || a.target.parent() == Some(b.target.as_path()) - || b.target.parent() == Some(a.target.as_path()) - } - } -} - -/// Does `cover` fully contain `inner`'s claimed paths? -fn covers_fully(cover: &ScopeRule, inner: &ScopeRule) -> bool { - if cover.permission < inner.permission { - return false; - } - if cover.recursive { - inner.target.starts_with(&cover.target) - } else { - inner.target == cover.target && !inner.recursive - } -} - -/// Check whether `rule` is contained in `parent`'s effective write -/// scope: its allow set covers `rule`, and no child of `parent` has -/// already taken a piece that would overlap `rule`. -pub fn is_within_effective_write(lock: &LockFile, parent: &str, rule: &ScopeRule) -> bool { - let Some(alloc) = lock.find(parent) else { - return false; - }; - if rule.permission != Permission::Write { - return alloc.scope_allow.iter().any(|r| covers_fully(r, rule)); - } - let covered = alloc - .scope_allow - .iter() - .filter(|r| r.permission == Permission::Write) - .any(|r| covers_fully(r, rule)); - if !covered { - return false; - } - let child_conflict = lock - .allocations - .iter() - .filter(|a| a.delegated_from.as_deref() == Some(parent)) - .flat_map(|a| a.scope_allow.iter()) - .filter(|r| r.permission == Permission::Write) - .any(|r| rules_overlap(r, rule)); - !child_conflict -} - -/// Find the Pod that actually owns a write scope overlapping `rule`. -/// -/// Walks the delegation tree: if an allocation overlaps `rule`, we -/// descend into its children and return the deepest overlapping node -/// as the true owner. `exempt` names a Pod whose ownership is -/// permitted (used during delegation: the spawner itself is allowed -/// to still own the rule's region because it is handing it down). -pub fn find_conflict_owner( - lock: &LockFile, - rule: &ScopeRule, - exempt: Option<&str>, -) -> Option { - if rule.permission != Permission::Write { - return None; - } - for alloc in lock - .allocations - .iter() - .filter(|a| a.delegated_from.is_none()) - { - if let Some(owner) = find_conflict_in_subtree(lock, alloc, rule) { - if Some(owner.as_str()) == exempt { - continue; - } - return Some(owner); - } - } - None -} - -fn find_conflict_in_subtree( - lock: &LockFile, - alloc: &Allocation, - rule: &ScopeRule, -) -> Option { - let overlaps_here = alloc - .scope_allow - .iter() - .filter(|r| r.permission == Permission::Write) - .any(|r| rules_overlap(r, rule)); - if !overlaps_here { - return None; - } - for child in lock - .allocations - .iter() - .filter(|a| a.delegated_from.as_deref() == Some(alloc.pod_name.as_str())) - { - if let Some(owner) = find_conflict_in_subtree(lock, child, rule) { - return Some(owner); - } - } - Some(alloc.pod_name.clone()) -} - -// --------------------------------------------------------------------------- -// Mutating operations -// --------------------------------------------------------------------------- - -/// Register a top-level Pod (started directly by a human, no -/// delegation parent). Reclaims stale entries before checking -/// conflicts so a crashed Pod's allocation doesn't block the new one. -/// -/// Rejects when another live allocation is already writing to -/// `session_id`, so two `restore_from_manifest` calls under different -/// `pod_name`s cannot both grab the same session log. -pub fn register_pod( - guard: &mut LockFileGuard, - pod_name: String, - pid: u32, - socket: PathBuf, - scope_allow: Vec, - session_id: SessionId, -) -> Result<(), ScopeLockError> { - reclaim_stale(guard); - if guard.data().find(&pod_name).is_some() { - return Err(ScopeLockError::DuplicatePodName(pod_name)); - } - if let Some(existing) = guard.data().find_by_session(session_id) { - return Err(ScopeLockError::SessionConflict { - session_id, - pod_name: existing.pod_name.clone(), - socket: existing.socket.clone(), - }); - } - for rule in scope_allow - .iter() - .filter(|r| r.permission == Permission::Write) - { - if let Some(competitor) = find_conflict_owner(guard.data(), rule, None) { - return Err(ScopeLockError::WriteConflict { - competitor, - rule: rule.clone(), - }); - } - } - guard.data_mut().allocations.push(Allocation { - pod_name, - pid, - socket, - scope_allow, - delegated_from: None, - session_id: Some(session_id), - }); - guard.save()?; - Ok(()) -} - -/// Register a spawned Pod whose scope is delegated from `spawner`. -/// The requested scope must be within `spawner`'s effective write -/// scope; overlap with any Pod other than `spawner` is a conflict. -pub fn delegate_scope( - guard: &mut LockFileGuard, - spawner: &str, - spawned: String, - pid: u32, - socket: PathBuf, - scope_allow: Vec, -) -> Result<(), ScopeLockError> { - reclaim_stale(guard); - if guard.data().find(&spawned).is_some() { - return Err(ScopeLockError::DuplicatePodName(spawned)); - } - if guard.data().find(spawner).is_none() { - return Err(ScopeLockError::UnknownPod(spawner.into())); - } - for rule in &scope_allow { - if !is_within_effective_write(guard.data(), spawner, rule) { - return Err(ScopeLockError::NotSubset { - spawner: spawner.into(), - rule: rule.clone(), - }); - } - if rule.permission == Permission::Write { - if let Some(competitor) = find_conflict_owner(guard.data(), rule, Some(spawner)) { - return Err(ScopeLockError::WriteConflict { - competitor, - rule: rule.clone(), - }); - } - } - } - guard.data_mut().allocations.push(Allocation { - pod_name: spawned, - pid, - socket, - scope_allow, - delegated_from: Some(spawner.into()), - // Pre-reservation. The child fills in its own session_id when - // it calls `adopt_allocation` after the worker is built. - session_id: None, - }); - guard.save()?; - Ok(()) -} - -/// Remove a Pod's allocation. Surviving children are reparented to -/// the removed Pod's own `delegated_from`, so the delegation tree -/// stays connected. -pub fn release_pod(guard: &mut LockFileGuard, pod_name: &str) -> Result<(), ScopeLockError> { - let idx = guard - .data() - .allocations - .iter() - .position(|a| a.pod_name == pod_name); - let Some(idx) = idx else { - return Err(ScopeLockError::UnknownPod(pod_name.into())); - }; - let removed = guard.data().allocations[idx].clone(); - for alloc in guard.data_mut().allocations.iter_mut() { - if alloc.delegated_from.as_deref() == Some(pod_name) { - alloc.delegated_from.clone_from(&removed.delegated_from); - } - } - guard.data_mut().allocations.remove(idx); - guard.save()?; - Ok(()) -} - -/// Remove allocations whose PID is dead, reparenting children to the -/// dead Pod's `delegated_from`. Idempotent and best-effort — I/O -/// errors on save are swallowed so a crashed Pod's entry never blocks -/// forward progress. -pub fn reclaim_stale(guard: &mut LockFileGuard) { - reclaim_stale_with(guard, pid_alive); -} - -/// Test seam: stale reclaim with a caller-supplied liveness probe. -pub fn reclaim_stale_with(guard: &mut LockFileGuard, mut is_alive: impl FnMut(u32) -> bool) { - let dead: Vec = guard - .data() - .allocations - .iter() - .filter(|a| !is_alive(a.pid)) - .map(|a| a.pod_name.clone()) - .collect(); - if dead.is_empty() { - return; - } - for name in &dead { - let Some(idx) = guard - .data() - .allocations - .iter() - .position(|a| a.pod_name == *name) - else { - continue; - }; - let removed = guard.data().allocations[idx].clone(); - for alloc in guard.data_mut().allocations.iter_mut() { - if alloc.delegated_from.as_deref() == Some(name.as_str()) { - alloc.delegated_from.clone_from(&removed.delegated_from); - } - } - guard.data_mut().allocations.remove(idx); - } - let _ = guard.save(); -} - -/// `kill(pid, 0)` — returns true if the process exists (even when we -/// don't own it), false only on ESRCH. -fn pid_alive(pid: u32) -> bool { - if pid == 0 { - return false; - } - let ret = unsafe { libc::kill(pid as libc::pid_t, 0) }; - if ret == 0 { - return true; - } - io::Error::last_os_error() - .raw_os_error() - .map(|e| e != libc::ESRCH) - .unwrap_or(false) -} - -// --------------------------------------------------------------------------- -// Lifecycle guard -// --------------------------------------------------------------------------- - -/// Owned allocation: on drop, opens the lock file and releases this -/// Pod's entry. The guard keeps only the name + lock-file path; it -/// does not hold the `flock` for the Pod's lifetime. -#[derive(Debug)] -pub struct ScopeAllocationGuard { - pod_name: String, - lock_path: PathBuf, -} - -impl ScopeAllocationGuard { - pub fn pod_name(&self) -> &str { - &self.pod_name - } - - pub fn lock_path(&self) -> &Path { - &self.lock_path - } -} - -impl Drop for ScopeAllocationGuard { - fn drop(&mut self) { - if let Ok(mut guard) = LockFileGuard::open(&self.lock_path) { - let _ = release_pod(&mut guard, &self.pod_name); - } - } -} - -/// Open the default lock file, register a top-level Pod, and return a -/// guard that will release the allocation on drop. -pub fn install_top_level( - pod_name: String, - pid: u32, - socket: PathBuf, - scope_allow: Vec, - session_id: SessionId, -) -> Result { - let lock_path = default_registry_path()?; - let mut guard = LockFileGuard::open(&lock_path)?; - register_pod( - &mut guard, - pod_name.clone(), - pid, - socket, - scope_allow, - session_id, - )?; - Ok(ScopeAllocationGuard { - pod_name, - lock_path, - }) -} - -/// Take ownership of an existing allocation that was pre-registered by -/// a spawning Pod. -/// -/// The spawning flow is two-stage: the spawner calls [`delegate_scope`] -/// (with its own pid as a live placeholder, `session_id = None`), then -/// exec's the child; the child, once running, calls this function to -/// rewrite the allocation's pid + session_id to its own and claim the -/// `ScopeAllocationGuard` so the entry is released when the child -/// exits. -pub fn adopt_allocation( - pod_name: String, - new_pid: u32, - session_id: SessionId, -) -> Result { - let lock_path = default_registry_path()?; - let mut guard = LockFileGuard::open(&lock_path)?; - let alloc = guard - .data_mut() - .find_mut(&pod_name) - .ok_or_else(|| ScopeLockError::UnknownPod(pod_name.clone()))?; - alloc.pid = new_pid; - alloc.session_id = Some(session_id); - guard.save()?; - Ok(ScopeAllocationGuard { - pod_name, - lock_path, - }) -} - -/// Rewrite the `session_id` recorded for `pod_name` to -/// `new_session_id`. -/// -/// The Pod's in-memory `session_id` can change underneath the -/// allocation in two normal places: -/// -/// - `Pod::compact` mints a fresh session and swaps it in. -/// - `session_store::ensure_head_or_fork` auto-forks when another -/// writer has advanced the store head behind our back. -/// -/// Both paths must call this so subsequent `lookup_session` queries -/// find the live session id, not the old one. Without this update a -/// concurrent `restore_from_manifest(new_id)` would see "no live -/// writer" and proceed to register a competing allocation on the -/// session this Pod just moved into. -/// -/// The lock is opened once and the allocation is rewritten inside the -/// guard, so the session_id collision check is atomic with the -/// rewrite. -pub fn update_session(pod_name: &str, new_session_id: SessionId) -> Result<(), ScopeLockError> { - let lock_path = default_registry_path()?; - let mut guard = LockFileGuard::open(&lock_path)?; - if let Some(other) = guard.data().find_by_session(new_session_id) { - if other.pod_name != pod_name { - return Err(ScopeLockError::SessionConflict { - session_id: new_session_id, - pod_name: other.pod_name.clone(), - socket: other.socket.clone(), - }); - } - } - let alloc = guard - .data_mut() - .find_mut(pod_name) - .ok_or_else(|| ScopeLockError::UnknownPod(pod_name.into()))?; - alloc.session_id = Some(new_session_id); - guard.save()?; - Ok(()) -} - -/// Information about a Pod that currently holds an allocation for a -/// given session. -#[derive(Debug, Clone)] -pub struct SessionLockInfo { - pub pod_name: String, - pub socket: PathBuf, - pub pid: u32, -} - -/// Open the default lock file, reclaim stale entries, and return the -/// allocation currently writing to `session_id`, if any. -/// -/// Used by `Pod::restore_from_manifest` to refuse a resume that would -/// race a live writer on the same source session. -pub fn lookup_session(session_id: SessionId) -> Result, ScopeLockError> { - let lock_path = default_registry_path()?; - let mut guard = LockFileGuard::open(&lock_path)?; - reclaim_stale(&mut guard); - Ok(guard.data().find_by_session(session_id).map(|a| { - SessionLockInfo { - pod_name: a.pod_name.clone(), - socket: a.socket.clone(), - pid: a.pid, - } - })) -} - -/// Errors raised by the mutating pod-registry operations. -#[derive(Debug, thiserror::Error)] -pub enum ScopeLockError { - #[error("I/O error on pods.json: {0}")] - Io(#[from] io::Error), - #[error("pod name `{0}` is already registered")] - DuplicatePodName(String), - #[error("requested scope `{}` conflicts with pod `{competitor}`", .rule.target.display())] - WriteConflict { competitor: String, rule: ScopeRule }, - #[error( - "requested scope `{}` is not within spawner `{spawner}`'s effective scope", - .rule.target.display() - )] - NotSubset { spawner: String, rule: ScopeRule }, - #[error("pod `{0}` is not registered")] - UnknownPod(String), - #[error( - "session {session_id} is already held by pod `{pod_name}` at {}", - .socket.display() - )] - SessionConflict { - session_id: SessionId, - pod_name: String, - socket: PathBuf, - }, -} +mod conflict; +mod error; +mod lifecycle; +mod mutate; +mod table; #[cfg(test)] -mod tests { - use super::*; - use manifest::Permission; - use std::sync::{LazyLock, Mutex, MutexGuard}; - use tempfile::TempDir; +mod test_util; - /// Serialises tests that mutate runtime-dir env vars. The test - /// harness runs tests on multiple threads inside a single process, - /// so env-var writes from one test would otherwise leak into a - /// parallel test's `default_lock_path()` lookup. - fn sid() -> SessionId { - session_store::new_session_id() - } - - static ENV_LOCK: LazyLock> = LazyLock::new(|| Mutex::new(())); - - /// Sandbox `INSOMNIA_RUNTIME_DIR` to a tempdir for the duration of - /// a test; restore the previous value (and any `INSOMNIA_HOME` / - /// `XDG_RUNTIME_DIR` that would otherwise outrank it) on drop. - struct RuntimeDirSandbox { - prev_runtime: Option, - prev_home: Option, - prev_xdg: Option, - _guard: MutexGuard<'static, ()>, - } - - impl RuntimeDirSandbox { - fn new(dir: &Path) -> Self { - let guard = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner()); - let prev_runtime = std::env::var("INSOMNIA_RUNTIME_DIR").ok(); - let prev_home = std::env::var("INSOMNIA_HOME").ok(); - let prev_xdg = std::env::var("XDG_RUNTIME_DIR").ok(); - // SAFETY: ENV_LOCK serialises env writes across this test - // module; other modules that touch env vars rely on their - // own lock or `serial_test`. - unsafe { - std::env::remove_var("INSOMNIA_HOME"); - std::env::remove_var("XDG_RUNTIME_DIR"); - std::env::set_var("INSOMNIA_RUNTIME_DIR", dir); - } - Self { - prev_runtime, - prev_home, - prev_xdg, - _guard: guard, - } - } - } - - impl Drop for RuntimeDirSandbox { - fn drop(&mut self) { - unsafe { - match &self.prev_runtime { - Some(v) => std::env::set_var("INSOMNIA_RUNTIME_DIR", v), - None => std::env::remove_var("INSOMNIA_RUNTIME_DIR"), - } - match &self.prev_home { - Some(v) => std::env::set_var("INSOMNIA_HOME", v), - None => std::env::remove_var("INSOMNIA_HOME"), - } - match &self.prev_xdg { - Some(v) => std::env::set_var("XDG_RUNTIME_DIR", v), - None => std::env::remove_var("XDG_RUNTIME_DIR"), - } - } - } - } - - fn write_rule(path: &str, recursive: bool) -> ScopeRule { - ScopeRule { - target: PathBuf::from(path), - permission: Permission::Write, - recursive, - } - } - - fn sock(name: &str) -> PathBuf { - PathBuf::from(format!("/tmp/{name}.sock")) - } - - fn open_empty(path: &Path) -> LockFileGuard { - LockFileGuard::open(path).unwrap() - } - - #[test] - fn open_creates_empty_lock_file() { - let dir = TempDir::new().unwrap(); - let path = dir.path().join("pods.json"); - let guard = LockFileGuard::open(&path).unwrap(); - assert!(guard.data().allocations.is_empty()); - assert!(path.exists()); - } - - #[test] - fn open_creates_file_with_owner_only_permissions() { - use std::os::unix::fs::PermissionsExt; - let dir = TempDir::new().unwrap(); - let parent = dir.path().join("insomnia"); - let path = parent.join("pods.json"); - let _guard = LockFileGuard::open(&path).unwrap(); - let file_mode = std::fs::metadata(&path).unwrap().permissions().mode() & 0o777; - assert_eq!(file_mode, 0o600, "file mode = {file_mode:o}"); - let dir_mode = std::fs::metadata(&parent).unwrap().permissions().mode() & 0o777; - assert_eq!(dir_mode, 0o700, "dir mode = {dir_mode:o}"); - } - - #[test] - fn save_and_reopen_roundtrip() { - let dir = TempDir::new().unwrap(); - let path = dir.path().join("pods.json"); - { - let mut g = open_empty(&path); - register_pod( - &mut g, - "a".into(), - std::process::id(), - sock("a"), - vec![write_rule("/src", true)], - sid(), - ) - .unwrap(); - } - let guard = LockFileGuard::open(&path).unwrap(); - assert_eq!(guard.data().allocations.len(), 1); - assert_eq!(guard.data().allocations[0].pod_name, "a"); - } - - #[test] - fn rules_overlap_prefix_relation() { - assert!(rules_overlap( - &write_rule("/src", true), - &write_rule("/src/core", true) - )); - assert!(rules_overlap( - &write_rule("/src/core", true), - &write_rule("/src", true), - )); - assert!(!rules_overlap( - &write_rule("/src", true), - &write_rule("/docs", true), - )); - } - - #[test] - fn rules_overlap_non_recursive() { - assert!(!rules_overlap( - &write_rule("/src", false), - &write_rule("/src/a/b", true), - )); - assert!(rules_overlap( - &write_rule("/src", false), - &write_rule("/src/child", false), - )); - } - - #[test] - fn register_detects_write_conflict() { - let dir = TempDir::new().unwrap(); - let path = dir.path().join("pods.json"); - let mut g = open_empty(&path); - register_pod( - &mut g, - "a".into(), - std::process::id(), - sock("a"), - vec![write_rule("/src", true)], - sid(), - ) - .unwrap(); - let err = register_pod( - &mut g, - "b".into(), - std::process::id(), - sock("b"), - vec![write_rule("/src/core", true)], - sid(), - ) - .unwrap_err(); - match err { - ScopeLockError::WriteConflict { competitor, .. } => assert_eq!(competitor, "a"), - other => panic!("expected WriteConflict, got {other:?}"), - } - } - - #[test] - fn duplicate_pod_name_rejected() { - let dir = TempDir::new().unwrap(); - let path = dir.path().join("pods.json"); - let mut g = open_empty(&path); - register_pod( - &mut g, - "a".into(), - std::process::id(), - sock("a"), - vec![write_rule("/src", true)], - sid(), - ) - .unwrap(); - let err = register_pod( - &mut g, - "a".into(), - std::process::id(), - sock("a2"), - vec![write_rule("/docs", true)], - sid(), - ) - .unwrap_err(); - assert!(matches!(err, ScopeLockError::DuplicatePodName(ref n) if n == "a")); - } - - #[test] - fn delegate_must_be_subset() { - let dir = TempDir::new().unwrap(); - let path = dir.path().join("pods.json"); - let mut g = open_empty(&path); - register_pod( - &mut g, - "a".into(), - std::process::id(), - sock("a"), - vec![write_rule("/src", true)], - sid(), - ) - .unwrap(); - let err = delegate_scope( - &mut g, - "a", - "b".into(), - std::process::id(), - sock("b"), - vec![write_rule("/docs", true)], - ) - .unwrap_err(); - assert!(matches!(err, ScopeLockError::NotSubset { .. })); - } - - #[test] - fn delegate_succeeds_within_parent_scope() { - let dir = TempDir::new().unwrap(); - let path = dir.path().join("pods.json"); - let mut g = open_empty(&path); - register_pod( - &mut g, - "a".into(), - std::process::id(), - sock("a"), - vec![write_rule("/src", true)], - sid(), - ) - .unwrap(); - delegate_scope( - &mut g, - "a", - "b".into(), - std::process::id(), - sock("b"), - vec![write_rule("/src/core", true)], - ) - .unwrap(); - assert_eq!(g.data().allocations.len(), 2); - // A's effective write no longer covers /src/core because B has it. - assert!(!is_within_effective_write( - g.data(), - "a", - &write_rule("/src/core", true) - )); - // A still covers its own uninvolved areas. - assert!(is_within_effective_write( - g.data(), - "a", - &write_rule("/src/other", true) - )); - } - - #[test] - fn delegate_rejects_sibling_overlap() { - let dir = TempDir::new().unwrap(); - let path = dir.path().join("pods.json"); - let mut g = open_empty(&path); - register_pod( - &mut g, - "a".into(), - std::process::id(), - sock("a"), - vec![write_rule("/src", true)], - sid(), - ) - .unwrap(); - delegate_scope( - &mut g, - "a", - "b".into(), - std::process::id(), - sock("b"), - vec![write_rule("/src/core", true)], - ) - .unwrap(); - // Sibling C from A tries to take /src/core/sub — already under B's scope. - let err = delegate_scope( - &mut g, - "a", - "c".into(), - std::process::id(), - sock("c"), - vec![write_rule("/src/core/sub", true)], - ) - .unwrap_err(); - // NotSubset fires first because /src/core is no longer in A's effective. - assert!(matches!(err, ScopeLockError::NotSubset { .. })); - } - - #[test] - fn release_reparents_children() { - let dir = TempDir::new().unwrap(); - let path = dir.path().join("pods.json"); - let mut g = open_empty(&path); - register_pod( - &mut g, - "a".into(), - std::process::id(), - sock("a"), - vec![write_rule("/src", true)], - sid(), - ) - .unwrap(); - delegate_scope( - &mut g, - "a", - "b".into(), - std::process::id(), - sock("b"), - vec![write_rule("/src/core", true)], - ) - .unwrap(); - delegate_scope( - &mut g, - "b", - "d".into(), - std::process::id(), - sock("d"), - vec![write_rule("/src/core/x", true)], - ) - .unwrap(); - release_pod(&mut g, "b").unwrap(); - // D should now list A as its delegated_from. - let d = g.data().find("d").unwrap(); - assert_eq!(d.delegated_from.as_deref(), Some("a")); - assert!(g.data().find("b").is_none()); - } - - #[test] - fn reclaim_stale_reparents_and_removes_dead_entries() { - let dir = TempDir::new().unwrap(); - let path = dir.path().join("pods.json"); - let mut g = open_empty(&path); - register_pod( - &mut g, - "a".into(), - std::process::id(), - sock("a"), - vec![write_rule("/src", true)], - sid(), - ) - .unwrap(); - delegate_scope( - &mut g, - "a", - "b".into(), - std::process::id(), - sock("b"), - vec![write_rule("/src/core", true)], - ) - .unwrap(); - delegate_scope( - &mut g, - "b", - "d".into(), - std::process::id(), - sock("d"), - vec![write_rule("/src/core/x", true)], - ) - .unwrap(); - // Simulate B crashing by rewriting its pid to one the probe - // will treat as dead. - let fake_dead_pid: u32 = 0xffff_fff0; - for alloc in g.data_mut().allocations.iter_mut() { - if alloc.pod_name == "b" { - alloc.pid = fake_dead_pid; - } - } - reclaim_stale_with(&mut g, |pid| pid != fake_dead_pid); - assert!(g.data().find("b").is_none()); - let d = g.data().find("d").unwrap(); - assert_eq!(d.delegated_from.as_deref(), Some("a")); - } - - fn read_rule(path: &str, recursive: bool) -> ScopeRule { - ScopeRule { - target: PathBuf::from(path), - permission: Permission::Read, - recursive, - } - } - - #[test] - fn read_rules_do_not_conflict_with_write() { - let dir = TempDir::new().unwrap(); - let path = dir.path().join("pods.json"); - let mut g = open_empty(&path); - register_pod( - &mut g, - "a".into(), - std::process::id(), - sock("a"), - vec![write_rule("/src", true)], - sid(), - ) - .unwrap(); - // B only reads under the same tree — allowed. - register_pod( - &mut g, - "b".into(), - std::process::id(), - sock("b"), - vec![read_rule("/src", true)], - sid(), - ) - .unwrap(); - assert_eq!(g.data().allocations.len(), 2); - } - - #[test] - fn releasing_pod_reopens_scope_for_fresh_registration() { - let dir = TempDir::new().unwrap(); - let path = dir.path().join("pods.json"); - let mut g = open_empty(&path); - register_pod( - &mut g, - "a".into(), - std::process::id(), - sock("a"), - vec![write_rule("/src", true)], - sid(), - ) - .unwrap(); - release_pod(&mut g, "a").unwrap(); - register_pod( - &mut g, - "b".into(), - std::process::id(), - sock("b"), - vec![write_rule("/src", true)], - sid(), - ) - .unwrap(); - } - - #[test] - fn delegated_scope_returns_to_parent_on_release() { - let dir = TempDir::new().unwrap(); - let path = dir.path().join("pods.json"); - let mut g = open_empty(&path); - register_pod( - &mut g, - "a".into(), - std::process::id(), - sock("a"), - vec![write_rule("/src", true)], - sid(), - ) - .unwrap(); - delegate_scope( - &mut g, - "a", - "b".into(), - std::process::id(), - sock("b"), - vec![write_rule("/src/core", true)], - ) - .unwrap(); - assert!(!is_within_effective_write( - g.data(), - "a", - &write_rule("/src/core", true) - )); - release_pod(&mut g, "b").unwrap(); - // /src/core is back in A's effective write scope. - assert!(is_within_effective_write( - g.data(), - "a", - &write_rule("/src/core", true) - )); - } - - #[test] - fn scope_allocation_guard_releases_on_drop() { - let dir = TempDir::new().unwrap(); - let _sandbox = RuntimeDirSandbox::new(dir.path()); - let lock_path = dir.path().join("pods.json"); - let guard = install_top_level( - "a".into(), - std::process::id(), - sock("a"), - vec![write_rule("/src", true)], - sid(), - ) - .unwrap(); - { - let g = LockFileGuard::open(&lock_path).unwrap(); - assert!(g.data().find("a").is_some()); - } - drop(guard); - { - let g = LockFileGuard::open(&lock_path).unwrap(); - assert!(g.data().find("a").is_none()); - } - } - - #[test] - fn adopt_allocation_rewrites_pid_and_releases_on_drop() { - let dir = TempDir::new().unwrap(); - let _sandbox = RuntimeDirSandbox::new(dir.path()); - let lock_path = dir.path().join("pods.json"); - // Pre-register an allocation under spawner's pid, as delegate_scope would. - { - let mut g = LockFileGuard::open(&lock_path).unwrap(); - delegate_placeholder(&mut g, "child", std::process::id()); - } - let child_pid = std::process::id().wrapping_add(1); - let guard = adopt_allocation("child".into(), child_pid, sid()).unwrap(); - { - let g = LockFileGuard::open(&lock_path).unwrap(); - let alloc = g.data().find("child").unwrap(); - assert_eq!(alloc.pid, child_pid); - } - drop(guard); - { - let g = LockFileGuard::open(&lock_path).unwrap(); - assert!(g.data().find("child").is_none()); - } - } - - #[test] - fn adopt_allocation_errors_on_unknown_pod() { - let dir = TempDir::new().unwrap(); - let _sandbox = RuntimeDirSandbox::new(dir.path()); - let err = adopt_allocation("ghost".into(), 42, sid()).unwrap_err(); - assert!(matches!(err, ScopeLockError::UnknownPod(ref n) if n == "ghost")); - } - - /// Mimic what the spawner does before the child comes up: push an - /// allocation for the child carrying the spawner's (live) pid as a - /// placeholder. Exists only in tests. - fn delegate_placeholder(g: &mut LockFileGuard, pod_name: &str, placeholder_pid: u32) { - g.data_mut().allocations.push(Allocation { - pod_name: pod_name.to_string(), - pid: placeholder_pid, - socket: sock(pod_name), - scope_allow: vec![write_rule("/tmp/child", true)], - delegated_from: None, - session_id: None, - }); - g.save().unwrap(); - } - - #[test] - fn conflict_detection_descends_to_real_owner() { - let dir = TempDir::new().unwrap(); - let path = dir.path().join("pods.json"); - let mut g = open_empty(&path); - register_pod( - &mut g, - "a".into(), - std::process::id(), - sock("a"), - vec![write_rule("/src", true)], - sid(), - ) - .unwrap(); - delegate_scope( - &mut g, - "a", - "b".into(), - std::process::id(), - sock("b"), - vec![write_rule("/src/core", true)], - ) - .unwrap(); - // A different top-level Pod trying to register /src/core/x - // should be blamed on B (deepest owner), not A. - let err = register_pod( - &mut g, - "x".into(), - std::process::id(), - sock("x"), - vec![write_rule("/src/core/x", true)], - sid(), - ) - .unwrap_err(); - match err { - ScopeLockError::WriteConflict { competitor, .. } => assert_eq!(competitor, "b"), - other => panic!("expected WriteConflict, got {other:?}"), - } - } - - #[test] - fn find_by_session_skips_none_placeholders() { - let dir = TempDir::new().unwrap(); - let path = dir.path().join("pods.json"); - let mut g = open_empty(&path); - // Pre-reservation: delegate_scope leaves session_id = None - // until adopt_allocation rewrites it. find_by_session must not - // match those placeholders, otherwise a freshly-spawning child - // would shadow itself before it has even chosen a session. - register_pod( - &mut g, - "parent".into(), - std::process::id(), - sock("parent"), - vec![write_rule("/p", true)], - sid(), - ) - .unwrap(); - delegate_scope( - &mut g, - "parent", - "child".into(), - std::process::id(), - sock("child"), - vec![write_rule("/p/sub", true)], - ) - .unwrap(); - - let target_session = sid(); - // The placeholder allocation has session_id = None and must - // not be returned for any lookup. - assert!(g.data().find_by_session(target_session).is_none()); - - // After adopt-style rewrite, the same allocation is now found. - g.data_mut() - .find_mut("child") - .unwrap() - .session_id = Some(target_session); - let found = g.data().find_by_session(target_session).unwrap(); - assert_eq!(found.pod_name, "child"); - } - - #[test] - fn register_pod_rejects_session_id_collision() { - let dir = TempDir::new().unwrap(); - let path = dir.path().join("pods.json"); - let mut g = open_empty(&path); - let shared_session = sid(); - register_pod( - &mut g, - "first".into(), - std::process::id(), - sock("first"), - vec![write_rule("/work/a", true)], - shared_session, - ) - .unwrap(); - // Second registration tries to grab the same session_id under - // a different pod_name. Without the SessionConflict check both - // would succeed and race on the same jsonl. - let err = register_pod( - &mut g, - "second".into(), - std::process::id(), - sock("second"), - vec![write_rule("/work/b", true)], - shared_session, - ) - .unwrap_err(); - match err { - ScopeLockError::SessionConflict { - session_id, - pod_name, - .. - } => { - assert_eq!(session_id, shared_session); - assert_eq!(pod_name, "first"); - } - other => panic!("expected SessionConflict, got {other:?}"), - } - } - - #[test] - fn lookup_session_returns_live_writer_info() { - let dir = TempDir::new().unwrap(); - let _sandbox = RuntimeDirSandbox::new(dir.path()); - let s = sid(); - let guard = install_top_level( - "live".into(), - std::process::id(), - sock("live"), - vec![write_rule("/work", true)], - s, - ) - .unwrap(); - let info = lookup_session(s).unwrap().expect("expected live writer"); - assert_eq!(info.pod_name, "live"); - assert_eq!(info.socket, sock("live")); - drop(guard); - // After the guard's release, the lookup goes back to None. - assert!(lookup_session(s).unwrap().is_none()); - } - - #[test] - fn update_session_rewrites_allocation_session_id() { - let dir = TempDir::new().unwrap(); - let _sandbox = RuntimeDirSandbox::new(dir.path()); - let original = sid(); - let updated = sid(); - let _guard = install_top_level( - "p".into(), - std::process::id(), - sock("p"), - vec![write_rule("/work", true)], - original, - ) - .unwrap(); - update_session("p", updated).unwrap(); - // lookup against the original is now empty, the updated id wins. - assert!(lookup_session(original).unwrap().is_none()); - assert_eq!(lookup_session(updated).unwrap().unwrap().pod_name, "p"); - } - - #[test] - fn update_session_rejects_when_target_already_held() { - let dir = TempDir::new().unwrap(); - let _sandbox = RuntimeDirSandbox::new(dir.path()); - let s_a = sid(); - let s_b = sid(); - let _g_a = install_top_level( - "a".into(), - std::process::id(), - sock("a"), - vec![write_rule("/work/a", true)], - s_a, - ) - .unwrap(); - let _g_b = install_top_level( - "b".into(), - std::process::id(), - sock("b"), - vec![write_rule("/work/b", true)], - s_b, - ) - .unwrap(); - // `a` cannot adopt b's live session id. - let err = update_session("a", s_b).unwrap_err(); - match err { - ScopeLockError::SessionConflict { - pod_name, - session_id, - .. - } => { - assert_eq!(pod_name, "b"); - assert_eq!(session_id, s_b); - } - other => panic!("expected SessionConflict, got {other:?}"), - } - } -} +pub use conflict::{find_conflict_owner, is_within_effective_write}; +pub use error::ScopeLockError; +pub use lifecycle::{ + ScopeAllocationGuard, SessionLockInfo, adopt_allocation, install_top_level, lookup_session, + update_session, +}; +pub use mutate::{delegate_scope, reclaim_stale, reclaim_stale_with, register_pod, release_pod}; +pub use table::{Allocation, LockFile, LockFileGuard, default_registry_path}; diff --git a/crates/pod-registry/src/lifecycle.rs b/crates/pod-registry/src/lifecycle.rs new file mode 100644 index 00000000..5f887866 --- /dev/null +++ b/crates/pod-registry/src/lifecycle.rs @@ -0,0 +1,317 @@ +//! Owned-allocation guards and the high-level entry points that open +//! the default registry path, mutate it, and return a guard that cleans +//! up on drop. + +use std::path::{Path, PathBuf}; + +use manifest::ScopeRule; +use session_store::SessionId; + +use crate::error::ScopeLockError; +use crate::mutate::{register_pod, release_pod}; +use crate::table::{LockFileGuard, default_registry_path}; + +/// Owned allocation: on drop, opens the lock file and releases this +/// Pod's entry. The guard keeps only the name + lock-file path; it +/// does not hold the `flock` for the Pod's lifetime. +#[derive(Debug)] +pub struct ScopeAllocationGuard { + pod_name: String, + lock_path: PathBuf, +} + +impl ScopeAllocationGuard { + pub fn pod_name(&self) -> &str { + &self.pod_name + } + + pub fn lock_path(&self) -> &Path { + &self.lock_path + } +} + +impl Drop for ScopeAllocationGuard { + fn drop(&mut self) { + if let Ok(mut guard) = LockFileGuard::open(&self.lock_path) { + let _ = release_pod(&mut guard, &self.pod_name); + } + } +} + +/// Open the default lock file, register a top-level Pod, and return a +/// guard that will release the allocation on drop. +pub fn install_top_level( + pod_name: String, + pid: u32, + socket: PathBuf, + scope_allow: Vec, + session_id: SessionId, +) -> Result { + let lock_path = default_registry_path()?; + let mut guard = LockFileGuard::open(&lock_path)?; + register_pod( + &mut guard, + pod_name.clone(), + pid, + socket, + scope_allow, + session_id, + )?; + Ok(ScopeAllocationGuard { + pod_name, + lock_path, + }) +} + +/// Take ownership of an existing allocation that was pre-registered by +/// a spawning Pod. +/// +/// The spawning flow is two-stage: the spawner calls +/// [`crate::delegate_scope`] (with its own pid as a live placeholder, +/// `session_id = None`), then exec's the child; the child, once +/// running, calls this function to rewrite the allocation's pid + +/// session_id to its own and claim the [`ScopeAllocationGuard`] so +/// the entry is released when the child exits. +pub fn adopt_allocation( + pod_name: String, + new_pid: u32, + session_id: SessionId, +) -> Result { + let lock_path = default_registry_path()?; + let mut guard = LockFileGuard::open(&lock_path)?; + let alloc = guard + .data_mut() + .find_mut(&pod_name) + .ok_or_else(|| ScopeLockError::UnknownPod(pod_name.clone()))?; + alloc.pid = new_pid; + alloc.session_id = Some(session_id); + guard.save()?; + Ok(ScopeAllocationGuard { + pod_name, + lock_path, + }) +} + +/// Rewrite the `session_id` recorded for `pod_name` to +/// `new_session_id`. +/// +/// The Pod's in-memory `session_id` can change underneath the +/// allocation in two normal places: +/// +/// - `Pod::compact` mints a fresh session and swaps it in. +/// - `session_store::ensure_head_or_fork` auto-forks when another +/// writer has advanced the store head behind our back. +/// +/// Both paths must call this so subsequent [`lookup_session`] queries +/// find the live session id, not the old one. Without this update a +/// concurrent `restore_from_manifest(new_id)` would see "no live +/// writer" and proceed to register a competing allocation on the +/// session this Pod just moved into. +/// +/// The lock is opened once and the allocation is rewritten inside the +/// guard, so the session_id collision check is atomic with the +/// rewrite. +pub fn update_session(pod_name: &str, new_session_id: SessionId) -> Result<(), ScopeLockError> { + let lock_path = default_registry_path()?; + let mut guard = LockFileGuard::open(&lock_path)?; + if let Some(other) = guard.data().find_by_session(new_session_id) { + if other.pod_name != pod_name { + return Err(ScopeLockError::SessionConflict { + session_id: new_session_id, + pod_name: other.pod_name.clone(), + socket: other.socket.clone(), + }); + } + } + let alloc = guard + .data_mut() + .find_mut(pod_name) + .ok_or_else(|| ScopeLockError::UnknownPod(pod_name.into()))?; + alloc.session_id = Some(new_session_id); + guard.save()?; + Ok(()) +} + +/// Information about a Pod that currently holds an allocation for a +/// given session. +#[derive(Debug, Clone)] +pub struct SessionLockInfo { + pub pod_name: String, + pub socket: PathBuf, + pub pid: u32, +} + +/// Open the default lock file, reclaim stale entries, and return the +/// allocation currently writing to `session_id`, if any. +/// +/// Used by `Pod::restore_from_manifest` to refuse a resume that would +/// race a live writer on the same source session. +pub fn lookup_session(session_id: SessionId) -> Result, ScopeLockError> { + let lock_path = default_registry_path()?; + let mut guard = LockFileGuard::open(&lock_path)?; + crate::mutate::reclaim_stale(&mut guard); + Ok(guard.data().find_by_session(session_id).map(|a| { + SessionLockInfo { + pod_name: a.pod_name.clone(), + socket: a.socket.clone(), + pid: a.pid, + } + })) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::table::Allocation; + use crate::test_util::*; + use tempfile::TempDir; + + /// Mimic what the spawner does before the child comes up: push an + /// allocation for the child carrying the spawner's (live) pid as a + /// placeholder. Exists only in tests. + fn delegate_placeholder(g: &mut LockFileGuard, pod_name: &str, placeholder_pid: u32) { + g.data_mut().allocations.push(Allocation { + pod_name: pod_name.to_string(), + pid: placeholder_pid, + socket: sock(pod_name), + scope_allow: vec![write_rule("/tmp/child", true)], + delegated_from: None, + session_id: None, + }); + g.save().unwrap(); + } + + #[test] + fn scope_allocation_guard_releases_on_drop() { + let dir = TempDir::new().unwrap(); + let _sandbox = RuntimeDirSandbox::new(dir.path()); + let lock_path = dir.path().join("pods.json"); + let guard = install_top_level( + "a".into(), + std::process::id(), + sock("a"), + vec![write_rule("/src", true)], + sid(), + ) + .unwrap(); + { + let g = LockFileGuard::open(&lock_path).unwrap(); + assert!(g.data().find("a").is_some()); + } + drop(guard); + { + let g = LockFileGuard::open(&lock_path).unwrap(); + assert!(g.data().find("a").is_none()); + } + } + + #[test] + fn adopt_allocation_rewrites_pid_and_releases_on_drop() { + let dir = TempDir::new().unwrap(); + let _sandbox = RuntimeDirSandbox::new(dir.path()); + let lock_path = dir.path().join("pods.json"); + // Pre-register an allocation under spawner's pid, as delegate_scope would. + { + let mut g = LockFileGuard::open(&lock_path).unwrap(); + delegate_placeholder(&mut g, "child", std::process::id()); + } + let child_pid = std::process::id().wrapping_add(1); + let guard = adopt_allocation("child".into(), child_pid, sid()).unwrap(); + { + let g = LockFileGuard::open(&lock_path).unwrap(); + let alloc = g.data().find("child").unwrap(); + assert_eq!(alloc.pid, child_pid); + } + drop(guard); + { + let g = LockFileGuard::open(&lock_path).unwrap(); + assert!(g.data().find("child").is_none()); + } + } + + #[test] + fn adopt_allocation_errors_on_unknown_pod() { + let dir = TempDir::new().unwrap(); + let _sandbox = RuntimeDirSandbox::new(dir.path()); + let err = adopt_allocation("ghost".into(), 42, sid()).unwrap_err(); + assert!(matches!(err, ScopeLockError::UnknownPod(ref n) if n == "ghost")); + } + + #[test] + fn lookup_session_returns_live_writer_info() { + let dir = TempDir::new().unwrap(); + let _sandbox = RuntimeDirSandbox::new(dir.path()); + let s = sid(); + let guard = install_top_level( + "live".into(), + std::process::id(), + sock("live"), + vec![write_rule("/work", true)], + s, + ) + .unwrap(); + let info = lookup_session(s).unwrap().expect("expected live writer"); + assert_eq!(info.pod_name, "live"); + assert_eq!(info.socket, sock("live")); + drop(guard); + // After the guard's release, the lookup goes back to None. + assert!(lookup_session(s).unwrap().is_none()); + } + + #[test] + fn update_session_rewrites_allocation_session_id() { + let dir = TempDir::new().unwrap(); + let _sandbox = RuntimeDirSandbox::new(dir.path()); + let original = sid(); + let updated = sid(); + let _guard = install_top_level( + "p".into(), + std::process::id(), + sock("p"), + vec![write_rule("/work", true)], + original, + ) + .unwrap(); + update_session("p", updated).unwrap(); + // lookup against the original is now empty, the updated id wins. + assert!(lookup_session(original).unwrap().is_none()); + assert_eq!(lookup_session(updated).unwrap().unwrap().pod_name, "p"); + } + + #[test] + fn update_session_rejects_when_target_already_held() { + let dir = TempDir::new().unwrap(); + let _sandbox = RuntimeDirSandbox::new(dir.path()); + let s_a = sid(); + let s_b = sid(); + let _g_a = install_top_level( + "a".into(), + std::process::id(), + sock("a"), + vec![write_rule("/work/a", true)], + s_a, + ) + .unwrap(); + let _g_b = install_top_level( + "b".into(), + std::process::id(), + sock("b"), + vec![write_rule("/work/b", true)], + s_b, + ) + .unwrap(); + // `a` cannot adopt b's live session id. + let err = update_session("a", s_b).unwrap_err(); + match err { + ScopeLockError::SessionConflict { + pod_name, + session_id, + .. + } => { + assert_eq!(pod_name, "b"); + assert_eq!(session_id, s_b); + } + other => panic!("expected SessionConflict, got {other:?}"), + } + } +} diff --git a/crates/pod-registry/src/mutate.rs b/crates/pod-registry/src/mutate.rs new file mode 100644 index 00000000..643adc95 --- /dev/null +++ b/crates/pod-registry/src/mutate.rs @@ -0,0 +1,567 @@ +//! Mutating operations over the allocation table. All of these expect +//! the caller to hold a [`LockFileGuard`] for the registry's lock file. + +use std::io; +use std::path::PathBuf; + +use manifest::{Permission, ScopeRule}; +use session_store::SessionId; + +use crate::conflict::{find_conflict_owner, is_within_effective_write}; +use crate::error::ScopeLockError; +use crate::table::{Allocation, LockFileGuard}; + +/// Register a top-level Pod (started directly by a human, no +/// delegation parent). Reclaims stale entries before checking +/// conflicts so a crashed Pod's allocation doesn't block the new one. +/// +/// Rejects when another live allocation is already writing to +/// `session_id`, so two `restore_from_manifest` calls under different +/// `pod_name`s cannot both grab the same session log. +pub fn register_pod( + guard: &mut LockFileGuard, + pod_name: String, + pid: u32, + socket: PathBuf, + scope_allow: Vec, + session_id: SessionId, +) -> Result<(), ScopeLockError> { + reclaim_stale(guard); + if guard.data().find(&pod_name).is_some() { + return Err(ScopeLockError::DuplicatePodName(pod_name)); + } + if let Some(existing) = guard.data().find_by_session(session_id) { + return Err(ScopeLockError::SessionConflict { + session_id, + pod_name: existing.pod_name.clone(), + socket: existing.socket.clone(), + }); + } + for rule in scope_allow + .iter() + .filter(|r| r.permission == Permission::Write) + { + if let Some(competitor) = find_conflict_owner(guard.data(), rule, None) { + return Err(ScopeLockError::WriteConflict { + competitor, + rule: rule.clone(), + }); + } + } + guard.data_mut().allocations.push(Allocation { + pod_name, + pid, + socket, + scope_allow, + delegated_from: None, + session_id: Some(session_id), + }); + guard.save()?; + Ok(()) +} + +/// Register a spawned Pod whose scope is delegated from `spawner`. +/// The requested scope must be within `spawner`'s effective write +/// scope; overlap with any Pod other than `spawner` is a conflict. +pub fn delegate_scope( + guard: &mut LockFileGuard, + spawner: &str, + spawned: String, + pid: u32, + socket: PathBuf, + scope_allow: Vec, +) -> Result<(), ScopeLockError> { + reclaim_stale(guard); + if guard.data().find(&spawned).is_some() { + return Err(ScopeLockError::DuplicatePodName(spawned)); + } + if guard.data().find(spawner).is_none() { + return Err(ScopeLockError::UnknownPod(spawner.into())); + } + for rule in &scope_allow { + if !is_within_effective_write(guard.data(), spawner, rule) { + return Err(ScopeLockError::NotSubset { + spawner: spawner.into(), + rule: rule.clone(), + }); + } + if rule.permission == Permission::Write { + if let Some(competitor) = find_conflict_owner(guard.data(), rule, Some(spawner)) { + return Err(ScopeLockError::WriteConflict { + competitor, + rule: rule.clone(), + }); + } + } + } + guard.data_mut().allocations.push(Allocation { + pod_name: spawned, + pid, + socket, + scope_allow, + delegated_from: Some(spawner.into()), + // Pre-reservation. The child fills in its own session_id when + // it calls `adopt_allocation` after the worker is built. + session_id: None, + }); + guard.save()?; + Ok(()) +} + +/// Remove a Pod's allocation. Surviving children are reparented to +/// the removed Pod's own `delegated_from`, so the delegation tree +/// stays connected. +pub fn release_pod(guard: &mut LockFileGuard, pod_name: &str) -> Result<(), ScopeLockError> { + let idx = guard + .data() + .allocations + .iter() + .position(|a| a.pod_name == pod_name); + let Some(idx) = idx else { + return Err(ScopeLockError::UnknownPod(pod_name.into())); + }; + let removed = guard.data().allocations[idx].clone(); + for alloc in guard.data_mut().allocations.iter_mut() { + if alloc.delegated_from.as_deref() == Some(pod_name) { + alloc.delegated_from.clone_from(&removed.delegated_from); + } + } + guard.data_mut().allocations.remove(idx); + guard.save()?; + Ok(()) +} + +/// Remove allocations whose PID is dead, reparenting children to the +/// dead Pod's `delegated_from`. Idempotent and best-effort — I/O +/// errors on save are swallowed so a crashed Pod's entry never blocks +/// forward progress. +pub fn reclaim_stale(guard: &mut LockFileGuard) { + reclaim_stale_with(guard, pid_alive); +} + +/// Test seam: stale reclaim with a caller-supplied liveness probe. +pub fn reclaim_stale_with(guard: &mut LockFileGuard, mut is_alive: impl FnMut(u32) -> bool) { + let dead: Vec = guard + .data() + .allocations + .iter() + .filter(|a| !is_alive(a.pid)) + .map(|a| a.pod_name.clone()) + .collect(); + if dead.is_empty() { + return; + } + for name in &dead { + let Some(idx) = guard + .data() + .allocations + .iter() + .position(|a| a.pod_name == *name) + else { + continue; + }; + let removed = guard.data().allocations[idx].clone(); + for alloc in guard.data_mut().allocations.iter_mut() { + if alloc.delegated_from.as_deref() == Some(name.as_str()) { + alloc.delegated_from.clone_from(&removed.delegated_from); + } + } + guard.data_mut().allocations.remove(idx); + } + let _ = guard.save(); +} + +/// `kill(pid, 0)` — returns true if the process exists (even when we +/// don't own it), false only on ESRCH. +fn pid_alive(pid: u32) -> bool { + if pid == 0 { + return false; + } + let ret = unsafe { libc::kill(pid as libc::pid_t, 0) }; + if ret == 0 { + return true; + } + io::Error::last_os_error() + .raw_os_error() + .map(|e| e != libc::ESRCH) + .unwrap_or(false) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::is_within_effective_write; + use crate::test_util::*; + use tempfile::TempDir; + + #[test] + fn register_detects_write_conflict() { + let dir = TempDir::new().unwrap(); + let path = dir.path().join("pods.json"); + let mut g = open_empty(&path); + register_pod( + &mut g, + "a".into(), + std::process::id(), + sock("a"), + vec![write_rule("/src", true)], + sid(), + ) + .unwrap(); + let err = register_pod( + &mut g, + "b".into(), + std::process::id(), + sock("b"), + vec![write_rule("/src/core", true)], + sid(), + ) + .unwrap_err(); + match err { + ScopeLockError::WriteConflict { competitor, .. } => assert_eq!(competitor, "a"), + other => panic!("expected WriteConflict, got {other:?}"), + } + } + + #[test] + fn duplicate_pod_name_rejected() { + let dir = TempDir::new().unwrap(); + let path = dir.path().join("pods.json"); + let mut g = open_empty(&path); + register_pod( + &mut g, + "a".into(), + std::process::id(), + sock("a"), + vec![write_rule("/src", true)], + sid(), + ) + .unwrap(); + let err = register_pod( + &mut g, + "a".into(), + std::process::id(), + sock("a2"), + vec![write_rule("/docs", true)], + sid(), + ) + .unwrap_err(); + assert!(matches!(err, ScopeLockError::DuplicatePodName(ref n) if n == "a")); + } + + #[test] + fn delegate_must_be_subset() { + let dir = TempDir::new().unwrap(); + let path = dir.path().join("pods.json"); + let mut g = open_empty(&path); + register_pod( + &mut g, + "a".into(), + std::process::id(), + sock("a"), + vec![write_rule("/src", true)], + sid(), + ) + .unwrap(); + let err = delegate_scope( + &mut g, + "a", + "b".into(), + std::process::id(), + sock("b"), + vec![write_rule("/docs", true)], + ) + .unwrap_err(); + assert!(matches!(err, ScopeLockError::NotSubset { .. })); + } + + #[test] + fn delegate_succeeds_within_parent_scope() { + let dir = TempDir::new().unwrap(); + let path = dir.path().join("pods.json"); + let mut g = open_empty(&path); + register_pod( + &mut g, + "a".into(), + std::process::id(), + sock("a"), + vec![write_rule("/src", true)], + sid(), + ) + .unwrap(); + delegate_scope( + &mut g, + "a", + "b".into(), + std::process::id(), + sock("b"), + vec![write_rule("/src/core", true)], + ) + .unwrap(); + assert_eq!(g.data().allocations.len(), 2); + // A's effective write no longer covers /src/core because B has it. + assert!(!is_within_effective_write( + g.data(), + "a", + &write_rule("/src/core", true) + )); + // A still covers its own uninvolved areas. + assert!(is_within_effective_write( + g.data(), + "a", + &write_rule("/src/other", true) + )); + } + + #[test] + fn delegate_rejects_sibling_overlap() { + let dir = TempDir::new().unwrap(); + let path = dir.path().join("pods.json"); + let mut g = open_empty(&path); + register_pod( + &mut g, + "a".into(), + std::process::id(), + sock("a"), + vec![write_rule("/src", true)], + sid(), + ) + .unwrap(); + delegate_scope( + &mut g, + "a", + "b".into(), + std::process::id(), + sock("b"), + vec![write_rule("/src/core", true)], + ) + .unwrap(); + // Sibling C from A tries to take /src/core/sub — already under B's scope. + let err = delegate_scope( + &mut g, + "a", + "c".into(), + std::process::id(), + sock("c"), + vec![write_rule("/src/core/sub", true)], + ) + .unwrap_err(); + // NotSubset fires first because /src/core is no longer in A's effective. + assert!(matches!(err, ScopeLockError::NotSubset { .. })); + } + + #[test] + fn release_reparents_children() { + let dir = TempDir::new().unwrap(); + let path = dir.path().join("pods.json"); + let mut g = open_empty(&path); + register_pod( + &mut g, + "a".into(), + std::process::id(), + sock("a"), + vec![write_rule("/src", true)], + sid(), + ) + .unwrap(); + delegate_scope( + &mut g, + "a", + "b".into(), + std::process::id(), + sock("b"), + vec![write_rule("/src/core", true)], + ) + .unwrap(); + delegate_scope( + &mut g, + "b", + "d".into(), + std::process::id(), + sock("d"), + vec![write_rule("/src/core/x", true)], + ) + .unwrap(); + release_pod(&mut g, "b").unwrap(); + // D should now list A as its delegated_from. + let d = g.data().find("d").unwrap(); + assert_eq!(d.delegated_from.as_deref(), Some("a")); + assert!(g.data().find("b").is_none()); + } + + #[test] + fn reclaim_stale_reparents_and_removes_dead_entries() { + let dir = TempDir::new().unwrap(); + let path = dir.path().join("pods.json"); + let mut g = open_empty(&path); + register_pod( + &mut g, + "a".into(), + std::process::id(), + sock("a"), + vec![write_rule("/src", true)], + sid(), + ) + .unwrap(); + delegate_scope( + &mut g, + "a", + "b".into(), + std::process::id(), + sock("b"), + vec![write_rule("/src/core", true)], + ) + .unwrap(); + delegate_scope( + &mut g, + "b", + "d".into(), + std::process::id(), + sock("d"), + vec![write_rule("/src/core/x", true)], + ) + .unwrap(); + // Simulate B crashing by rewriting its pid to one the probe + // will treat as dead. + let fake_dead_pid: u32 = 0xffff_fff0; + for alloc in g.data_mut().allocations.iter_mut() { + if alloc.pod_name == "b" { + alloc.pid = fake_dead_pid; + } + } + reclaim_stale_with(&mut g, |pid| pid != fake_dead_pid); + assert!(g.data().find("b").is_none()); + let d = g.data().find("d").unwrap(); + assert_eq!(d.delegated_from.as_deref(), Some("a")); + } + + #[test] + fn read_rules_do_not_conflict_with_write() { + let dir = TempDir::new().unwrap(); + let path = dir.path().join("pods.json"); + let mut g = open_empty(&path); + register_pod( + &mut g, + "a".into(), + std::process::id(), + sock("a"), + vec![write_rule("/src", true)], + sid(), + ) + .unwrap(); + // B only reads under the same tree — allowed. + register_pod( + &mut g, + "b".into(), + std::process::id(), + sock("b"), + vec![read_rule("/src", true)], + sid(), + ) + .unwrap(); + assert_eq!(g.data().allocations.len(), 2); + } + + #[test] + fn releasing_pod_reopens_scope_for_fresh_registration() { + let dir = TempDir::new().unwrap(); + let path = dir.path().join("pods.json"); + let mut g = open_empty(&path); + register_pod( + &mut g, + "a".into(), + std::process::id(), + sock("a"), + vec![write_rule("/src", true)], + sid(), + ) + .unwrap(); + release_pod(&mut g, "a").unwrap(); + register_pod( + &mut g, + "b".into(), + std::process::id(), + sock("b"), + vec![write_rule("/src", true)], + sid(), + ) + .unwrap(); + } + + #[test] + fn delegated_scope_returns_to_parent_on_release() { + let dir = TempDir::new().unwrap(); + let path = dir.path().join("pods.json"); + let mut g = open_empty(&path); + register_pod( + &mut g, + "a".into(), + std::process::id(), + sock("a"), + vec![write_rule("/src", true)], + sid(), + ) + .unwrap(); + delegate_scope( + &mut g, + "a", + "b".into(), + std::process::id(), + sock("b"), + vec![write_rule("/src/core", true)], + ) + .unwrap(); + assert!(!is_within_effective_write( + g.data(), + "a", + &write_rule("/src/core", true) + )); + release_pod(&mut g, "b").unwrap(); + // /src/core is back in A's effective write scope. + assert!(is_within_effective_write( + g.data(), + "a", + &write_rule("/src/core", true) + )); + } + + #[test] + fn register_pod_rejects_session_id_collision() { + let dir = TempDir::new().unwrap(); + let path = dir.path().join("pods.json"); + let mut g = open_empty(&path); + let shared_session = sid(); + register_pod( + &mut g, + "first".into(), + std::process::id(), + sock("first"), + vec![write_rule("/work/a", true)], + shared_session, + ) + .unwrap(); + // Second registration tries to grab the same session_id under + // a different pod_name. Without the SessionConflict check both + // would succeed and race on the same jsonl. + let err = register_pod( + &mut g, + "second".into(), + std::process::id(), + sock("second"), + vec![write_rule("/work/b", true)], + shared_session, + ) + .unwrap_err(); + match err { + ScopeLockError::SessionConflict { + session_id, + pod_name, + .. + } => { + assert_eq!(session_id, shared_session); + assert_eq!(pod_name, "first"); + } + other => panic!("expected SessionConflict, got {other:?}"), + } + } +} diff --git a/crates/pod-registry/src/table.rs b/crates/pod-registry/src/table.rs new file mode 100644 index 00000000..5a6424d8 --- /dev/null +++ b/crates/pod-registry/src/table.rs @@ -0,0 +1,259 @@ +//! On-disk allocation table and the `flock`-protected guard. + +use std::fs::{DirBuilder, File, OpenOptions}; +use std::io::{self, Read, Seek, SeekFrom, Write}; +use std::os::unix::fs::{DirBuilderExt, OpenOptionsExt}; +use std::path::{Path, PathBuf}; + +use fs4::fs_std::FileExt; +use manifest::{ScopeRule, paths}; +use serde::{Deserialize, Serialize}; +use session_store::SessionId; + +/// On-disk representation of the allocation table. +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct LockFile { + #[serde(default)] + pub allocations: Vec, +} + +/// One Pod's scope allocation. +/// +/// `scope_allow` is the full set of allow rules the Pod was granted. +/// Portions delegated out to child Pods are **not** subtracted in +/// storage — the effective write scope is derived on the fly by +/// removing rules owned by any Pod whose `delegated_from` points to +/// this one. Keeping the raw allow set makes reparenting (stale +/// reclaim) trivial. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Allocation { + /// Pod name — also the identity used throughout orchestration. + pub pod_name: String, + /// Owning process. Checked with `kill(pid, 0)` for stale detection. + pub pid: u32, + /// Pod's Unix socket path. + pub socket: PathBuf, + /// Allow rules granted to this Pod (write + read). + pub scope_allow: Vec, + /// Name of the Pod that delegated scope to this one, or `None` for + /// a top-level Pod started directly by a human. + pub delegated_from: Option, + /// Session ID this Pod is currently writing to. `None` means this + /// is a pre-reservation made by a spawner via [`crate::delegate_scope`] + /// before the child has come up; the child fills it in at + /// [`crate::adopt_allocation`] time. + #[serde(default)] + pub session_id: Option, +} + +impl LockFile { + pub fn find(&self, pod_name: &str) -> Option<&Allocation> { + self.allocations.iter().find(|a| a.pod_name == pod_name) + } + + pub fn find_mut(&mut self, pod_name: &str) -> Option<&mut Allocation> { + self.allocations.iter_mut().find(|a| a.pod_name == pod_name) + } + + /// Find the allocation currently writing to `session_id`. Skips + /// pre-reservations whose `session_id` is still `None`. + pub fn find_by_session(&self, session_id: SessionId) -> Option<&Allocation> { + self.allocations + .iter() + .find(|a| a.session_id == Some(session_id)) + } +} + +/// Default on-disk path: `/pods.json` resolved via +/// [`manifest::paths::pod_registry_path`]. Tests should point this +/// elsewhere by setting `INSOMNIA_HOME` or `INSOMNIA_RUNTIME_DIR` to a +/// tempdir. +pub fn default_registry_path() -> io::Result { + paths::pod_registry_path().ok_or_else(|| { + io::Error::new( + io::ErrorKind::NotFound, + "could not resolve pods.json path (no INSOMNIA_HOME / \ + INSOMNIA_RUNTIME_DIR / XDG_RUNTIME_DIR / HOME)", + ) + }) +} + +/// RAII guard over an exclusively-locked lock file. +/// +/// The file is kept open for the lifetime of the guard; `flock(LOCK_EX)` +/// is released automatically on drop. Mutations go through +/// [`LockFileGuard::data_mut`] and are committed with +/// [`LockFileGuard::save`] before dropping — callers who mutate but +/// never call `save` leave the table unchanged, which is the right +/// behaviour for error paths. +pub struct LockFileGuard { + file: File, + data: LockFile, +} + +impl LockFileGuard { + /// Open the lock file at `path` (creating it + parent dirs if + /// needed), acquire an exclusive `flock`, then parse the contents. + /// + /// An empty file is treated as an empty allocation table. + /// + /// File is created with mode `0600` and its parent directory with + /// mode `0700` so no other user on the machine can read the + /// allocation table. Existing files/directories are left alone. + pub fn open(path: &Path) -> io::Result { + if let Some(parent) = path.parent() { + DirBuilder::new() + .recursive(true) + .mode(0o700) + .create(parent)?; + } + let file = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .truncate(false) + .mode(0o600) + .open(path)?; + FileExt::lock_exclusive(&file)?; + let mut this = Self { + file, + data: LockFile::default(), + }; + this.reload()?; + Ok(this) + } + + fn reload(&mut self) -> io::Result<()> { + self.file.seek(SeekFrom::Start(0))?; + let mut buf = String::new(); + self.file.read_to_string(&mut buf)?; + self.data = if buf.trim().is_empty() { + LockFile::default() + } else { + serde_json::from_str(&buf).map_err(|e| { + io::Error::new( + io::ErrorKind::InvalidData, + format!("pods.json parse error: {e}"), + ) + })? + }; + Ok(()) + } + + pub fn data(&self) -> &LockFile { + &self.data + } + + pub fn data_mut(&mut self) -> &mut LockFile { + &mut self.data + } + + /// Serialise `self.data` back to the file (truncate + rewrite). + pub fn save(&mut self) -> io::Result<()> { + let json = serde_json::to_vec_pretty(&self.data).map_err(io::Error::other)?; + self.file.seek(SeekFrom::Start(0))?; + self.file.set_len(0)?; + self.file.write_all(&json)?; + self.file.sync_data()?; + Ok(()) + } +} + +impl Drop for LockFileGuard { + fn drop(&mut self) { + let _ = FileExt::unlock(&self.file); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::register_pod; + use crate::test_util::*; + use tempfile::TempDir; + + #[test] + fn open_creates_empty_lock_file() { + let dir = TempDir::new().unwrap(); + let path = dir.path().join("pods.json"); + let guard = LockFileGuard::open(&path).unwrap(); + assert!(guard.data().allocations.is_empty()); + assert!(path.exists()); + } + + #[test] + fn open_creates_file_with_owner_only_permissions() { + use std::os::unix::fs::PermissionsExt; + let dir = TempDir::new().unwrap(); + let parent = dir.path().join("insomnia"); + let path = parent.join("pods.json"); + let _guard = LockFileGuard::open(&path).unwrap(); + let file_mode = std::fs::metadata(&path).unwrap().permissions().mode() & 0o777; + assert_eq!(file_mode, 0o600, "file mode = {file_mode:o}"); + let dir_mode = std::fs::metadata(&parent).unwrap().permissions().mode() & 0o777; + assert_eq!(dir_mode, 0o700, "dir mode = {dir_mode:o}"); + } + + #[test] + fn save_and_reopen_roundtrip() { + let dir = TempDir::new().unwrap(); + let path = dir.path().join("pods.json"); + { + let mut g = open_empty(&path); + register_pod( + &mut g, + "a".into(), + std::process::id(), + sock("a"), + vec![write_rule("/src", true)], + sid(), + ) + .unwrap(); + } + let guard = LockFileGuard::open(&path).unwrap(); + assert_eq!(guard.data().allocations.len(), 1); + assert_eq!(guard.data().allocations[0].pod_name, "a"); + } + + #[test] + fn find_by_session_skips_none_placeholders() { + let dir = TempDir::new().unwrap(); + let path = dir.path().join("pods.json"); + let mut g = open_empty(&path); + // Pre-reservation: delegate_scope leaves session_id = None + // until adopt_allocation rewrites it. find_by_session must not + // match those placeholders, otherwise a freshly-spawning child + // would shadow itself before it has even chosen a session. + register_pod( + &mut g, + "parent".into(), + std::process::id(), + sock("parent"), + vec![write_rule("/p", true)], + sid(), + ) + .unwrap(); + crate::delegate_scope( + &mut g, + "parent", + "child".into(), + std::process::id(), + sock("child"), + vec![write_rule("/p/sub", true)], + ) + .unwrap(); + + let target_session = sid(); + // The placeholder allocation has session_id = None and must + // not be returned for any lookup. + assert!(g.data().find_by_session(target_session).is_none()); + + // After adopt-style rewrite, the same allocation is now found. + g.data_mut() + .find_mut("child") + .unwrap() + .session_id = Some(target_session); + let found = g.data().find_by_session(target_session).unwrap(); + assert_eq!(found.pod_name, "child"); + } +} diff --git a/crates/pod-registry/src/test_util.rs b/crates/pod-registry/src/test_util.rs new file mode 100644 index 00000000..5d1609fd --- /dev/null +++ b/crates/pod-registry/src/test_util.rs @@ -0,0 +1,97 @@ +//! Shared test helpers for the pod-registry crate. +//! +//! Visible to all `#[cfg(test)]` modules under `crate::test_util::*`. + +use std::path::{Path, PathBuf}; +use std::sync::{LazyLock, Mutex, MutexGuard}; + +use manifest::{Permission, ScopeRule}; +use session_store::SessionId; + +use crate::table::LockFileGuard; + +pub(crate) fn sid() -> SessionId { + session_store::new_session_id() +} + +/// Serialises tests that mutate runtime-dir env vars. The test +/// harness runs tests on multiple threads inside a single process, +/// so env-var writes from one test would otherwise leak into a +/// parallel test's `default_registry_path()` lookup. +pub(crate) static ENV_LOCK: LazyLock> = LazyLock::new(|| Mutex::new(())); + +/// Sandbox `INSOMNIA_RUNTIME_DIR` to a tempdir for the duration of +/// a test; restore the previous value (and any `INSOMNIA_HOME` / +/// `XDG_RUNTIME_DIR` that would otherwise outrank it) on drop. +pub(crate) struct RuntimeDirSandbox { + prev_runtime: Option, + prev_home: Option, + prev_xdg: Option, + _guard: MutexGuard<'static, ()>, +} + +impl RuntimeDirSandbox { + pub(crate) fn new(dir: &Path) -> Self { + let guard = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner()); + let prev_runtime = std::env::var("INSOMNIA_RUNTIME_DIR").ok(); + let prev_home = std::env::var("INSOMNIA_HOME").ok(); + let prev_xdg = std::env::var("XDG_RUNTIME_DIR").ok(); + // SAFETY: ENV_LOCK serialises env writes across this test + // module; other modules that touch env vars rely on their + // own lock or `serial_test`. + unsafe { + std::env::remove_var("INSOMNIA_HOME"); + std::env::remove_var("XDG_RUNTIME_DIR"); + std::env::set_var("INSOMNIA_RUNTIME_DIR", dir); + } + Self { + prev_runtime, + prev_home, + prev_xdg, + _guard: guard, + } + } +} + +impl Drop for RuntimeDirSandbox { + fn drop(&mut self) { + unsafe { + match &self.prev_runtime { + Some(v) => std::env::set_var("INSOMNIA_RUNTIME_DIR", v), + None => std::env::remove_var("INSOMNIA_RUNTIME_DIR"), + } + match &self.prev_home { + Some(v) => std::env::set_var("INSOMNIA_HOME", v), + None => std::env::remove_var("INSOMNIA_HOME"), + } + match &self.prev_xdg { + Some(v) => std::env::set_var("XDG_RUNTIME_DIR", v), + None => std::env::remove_var("XDG_RUNTIME_DIR"), + } + } + } +} + +pub(crate) fn write_rule(path: &str, recursive: bool) -> ScopeRule { + ScopeRule { + target: PathBuf::from(path), + permission: Permission::Write, + recursive, + } +} + +pub(crate) fn read_rule(path: &str, recursive: bool) -> ScopeRule { + ScopeRule { + target: PathBuf::from(path), + permission: Permission::Read, + recursive, + } +} + +pub(crate) fn sock(name: &str) -> PathBuf { + PathBuf::from(format!("/tmp/{name}.sock")) +} + +pub(crate) fn open_empty(path: &Path) -> LockFileGuard { + LockFileGuard::open(path).unwrap() +}