merge: pod tool surface cleanup

This commit is contained in:
Keisuke Hirata 2026-05-31 11:59:27 +09:00
commit 97484091b6
No known key found for this signature in database
10 changed files with 192 additions and 355 deletions

View File

@ -8,9 +8,7 @@ use pod_store::PodMetadataStore;
use session_store::Store;
use tokio::sync::{broadcast, mpsc, oneshot};
use crate::discovery::{
PodDiscovery, attach_or_restore_pod_tool, inspect_pod_tool, list_visible_pods_tool,
};
use crate::discovery::{PodDiscovery, list_pods_tool, restore_pod_tool};
use crate::ipc::alerter::Alerter;
use crate::ipc::notify_buffer::NotifyBuffer;
use crate::ipc::server::SocketServer;
@ -18,9 +16,7 @@ use crate::pod::{Pod, PodError, PodRunResult, SystemItemCommitter};
use crate::runtime::dir::RuntimeDir;
use crate::segment_log_sink::SegmentLogSink;
use crate::shared_state::PodSharedState;
use crate::spawn::comm_tools::{
list_pods_tool, read_pod_output_tool, send_to_pod_tool, stop_pod_tool,
};
use crate::spawn::comm_tools::{read_pod_output_tool, send_to_pod_tool, stop_pod_tool};
use crate::spawn::registry::SpawnedPodRegistry;
use crate::spawn::tool::spawn_pod_tool;
use protocol::{
@ -563,12 +559,9 @@ where
worker.register_tool(send_to_pod_tool(spawned_registry.clone()));
worker.register_tool(read_pod_output_tool(spawned_registry.clone()));
worker.register_tool(stop_pod_tool(spawned_registry.clone()));
worker.register_tool(list_pods_tool(spawned_registry.clone()));
let discovery = PodDiscovery::new(pod_store, spawner_name, runtime_base, pwd, spawned_registry);
worker.register_tool(list_visible_pods_tool(discovery.clone()));
worker.register_tool(inspect_pod_tool(discovery.clone()));
worker.register_tool(attach_or_restore_pod_tool(discovery));
worker.register_tool(list_pods_tool(discovery.clone()));
worker.register_tool(restore_pod_tool(discovery));
pod.attach_tracker(tracker);
fs_for_view
}
@ -835,10 +828,10 @@ async fn controller_loop<C, St>(
break;
}
Method::ListVisiblePods => match discovery.list_visible().await {
Method::ListPods => match discovery.list_visible().await {
Ok(pods) => match serde_json::to_value(pods) {
Ok(pods) => {
let _ = event_tx.send(Event::VisiblePods { pods });
let _ = event_tx.send(Event::PodsListed { pods });
}
Err(error) => {
let _ = event_tx.send(Event::Error {
@ -855,35 +848,15 @@ async fn controller_loop<C, St>(
}
},
Method::InspectPod { name } => match discovery.inspect(&name).await {
Ok(pod) => match serde_json::to_value(pod) {
Ok(pod) => {
let _ = event_tx.send(Event::PodInspection { pod });
}
Err(error) => {
let _ = event_tx.send(Event::Error {
code: ErrorCode::Internal,
message: format!("serialize pod inspection: {error}"),
});
}
},
Err(error) => {
let _ = event_tx.send(Event::Error {
code: ErrorCode::InvalidRequest,
message: error.to_string(),
});
}
},
Method::AttachOrRestorePod { name } => match discovery.attach_or_restore(&name).await {
Method::RestorePod { name } => match discovery.restore(&name).await {
Ok(result) => match serde_json::to_value(result) {
Ok(result) => {
let _ = event_tx.send(Event::PodAttachRestore { result });
let _ = event_tx.send(Event::PodRestored { result });
}
Err(error) => {
let _ = event_tx.send(Event::Error {
code: ErrorCode::Internal,
message: format!("serialize pod attach/restore result: {error}"),
message: format!("serialize pod restore result: {error}"),
});
}
},
@ -1096,11 +1069,7 @@ where
notify_buffer.push_notify(message);
}
Some(Method::ListCompletions { .. }) => {}
Some(
Method::ListVisiblePods
| Method::InspectPod { .. }
| Method::AttachOrRestorePod { .. },
) => {
Some(Method::ListPods | Method::RestorePod { .. }) => {
let _ = event_tx.send(Event::Error {
code: ErrorCode::AlreadyRunning,
message: "Pod discovery requests are only handled while the Pod is idle or paused"

View File

@ -1,4 +1,4 @@
//! Pod-state-backed discovery and restore/attach tools.
//! Pod-state-backed discovery and restore tools.
//!
//! This surface deliberately does not enumerate every Pod on the host. The
//! listing path starts from the caller's visibility set (the caller itself and
@ -15,6 +15,7 @@ use std::time::Duration;
use async_trait::async_trait;
use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput};
use manifest::{Permission, ScopeRule};
use pod_store::{PodActiveSegmentRef, PodMetadata, PodMetadataStore};
use protocol::stream::JsonLineReader;
use protocol::{Event, PodStatus};
@ -24,6 +25,7 @@ use session_store::{SegmentId, SessionId};
use tokio::net::UnixStream;
use tokio::process::Command;
use crate::runtime::dir::SpawnedPodRecord;
use crate::runtime::pod_registry;
use crate::spawn::registry::SpawnedPodRegistry;
@ -97,26 +99,23 @@ where
}
}
pub async fn attach_or_restore(
&self,
pod_name: &str,
) -> Result<AttachRestoreResult, PodDiscoveryError> {
match self.plan_attach_or_restore(pod_name).await? {
AttachRestorePlan::Attach {
pub async fn restore(&self, pod_name: &str) -> Result<RestoreResult, PodDiscoveryError> {
match self.plan_restore(pod_name).await? {
RestorePlan::AlreadyLive {
pod_name,
socket_path,
status,
} => Ok(AttachRestoreResult::Attached {
} => Ok(RestoreResult::AlreadyLive {
pod_name,
socket_path,
status,
}),
AttachRestorePlan::Restore {
RestorePlan::Restore {
pod_name,
socket_path,
} => {
self.spawn_restore_process(&pod_name, &socket_path).await?;
Ok(AttachRestoreResult::Restored {
Ok(RestoreResult::Restored {
pod_name,
socket_path,
})
@ -124,13 +123,10 @@ where
}
}
pub async fn plan_attach_or_restore(
&self,
pod_name: &str,
) -> Result<AttachRestorePlan, PodDiscoveryError> {
pub async fn plan_restore(&self, pod_name: &str) -> Result<RestorePlan, PodDiscoveryError> {
let detail = self.inspect(pod_name).await?;
if detail.live.reachable {
return Ok(AttachRestorePlan::Attach {
return Ok(RestorePlan::AlreadyLive {
pod_name: pod_name.to_string(),
socket_path: detail.live.socket_path,
status: detail.live.status,
@ -153,7 +149,7 @@ where
if let Some(lock) = lookup_segment_lock(segment_id)? {
let lock_live = probe_socket(&lock.socket).await;
return if lock_live.reachable {
Ok(AttachRestorePlan::Attach {
Ok(RestorePlan::AlreadyLive {
pod_name: lock.pod_name,
socket_path: lock.socket,
status: lock_live.status,
@ -169,7 +165,7 @@ where
};
}
Ok(AttachRestorePlan::Restore {
Ok(RestorePlan::Restore {
pod_name: pod_name.to_string(),
socket_path: self.default_socket_path(pod_name),
})
@ -178,6 +174,7 @@ where
async fn visibility(&self) -> Result<VisibilitySet, PodDiscoveryError> {
let mut visible = BTreeMap::new();
let mut child_sockets = BTreeMap::new();
let mut comm_registry = BTreeMap::new();
visible.insert(self.self_pod_name.clone(), VisibilityReason::SelfPod);
// Durable parent -> child state is the primary visibility source.
@ -186,7 +183,8 @@ where
visible
.entry(child.pod_name.clone())
.or_insert(VisibilityReason::SpawnedChild);
child_sockets.insert(child.pod_name, child.socket_path);
child_sockets.insert(child.pod_name.clone(), child.socket_path.clone());
comm_registry.insert(child.pod_name.clone(), comm_info_from_spawned_child(&child));
}
}
@ -197,12 +195,17 @@ where
visible
.entry(record.pod_name.clone())
.or_insert(VisibilityReason::SpawnedChild);
child_sockets.insert(record.pod_name, record.socket_path);
child_sockets.insert(record.pod_name.clone(), record.socket_path.clone());
comm_registry.insert(
record.pod_name.clone(),
CommRegistryInfo::from_record(&record),
);
}
Ok(VisibilitySet {
visible,
child_sockets,
comm_registry,
})
}
@ -222,6 +225,7 @@ where
active: detail.active,
live: detail.live,
restore: detail.restore,
comm_registry: detail.comm_registry,
spawned_children: detail.spawned_children,
error: None,
}
@ -233,6 +237,7 @@ where
active: None,
live: self.live_for_name(pod_name, None).await,
restore: RestoreInfo::not_possible("pod state missing"),
comm_registry: visibility.comm_info_for(pod_name),
spawned_children: SpawnedChildrenSummary::default(),
error: None,
},
@ -243,6 +248,7 @@ where
active: None,
live: self.live_for_name(pod_name, None).await,
restore: RestoreInfo::not_possible("pod state is unreadable"),
comm_registry: visibility.comm_info_for(pod_name),
spawned_children: SpawnedChildrenSummary::default(),
error: Some(error.to_string()),
},
@ -268,6 +274,7 @@ where
active: metadata.active.map(ActivePointer::from),
live,
restore,
comm_registry: visibility.comm_info_for(&metadata.pod_name),
spawned_children,
}
}
@ -424,6 +431,33 @@ pub struct SpawnedChildrenSummary {
pub unreachable: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct CommRegistryInfo {
pub registered: bool,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub socket_path: Option<PathBuf>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub scope_delegated: Vec<ScopeRule>,
}
impl CommRegistryInfo {
fn missing() -> Self {
Self {
registered: false,
socket_path: None,
scope_delegated: Vec::new(),
}
}
fn from_record(record: &SpawnedPodRecord) -> Self {
Self {
registered: true,
socket_path: Some(record.socket_path.clone()),
scope_delegated: record.scope_delegated.clone(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct VisiblePodItem {
pub pod_name: String,
@ -433,6 +467,7 @@ pub struct VisiblePodItem {
pub active: Option<ActivePointer>,
pub live: LiveInfo,
pub restore: RestoreInfo,
pub comm_registry: CommRegistryInfo,
pub spawned_children: SpawnedChildrenSummary,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
@ -446,13 +481,14 @@ pub struct PodDetail {
pub active: Option<ActivePointer>,
pub live: LiveInfo,
pub restore: RestoreInfo,
pub comm_registry: CommRegistryInfo,
pub spawned_children: SpawnedChildrenSummary,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(tag = "action", rename_all = "snake_case")]
pub enum AttachRestorePlan {
Attach {
pub enum RestorePlan {
AlreadyLive {
pod_name: String,
socket_path: PathBuf,
#[serde(default, skip_serializing_if = "Option::is_none")]
@ -466,8 +502,8 @@ pub enum AttachRestorePlan {
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(tag = "action", rename_all = "snake_case")]
pub enum AttachRestoreResult {
Attached {
pub enum RestoreResult {
AlreadyLive {
pod_name: String,
socket_path: PathBuf,
#[serde(default, skip_serializing_if = "Option::is_none")]
@ -514,6 +550,7 @@ pub enum PodDiscoveryError {
struct VisibilitySet {
visible: BTreeMap<String, VisibilityReason>,
child_sockets: BTreeMap<String, PathBuf>,
comm_registry: BTreeMap<String, CommRegistryInfo>,
}
impl VisibilitySet {
@ -527,6 +564,37 @@ impl VisibilitySet {
fn child_socket_for(&self, pod_name: &str) -> Option<PathBuf> {
self.child_sockets.get(pod_name).cloned()
}
fn comm_info_for(&self, pod_name: &str) -> CommRegistryInfo {
self.comm_registry
.get(pod_name)
.cloned()
.unwrap_or_else(CommRegistryInfo::missing)
}
}
fn comm_info_from_spawned_child(child: &pod_store::PodSpawnedChild) -> CommRegistryInfo {
let scope_delegated = child
.scope_delegated
.iter()
.filter_map(|rule| {
let permission = match rule.permission.as_str() {
"read" => Permission::Read,
"write" => Permission::Write,
_ => return None,
};
Some(ScopeRule {
target: rule.target.clone(),
permission,
recursive: rule.recursive,
})
})
.collect();
CommRegistryInfo {
registered: true,
socket_path: Some(child.socket_path.clone()),
scope_delegated,
}
}
async fn summarize_spawned_children(
@ -604,16 +672,16 @@ fn resolve_pod_command() -> PathBuf {
#[derive(Debug, Deserialize, JsonSchema)]
struct PodNameInput {
/// Pod name to inspect, attach, or restore.
/// Pod name to restore.
name: String,
}
struct ListVisiblePodsTool<St> {
struct ListPodsTool<St> {
discovery: PodDiscovery<St>,
}
#[async_trait]
impl<St> Tool for ListVisiblePodsTool<St>
impl<St> Tool for ListPodsTool<St>
where
St: PodMetadataStore + Clone + Send + Sync + 'static,
{
@ -631,53 +699,28 @@ where
}
}
struct InspectPodTool<St> {
struct RestorePodTool<St> {
discovery: PodDiscovery<St>,
}
#[async_trait]
impl<St> Tool for InspectPodTool<St>
impl<St> Tool for RestorePodTool<St>
where
St: PodMetadataStore + Clone + Send + Sync + 'static,
{
async fn execute(&self, input_json: &str) -> Result<ToolOutput, ToolError> {
let input: PodNameInput = serde_json::from_str(input_json)
.map_err(|e| ToolError::InvalidArgument(format!("invalid InspectPod input: {e}")))?;
let detail = self
.discovery
.inspect(&input.name)
.await
.map_err(discovery_error_to_tool_error)?;
Ok(ToolOutput {
summary: format!("pod `{}` inspected", detail.pod_name),
content: Some(json_content(&detail)?),
})
}
}
struct AttachOrRestorePodTool<St> {
discovery: PodDiscovery<St>,
}
#[async_trait]
impl<St> Tool for AttachOrRestorePodTool<St>
where
St: PodMetadataStore + Clone + Send + Sync + 'static,
{
async fn execute(&self, input_json: &str) -> Result<ToolOutput, ToolError> {
let input: PodNameInput = serde_json::from_str(input_json).map_err(|e| {
ToolError::InvalidArgument(format!("invalid AttachOrRestorePod input: {e}"))
})?;
.map_err(|e| ToolError::InvalidArgument(format!("invalid RestorePod input: {e}")))?;
let result = self
.discovery
.attach_or_restore(&input.name)
.restore(&input.name)
.await
.map_err(discovery_error_to_tool_error)?;
let summary = match &result {
AttachRestoreResult::Attached { pod_name, .. } => {
format!("pod `{pod_name}` is live; attached to existing socket")
RestoreResult::AlreadyLive { pod_name, .. } => {
format!("pod `{pod_name}` is already live")
}
AttachRestoreResult::Restored { pod_name, .. } => {
RestoreResult::Restored { pod_name, .. } => {
format!("pod `{pod_name}` restored from pod state")
}
};
@ -688,55 +731,38 @@ where
}
}
pub fn list_visible_pods_tool<St>(discovery: PodDiscovery<St>) -> ToolDefinition
pub fn list_pods_tool<St>(discovery: PodDiscovery<St>) -> ToolDefinition
where
St: PodMetadataStore + Clone + Send + Sync + 'static,
{
Arc::new(move || {
let meta = ToolMeta::new("ListVisiblePods")
let meta = ToolMeta::new("ListPods")
.description(
"List Pod state entries visible to this Pod. This is state-backed and does not expose the host-wide Pod universe.",
"List Pods visible to this Pod from durable Pod state and the spawned-child registry. This does not expose the host-wide Pod universe.",
)
.input_schema(serde_json::json!({
"type": "object",
"properties": {},
"additionalProperties": false,
}));
let tool: Arc<dyn Tool> = Arc::new(ListVisiblePodsTool {
let tool: Arc<dyn Tool> = Arc::new(ListPodsTool {
discovery: discovery.clone(),
});
(meta, tool)
})
}
pub fn inspect_pod_tool<St>(discovery: PodDiscovery<St>) -> ToolDefinition
pub fn restore_pod_tool<St>(discovery: PodDiscovery<St>) -> ToolDefinition
where
St: PodMetadataStore + Clone + Send + Sync + 'static,
{
Arc::new(move || {
let meta = ToolMeta::new("InspectPod")
let meta = ToolMeta::new("RestorePod")
.description(
"Inspect one visible Pod by name from durable Pod state, distinguishing missing state from not-visible Pods.",
"Restore a visible stopped/restorable Pod, or report that a visible Pod is already live. Missing state is an error.",
)
.input_schema(serde_json::to_value(schemars::schema_for!(PodNameInput)).unwrap());
let tool: Arc<dyn Tool> = Arc::new(InspectPodTool {
discovery: discovery.clone(),
});
(meta, tool)
})
}
pub fn attach_or_restore_pod_tool<St>(discovery: PodDiscovery<St>) -> ToolDefinition
where
St: PodMetadataStore + Clone + Send + Sync + 'static,
{
Arc::new(move || {
let meta = ToolMeta::new("AttachOrRestorePod")
.description(
"Attach to a visible live Pod, or restore it from Pod state when no live socket is reachable. Missing state is an error.",
)
.input_schema(serde_json::to_value(schemars::schema_for!(PodNameInput)).unwrap());
let tool: Arc<dyn Tool> = Arc::new(AttachOrRestorePodTool {
let tool: Arc<dyn Tool> = Arc::new(RestorePodTool {
discovery: discovery.clone(),
});
(meta, tool)
@ -781,7 +807,7 @@ mod tests {
static ENV_LOCK: Mutex<()> = Mutex::new(());
#[tokio::test(flavor = "current_thread")]
async fn state_backed_visibility_and_attach_restore_planning() {
async fn state_backed_visibility_and_restore_planning() {
let _env = ENV_LOCK.lock().unwrap();
let root = TempDir::new().unwrap();
let store_dir = root.path().join("store");
@ -873,6 +899,13 @@ mod tests {
registry,
);
let list_tool_def = list_pods_tool(discovery.clone());
let (list_meta, _) = list_tool_def();
assert_eq!(list_meta.name, "ListPods");
let restore_tool_def = restore_pod_tool(discovery.clone());
let (restore_meta, _) = restore_tool_def();
assert_eq!(restore_meta.name, "RestorePod");
let list = discovery.list_visible().await.unwrap();
let names: Vec<_> = list.iter().map(|p| p.pod_name.as_str()).collect();
assert_eq!(
@ -912,25 +945,16 @@ mod tests {
missing_err,
PodDiscoveryError::StateMissing { .. }
));
let hidden_restore_err = discovery
.plan_attach_or_restore("hidden")
.await
.unwrap_err();
let hidden_restore_err = discovery.plan_restore("hidden").await.unwrap_err();
assert!(matches!(
hidden_restore_err,
PodDiscoveryError::NotVisible { .. }
));
let attach_plan = discovery
.plan_attach_or_restore("child-live")
.await
.unwrap();
assert!(matches!(attach_plan, AttachRestorePlan::Attach { .. }));
let restore_plan = discovery
.plan_attach_or_restore("child-stale")
.await
.unwrap();
assert!(matches!(restore_plan, AttachRestorePlan::Restore { .. }));
let live_plan = discovery.plan_restore("child-live").await.unwrap();
assert!(matches!(live_plan, RestorePlan::AlreadyLive { .. }));
let restore_plan = discovery.plan_restore("child-stale").await.unwrap();
assert!(matches!(restore_plan, RestorePlan::Restore { .. }));
let lock_socket = runtime_base.join("lock-owner.sock");
let _guard = pod_registry::install_top_level(
@ -945,10 +969,7 @@ mod tests {
active_child_segment,
)
.unwrap();
let locked_err = discovery
.plan_attach_or_restore("child-stale")
.await
.unwrap_err();
let locked_err = discovery.plan_restore("child-stale").await.unwrap_err();
assert!(matches!(locked_err, PodDiscoveryError::LockConflict { .. }));
live_listener.abort();

View File

@ -1,7 +1,7 @@
//! Pod-to-Pod communication tools.
//!
//! Four tools in one module — `SendToPod`, `ReadPodOutput`, `StopPod`,
//! `ListPods` — all built on the same `SpawnedPodRegistry` handed in by
//! Three tools in one module: `SendToPod`, `ReadPodOutput`, `StopPod`,
//! all built on the same `SpawnedPodRegistry` handed in by
//! the controller. Each operation is request-response: connect to the
//! target's Unix socket, perform one method exchange, disconnect.
//!
@ -23,7 +23,6 @@ use session_store::LogEntry;
use tokio::net::UnixStream;
use crate::runtime::dir::SpawnedPodRecord;
use crate::runtime::pod_registry::{self, LockFileGuard};
use crate::spawn::registry::SpawnedPodRegistry;
/// Timeout applied to each socket-level operation — connect, write,
@ -244,76 +243,6 @@ pub fn stop_pod_tool(registry: Arc<SpawnedPodRegistry>) -> ToolDefinition {
})
}
// ---------------------------------------------------------------------------
// ListPods
// ---------------------------------------------------------------------------
const LIST_PODS_DESCRIPTION: &str = "List all Pods spawned by this Pod along with their reachability \
status (`alive` / `stopped`) and the scope each was granted.";
#[derive(Debug, Deserialize, schemars::JsonSchema)]
struct EmptyInput {}
struct ListPodsTool {
registry: Arc<SpawnedPodRegistry>,
}
#[async_trait]
impl Tool for ListPodsTool {
async fn execute(&self, _input_json: &str) -> Result<ToolOutput, ToolError> {
let records = self.registry.list().await;
if records.is_empty() {
return Ok(ToolOutput {
summary: "no spawned pods".into(),
content: None,
});
}
let mut lines: Vec<String> = Vec::with_capacity(records.len());
let mut stale_names: Vec<String> = Vec::new();
for record in &records {
let alive = is_reachable(&record.socket_path).await;
let status = if alive { "alive" } else { "stopped" };
let scope = summarize_scope(record);
lines.push(format!("{} [{status}] scope={scope}", record.pod_name));
if !alive {
stale_names.push(record.pod_name.clone());
}
}
// Trigger stale reclaim on unreachable pods so the lock file's
// allocation table doesn't keep growing indefinitely when
// children crash without a clean exit path.
if !stale_names.is_empty() {
if let Ok(lock_path) = pod_registry::default_registry_path()
&& let Ok(mut guard) = LockFileGuard::open(&lock_path)
{
pod_registry::reclaim_stale(&mut guard);
}
}
let summary = format!("{} pod(s) known", records.len());
Ok(ToolOutput {
summary,
content: Some(lines.join("\n")),
})
}
}
pub fn list_pods_tool(registry: Arc<SpawnedPodRegistry>) -> ToolDefinition {
Arc::new(move || {
let schema = schemars::schema_for!(EmptyInput);
let schema_value = serde_json::to_value(schema).unwrap_or(serde_json::json!({}));
let meta = ToolMeta::new("ListPods")
.description(LIST_PODS_DESCRIPTION)
.input_schema(schema_value);
let tool: Arc<dyn Tool> = Arc::new(ListPodsTool {
registry: registry.clone(),
});
(meta, tool)
})
}
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
@ -322,6 +251,29 @@ fn unknown_pod_err(name: &str) -> ToolError {
ToolError::InvalidArgument(format!("no spawned pod named `{name}`"))
}
fn summarize_scope(record: &SpawnedPodRecord) -> String {
if record.scope_delegated.is_empty() {
return "(none)".into();
}
let parts: Vec<String> = record
.scope_delegated
.iter()
.map(|rule| {
let perm = match rule.permission {
manifest::Permission::Read => "read",
manifest::Permission::Write => "write",
};
let recursive = if rule.recursive {
""
} else {
" [non-recursive]"
};
format!("{perm}:{}{recursive}", rule.target.display())
})
.collect();
parts.join(", ")
}
/// Connect with a timeout, drain the server's connect-time snapshot,
/// write one `Method` line, flush, and close.
///
@ -487,14 +439,6 @@ async fn fetch_history(socket: &Path) -> std::io::Result<Vec<serde_json::Value>>
}
}
/// Probe-connect test. Connection accepted within timeout → alive.
async fn is_reachable(socket: &Path) -> bool {
tokio::time::timeout(SOCKET_OP_TIMEOUT, UnixStream::connect(socket))
.await
.map(|r| r.is_ok())
.unwrap_or(false)
}
fn extract_assistant_text(entries: &[serde_json::Value]) -> String {
let mut out = String::new();
for value in entries {
@ -536,25 +480,6 @@ fn push_assistant_text(out: &mut String, logged: session_store::LoggedItem) {
}
}
fn summarize_scope(record: &SpawnedPodRecord) -> String {
if record.scope_delegated.is_empty() {
return "(none)".into();
}
let parts: Vec<String> = record
.scope_delegated
.iter()
.map(|r| {
let perm = match r.permission {
manifest::Permission::Read => "read",
manifest::Permission::Write => "write",
};
let tag = if r.recursive { "" } else { " [non-recursive]" };
format!("{perm}:{}{tag}", r.target.display())
})
.collect();
parts.join(", ")
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -1,9 +1,10 @@
//! Shared registry of Pods spawned by this Pod.
//!
//! `SpawnPod` writes here; the pod-comm tools (`SendToPod`,
//! `ReadPodOutput`, `StopPod`, `ListPods`) read and mutate the same
//! instance. Runtime write-through still materialises `spawned_pods.json`,
//! but durable state lives in the spawner's Pod metadata.
//! `ReadPodOutput`, `StopPod`) read and mutate the same instance. Discovery
//! tools consult this registry together with durable Pod state. Runtime
//! write-through still materialises `spawned_pods.json`, but durable state lives
//! in the spawner's Pod metadata.
//!
//! `ReadPodOutput` additionally owns a per-spawned-pod cursor here so
//! two consecutive reads yield only new assistant text. The cursor is

View File

@ -220,8 +220,8 @@ pub struct SpawnPodTool {
/// override it. Defaults to the spawner's pwd — see module docs.
spawner_pwd: PathBuf,
/// Shared registry of spawned children, also used by the
/// pod-comm tools (`SendToPod` / `ReadPodOutput` / `StopPod` /
/// `ListPods`). Writes the list to runtime and durable Pod state on
/// pod-comm tools (`SendToPod` / `ReadPodOutput` / `StopPod`) and by
/// Pod discovery. Writes the list to runtime and durable Pod state on
/// each add.
registry: Arc<SpawnedPodRegistry>,
/// THIS Pod's own parent-callback socket, if any. After a

View File

@ -1,5 +1,5 @@
//! Integration tests for the pod-comm tools (`SendToPod`,
//! `ReadPodOutput`, `StopPod`, `ListPods`).
//! `ReadPodOutput`, `StopPod`).
//!
//! The real child Pod binary is not started. Instead each test stands
//! up a mock `UnixListener` that speaks the socket protocol directly:
@ -16,9 +16,7 @@ use llm_worker::tool::ToolOutput;
use manifest::{Permission, Scope, ScopeRule, SharedScope};
use pod::runtime::dir::{RuntimeDir, SpawnedPodRecord};
use pod::runtime::pod_registry::{self, LockFileGuard};
use pod::spawn::comm_tools::{
list_pods_tool, read_pod_output_tool, send_to_pod_tool, stop_pod_tool,
};
use pod::spawn::comm_tools::{read_pod_output_tool, send_to_pod_tool, stop_pod_tool};
use pod::spawn::registry::SpawnedPodRegistry;
use pod_store::{CombinedStore, FsPodStore, PodMetadataStore};
use protocol::stream::{JsonLineReader, JsonLineWriter};
@ -544,13 +542,6 @@ async fn restored_registry_uses_pod_state_without_runtime_file() {
.await
.unwrap();
let def = list_pods_tool(restored.clone());
let (_meta, tool) = def();
let output: ToolOutput = tool.execute("{}").await.unwrap();
assert!(output.summary.contains("1 pod"), "{}", output.summary);
let body = output.content.expect("restored ListPods should list child");
assert!(body.contains("child [alive]"), "body: {body}");
let def = send_to_pod_tool(restored.clone());
let (_meta, tool) = def();
let input = json!({ "name": "child", "message": "after restart" }).to_string();
@ -718,51 +709,3 @@ async fn load_from_pod_state_reclaims_missing_child_scope_and_records_history()
let runtime_records: Vec<SpawnedPodRecord> = serde_json::from_str(&runtime_contents).unwrap();
assert!(runtime_records.is_empty());
}
// ---------------------------------------------------------------------------
// ListPods
// ---------------------------------------------------------------------------
#[tokio::test]
async fn list_pods_reports_alive_and_stopped() {
let (tmp, registry, _rd) = setup_registry().await;
// One child is reachable…
let (live_socket, listener) = bind_mock_socket(tmp.path(), "alive").await;
// Keep the listener alive by moving it into a task that never exits.
let _accept = tokio::spawn(async move {
loop {
let Ok((stream, _)) = listener.accept().await else {
return;
};
drop(stream);
}
});
register_child(&registry, "alive", &live_socket, tmp.path()).await;
// …the other is not.
let dead_socket = tmp.path().join("dead.sock");
register_child(&registry, "dead", &dead_socket, tmp.path()).await;
let def = list_pods_tool(registry);
let (_meta, tool) = def();
let output: ToolOutput = tool.execute("{}").await.unwrap();
assert!(output.summary.contains("2 pod"), "{}", output.summary);
let body = output.content.expect("list_pods should populate content");
assert!(body.contains("alive [alive]"), "body: {body}");
assert!(body.contains("dead [stopped]"), "body: {body}");
}
#[tokio::test]
async fn list_pods_empty_when_nothing_registered() {
let (_tmp, registry, _rd) = setup_registry().await;
let def = list_pods_tool(registry);
let (_meta, tool) = def();
let output: ToolOutput = tool.execute("{}").await.unwrap();
assert!(
output.summary.contains("no spawned pods"),
"{}",
output.summary
);
assert!(output.content.is_none());
}

View File

@ -56,16 +56,12 @@ pub enum Method {
kind: CompletionKind,
prefix: String,
},
/// List Pods visible to this Pod from durable Pod state. This is not a
/// host-wide Pod universe query.
ListVisiblePods,
/// Inspect one Pod by name if its state exists and it is visible to this Pod.
InspectPod {
name: String,
},
/// Attach to a visible live Pod, or restore it from durable Pod state when
/// it is not live. Missing state and not-visible state are distinct errors.
AttachOrRestorePod {
/// List Pods visible to this Pod from durable Pod state and the spawned-child
/// registry. This is not a host-wide Pod universe query.
ListPods,
/// Restore a visible stopped/restorable Pod, or report that it is already
/// live. Missing state and not-visible state are distinct errors.
RestorePod {
name: String,
},
}
@ -474,18 +470,14 @@ pub enum Event {
input: Vec<Segment>,
summary: RewindSummary,
},
/// Reply to `Method::ListVisiblePods`. Payload is a stable JSON value so
/// the Pod crate can evolve discovery fields without introducing a protocol
/// Reply to `Method::ListPods`. Payload is a stable JSON value so the Pod
/// crate can evolve discovery fields without introducing a protocol
/// dependency on session-store.
VisiblePods {
PodsListed {
pods: serde_json::Value,
},
/// Reply to `Method::InspectPod`.
PodInspection {
pod: serde_json::Value,
},
/// Reply to `Method::AttachOrRestorePod`.
PodAttachRestore {
/// Reply to `Method::RestorePod`.
PodRestored {
result: serde_json::Value,
},
Alert(Alert),
@ -1469,11 +1461,8 @@ mod tests {
#[test]
fn pod_discovery_methods_roundtrip() {
let methods = [
Method::ListVisiblePods,
Method::InspectPod {
name: "child".into(),
},
Method::AttachOrRestorePod {
Method::ListPods,
Method::RestorePod {
name: "child".into(),
},
];
@ -1481,9 +1470,8 @@ mod tests {
let json = serde_json::to_string(&method).unwrap();
let decoded: Method = serde_json::from_str(&json).unwrap();
match (decoded, method) {
(Method::ListVisiblePods, Method::ListVisiblePods)
| (Method::InspectPod { .. }, Method::InspectPod { .. })
| (Method::AttachOrRestorePod { .. }, Method::AttachOrRestorePod { .. }) => {}
(Method::ListPods, Method::ListPods)
| (Method::RestorePod { .. }, Method::RestorePod { .. }) => {}
(decoded, expected) => panic!("decoded {decoded:?}, expected {expected:?}"),
}
}
@ -1492,30 +1480,23 @@ mod tests {
#[test]
fn pod_discovery_events_roundtrip() {
let events = [
Event::VisiblePods {
Event::PodsListed {
pods: serde_json::json!([{ "pod_name": "child" }]),
},
Event::PodInspection {
pod: serde_json::json!({ "pod_name": "child" }),
},
Event::PodAttachRestore {
result: serde_json::json!({ "action": "attach" }),
Event::PodRestored {
result: serde_json::json!({ "action": "already_live" }),
},
];
for event in events {
let json = serde_json::to_string(&event).unwrap();
let decoded: Event = serde_json::from_str(&json).unwrap();
match (decoded, event) {
(Event::VisiblePods { pods }, Event::VisiblePods { pods: expected }) => {
(Event::PodsListed { pods }, Event::PodsListed { pods: expected }) => {
assert_eq!(pods, expected)
}
(Event::PodInspection { pod }, Event::PodInspection { pod: expected }) => {
assert_eq!(pod, expected)
(Event::PodRestored { result }, Event::PodRestored { result: expected }) => {
assert_eq!(result, expected)
}
(
Event::PodAttachRestore { result },
Event::PodAttachRestore { result: expected },
) => assert_eq!(result, expected),
(decoded, expected) => panic!("decoded {decoded:?}, expected {expected:?}"),
}
}

View File

@ -1204,9 +1204,7 @@ impl App {
message,
});
}
Event::VisiblePods { .. }
| Event::PodInspection { .. }
| Event::PodAttachRestore { .. } => {}
Event::PodsListed { .. } | Event::PodRestored { .. } => {}
Event::Shutdown => {
self.mark_orphan_compacts_incomplete();
self.quit = true;

View File

@ -262,7 +262,7 @@ fn draw(f: &mut Frame<'_>, list: &PodList) {
Span::styled("[↑/↓]", Style::default().fg(Color::DarkGray)),
Span::raw(" select "),
Span::styled("[enter]", Style::default().fg(Color::Green)),
Span::raw(" attach/restore "),
Span::raw(" open/restore "),
Span::styled("[esc]", Style::default().fg(Color::Yellow)),
Span::raw(" cancel"),
])),

View File

@ -64,11 +64,11 @@ Pod の制御・監視に使う JSONL ベースのメッセージプロトコル
- context/session 制御: `Compact`, `ListRewindTargets`, `RewindTo`
- typed injection / child lifecycle: `Notify`, `PodEvent`
- client 補助: `ListCompletions`
- Pod visibility / attach: `ListVisiblePods`, `InspectPod`, `AttachOrRestorePod`
- Pod visibility / restore: `ListPods`, `RestorePod`
- **Pod → Client (`Event`)**
- accepted input / history seed: `Snapshot`, `UserMessage`, `SystemItem`, `SegmentRotated`
- generation stream: `TurnStart`, `TurnEnd`, `LlmCallStart`, `LlmCallEnd`, retry/continuation events, `Text*`, `Thinking*`, `ToolCall*`, `ToolResult`, `Usage`, `RunEnd`
- control replies: completions, rewind, visible Pod / inspect / attach results
- control replies: completions, rewind, visible Pod list / restore results
- operational status: `Status`, `Alert`, `MemoryWorker`, `Compact*`, `Error`, `Shutdown`
- リクエストとレスポンスの紐付けを一般化した RPC にはしない。多くの状態は broadcast event と Pod status で観測する
- 一部の reply例: completionsは要求 socket にだけ返る。broadcast event と request-local reply の違いは enum variant のコメントを正とする
@ -146,8 +146,7 @@ Pod が操作できるファイルパスの制御。
| File / shell | `Read`, `Write`, `Edit`, `Glob`, `Grep`, `Bash` | workspace ファイル操作と shell 実行。file tools は `ScopedFs` と read-before-edit tracker を通る。`Bash` は permission policy と出力退避で制御する |
| Task | `TaskCreate`, `TaskUpdate`, `TaskList`, `TaskGet` | セッション内の短期 task 状態管理 |
| Memory / Knowledge | `MemoryQuery`, `MemoryRead`, `MemoryWrite`, `MemoryEdit`, `MemoryDelete`, `KnowledgeQuery` | manifest の memory 設定が有効な時に登録される durable memory / knowledge 操作 |
| Pod orchestration | `SpawnPod`, `SendToPod`, `ReadPodOutput`, `StopPod`, `ListPods` | child Pod の起動・通信・停止・一覧 |
| Visible Pod state | `ListVisiblePods`, `InspectPod`, `AttachOrRestorePod` | durable Pod state と visibility に基づく Pod inspection / attach / restore |
| Pod orchestration | `SpawnPod`, `SendToPod`, `ReadPodOutput`, `StopPod`, `ListPods`, `RestorePod` | child / visible Pod の起動・通信・停止・一覧・復元 |
| Web | `WebSearch`, `WebFetch` | manifest/env で明示設定された provider 経由の bounded web access |
すべての tool call は manifest tool permission と scope/policy のチェックを通る。ファイル write scope、Pod delegation、memory layout、web provider 設定はそれぞれ別の authority を持ち、UI 表示だけで権限を広げない。