SpawnPodツール完了

This commit is contained in:
Keisuke Hirata 2026-04-18 20:31:10 +09:00
parent e7a4b76c54
commit 73acfcb7f2
12 changed files with 972 additions and 84 deletions

1
Cargo.lock generated
View File

@ -1912,6 +1912,7 @@ dependencies = [
"minijinja",
"protocol",
"provider",
"schemars",
"serde",
"serde_json",
"session-store",

View File

@ -5,7 +5,6 @@
- [ ] Protocol の設計 → [tickets/protocol-design.md](tickets/protocol-design.md)
- [ ] パーミッション: パターンベースのツール実行制御 → [tickets/permission-extension-point.md](tickets/permission-extension-point.md)
- [ ] Pod オーケストレーション
- [ ] SpawnPod ツール: LLM から Pod を生成 → [tickets/spawn-pod-tool.md](tickets/spawn-pod-tool.md)
- [ ] Pod 間通信ツール: SendToPod / ReadPodOutput / StopPod / ListPods → [tickets/pod-comm-tools.md](tickets/pod-comm-tools.md)
- [ ] Pod 間コールバック通知 → [tickets/pod-callback.md](tickets/pod-callback.md)
- [ ] 動的 Scope 変更 → [tickets/dynamic-scope.md](tickets/dynamic-scope.md)

View File

@ -15,7 +15,7 @@ provider = { version = "0.1.0", path = "../provider" }
serde = { version = "1.0.228", features = ["derive"] }
serde_json = "1.0.149"
thiserror = "2.0"
tokio = { version = "1.49", features = ["fs", "io-util", "macros", "net", "rt-multi-thread", "signal", "sync"] }
tokio = { version = "1.49", features = ["fs", "io-util", "macros", "net", "process", "rt-multi-thread", "signal", "sync", "time"] }
toml = "1.1.2"
tracing = "0.1.44"
tools = { version = "0.1.0", path = "../tools" }
@ -24,6 +24,7 @@ chrono = "0.4.44"
include_dir = "0.7.4"
fs4 = { version = "0.13.1", features = ["sync"] }
libc = "0.2.185"
schemars = "1.2.1"
[dev-dependencies]
async-trait = "0.1.89"

View File

@ -12,6 +12,7 @@ use crate::pod::{Pod, PodError, PodRunResult};
use crate::runtime_dir::RuntimeDir;
use crate::shared_state::{PodSharedState, PodStatus};
use crate::socket_server::SocketServer;
use crate::spawn_pod::spawn_pod_tool;
use protocol::{ErrorCode, Event, Method, NotificationLevel, NotificationSource, RunResult, TurnResult};
// ---------------------------------------------------------------------------
@ -107,6 +108,7 @@ impl PodController {
// can build a `ScopedFs` for the builtin tools.
let scope_for_tools = pod.scope().clone();
let pwd_for_tools = pod.pwd().to_path_buf();
let spawner_name = pod.manifest().pod.name.clone();
// Register event bridge callbacks on the worker
{
@ -198,9 +200,22 @@ impl PodController {
// also handed to the Pod itself so Pod-level operations (e.g.
// context compaction) can ask which files the agent has been
// touching.
let fs = tools::ScopedFs::new(scope_for_tools, pwd_for_tools);
let fs = tools::ScopedFs::new(scope_for_tools, pwd_for_tools.clone());
let tracker = tools::Tracker::new();
worker.register_tools(tools::builtin_tools(fs, tracker.clone()));
// SpawnPod is wired here rather than in `tools::builtin_tools`
// because it needs Pod-scoped handles (this Pod's own socket
// path, runtime_dir, spawner name) that the generic tools
// crate has no access to.
let spawner_socket = runtime_dir.socket_path();
worker.register_tool(spawn_pod_tool(
spawner_name,
spawner_socket,
runtime_base.to_path_buf(),
pwd_for_tools,
runtime_dir.clone(),
));
pod.attach_tracker(tracker);
}
@ -466,15 +481,17 @@ where
manifest::ProviderKind::Gemini => "gemini",
manifest::ProviderKind::Ollama => "ollama",
};
// The tool list mirrors `builtin_tools`. A fresh `ScopedFs`/`Tracker`
// is instantiated only to invoke the factories for name extraction;
// the instances themselves are discarded.
// The tool list mirrors what `spawn()` registers on the Worker:
// builtin filesystem tools plus `SpawnPod`. `SpawnPod` is appended
// by name because constructing its factory here would require
// Pod-lifetime handles we haven't built yet (runtime_dir, socket).
let fs = tools::ScopedFs::new(pod.scope().clone(), pod.pwd().to_path_buf());
let tracker = tools::Tracker::new();
let tool_names = tools::builtin_tools(fs, tracker)
let mut tool_names: Vec<String> = tools::builtin_tools(fs, tracker)
.iter()
.map(|def| def().0.name)
.collect();
tool_names.push("SpawnPod".into());
protocol::Greeting {
pod_name: manifest.pod.name.clone(),
cwd: pod.pwd().display().to_string(),

View File

@ -5,6 +5,7 @@ pub mod runtime_dir;
pub mod scope_lock;
pub mod shared_state;
pub mod socket_server;
pub mod spawn_pod;
mod agents_md;
mod compact_state;

View File

@ -36,6 +36,17 @@ struct Cli {
/// `~/.insomnia/sessions/`.
#[arg(short, long)]
store: Option<PathBuf>,
/// Claim a scope allocation pre-registered by a spawning Pod, rather
/// than installing a new top-level allocation. Used only when this
/// process is launched by `SpawnPod`; end users should never pass it.
#[arg(long)]
adopt: bool,
/// Socket path of the spawning Pod, for delivering `Method::Notify`
/// callbacks upward. Required alongside `--adopt`.
#[arg(long, value_name = "PATH", requires = "adopt")]
callback: Option<PathBuf>,
}
fn default_store_dir() -> Result<PathBuf, std::io::Error> {
@ -140,11 +151,28 @@ async fn main() -> ExitCode {
}
};
let pod = match Pod::from_manifest(manifest, store, loader).await {
Ok(p) => p,
Err(e) => {
eprintln!("error: failed to create pod: {e}");
return ExitCode::FAILURE;
let pod = if cli.adopt {
let callback = match cli.callback.clone() {
Some(p) => p,
None => {
eprintln!("error: --adopt requires --callback");
return ExitCode::FAILURE;
}
};
match Pod::from_manifest_spawned(manifest, store, loader, callback).await {
Ok(p) => p,
Err(e) => {
eprintln!("error: failed to create spawned pod: {e}");
return ExitCode::FAILURE;
}
}
} else {
match Pod::from_manifest(manifest, store, loader).await {
Ok(p) => p,
Err(e) => {
eprintln!("error: failed to create pod: {e}");
return ExitCode::FAILURE;
}
}
};
let pod_name = pod.manifest().pod.name.clone();

View File

@ -114,6 +114,13 @@ pub struct Pod<C: LlmClient, St: Store> {
/// releases the allocation when the Pod is dropped.
#[allow(dead_code)]
scope_allocation: Option<ScopeAllocationGuard>,
/// Socket path of the spawning Pod. `Some` only for Pods built via
/// `from_manifest_spawned`. The callback is consumed by the
/// `pod-callback` layer (separate ticket) to deliver
/// `Method::Notify` back to the spawner; stored here so the Pod
/// carries the reference for the duration of its life.
#[allow(dead_code)]
callback_socket: Option<PathBuf>,
}
impl<C: LlmClient, St: Store> Pod<C, St> {
@ -156,6 +163,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
notifier: None,
pending_notifications: NotificationBuffer::new(),
scope_allocation: None,
callback_socket: None,
};
pod.apply_prune_from_manifest();
Ok(pod)
@ -206,6 +214,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
notifier: None,
pending_notifications: NotificationBuffer::new(),
scope_allocation: None,
callback_socket: None,
};
pod.apply_prune_from_manifest();
Ok(pod)
@ -936,6 +945,66 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
notifier: None,
pending_notifications: NotificationBuffer::new(),
scope_allocation: Some(scope_allocation),
callback_socket: None,
};
pod.apply_prune_from_manifest();
Ok(pod)
}
/// Build a Pod spawned by another Pod (sibling process).
///
/// Behaves like [`Pod::from_manifest`] but claims the scope
/// allocation that the spawner pre-registered via
/// [`scope_lock::delegate_scope`], rather than installing a new
/// top-level entry. `callback_socket` carries the spawner's
/// Unix-socket path so the spawned Pod can send `Method::Notify`
/// back to the spawner; it is stored but unused in the
/// `spawn-pod-tool` ticket — the receiving side lands in the
/// follow-up `pod-callback` ticket.
pub async fn from_manifest_spawned(
manifest: PodManifest,
store: St,
loader: PromptLoader,
callback_socket: PathBuf,
) -> Result<Self, PodError> {
let pwd = resolve_pwd(&manifest.pod.pwd)?;
let scope = Scope::from_config(&manifest.scope, &pwd).map_err(PodError::Scope)?;
if !scope.is_readable(&pwd) {
return Err(PodError::PwdOutsideScope { pwd });
}
let scope_allocation =
scope_lock::adopt_allocation(manifest.pod.name.clone(), std::process::id())?;
let client = provider::build_client(&manifest.provider)?;
let mut worker = Worker::new(client);
apply_worker_manifest(&mut worker, &manifest.worker);
let system_prompt_template = Some(
SystemPromptTemplate::parse(&manifest.worker.instruction, loader)
.map_err(|source| PodError::InvalidSystemPromptTemplate { source })?,
);
let session_id = session_store::new_session_id();
let mut pod = Self {
manifest,
worker: Some(worker),
store,
session_id,
head_hash: None,
pwd,
scope,
hook_builder: HookRegistryBuilder::new(),
interceptor_installed: false,
compact_state: None,
usage_tracker: Arc::new(UsageTracker::new()),
usage_history: Arc::new(Mutex::new(Vec::new())),
tracker: None,
system_prompt_template,
notifier: None,
pending_notifications: NotificationBuffer::new(),
scope_allocation: Some(scope_allocation),
callback_socket: Some(callback_socket),
};
pod.apply_prune_from_manifest();
Ok(pod)

View File

@ -1,10 +1,30 @@
use std::io;
use std::path::{Path, PathBuf};
use manifest::ScopeRule;
use serde::{Deserialize, Serialize};
use tokio::fs;
use crate::shared_state::PodSharedState;
/// One spawned-child record persisted to `spawned_pods.json`.
///
/// Written by the spawner after a successful `SpawnPod` tool call so
/// `ListPods` (future ticket) and a restored spawner can enumerate
/// their live children without re-querying `scope.lock`.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SpawnedPodRecord {
/// Spawned Pod's identity.
pub pod_name: String,
/// Spawned Pod's Unix socket path.
pub socket_path: PathBuf,
/// Scope allow rules delegated to the spawned Pod.
pub scope_delegated: Vec<ScopeRule>,
/// Socket path the spawned Pod was told to use for callbacks
/// (= this Pod's own socket when spawn happened).
pub callback_address: PathBuf,
}
/// Manages the Pod's runtime directory on tmpfs.
///
/// ```text
@ -60,6 +80,17 @@ impl RuntimeDir {
atomic_write(&self.path.join("history.json"), content.as_bytes()).await
}
/// Write `spawned_pods.json` atomically. The entries are the full
/// set of spawned children known to this Pod — callers pass the
/// replacement list, no incremental merge.
pub async fn write_spawned_pods(
&self,
records: &[SpawnedPodRecord],
) -> Result<(), io::Error> {
let json = serde_json::to_vec_pretty(records).map_err(io::Error::other)?;
atomic_write(&self.path.join("spawned_pods.json"), &json).await
}
/// Path to this Pod's runtime directory.
pub fn path(&self) -> &Path {
&self.path
@ -173,6 +204,30 @@ mod tests {
assert_eq!(content, "[pod]\nname = \"test\"");
}
#[tokio::test]
async fn write_spawned_pods_creates_file() {
use manifest::{Permission, ScopeRule};
let tmp = tempfile::tempdir().unwrap();
let rt = RuntimeDir::create(tmp.path(), "my-pod").await.unwrap();
let records = vec![SpawnedPodRecord {
pod_name: "child".into(),
socket_path: "/run/insomnia/child/sock".into(),
scope_delegated: vec![ScopeRule {
target: "/tmp/work".into(),
permission: Permission::Write,
recursive: true,
}],
callback_address: "/run/insomnia/my-pod/sock".into(),
}];
rt.write_spawned_pods(&records).await.unwrap();
let content = std::fs::read_to_string(rt.path().join("spawned_pods.json")).unwrap();
let parsed: Vec<SpawnedPodRecord> = serde_json::from_str(&content).unwrap();
assert_eq!(parsed.len(), 1);
assert_eq!(parsed[0].pod_name, "child");
}
#[tokio::test]
async fn write_history_creates_file() {
let tmp = tempfile::tempdir().unwrap();

View File

@ -460,6 +460,7 @@ fn pid_alive(pid: u32) -> bool {
/// Owned allocation: on drop, opens the lock file and releases this
/// Pod's entry. The guard keeps only the name + lock-file path; it
/// does not hold the `flock` for the Pod's lifetime.
#[derive(Debug)]
pub struct ScopeAllocationGuard {
pod_name: String,
lock_path: PathBuf,
@ -500,6 +501,32 @@ pub fn install_top_level(
})
}
/// Take ownership of an existing allocation that was pre-registered by
/// a spawning Pod.
///
/// The spawning flow is two-stage: the spawner calls [`delegate_scope`]
/// (with its own pid as a live placeholder), then exec's the child; the
/// child, once running, calls this function to rewrite the allocation's
/// pid to its own and claim the `ScopeAllocationGuard` so the entry is
/// released when the child exits.
pub fn adopt_allocation(
pod_name: String,
new_pid: u32,
) -> Result<ScopeAllocationGuard, ScopeLockError> {
let lock_path = default_lock_path()?;
let mut guard = LockFileGuard::open(&lock_path)?;
let alloc = guard
.data_mut()
.find_mut(&pod_name)
.ok_or_else(|| ScopeLockError::UnknownPod(pod_name.clone()))?;
alloc.pid = new_pid;
guard.save()?;
Ok(ScopeAllocationGuard {
pod_name,
lock_path,
})
}
/// Errors raised by the mutating scope-lock operations.
#[derive(Debug, thiserror::Error)]
pub enum ScopeLockError {
@ -528,8 +555,15 @@ pub enum ScopeLockError {
mod tests {
use super::*;
use manifest::Permission;
use std::sync::{LazyLock, Mutex};
use tempfile::TempDir;
/// Serialises tests that mutate `INSOMNIA_SCOPE_LOCK`. The test
/// harness runs tests on multiple threads inside a single process,
/// so env-var writes from one test would otherwise leak into a
/// parallel test's `default_lock_path()` lookup.
static ENV_LOCK: LazyLock<Mutex<()>> = LazyLock::new(|| Mutex::new(()));
fn write_rule(path: &str, recursive: bool) -> ScopeRule {
ScopeRule {
target: PathBuf::from(path),
@ -943,12 +977,9 @@ mod tests {
#[test]
fn scope_allocation_guard_releases_on_drop() {
let _env = ENV_LOCK.lock().unwrap();
let dir = TempDir::new().unwrap();
let lock_path = dir.path().join("scope.lock");
// Override the default path for the duration of the test.
// SAFETY: single-threaded inside each #[test]; concurrent tests
// that also touch INSOMNIA_SCOPE_LOCK are excluded by using
// per-test paths and not clearing the var until after drop.
unsafe {
std::env::set_var("INSOMNIA_SCOPE_LOCK", &lock_path);
}
@ -973,6 +1004,65 @@ mod tests {
}
}
#[test]
fn adopt_allocation_rewrites_pid_and_releases_on_drop() {
let _env = ENV_LOCK.lock().unwrap();
let dir = TempDir::new().unwrap();
let lock_path = dir.path().join("scope.lock");
unsafe {
std::env::set_var("INSOMNIA_SCOPE_LOCK", &lock_path);
}
// Pre-register an allocation under spawner's pid, as delegate_scope would.
{
let mut g = LockFileGuard::open(&lock_path).unwrap();
delegate_placeholder(&mut g, "child", std::process::id());
}
let child_pid = std::process::id().wrapping_add(1);
let guard = adopt_allocation("child".into(), child_pid).unwrap();
{
let g = LockFileGuard::open(&lock_path).unwrap();
let alloc = g.data().find("child").unwrap();
assert_eq!(alloc.pid, child_pid);
}
drop(guard);
{
let g = LockFileGuard::open(&lock_path).unwrap();
assert!(g.data().find("child").is_none());
}
unsafe {
std::env::remove_var("INSOMNIA_SCOPE_LOCK");
}
}
#[test]
fn adopt_allocation_errors_on_unknown_pod() {
let _env = ENV_LOCK.lock().unwrap();
let dir = TempDir::new().unwrap();
let lock_path = dir.path().join("scope.lock");
unsafe {
std::env::set_var("INSOMNIA_SCOPE_LOCK", &lock_path);
}
let err = adopt_allocation("ghost".into(), 42).unwrap_err();
assert!(matches!(err, ScopeLockError::UnknownPod(ref n) if n == "ghost"));
unsafe {
std::env::remove_var("INSOMNIA_SCOPE_LOCK");
}
}
/// Mimic what the spawner does before the child comes up: push an
/// allocation for the child carrying the spawner's (live) pid as a
/// placeholder. Exists only in tests.
fn delegate_placeholder(g: &mut LockFileGuard, pod_name: &str, placeholder_pid: u32) {
g.data_mut().allocations.push(Allocation {
pod_name: pod_name.to_string(),
pid: placeholder_pid,
socket: sock(pod_name),
scope_allow: vec![write_rule("/tmp/child", true)],
delegated_from: None,
});
g.save().unwrap();
}
#[test]
fn conflict_detection_descends_to_real_owner() {
let dir = TempDir::new().unwrap();

387
crates/pod/src/spawn_pod.rs Normal file
View File

@ -0,0 +1,387 @@
//! `SpawnPod` tool — launch a new Pod process as a child of this one.
//!
//! Wires scope-lock delegation, overlay-TOML construction, subprocess
//! launch, and socket handoff into a single `Tool` implementation. When
//! the LLM calls `SpawnPod`, a fresh `pod` binary is exec'd in its own
//! process group, the scope lock is updated atomically, and the child's
//! first turn is kicked off by handing its socket a `Method::Run`.
use std::path::{Path, PathBuf};
use std::process::Stdio;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput};
use manifest::{
Permission, PodManifestConfig, PodMetaConfig, ScopeConfig, ScopeRule, WorkerManifestConfig,
};
use protocol::Method;
use protocol::stream::JsonLineWriter;
use serde::Deserialize;
use tokio::net::UnixStream;
use tokio::process::Command;
use tokio::sync::Mutex;
use tokio::time::sleep;
use crate::runtime_dir::{RuntimeDir, SpawnedPodRecord};
use crate::scope_lock::{self, LockFileGuard, ScopeLockError};
const DESCRIPTION: &str = "Spawn a new Pod process to work on a delegated task. \
The spawner's write scope is reduced by the scope passed here; the spawned \
Pod receives its own socket and starts running `task` immediately. The \
spawned Pod outlives the spawner's current turn and can be contacted again \
through its socket path.";
const DEFAULT_INSTRUCTION: &str = "$insomnia/default";
/// How long we will wait for the spawned Pod's socket to become
/// connectable before treating the spawn as failed.
const SOCKET_WAIT_TIMEOUT: Duration = Duration::from_secs(10);
#[derive(Debug, Deserialize, schemars::JsonSchema)]
struct SpawnPodInput {
/// Identifier for the spawned Pod. Must be unique machine-wide.
name: String,
/// Instruction-file reference (e.g. `$insomnia/default`, `$user/my-agent`).
#[serde(default)]
instruction: Option<String>,
/// First message sent to the spawned Pod via `Method::Run`.
task: String,
/// Allow rules delegated to the spawned Pod. Must be a subset of the
/// spawner's effective write scope.
scope: Vec<ScopeRuleInput>,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
struct ScopeRuleInput {
/// Absolute target path. Relative paths are rejected.
target: PathBuf,
/// `"read"` or `"write"`.
permission: PermissionInput,
/// When `false`, the rule matches the target itself and its direct
/// children only. Defaults to `true`.
#[serde(default = "default_true")]
recursive: bool,
}
#[derive(Debug, Deserialize, schemars::JsonSchema, Clone, Copy)]
#[serde(rename_all = "lowercase")]
enum PermissionInput {
Read,
Write,
}
fn default_true() -> bool {
true
}
impl From<PermissionInput> for Permission {
fn from(p: PermissionInput) -> Self {
match p {
PermissionInput::Read => Permission::Read,
PermissionInput::Write => Permission::Write,
}
}
}
/// Runtime dependencies the `SpawnPod` tool needs in order to launch a
/// child Pod and record the handoff locally. Constructed by the Pod
/// controller once per Pod lifetime.
pub struct SpawnPodTool {
/// Spawner's own pod name — becomes the spawned Pod's
/// `delegated_from` in the scope-lock registry.
spawner_name: String,
/// Path to the spawner's Unix socket. Handed to the child via
/// `--callback` so `Method::Notify` has somewhere to land.
callback_socket: PathBuf,
/// Root of the `$XDG_RUNTIME_DIR/insomnia/` tree, used to predict
/// the spawned Pod's socket path before the child has bound it.
runtime_base: PathBuf,
/// Directory the spawned Pod should run in when the LLM did not
/// override it. Defaults to the spawner's pwd — see module docs.
spawner_pwd: PathBuf,
/// Spawner's own runtime directory — target for `spawned_pods.json`.
runtime_dir: Arc<RuntimeDir>,
/// Running list of successful spawns, replayed into
/// `spawned_pods.json` on every successful `execute`.
records: Arc<Mutex<Vec<SpawnedPodRecord>>>,
}
impl SpawnPodTool {
pub fn new(
spawner_name: String,
callback_socket: PathBuf,
runtime_base: PathBuf,
spawner_pwd: PathBuf,
runtime_dir: Arc<RuntimeDir>,
) -> Self {
Self {
spawner_name,
callback_socket,
runtime_base,
spawner_pwd,
runtime_dir,
records: Arc::new(Mutex::new(Vec::new())),
}
}
}
#[async_trait]
impl Tool for SpawnPodTool {
async fn execute(&self, input_json: &str) -> Result<ToolOutput, ToolError> {
let input: SpawnPodInput = serde_json::from_str(input_json)
.map_err(|e| ToolError::InvalidArgument(format!("invalid SpawnPod input: {e}")))?;
// `delegate_scope` catches this too (as `DuplicatePodName`), but
// the dedicated message is kinder to the LLM — which gets the
// error back verbatim — than the generic duplicate-name error.
if input.name == self.spawner_name {
return Err(ToolError::InvalidArgument(format!(
"spawned pod name `{}` collides with spawner's own name",
input.name
)));
}
let scope_allow = parse_scope(&input.scope)?;
let instruction = input
.instruction
.clone()
.unwrap_or_else(|| DEFAULT_INSTRUCTION.to_string());
let predicted_socket = self.runtime_base.join(&input.name).join("sock");
let lock_path = scope_lock::default_lock_path()
.map_err(|e| ToolError::ExecutionFailed(format!("scope lock path: {e}")))?;
// Reserve the allocation up front. Spawner's pid is a live
// placeholder; the child will rewrite it via `adopt_allocation`.
{
let mut guard = LockFileGuard::open(&lock_path)
.map_err(|e| ToolError::ExecutionFailed(format!("scope lock open: {e}")))?;
scope_lock::delegate_scope(
&mut guard,
&self.spawner_name,
input.name.clone(),
std::process::id(),
predicted_socket.clone(),
scope_allow.clone(),
)
.map_err(scope_lock_err_to_tool)?;
}
// `start_outcome` covers steps that happen before the child is
// observably alive (exec + socket bind). Once its socket is
// listening, the child owns the allocation and we must not roll
// it back — even if later steps (Method::Run delivery, record
// write) fail, the child is running and will release its own
// entry on exit.
let overlay_toml =
match build_overlay_toml(&input.name, &self.spawner_pwd, &instruction, &scope_allow) {
Ok(s) => s,
Err(e) => {
self.release_reservation(&lock_path, &input.name);
return Err(ToolError::ExecutionFailed(format!(
"overlay serialisation: {e}"
)));
}
};
let start_outcome = self.exec_child(&overlay_toml, &predicted_socket).await;
if let Err(e) = start_outcome {
self.release_reservation(&lock_path, &input.name);
return Err(e);
}
// Child is live. Post-start errors propagate but do not roll
// back the scope allocation — the child already owns it.
send_run(&predicted_socket, &input.task).await?;
let record = SpawnedPodRecord {
pod_name: input.name.clone(),
socket_path: predicted_socket.clone(),
scope_delegated: scope_allow,
callback_address: self.callback_socket.clone(),
};
{
let mut records = self.records.lock().await;
records.push(record);
self.runtime_dir
.write_spawned_pods(records.as_slice())
.await
.map_err(|e| {
ToolError::ExecutionFailed(format!("write spawned_pods.json: {e}"))
})?;
}
Ok(ToolOutput {
summary: format!(
"spawned pod `{}` listening on {}",
input.name,
predicted_socket.display()
),
content: None,
})
}
}
impl SpawnPodTool {
async fn exec_child(
&self,
overlay_toml: &str,
predicted_socket: &Path,
) -> Result<(), ToolError> {
let pod_command = std::env::var("INSOMNIA_POD_COMMAND").unwrap_or_else(|_| "pod".into());
let mut cmd = Command::new(&pod_command);
cmd.arg("--adopt")
.arg("--callback")
.arg(&self.callback_socket)
.arg("--overlay")
.arg(overlay_toml)
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::null())
.process_group(0);
let child = cmd
.spawn()
.map_err(|e| ToolError::ExecutionFailed(format!("failed to spawn `{pod_command}`: {e}")))?;
// Default `kill_on_drop = false` keeps the process alive after
// the `Child` is dropped. We intentionally do not `.wait()` —
// when the spawner later exits, init adopts any remaining
// orphans. Lifecycle tracking lives in `spawned_pods.json`.
drop(child);
wait_for_socket(predicted_socket, SOCKET_WAIT_TIMEOUT).await
}
fn release_reservation(&self, lock_path: &Path, pod_name: &str) {
if let Ok(mut g) = LockFileGuard::open(lock_path) {
let _ = scope_lock::release_pod(&mut g, pod_name);
}
}
}
fn parse_scope(rules: &[ScopeRuleInput]) -> Result<Vec<ScopeRule>, ToolError> {
if rules.is_empty() {
return Err(ToolError::InvalidArgument("scope must not be empty".into()));
}
rules
.iter()
.map(|r| {
if !r.target.is_absolute() {
return Err(ToolError::InvalidArgument(format!(
"scope.target must be absolute: {}",
r.target.display()
)));
}
Ok(ScopeRule {
target: r.target.clone(),
permission: r.permission.into(),
recursive: r.recursive,
})
})
.collect()
}
/// Serialise the overlay TOML that gets handed to the child `pod`
/// binary via `--overlay`. `PodManifestConfig`'s `Serialize` impl is
/// the single source of truth for the on-disk manifest format.
fn build_overlay_toml(
name: &str,
pwd: &Path,
instruction: &str,
scope_allow: &[ScopeRule],
) -> Result<String, toml::ser::Error> {
let overlay = PodManifestConfig {
pod: PodMetaConfig {
name: Some(name.to_string()),
pwd: Some(pwd.to_path_buf()),
},
worker: WorkerManifestConfig {
instruction: Some(instruction.to_string()),
..Default::default()
},
scope: ScopeConfig {
allow: scope_allow.to_vec(),
deny: Vec::new(),
},
..Default::default()
};
toml::to_string(&overlay)
}
async fn wait_for_socket(path: &Path, timeout: Duration) -> Result<(), ToolError> {
let deadline = tokio::time::Instant::now() + timeout;
loop {
if path.exists() {
if let Ok(stream) = UnixStream::connect(path).await {
drop(stream);
return Ok(());
}
}
if tokio::time::Instant::now() >= deadline {
return Err(ToolError::ExecutionFailed(format!(
"spawned pod socket did not appear within {timeout:?}: {}",
path.display()
)));
}
sleep(Duration::from_millis(50)).await;
}
}
async fn send_run(socket: &Path, task: &str) -> Result<(), ToolError> {
let stream = UnixStream::connect(socket)
.await
.map_err(|e| ToolError::ExecutionFailed(format!("connect {}: {e}", socket.display())))?;
let (_reader, writer) = stream.into_split();
let mut w = JsonLineWriter::new(writer);
w.write(&Method::Run {
input: task.to_string(),
})
.await
.map_err(|e| ToolError::ExecutionFailed(format!("send Method::Run: {e}")))?;
// Drop the writer to close the socket's write half. The flush
// inside `JsonLineWriter::write` has already pushed the bytes
// across, so the child will see a complete method line followed by
// EOF.
drop(w);
Ok(())
}
fn scope_lock_err_to_tool(e: ScopeLockError) -> ToolError {
match e {
ScopeLockError::NotSubset { .. }
| ScopeLockError::WriteConflict { .. }
| ScopeLockError::DuplicatePodName(_)
| ScopeLockError::UnknownPod(_) => ToolError::InvalidArgument(e.to_string()),
ScopeLockError::Io(_) => ToolError::ExecutionFailed(e.to_string()),
}
}
/// Factory for the `SpawnPod` tool.
pub fn spawn_pod_tool(
spawner_name: String,
callback_socket: PathBuf,
runtime_base: PathBuf,
spawner_pwd: PathBuf,
runtime_dir: Arc<RuntimeDir>,
) -> ToolDefinition {
Arc::new(move || {
let schema = schemars::schema_for!(SpawnPodInput);
let schema_value = serde_json::to_value(schema).unwrap_or(serde_json::json!({}));
let meta = ToolMeta::new("SpawnPod")
.description(DESCRIPTION)
.input_schema(schema_value);
let tool: Arc<dyn Tool> = Arc::new(SpawnPodTool::new(
spawner_name.clone(),
callback_socket.clone(),
runtime_base.clone(),
spawner_pwd.clone(),
runtime_dir.clone(),
));
(meta, tool)
})
}

View File

@ -0,0 +1,308 @@
//! Integration tests for the `SpawnPod` tool.
//!
//! These tests exercise the tool's scope-lock delegation, subprocess
//! launch, socket handoff, and `spawned_pods.json` write without relying
//! on the real `pod` binary. `INSOMNIA_POD_COMMAND` is pointed at
//! `/bin/true` (which exits immediately) while a test-owned Unix
//! listener pre-binds the predicted socket path, so the tool sees the
//! "child" as live.
use std::path::{Path, PathBuf};
use std::sync::{LazyLock, Mutex};
use llm_worker::tool::{ToolError, ToolOutput};
use manifest::{Permission, ScopeRule};
use pod::runtime_dir::{RuntimeDir, SpawnedPodRecord};
use pod::scope_lock::{self, LockFileGuard};
use pod::spawn_pod::spawn_pod_tool;
use protocol::Method;
use protocol::stream::JsonLineReader;
use serde_json::json;
use std::sync::Arc;
use tempfile::TempDir;
use tokio::net::UnixListener;
/// Serialises tests that mutate `INSOMNIA_SCOPE_LOCK` /
/// `INSOMNIA_POD_COMMAND` across the thread-pooled test harness.
static ENV_LOCK: LazyLock<Mutex<()>> = LazyLock::new(|| Mutex::new(()));
struct EnvGuard {
_lock: std::sync::MutexGuard<'static, ()>,
}
impl EnvGuard {
fn acquire() -> Self {
Self {
_lock: ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner()),
}
}
}
/// Set up a tempdir, point `INSOMNIA_SCOPE_LOCK` + runtime-dir base at
/// it, and install a live top-level "spawner" allocation so the tool
/// has something to delegate from. Returns the tempdir (keeps it alive
/// for the test's lifetime), runtime base, spawner socket, and the
/// spawner's runtime dir.
async fn setup_spawner(
spawner_name: &str,
allow_root: &Path,
) -> (TempDir, PathBuf, PathBuf, Arc<RuntimeDir>) {
let tmp = TempDir::new().unwrap();
let lock_path = tmp.path().join("scope.lock");
unsafe {
std::env::set_var("INSOMNIA_SCOPE_LOCK", &lock_path);
}
let runtime_base = tmp.path().join("runtime");
let spawner_rd = RuntimeDir::create(&runtime_base, spawner_name)
.await
.unwrap();
let spawner_socket = spawner_rd.socket_path();
let _guard = scope_lock::install_top_level(
spawner_name.into(),
std::process::id(),
spawner_socket.clone(),
vec![ScopeRule {
target: allow_root.to_path_buf(),
permission: Permission::Write,
recursive: true,
}],
)
.unwrap();
// Leak the guard — the spawner allocation needs to outlive the
// tool call. Dropping it would auto-release the allocation, which
// defeats the point of the test.
std::mem::forget(_guard);
(tmp, runtime_base, spawner_socket, Arc::new(spawner_rd))
}
/// Bind a Unix listener at the path the tool will predict for the
/// spawned pod. The tool only needs the socket to accept a connection
/// and receive one `Method::Run` line; the returned `UnixListener` is
/// read from by the caller in a joined task.
async fn bind_mock_pod_socket(runtime_base: &Path, pod_name: &str) -> (PathBuf, UnixListener) {
let dir = runtime_base.join(pod_name);
tokio::fs::create_dir_all(&dir).await.unwrap();
let socket = dir.join("sock");
let listener = UnixListener::bind(&socket).unwrap();
(socket, listener)
}
/// Launch a tokio task that accepts connections until one carries a
/// `Method` line, then returns it. `wait_for_socket` inside the tool
/// makes a probe connection that carries no data, so the task must
/// tolerate an empty connection and keep listening.
fn accept_one_method(
listener: UnixListener,
) -> tokio::task::JoinHandle<Option<Method>> {
tokio::spawn(async move {
loop {
let (stream, _) = listener.accept().await.ok()?;
let (reader, _writer) = stream.into_split();
let mut r = JsonLineReader::new(reader);
if let Ok(Some(method)) = r.next::<Method>().await {
return Some(method);
}
}
})
}
fn point_pod_command_at_true() {
let path = which_true();
unsafe {
std::env::set_var("INSOMNIA_POD_COMMAND", &path);
}
}
/// `/bin/true` only exists on FHS-compliant systems. On Nix, resolve it
/// via PATH so the tests work regardless of distro.
fn which_true() -> String {
for dir in std::env::var_os("PATH")
.map(|p| std::env::split_paths(&p).collect::<Vec<_>>())
.unwrap_or_default()
{
let candidate = dir.join("true");
if candidate.is_file() {
return candidate.to_string_lossy().into_owned();
}
}
"/bin/true".into()
}
fn clear_env() {
unsafe {
std::env::remove_var("INSOMNIA_SCOPE_LOCK");
std::env::remove_var("INSOMNIA_POD_COMMAND");
}
}
#[tokio::test]
async fn spawn_pod_delegates_scope_and_sends_run() {
let _env = EnvGuard::acquire();
let allow_root = TempDir::new().unwrap();
let (_tmp, runtime_base, spawner_socket, spawner_rd) =
setup_spawner("root", allow_root.path()).await;
point_pod_command_at_true();
let (_predicted_socket, listener) = bind_mock_pod_socket(&runtime_base, "child").await;
let received = accept_one_method(listener);
let def = spawn_pod_tool(
"root".into(),
spawner_socket.clone(),
runtime_base.clone(),
allow_root.path().to_path_buf(),
spawner_rd.clone(),
);
let (_meta, tool) = def();
let input = json!({
"name": "child",
"task": "hello",
"scope": [{
"target": allow_root.path().to_str().unwrap(),
"permission": "write"
}]
})
.to_string();
let output: ToolOutput = tool.execute(&input).await.unwrap();
assert!(output.summary.contains("child"), "summary: {}", output.summary);
// Verify the tool delivered Method::Run to the socket.
let method = received.await.unwrap().expect("expected one Method line");
match method {
Method::Run { input } => assert_eq!(input, "hello"),
other => panic!("expected Run, got {other:?}"),
}
// Verify scope_lock has the child allocation under `root`.
let lock_path = scope_lock::default_lock_path().unwrap();
let guard = LockFileGuard::open(&lock_path).unwrap();
let child = guard
.data()
.find("child")
.expect("child allocation missing after spawn");
assert_eq!(child.delegated_from.as_deref(), Some("root"));
drop(guard);
// Verify spawned_pods.json was written.
let spawned_file = spawner_rd.path().join("spawned_pods.json");
let contents = std::fs::read_to_string(&spawned_file).unwrap();
let records: Vec<SpawnedPodRecord> = serde_json::from_str(&contents).unwrap();
assert_eq!(records.len(), 1);
assert_eq!(records[0].pod_name, "child");
assert_eq!(records[0].callback_address, spawner_socket);
clear_env();
}
#[tokio::test]
async fn spawn_pod_rejects_scope_outside_spawner() {
let _env = EnvGuard::acquire();
let allow_root = TempDir::new().unwrap();
let outside = TempDir::new().unwrap();
let (_tmp, runtime_base, spawner_socket, spawner_rd) =
setup_spawner("root", allow_root.path()).await;
point_pod_command_at_true();
let def = spawn_pod_tool(
"root".into(),
spawner_socket,
runtime_base,
allow_root.path().to_path_buf(),
spawner_rd,
);
let (_meta, tool) = def();
// Request write access to a path the spawner doesn't own.
let input = json!({
"name": "child",
"task": "nope",
"scope": [{
"target": outside.path().to_str().unwrap(),
"permission": "write"
}]
})
.to_string();
let err = tool.execute(&input).await.unwrap_err();
match err {
ToolError::InvalidArgument(msg) => {
assert!(msg.contains("not within"), "expected NotSubset wording: {msg}");
}
other => panic!("expected InvalidArgument, got {other:?}"),
}
// The spawner's allocation is unchanged; no "child" appeared.
let lock_path = scope_lock::default_lock_path().unwrap();
let guard = LockFileGuard::open(&lock_path).unwrap();
assert!(guard.data().find("child").is_none());
clear_env();
}
#[tokio::test]
async fn spawn_pod_rolls_back_reservation_when_socket_never_appears() {
let _env = EnvGuard::acquire();
let allow_root = TempDir::new().unwrap();
let (_tmp, runtime_base, spawner_socket, spawner_rd) =
setup_spawner("root", allow_root.path()).await;
point_pod_command_at_true();
// Deliberately do NOT bind a socket at the predicted path. The
// tool's wait_for_socket should time out, triggering rollback.
// `SOCKET_WAIT_TIMEOUT` is 10s in production; we override via a
// tighter env-based lock path and just accept the wait in test.
// To keep the test fast, use a shorter wait by constructing a
// short-lived separate instance.
//
// As the tool's timeout is internal, we accept the 10s wait here —
// marked with `// slow_test`. Keep the rest of the test suite fast
// by running this test alone when iterating.
let def = spawn_pod_tool(
"root".into(),
spawner_socket,
runtime_base,
allow_root.path().to_path_buf(),
spawner_rd,
);
let (_meta, tool) = def();
let input = json!({
"name": "ghost",
"task": "will never be delivered",
"scope": [{
"target": allow_root.path().to_str().unwrap(),
"permission": "write"
}]
})
.to_string();
let err = tool.execute(&input).await.unwrap_err();
match err {
ToolError::ExecutionFailed(msg) => {
assert!(
msg.contains("socket did not appear"),
"expected socket timeout wording: {msg}"
);
}
other => panic!("expected ExecutionFailed, got {other:?}"),
}
// Rollback assertion: the reserved "ghost" allocation is gone.
let lock_path = scope_lock::default_lock_path().unwrap();
let guard = LockFileGuard::open(&lock_path).unwrap();
assert!(
guard.data().find("ghost").is_none(),
"allocation was not rolled back after socket wait timed out"
);
clear_env();
}

View File

@ -1,68 +0,0 @@
# SpawnPod ツール: LLM から Pod を生成する
## 背景
オーケストレーションの起点。LLM が「このタスクを別 Pod に任せたい」と判断したとき、`SpawnPod` ツールを呼び出して新しい Pod プロセスを起動する。
## 依存
- `tickets/scope-lock.md`: scope 分譲の記録基盤
## 仕様
### ツール定義
`SpawnPod` は通常の `Tool` trait 実装として Worker に登録される。
入力:
- `name`: spawned Pod の識別名
- `instruction`: instruction ファイル参照(省略時は `$insomnia/default`
- `task`: 最初のメッセージspawn 後に即座に run される)
- `scope`: 譲渡する scope 定義allow ルール。spawner の effective scope のサブセットでなければならない
出力:
- spawned Pod の `name` と接続先 addresssocket path
### 内部動作
1. scope lock file を flock → spawner の effective scope を確認 → 要求された scope がサブセットか検証
2. spawner の allocation に deny を追記 + 新 Pod の allocation を登録(`delegated_from` = spawner→ unlock
3. PodFactory のカスケードuser / project manifestに spawner からの overlayname, pwd, scope, instructionを重ねて PodManifest を構築
4. `pod` バイナリを独立プロセスとして起動(`Command::new(pod_command)`
5. spawned Pod の socket が利用可能になるまで待機
6. spawner の callback address を spawned Pod に渡す(`Method::Notify` 経由の受け口として)
7. `task``Method::Run` で送信
8. spawn 記録を Pod のランタイム状態に保存
### Pod 起動コマンド
- デフォルト: `pod`PATH 上のバイナリ)
- 環境変数 `INSOMNIA_POD_COMMAND` またはユーザー manifest で上書き可能
- 引数: `--overlay <toml>` で scope / instruction / name / pwd を渡す
### spawn 記録
Pod が保持する既知の Pod リスト。各エントリ:
- `name`, `name`, `socket_path`, `scope_delegated`, `callback_address`
- spawner 復帰時にこの記録を読んで再接続する
## 設計で決めること
- **spawn 記録の永続化**: session-store に載せるか、runtime_dir にファイルとして書くか
- **socket 待機のタイムアウト**: spawned Pod が socket を開くまでの待機時間
- **callback address の形式**: ローカルでは spawner の socket path、リモートでは `insomnia@host:pod-name`
## 完了条件
- LLM が `SpawnPod` ツールを呼び出すと、新しい Pod プロセスが独立して起動する
- scope lock file に分譲が記録され、spawner の effective scope が縮小する
- 要求 scope が spawner の effective scope を超えていたらツールエラー
- spawned Pod の socket に接続でき、`task` が `Method::Run` で送信される
- spawn 記録が保存され、`ListPods` で参照できる
- spawner が停止しても spawned Pod は続行する
## 範囲外
- Pod 間通信ツールSendToPod / ReadPodOutput / StopPod / ListPods`tickets/pod-comm-tools.md`
- コールバック通知は `tickets/pod-callback.md`
- リモート spawnSSH 越し)は `docs/network-peering.md` を参照