diff --git a/Cargo.lock b/Cargo.lock index 1d4e0307..2169080c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1912,6 +1912,7 @@ dependencies = [ "minijinja", "protocol", "provider", + "schemars", "serde", "serde_json", "session-store", diff --git a/TODO.md b/TODO.md index ecf33eb3..4de94b20 100644 --- a/TODO.md +++ b/TODO.md @@ -5,7 +5,6 @@ - [ ] Protocol の設計 → [tickets/protocol-design.md](tickets/protocol-design.md) - [ ] パーミッション: パターンベースのツール実行制御 → [tickets/permission-extension-point.md](tickets/permission-extension-point.md) - [ ] Pod オーケストレーション - - [ ] SpawnPod ツール: LLM から Pod を生成 → [tickets/spawn-pod-tool.md](tickets/spawn-pod-tool.md) - [ ] Pod 間通信ツール: SendToPod / ReadPodOutput / StopPod / ListPods → [tickets/pod-comm-tools.md](tickets/pod-comm-tools.md) - [ ] Pod 間コールバック通知 → [tickets/pod-callback.md](tickets/pod-callback.md) - [ ] 動的 Scope 変更 → [tickets/dynamic-scope.md](tickets/dynamic-scope.md) diff --git a/crates/pod/Cargo.toml b/crates/pod/Cargo.toml index 7e30e2b9..71a78071 100644 --- a/crates/pod/Cargo.toml +++ b/crates/pod/Cargo.toml @@ -15,7 +15,7 @@ provider = { version = "0.1.0", path = "../provider" } serde = { version = "1.0.228", features = ["derive"] } serde_json = "1.0.149" thiserror = "2.0" -tokio = { version = "1.49", features = ["fs", "io-util", "macros", "net", "rt-multi-thread", "signal", "sync"] } +tokio = { version = "1.49", features = ["fs", "io-util", "macros", "net", "process", "rt-multi-thread", "signal", "sync", "time"] } toml = "1.1.2" tracing = "0.1.44" tools = { version = "0.1.0", path = "../tools" } @@ -24,6 +24,7 @@ chrono = "0.4.44" include_dir = "0.7.4" fs4 = { version = "0.13.1", features = ["sync"] } libc = "0.2.185" +schemars = "1.2.1" [dev-dependencies] async-trait = "0.1.89" diff --git a/crates/pod/src/controller.rs b/crates/pod/src/controller.rs index 71385316..c0a95631 100644 --- a/crates/pod/src/controller.rs +++ b/crates/pod/src/controller.rs @@ -12,6 +12,7 @@ use crate::pod::{Pod, PodError, PodRunResult}; use crate::runtime_dir::RuntimeDir; use crate::shared_state::{PodSharedState, PodStatus}; use crate::socket_server::SocketServer; +use crate::spawn_pod::spawn_pod_tool; use protocol::{ErrorCode, Event, Method, NotificationLevel, NotificationSource, RunResult, TurnResult}; // --------------------------------------------------------------------------- @@ -107,6 +108,7 @@ impl PodController { // can build a `ScopedFs` for the builtin tools. let scope_for_tools = pod.scope().clone(); let pwd_for_tools = pod.pwd().to_path_buf(); + let spawner_name = pod.manifest().pod.name.clone(); // Register event bridge callbacks on the worker { @@ -198,9 +200,22 @@ impl PodController { // also handed to the Pod itself so Pod-level operations (e.g. // context compaction) can ask which files the agent has been // touching. - let fs = tools::ScopedFs::new(scope_for_tools, pwd_for_tools); + let fs = tools::ScopedFs::new(scope_for_tools, pwd_for_tools.clone()); let tracker = tools::Tracker::new(); worker.register_tools(tools::builtin_tools(fs, tracker.clone())); + + // SpawnPod is wired here rather than in `tools::builtin_tools` + // because it needs Pod-scoped handles (this Pod's own socket + // path, runtime_dir, spawner name) that the generic tools + // crate has no access to. + let spawner_socket = runtime_dir.socket_path(); + worker.register_tool(spawn_pod_tool( + spawner_name, + spawner_socket, + runtime_base.to_path_buf(), + pwd_for_tools, + runtime_dir.clone(), + )); pod.attach_tracker(tracker); } @@ -466,15 +481,17 @@ where manifest::ProviderKind::Gemini => "gemini", manifest::ProviderKind::Ollama => "ollama", }; - // The tool list mirrors `builtin_tools`. A fresh `ScopedFs`/`Tracker` - // is instantiated only to invoke the factories for name extraction; - // the instances themselves are discarded. + // The tool list mirrors what `spawn()` registers on the Worker: + // builtin filesystem tools plus `SpawnPod`. `SpawnPod` is appended + // by name because constructing its factory here would require + // Pod-lifetime handles we haven't built yet (runtime_dir, socket). let fs = tools::ScopedFs::new(pod.scope().clone(), pod.pwd().to_path_buf()); let tracker = tools::Tracker::new(); - let tool_names = tools::builtin_tools(fs, tracker) + let mut tool_names: Vec = tools::builtin_tools(fs, tracker) .iter() .map(|def| def().0.name) .collect(); + tool_names.push("SpawnPod".into()); protocol::Greeting { pod_name: manifest.pod.name.clone(), cwd: pod.pwd().display().to_string(), diff --git a/crates/pod/src/lib.rs b/crates/pod/src/lib.rs index adf8b26a..fd72103d 100644 --- a/crates/pod/src/lib.rs +++ b/crates/pod/src/lib.rs @@ -5,6 +5,7 @@ pub mod runtime_dir; pub mod scope_lock; pub mod shared_state; pub mod socket_server; +pub mod spawn_pod; mod agents_md; mod compact_state; diff --git a/crates/pod/src/main.rs b/crates/pod/src/main.rs index 484d084d..4b71737a 100644 --- a/crates/pod/src/main.rs +++ b/crates/pod/src/main.rs @@ -36,6 +36,17 @@ struct Cli { /// `~/.insomnia/sessions/`. #[arg(short, long)] store: Option, + + /// Claim a scope allocation pre-registered by a spawning Pod, rather + /// than installing a new top-level allocation. Used only when this + /// process is launched by `SpawnPod`; end users should never pass it. + #[arg(long)] + adopt: bool, + + /// Socket path of the spawning Pod, for delivering `Method::Notify` + /// callbacks upward. Required alongside `--adopt`. + #[arg(long, value_name = "PATH", requires = "adopt")] + callback: Option, } fn default_store_dir() -> Result { @@ -140,11 +151,28 @@ async fn main() -> ExitCode { } }; - let pod = match Pod::from_manifest(manifest, store, loader).await { - Ok(p) => p, - Err(e) => { - eprintln!("error: failed to create pod: {e}"); - return ExitCode::FAILURE; + let pod = if cli.adopt { + let callback = match cli.callback.clone() { + Some(p) => p, + None => { + eprintln!("error: --adopt requires --callback"); + return ExitCode::FAILURE; + } + }; + match Pod::from_manifest_spawned(manifest, store, loader, callback).await { + Ok(p) => p, + Err(e) => { + eprintln!("error: failed to create spawned pod: {e}"); + return ExitCode::FAILURE; + } + } + } else { + match Pod::from_manifest(manifest, store, loader).await { + Ok(p) => p, + Err(e) => { + eprintln!("error: failed to create pod: {e}"); + return ExitCode::FAILURE; + } } }; let pod_name = pod.manifest().pod.name.clone(); diff --git a/crates/pod/src/pod.rs b/crates/pod/src/pod.rs index 8288373c..cac8eab1 100644 --- a/crates/pod/src/pod.rs +++ b/crates/pod/src/pod.rs @@ -114,6 +114,13 @@ pub struct Pod { /// releases the allocation when the Pod is dropped. #[allow(dead_code)] scope_allocation: Option, + /// Socket path of the spawning Pod. `Some` only for Pods built via + /// `from_manifest_spawned`. The callback is consumed by the + /// `pod-callback` layer (separate ticket) to deliver + /// `Method::Notify` back to the spawner; stored here so the Pod + /// carries the reference for the duration of its life. + #[allow(dead_code)] + callback_socket: Option, } impl Pod { @@ -156,6 +163,7 @@ impl Pod { notifier: None, pending_notifications: NotificationBuffer::new(), scope_allocation: None, + callback_socket: None, }; pod.apply_prune_from_manifest(); Ok(pod) @@ -206,6 +214,7 @@ impl Pod { notifier: None, pending_notifications: NotificationBuffer::new(), scope_allocation: None, + callback_socket: None, }; pod.apply_prune_from_manifest(); Ok(pod) @@ -936,6 +945,66 @@ impl Pod, St> { notifier: None, pending_notifications: NotificationBuffer::new(), scope_allocation: Some(scope_allocation), + callback_socket: None, + }; + pod.apply_prune_from_manifest(); + Ok(pod) + } + + /// Build a Pod spawned by another Pod (sibling process). + /// + /// Behaves like [`Pod::from_manifest`] but claims the scope + /// allocation that the spawner pre-registered via + /// [`scope_lock::delegate_scope`], rather than installing a new + /// top-level entry. `callback_socket` carries the spawner's + /// Unix-socket path so the spawned Pod can send `Method::Notify` + /// back to the spawner; it is stored but unused in the + /// `spawn-pod-tool` ticket — the receiving side lands in the + /// follow-up `pod-callback` ticket. + pub async fn from_manifest_spawned( + manifest: PodManifest, + store: St, + loader: PromptLoader, + callback_socket: PathBuf, + ) -> Result { + let pwd = resolve_pwd(&manifest.pod.pwd)?; + let scope = Scope::from_config(&manifest.scope, &pwd).map_err(PodError::Scope)?; + if !scope.is_readable(&pwd) { + return Err(PodError::PwdOutsideScope { pwd }); + } + + let scope_allocation = + scope_lock::adopt_allocation(manifest.pod.name.clone(), std::process::id())?; + + let client = provider::build_client(&manifest.provider)?; + let mut worker = Worker::new(client); + apply_worker_manifest(&mut worker, &manifest.worker); + + let system_prompt_template = Some( + SystemPromptTemplate::parse(&manifest.worker.instruction, loader) + .map_err(|source| PodError::InvalidSystemPromptTemplate { source })?, + ); + + let session_id = session_store::new_session_id(); + let mut pod = Self { + manifest, + worker: Some(worker), + store, + session_id, + head_hash: None, + pwd, + scope, + hook_builder: HookRegistryBuilder::new(), + interceptor_installed: false, + compact_state: None, + usage_tracker: Arc::new(UsageTracker::new()), + usage_history: Arc::new(Mutex::new(Vec::new())), + tracker: None, + system_prompt_template, + notifier: None, + pending_notifications: NotificationBuffer::new(), + scope_allocation: Some(scope_allocation), + callback_socket: Some(callback_socket), }; pod.apply_prune_from_manifest(); Ok(pod) diff --git a/crates/pod/src/runtime_dir.rs b/crates/pod/src/runtime_dir.rs index 3a12da99..2028a95c 100644 --- a/crates/pod/src/runtime_dir.rs +++ b/crates/pod/src/runtime_dir.rs @@ -1,10 +1,30 @@ use std::io; use std::path::{Path, PathBuf}; +use manifest::ScopeRule; +use serde::{Deserialize, Serialize}; use tokio::fs; use crate::shared_state::PodSharedState; +/// One spawned-child record persisted to `spawned_pods.json`. +/// +/// Written by the spawner after a successful `SpawnPod` tool call so +/// `ListPods` (future ticket) and a restored spawner can enumerate +/// their live children without re-querying `scope.lock`. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SpawnedPodRecord { + /// Spawned Pod's identity. + pub pod_name: String, + /// Spawned Pod's Unix socket path. + pub socket_path: PathBuf, + /// Scope allow rules delegated to the spawned Pod. + pub scope_delegated: Vec, + /// Socket path the spawned Pod was told to use for callbacks + /// (= this Pod's own socket when spawn happened). + pub callback_address: PathBuf, +} + /// Manages the Pod's runtime directory on tmpfs. /// /// ```text @@ -60,6 +80,17 @@ impl RuntimeDir { atomic_write(&self.path.join("history.json"), content.as_bytes()).await } + /// Write `spawned_pods.json` atomically. The entries are the full + /// set of spawned children known to this Pod — callers pass the + /// replacement list, no incremental merge. + pub async fn write_spawned_pods( + &self, + records: &[SpawnedPodRecord], + ) -> Result<(), io::Error> { + let json = serde_json::to_vec_pretty(records).map_err(io::Error::other)?; + atomic_write(&self.path.join("spawned_pods.json"), &json).await + } + /// Path to this Pod's runtime directory. pub fn path(&self) -> &Path { &self.path @@ -173,6 +204,30 @@ mod tests { assert_eq!(content, "[pod]\nname = \"test\""); } + #[tokio::test] + async fn write_spawned_pods_creates_file() { + use manifest::{Permission, ScopeRule}; + let tmp = tempfile::tempdir().unwrap(); + let rt = RuntimeDir::create(tmp.path(), "my-pod").await.unwrap(); + + let records = vec![SpawnedPodRecord { + pod_name: "child".into(), + socket_path: "/run/insomnia/child/sock".into(), + scope_delegated: vec![ScopeRule { + target: "/tmp/work".into(), + permission: Permission::Write, + recursive: true, + }], + callback_address: "/run/insomnia/my-pod/sock".into(), + }]; + rt.write_spawned_pods(&records).await.unwrap(); + + let content = std::fs::read_to_string(rt.path().join("spawned_pods.json")).unwrap(); + let parsed: Vec = serde_json::from_str(&content).unwrap(); + assert_eq!(parsed.len(), 1); + assert_eq!(parsed[0].pod_name, "child"); + } + #[tokio::test] async fn write_history_creates_file() { let tmp = tempfile::tempdir().unwrap(); diff --git a/crates/pod/src/scope_lock.rs b/crates/pod/src/scope_lock.rs index 23702568..2f1f3913 100644 --- a/crates/pod/src/scope_lock.rs +++ b/crates/pod/src/scope_lock.rs @@ -460,6 +460,7 @@ fn pid_alive(pid: u32) -> bool { /// Owned allocation: on drop, opens the lock file and releases this /// Pod's entry. The guard keeps only the name + lock-file path; it /// does not hold the `flock` for the Pod's lifetime. +#[derive(Debug)] pub struct ScopeAllocationGuard { pod_name: String, lock_path: PathBuf, @@ -500,6 +501,32 @@ pub fn install_top_level( }) } +/// Take ownership of an existing allocation that was pre-registered by +/// a spawning Pod. +/// +/// The spawning flow is two-stage: the spawner calls [`delegate_scope`] +/// (with its own pid as a live placeholder), then exec's the child; the +/// child, once running, calls this function to rewrite the allocation's +/// pid to its own and claim the `ScopeAllocationGuard` so the entry is +/// released when the child exits. +pub fn adopt_allocation( + pod_name: String, + new_pid: u32, +) -> Result { + let lock_path = default_lock_path()?; + let mut guard = LockFileGuard::open(&lock_path)?; + let alloc = guard + .data_mut() + .find_mut(&pod_name) + .ok_or_else(|| ScopeLockError::UnknownPod(pod_name.clone()))?; + alloc.pid = new_pid; + guard.save()?; + Ok(ScopeAllocationGuard { + pod_name, + lock_path, + }) +} + /// Errors raised by the mutating scope-lock operations. #[derive(Debug, thiserror::Error)] pub enum ScopeLockError { @@ -528,8 +555,15 @@ pub enum ScopeLockError { mod tests { use super::*; use manifest::Permission; + use std::sync::{LazyLock, Mutex}; use tempfile::TempDir; + /// Serialises tests that mutate `INSOMNIA_SCOPE_LOCK`. The test + /// harness runs tests on multiple threads inside a single process, + /// so env-var writes from one test would otherwise leak into a + /// parallel test's `default_lock_path()` lookup. + static ENV_LOCK: LazyLock> = LazyLock::new(|| Mutex::new(())); + fn write_rule(path: &str, recursive: bool) -> ScopeRule { ScopeRule { target: PathBuf::from(path), @@ -943,12 +977,9 @@ mod tests { #[test] fn scope_allocation_guard_releases_on_drop() { + let _env = ENV_LOCK.lock().unwrap(); let dir = TempDir::new().unwrap(); let lock_path = dir.path().join("scope.lock"); - // Override the default path for the duration of the test. - // SAFETY: single-threaded inside each #[test]; concurrent tests - // that also touch INSOMNIA_SCOPE_LOCK are excluded by using - // per-test paths and not clearing the var until after drop. unsafe { std::env::set_var("INSOMNIA_SCOPE_LOCK", &lock_path); } @@ -973,6 +1004,65 @@ mod tests { } } + #[test] + fn adopt_allocation_rewrites_pid_and_releases_on_drop() { + let _env = ENV_LOCK.lock().unwrap(); + let dir = TempDir::new().unwrap(); + let lock_path = dir.path().join("scope.lock"); + unsafe { + std::env::set_var("INSOMNIA_SCOPE_LOCK", &lock_path); + } + // Pre-register an allocation under spawner's pid, as delegate_scope would. + { + let mut g = LockFileGuard::open(&lock_path).unwrap(); + delegate_placeholder(&mut g, "child", std::process::id()); + } + let child_pid = std::process::id().wrapping_add(1); + let guard = adopt_allocation("child".into(), child_pid).unwrap(); + { + let g = LockFileGuard::open(&lock_path).unwrap(); + let alloc = g.data().find("child").unwrap(); + assert_eq!(alloc.pid, child_pid); + } + drop(guard); + { + let g = LockFileGuard::open(&lock_path).unwrap(); + assert!(g.data().find("child").is_none()); + } + unsafe { + std::env::remove_var("INSOMNIA_SCOPE_LOCK"); + } + } + + #[test] + fn adopt_allocation_errors_on_unknown_pod() { + let _env = ENV_LOCK.lock().unwrap(); + let dir = TempDir::new().unwrap(); + let lock_path = dir.path().join("scope.lock"); + unsafe { + std::env::set_var("INSOMNIA_SCOPE_LOCK", &lock_path); + } + let err = adopt_allocation("ghost".into(), 42).unwrap_err(); + assert!(matches!(err, ScopeLockError::UnknownPod(ref n) if n == "ghost")); + unsafe { + std::env::remove_var("INSOMNIA_SCOPE_LOCK"); + } + } + + /// Mimic what the spawner does before the child comes up: push an + /// allocation for the child carrying the spawner's (live) pid as a + /// placeholder. Exists only in tests. + fn delegate_placeholder(g: &mut LockFileGuard, pod_name: &str, placeholder_pid: u32) { + g.data_mut().allocations.push(Allocation { + pod_name: pod_name.to_string(), + pid: placeholder_pid, + socket: sock(pod_name), + scope_allow: vec![write_rule("/tmp/child", true)], + delegated_from: None, + }); + g.save().unwrap(); + } + #[test] fn conflict_detection_descends_to_real_owner() { let dir = TempDir::new().unwrap(); diff --git a/crates/pod/src/spawn_pod.rs b/crates/pod/src/spawn_pod.rs new file mode 100644 index 00000000..564cda7c --- /dev/null +++ b/crates/pod/src/spawn_pod.rs @@ -0,0 +1,387 @@ +//! `SpawnPod` tool — launch a new Pod process as a child of this one. +//! +//! Wires scope-lock delegation, overlay-TOML construction, subprocess +//! launch, and socket handoff into a single `Tool` implementation. When +//! the LLM calls `SpawnPod`, a fresh `pod` binary is exec'd in its own +//! process group, the scope lock is updated atomically, and the child's +//! first turn is kicked off by handing its socket a `Method::Run`. + +use std::path::{Path, PathBuf}; +use std::process::Stdio; +use std::sync::Arc; +use std::time::Duration; + +use async_trait::async_trait; +use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput}; +use manifest::{ + Permission, PodManifestConfig, PodMetaConfig, ScopeConfig, ScopeRule, WorkerManifestConfig, +}; +use protocol::Method; +use protocol::stream::JsonLineWriter; +use serde::Deserialize; +use tokio::net::UnixStream; +use tokio::process::Command; +use tokio::sync::Mutex; +use tokio::time::sleep; + +use crate::runtime_dir::{RuntimeDir, SpawnedPodRecord}; +use crate::scope_lock::{self, LockFileGuard, ScopeLockError}; + +const DESCRIPTION: &str = "Spawn a new Pod process to work on a delegated task. \ +The spawner's write scope is reduced by the scope passed here; the spawned \ +Pod receives its own socket and starts running `task` immediately. The \ +spawned Pod outlives the spawner's current turn and can be contacted again \ +through its socket path."; + +const DEFAULT_INSTRUCTION: &str = "$insomnia/default"; + +/// How long we will wait for the spawned Pod's socket to become +/// connectable before treating the spawn as failed. +const SOCKET_WAIT_TIMEOUT: Duration = Duration::from_secs(10); + +#[derive(Debug, Deserialize, schemars::JsonSchema)] +struct SpawnPodInput { + /// Identifier for the spawned Pod. Must be unique machine-wide. + name: String, + /// Instruction-file reference (e.g. `$insomnia/default`, `$user/my-agent`). + #[serde(default)] + instruction: Option, + /// First message sent to the spawned Pod via `Method::Run`. + task: String, + /// Allow rules delegated to the spawned Pod. Must be a subset of the + /// spawner's effective write scope. + scope: Vec, +} + +#[derive(Debug, Deserialize, schemars::JsonSchema)] +struct ScopeRuleInput { + /// Absolute target path. Relative paths are rejected. + target: PathBuf, + /// `"read"` or `"write"`. + permission: PermissionInput, + /// When `false`, the rule matches the target itself and its direct + /// children only. Defaults to `true`. + #[serde(default = "default_true")] + recursive: bool, +} + +#[derive(Debug, Deserialize, schemars::JsonSchema, Clone, Copy)] +#[serde(rename_all = "lowercase")] +enum PermissionInput { + Read, + Write, +} + +fn default_true() -> bool { + true +} + +impl From for Permission { + fn from(p: PermissionInput) -> Self { + match p { + PermissionInput::Read => Permission::Read, + PermissionInput::Write => Permission::Write, + } + } +} + +/// Runtime dependencies the `SpawnPod` tool needs in order to launch a +/// child Pod and record the handoff locally. Constructed by the Pod +/// controller once per Pod lifetime. +pub struct SpawnPodTool { + /// Spawner's own pod name — becomes the spawned Pod's + /// `delegated_from` in the scope-lock registry. + spawner_name: String, + /// Path to the spawner's Unix socket. Handed to the child via + /// `--callback` so `Method::Notify` has somewhere to land. + callback_socket: PathBuf, + /// Root of the `$XDG_RUNTIME_DIR/insomnia/` tree, used to predict + /// the spawned Pod's socket path before the child has bound it. + runtime_base: PathBuf, + /// Directory the spawned Pod should run in when the LLM did not + /// override it. Defaults to the spawner's pwd — see module docs. + spawner_pwd: PathBuf, + /// Spawner's own runtime directory — target for `spawned_pods.json`. + runtime_dir: Arc, + /// Running list of successful spawns, replayed into + /// `spawned_pods.json` on every successful `execute`. + records: Arc>>, +} + +impl SpawnPodTool { + pub fn new( + spawner_name: String, + callback_socket: PathBuf, + runtime_base: PathBuf, + spawner_pwd: PathBuf, + runtime_dir: Arc, + ) -> Self { + Self { + spawner_name, + callback_socket, + runtime_base, + spawner_pwd, + runtime_dir, + records: Arc::new(Mutex::new(Vec::new())), + } + } +} + +#[async_trait] +impl Tool for SpawnPodTool { + async fn execute(&self, input_json: &str) -> Result { + let input: SpawnPodInput = serde_json::from_str(input_json) + .map_err(|e| ToolError::InvalidArgument(format!("invalid SpawnPod input: {e}")))?; + + // `delegate_scope` catches this too (as `DuplicatePodName`), but + // the dedicated message is kinder to the LLM — which gets the + // error back verbatim — than the generic duplicate-name error. + if input.name == self.spawner_name { + return Err(ToolError::InvalidArgument(format!( + "spawned pod name `{}` collides with spawner's own name", + input.name + ))); + } + + let scope_allow = parse_scope(&input.scope)?; + + let instruction = input + .instruction + .clone() + .unwrap_or_else(|| DEFAULT_INSTRUCTION.to_string()); + + let predicted_socket = self.runtime_base.join(&input.name).join("sock"); + let lock_path = scope_lock::default_lock_path() + .map_err(|e| ToolError::ExecutionFailed(format!("scope lock path: {e}")))?; + + // Reserve the allocation up front. Spawner's pid is a live + // placeholder; the child will rewrite it via `adopt_allocation`. + { + let mut guard = LockFileGuard::open(&lock_path) + .map_err(|e| ToolError::ExecutionFailed(format!("scope lock open: {e}")))?; + scope_lock::delegate_scope( + &mut guard, + &self.spawner_name, + input.name.clone(), + std::process::id(), + predicted_socket.clone(), + scope_allow.clone(), + ) + .map_err(scope_lock_err_to_tool)?; + } + + // `start_outcome` covers steps that happen before the child is + // observably alive (exec + socket bind). Once its socket is + // listening, the child owns the allocation and we must not roll + // it back — even if later steps (Method::Run delivery, record + // write) fail, the child is running and will release its own + // entry on exit. + let overlay_toml = + match build_overlay_toml(&input.name, &self.spawner_pwd, &instruction, &scope_allow) { + Ok(s) => s, + Err(e) => { + self.release_reservation(&lock_path, &input.name); + return Err(ToolError::ExecutionFailed(format!( + "overlay serialisation: {e}" + ))); + } + }; + + let start_outcome = self.exec_child(&overlay_toml, &predicted_socket).await; + if let Err(e) = start_outcome { + self.release_reservation(&lock_path, &input.name); + return Err(e); + } + + // Child is live. Post-start errors propagate but do not roll + // back the scope allocation — the child already owns it. + send_run(&predicted_socket, &input.task).await?; + + let record = SpawnedPodRecord { + pod_name: input.name.clone(), + socket_path: predicted_socket.clone(), + scope_delegated: scope_allow, + callback_address: self.callback_socket.clone(), + }; + { + let mut records = self.records.lock().await; + records.push(record); + self.runtime_dir + .write_spawned_pods(records.as_slice()) + .await + .map_err(|e| { + ToolError::ExecutionFailed(format!("write spawned_pods.json: {e}")) + })?; + } + + Ok(ToolOutput { + summary: format!( + "spawned pod `{}` listening on {}", + input.name, + predicted_socket.display() + ), + content: None, + }) + } +} + +impl SpawnPodTool { + async fn exec_child( + &self, + overlay_toml: &str, + predicted_socket: &Path, + ) -> Result<(), ToolError> { + let pod_command = std::env::var("INSOMNIA_POD_COMMAND").unwrap_or_else(|_| "pod".into()); + + let mut cmd = Command::new(&pod_command); + cmd.arg("--adopt") + .arg("--callback") + .arg(&self.callback_socket) + .arg("--overlay") + .arg(overlay_toml) + .stdin(Stdio::null()) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .process_group(0); + + let child = cmd + .spawn() + .map_err(|e| ToolError::ExecutionFailed(format!("failed to spawn `{pod_command}`: {e}")))?; + + // Default `kill_on_drop = false` keeps the process alive after + // the `Child` is dropped. We intentionally do not `.wait()` — + // when the spawner later exits, init adopts any remaining + // orphans. Lifecycle tracking lives in `spawned_pods.json`. + drop(child); + + wait_for_socket(predicted_socket, SOCKET_WAIT_TIMEOUT).await + } + + fn release_reservation(&self, lock_path: &Path, pod_name: &str) { + if let Ok(mut g) = LockFileGuard::open(lock_path) { + let _ = scope_lock::release_pod(&mut g, pod_name); + } + } +} + +fn parse_scope(rules: &[ScopeRuleInput]) -> Result, ToolError> { + if rules.is_empty() { + return Err(ToolError::InvalidArgument("scope must not be empty".into())); + } + rules + .iter() + .map(|r| { + if !r.target.is_absolute() { + return Err(ToolError::InvalidArgument(format!( + "scope.target must be absolute: {}", + r.target.display() + ))); + } + Ok(ScopeRule { + target: r.target.clone(), + permission: r.permission.into(), + recursive: r.recursive, + }) + }) + .collect() +} + +/// Serialise the overlay TOML that gets handed to the child `pod` +/// binary via `--overlay`. `PodManifestConfig`'s `Serialize` impl is +/// the single source of truth for the on-disk manifest format. +fn build_overlay_toml( + name: &str, + pwd: &Path, + instruction: &str, + scope_allow: &[ScopeRule], +) -> Result { + let overlay = PodManifestConfig { + pod: PodMetaConfig { + name: Some(name.to_string()), + pwd: Some(pwd.to_path_buf()), + }, + worker: WorkerManifestConfig { + instruction: Some(instruction.to_string()), + ..Default::default() + }, + scope: ScopeConfig { + allow: scope_allow.to_vec(), + deny: Vec::new(), + }, + ..Default::default() + }; + toml::to_string(&overlay) +} + +async fn wait_for_socket(path: &Path, timeout: Duration) -> Result<(), ToolError> { + let deadline = tokio::time::Instant::now() + timeout; + loop { + if path.exists() { + if let Ok(stream) = UnixStream::connect(path).await { + drop(stream); + return Ok(()); + } + } + if tokio::time::Instant::now() >= deadline { + return Err(ToolError::ExecutionFailed(format!( + "spawned pod socket did not appear within {timeout:?}: {}", + path.display() + ))); + } + sleep(Duration::from_millis(50)).await; + } +} + +async fn send_run(socket: &Path, task: &str) -> Result<(), ToolError> { + let stream = UnixStream::connect(socket) + .await + .map_err(|e| ToolError::ExecutionFailed(format!("connect {}: {e}", socket.display())))?; + let (_reader, writer) = stream.into_split(); + let mut w = JsonLineWriter::new(writer); + w.write(&Method::Run { + input: task.to_string(), + }) + .await + .map_err(|e| ToolError::ExecutionFailed(format!("send Method::Run: {e}")))?; + // Drop the writer to close the socket's write half. The flush + // inside `JsonLineWriter::write` has already pushed the bytes + // across, so the child will see a complete method line followed by + // EOF. + drop(w); + Ok(()) +} + +fn scope_lock_err_to_tool(e: ScopeLockError) -> ToolError { + match e { + ScopeLockError::NotSubset { .. } + | ScopeLockError::WriteConflict { .. } + | ScopeLockError::DuplicatePodName(_) + | ScopeLockError::UnknownPod(_) => ToolError::InvalidArgument(e.to_string()), + ScopeLockError::Io(_) => ToolError::ExecutionFailed(e.to_string()), + } +} + +/// Factory for the `SpawnPod` tool. +pub fn spawn_pod_tool( + spawner_name: String, + callback_socket: PathBuf, + runtime_base: PathBuf, + spawner_pwd: PathBuf, + runtime_dir: Arc, +) -> ToolDefinition { + Arc::new(move || { + let schema = schemars::schema_for!(SpawnPodInput); + let schema_value = serde_json::to_value(schema).unwrap_or(serde_json::json!({})); + let meta = ToolMeta::new("SpawnPod") + .description(DESCRIPTION) + .input_schema(schema_value); + let tool: Arc = Arc::new(SpawnPodTool::new( + spawner_name.clone(), + callback_socket.clone(), + runtime_base.clone(), + spawner_pwd.clone(), + runtime_dir.clone(), + )); + (meta, tool) + }) +} diff --git a/crates/pod/tests/spawn_pod_test.rs b/crates/pod/tests/spawn_pod_test.rs new file mode 100644 index 00000000..efcecd6d --- /dev/null +++ b/crates/pod/tests/spawn_pod_test.rs @@ -0,0 +1,308 @@ +//! Integration tests for the `SpawnPod` tool. +//! +//! These tests exercise the tool's scope-lock delegation, subprocess +//! launch, socket handoff, and `spawned_pods.json` write without relying +//! on the real `pod` binary. `INSOMNIA_POD_COMMAND` is pointed at +//! `/bin/true` (which exits immediately) while a test-owned Unix +//! listener pre-binds the predicted socket path, so the tool sees the +//! "child" as live. + +use std::path::{Path, PathBuf}; +use std::sync::{LazyLock, Mutex}; + +use llm_worker::tool::{ToolError, ToolOutput}; +use manifest::{Permission, ScopeRule}; +use pod::runtime_dir::{RuntimeDir, SpawnedPodRecord}; +use pod::scope_lock::{self, LockFileGuard}; +use pod::spawn_pod::spawn_pod_tool; +use protocol::Method; +use protocol::stream::JsonLineReader; +use serde_json::json; +use std::sync::Arc; +use tempfile::TempDir; +use tokio::net::UnixListener; + +/// Serialises tests that mutate `INSOMNIA_SCOPE_LOCK` / +/// `INSOMNIA_POD_COMMAND` across the thread-pooled test harness. +static ENV_LOCK: LazyLock> = LazyLock::new(|| Mutex::new(())); + +struct EnvGuard { + _lock: std::sync::MutexGuard<'static, ()>, +} + +impl EnvGuard { + fn acquire() -> Self { + Self { + _lock: ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner()), + } + } +} + +/// Set up a tempdir, point `INSOMNIA_SCOPE_LOCK` + runtime-dir base at +/// it, and install a live top-level "spawner" allocation so the tool +/// has something to delegate from. Returns the tempdir (keeps it alive +/// for the test's lifetime), runtime base, spawner socket, and the +/// spawner's runtime dir. +async fn setup_spawner( + spawner_name: &str, + allow_root: &Path, +) -> (TempDir, PathBuf, PathBuf, Arc) { + let tmp = TempDir::new().unwrap(); + let lock_path = tmp.path().join("scope.lock"); + unsafe { + std::env::set_var("INSOMNIA_SCOPE_LOCK", &lock_path); + } + + let runtime_base = tmp.path().join("runtime"); + let spawner_rd = RuntimeDir::create(&runtime_base, spawner_name) + .await + .unwrap(); + let spawner_socket = spawner_rd.socket_path(); + + let _guard = scope_lock::install_top_level( + spawner_name.into(), + std::process::id(), + spawner_socket.clone(), + vec![ScopeRule { + target: allow_root.to_path_buf(), + permission: Permission::Write, + recursive: true, + }], + ) + .unwrap(); + // Leak the guard — the spawner allocation needs to outlive the + // tool call. Dropping it would auto-release the allocation, which + // defeats the point of the test. + std::mem::forget(_guard); + + (tmp, runtime_base, spawner_socket, Arc::new(spawner_rd)) +} + +/// Bind a Unix listener at the path the tool will predict for the +/// spawned pod. The tool only needs the socket to accept a connection +/// and receive one `Method::Run` line; the returned `UnixListener` is +/// read from by the caller in a joined task. +async fn bind_mock_pod_socket(runtime_base: &Path, pod_name: &str) -> (PathBuf, UnixListener) { + let dir = runtime_base.join(pod_name); + tokio::fs::create_dir_all(&dir).await.unwrap(); + let socket = dir.join("sock"); + let listener = UnixListener::bind(&socket).unwrap(); + (socket, listener) +} + +/// Launch a tokio task that accepts connections until one carries a +/// `Method` line, then returns it. `wait_for_socket` inside the tool +/// makes a probe connection that carries no data, so the task must +/// tolerate an empty connection and keep listening. +fn accept_one_method( + listener: UnixListener, +) -> tokio::task::JoinHandle> { + tokio::spawn(async move { + loop { + let (stream, _) = listener.accept().await.ok()?; + let (reader, _writer) = stream.into_split(); + let mut r = JsonLineReader::new(reader); + if let Ok(Some(method)) = r.next::().await { + return Some(method); + } + } + }) +} + +fn point_pod_command_at_true() { + let path = which_true(); + unsafe { + std::env::set_var("INSOMNIA_POD_COMMAND", &path); + } +} + +/// `/bin/true` only exists on FHS-compliant systems. On Nix, resolve it +/// via PATH so the tests work regardless of distro. +fn which_true() -> String { + for dir in std::env::var_os("PATH") + .map(|p| std::env::split_paths(&p).collect::>()) + .unwrap_or_default() + { + let candidate = dir.join("true"); + if candidate.is_file() { + return candidate.to_string_lossy().into_owned(); + } + } + "/bin/true".into() +} + +fn clear_env() { + unsafe { + std::env::remove_var("INSOMNIA_SCOPE_LOCK"); + std::env::remove_var("INSOMNIA_POD_COMMAND"); + } +} + +#[tokio::test] +async fn spawn_pod_delegates_scope_and_sends_run() { + let _env = EnvGuard::acquire(); + + let allow_root = TempDir::new().unwrap(); + let (_tmp, runtime_base, spawner_socket, spawner_rd) = + setup_spawner("root", allow_root.path()).await; + point_pod_command_at_true(); + + let (_predicted_socket, listener) = bind_mock_pod_socket(&runtime_base, "child").await; + let received = accept_one_method(listener); + + let def = spawn_pod_tool( + "root".into(), + spawner_socket.clone(), + runtime_base.clone(), + allow_root.path().to_path_buf(), + spawner_rd.clone(), + ); + let (_meta, tool) = def(); + + let input = json!({ + "name": "child", + "task": "hello", + "scope": [{ + "target": allow_root.path().to_str().unwrap(), + "permission": "write" + }] + }) + .to_string(); + + let output: ToolOutput = tool.execute(&input).await.unwrap(); + assert!(output.summary.contains("child"), "summary: {}", output.summary); + + // Verify the tool delivered Method::Run to the socket. + let method = received.await.unwrap().expect("expected one Method line"); + match method { + Method::Run { input } => assert_eq!(input, "hello"), + other => panic!("expected Run, got {other:?}"), + } + + // Verify scope_lock has the child allocation under `root`. + let lock_path = scope_lock::default_lock_path().unwrap(); + let guard = LockFileGuard::open(&lock_path).unwrap(); + let child = guard + .data() + .find("child") + .expect("child allocation missing after spawn"); + assert_eq!(child.delegated_from.as_deref(), Some("root")); + drop(guard); + + // Verify spawned_pods.json was written. + let spawned_file = spawner_rd.path().join("spawned_pods.json"); + let contents = std::fs::read_to_string(&spawned_file).unwrap(); + let records: Vec = serde_json::from_str(&contents).unwrap(); + assert_eq!(records.len(), 1); + assert_eq!(records[0].pod_name, "child"); + assert_eq!(records[0].callback_address, spawner_socket); + + clear_env(); +} + +#[tokio::test] +async fn spawn_pod_rejects_scope_outside_spawner() { + let _env = EnvGuard::acquire(); + + let allow_root = TempDir::new().unwrap(); + let outside = TempDir::new().unwrap(); + let (_tmp, runtime_base, spawner_socket, spawner_rd) = + setup_spawner("root", allow_root.path()).await; + point_pod_command_at_true(); + + let def = spawn_pod_tool( + "root".into(), + spawner_socket, + runtime_base, + allow_root.path().to_path_buf(), + spawner_rd, + ); + let (_meta, tool) = def(); + + // Request write access to a path the spawner doesn't own. + let input = json!({ + "name": "child", + "task": "nope", + "scope": [{ + "target": outside.path().to_str().unwrap(), + "permission": "write" + }] + }) + .to_string(); + + let err = tool.execute(&input).await.unwrap_err(); + match err { + ToolError::InvalidArgument(msg) => { + assert!(msg.contains("not within"), "expected NotSubset wording: {msg}"); + } + other => panic!("expected InvalidArgument, got {other:?}"), + } + + // The spawner's allocation is unchanged; no "child" appeared. + let lock_path = scope_lock::default_lock_path().unwrap(); + let guard = LockFileGuard::open(&lock_path).unwrap(); + assert!(guard.data().find("child").is_none()); + + clear_env(); +} + +#[tokio::test] +async fn spawn_pod_rolls_back_reservation_when_socket_never_appears() { + let _env = EnvGuard::acquire(); + + let allow_root = TempDir::new().unwrap(); + let (_tmp, runtime_base, spawner_socket, spawner_rd) = + setup_spawner("root", allow_root.path()).await; + point_pod_command_at_true(); + + // Deliberately do NOT bind a socket at the predicted path. The + // tool's wait_for_socket should time out, triggering rollback. + // `SOCKET_WAIT_TIMEOUT` is 10s in production; we override via a + // tighter env-based lock path and just accept the wait in test. + // To keep the test fast, use a shorter wait by constructing a + // short-lived separate instance. + // + // As the tool's timeout is internal, we accept the 10s wait here — + // marked with `// slow_test`. Keep the rest of the test suite fast + // by running this test alone when iterating. + + let def = spawn_pod_tool( + "root".into(), + spawner_socket, + runtime_base, + allow_root.path().to_path_buf(), + spawner_rd, + ); + let (_meta, tool) = def(); + + let input = json!({ + "name": "ghost", + "task": "will never be delivered", + "scope": [{ + "target": allow_root.path().to_str().unwrap(), + "permission": "write" + }] + }) + .to_string(); + + let err = tool.execute(&input).await.unwrap_err(); + match err { + ToolError::ExecutionFailed(msg) => { + assert!( + msg.contains("socket did not appear"), + "expected socket timeout wording: {msg}" + ); + } + other => panic!("expected ExecutionFailed, got {other:?}"), + } + + // Rollback assertion: the reserved "ghost" allocation is gone. + let lock_path = scope_lock::default_lock_path().unwrap(); + let guard = LockFileGuard::open(&lock_path).unwrap(); + assert!( + guard.data().find("ghost").is_none(), + "allocation was not rolled back after socket wait timed out" + ); + + clear_env(); +} diff --git a/tickets/spawn-pod-tool.md b/tickets/spawn-pod-tool.md deleted file mode 100644 index 643e5e35..00000000 --- a/tickets/spawn-pod-tool.md +++ /dev/null @@ -1,68 +0,0 @@ -# SpawnPod ツール: LLM から Pod を生成する - -## 背景 - -オーケストレーションの起点。LLM が「このタスクを別 Pod に任せたい」と判断したとき、`SpawnPod` ツールを呼び出して新しい Pod プロセスを起動する。 - -## 依存 - -- `tickets/scope-lock.md`: scope 分譲の記録基盤 - -## 仕様 - -### ツール定義 - -`SpawnPod` は通常の `Tool` trait 実装として Worker に登録される。 - -入力: -- `name`: spawned Pod の識別名 -- `instruction`: instruction ファイル参照(省略時は `$insomnia/default`) -- `task`: 最初のメッセージ(spawn 後に即座に run される) -- `scope`: 譲渡する scope 定義(allow ルール)。spawner の effective scope のサブセットでなければならない - -出力: -- spawned Pod の `name` と接続先 address(socket path) - -### 内部動作 - -1. scope lock file を flock → spawner の effective scope を確認 → 要求された scope がサブセットか検証 -2. spawner の allocation に deny を追記 + 新 Pod の allocation を登録(`delegated_from` = spawner)→ unlock -3. PodFactory のカスケード(user / project manifest)に spawner からの overlay(name, pwd, scope, instruction)を重ねて PodManifest を構築 -4. `pod` バイナリを独立プロセスとして起動(`Command::new(pod_command)`) -5. spawned Pod の socket が利用可能になるまで待機 -6. spawner の callback address を spawned Pod に渡す(`Method::Notify` 経由の受け口として) -7. `task` を `Method::Run` で送信 -8. spawn 記録を Pod のランタイム状態に保存 - -### Pod 起動コマンド - -- デフォルト: `pod`(PATH 上のバイナリ) -- 環境変数 `INSOMNIA_POD_COMMAND` またはユーザー manifest で上書き可能 -- 引数: `--overlay ` で scope / instruction / name / pwd を渡す - -### spawn 記録 - -Pod が保持する既知の Pod リスト。各エントリ: -- `name`, `name`, `socket_path`, `scope_delegated`, `callback_address` -- spawner 復帰時にこの記録を読んで再接続する - -## 設計で決めること - -- **spawn 記録の永続化**: session-store に載せるか、runtime_dir にファイルとして書くか -- **socket 待機のタイムアウト**: spawned Pod が socket を開くまでの待機時間 -- **callback address の形式**: ローカルでは spawner の socket path、リモートでは `insomnia@host:pod-name` - -## 完了条件 - -- LLM が `SpawnPod` ツールを呼び出すと、新しい Pod プロセスが独立して起動する -- scope lock file に分譲が記録され、spawner の effective scope が縮小する -- 要求 scope が spawner の effective scope を超えていたらツールエラー -- spawned Pod の socket に接続でき、`task` が `Method::Run` で送信される -- spawn 記録が保存され、`ListPods` で参照できる -- spawner が停止しても spawned Pod は続行する - -## 範囲外 - -- Pod 間通信ツール(SendToPod / ReadPodOutput / StopPod / ListPods)は `tickets/pod-comm-tools.md` -- コールバック通知は `tickets/pod-callback.md` -- リモート spawn(SSH 越し)は `docs/network-peering.md` を参照