feat: add peer pod handshake command

This commit is contained in:
Keisuke Hirata 2026-06-02 19:43:20 +09:00
parent b51c32401c
commit 093a84fc83
No known key found for this signature in database
12 changed files with 663 additions and 14 deletions

View File

@ -84,6 +84,16 @@ pub struct PodReclaimedChild {
pub scope_delegated: Vec<PodSpawnedScopeRule>, pub scope_delegated: Vec<PodSpawnedScopeRule>,
} }
/// One peer Pod made visible by an explicit peer handshake.
///
/// Peer visibility is intentionally separate from spawned-child delegation: it
/// does not carry filesystem scope, callback ownership, output cursors, or
/// lifecycle-notification authority.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct PodPeer {
pub pod_name: String,
}
/// Persistent metadata for a Pod name. /// Persistent metadata for a Pod name.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct PodMetadata { pub struct PodMetadata {
@ -94,6 +104,8 @@ pub struct PodMetadata {
pub spawned_children: Vec<PodSpawnedChild>, pub spawned_children: Vec<PodSpawnedChild>,
#[serde(default, skip_serializing_if = "Vec::is_empty")] #[serde(default, skip_serializing_if = "Vec::is_empty")]
pub reclaimed_children: Vec<PodReclaimedChild>, pub reclaimed_children: Vec<PodReclaimedChild>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub peers: Vec<PodPeer>,
#[serde(default, skip_serializing_if = "Option::is_none")] #[serde(default, skip_serializing_if = "Option::is_none")]
pub resolved_manifest_snapshot: Option<serde_json::Value>, pub resolved_manifest_snapshot: Option<serde_json::Value>,
} }
@ -106,6 +118,7 @@ impl PodMetadata {
active, active,
spawned_children: Vec::new(), spawned_children: Vec::new(),
reclaimed_children: Vec::new(), reclaimed_children: Vec::new(),
peers: Vec::new(),
resolved_manifest_snapshot: None, resolved_manifest_snapshot: None,
} }
} }
@ -168,6 +181,33 @@ pub trait PodMetadataStore: Send + Sync {
}) })
} }
/// Set peer visibility state while preserving active pointer, child state,
/// and manifest snapshot.
fn set_peers(&self, pod_name: &str, peers: Vec<PodPeer>) -> Result<PodMetadata, PodStoreError> {
self.update_by_name(pod_name, |metadata| {
metadata.peers = peers;
})
}
/// Add one peer if absent while preserving every other metadata field.
fn add_peer(&self, pod_name: &str, peer_name: &str) -> Result<PodMetadata, PodStoreError> {
self.update_by_name(pod_name, |metadata| {
if !metadata.peers.iter().any(|peer| peer.pod_name == peer_name) {
metadata.peers.push(PodPeer {
pod_name: peer_name.to_string(),
});
metadata.peers.sort_by(|a, b| a.pod_name.cmp(&b.pod_name));
}
})
}
/// Remove one peer while preserving every other metadata field.
fn remove_peer(&self, pod_name: &str, peer_name: &str) -> Result<PodMetadata, PodStoreError> {
self.update_by_name(pod_name, |metadata| {
metadata.peers.retain(|peer| peer.pod_name != peer_name);
})
}
/// Remove reclaimed child delegations from the outstanding set and record /// Remove reclaimed child delegations from the outstanding set and record
/// them in durable reclaim history. /// them in durable reclaim history.
fn reclaim_spawned_children( fn reclaim_spawned_children(
@ -503,6 +543,52 @@ mod tests {
assert_eq!(restored.resolved_manifest_snapshot, Some(snapshot)); assert_eq!(restored.resolved_manifest_snapshot, Some(snapshot));
} }
#[test]
fn peer_updates_preserve_active_children_and_manifest_snapshot() {
let tmp = tempfile::TempDir::new().unwrap();
let store = FsPodStore::new(tmp.path()).unwrap();
let active = PodActiveSegmentRef::active_segment(
session_store::new_session_id(),
session_store::new_segment_id(),
);
let snapshot = serde_json::json!({"pod":{"name":"agent"}});
store
.set_active("agent", Some(active.clone()), Some(snapshot.clone()))
.unwrap();
store
.set_spawned_children(
"agent",
vec![PodSpawnedChild {
pod_name: "child".into(),
socket_path: std::path::Path::new("/tmp/child.sock").into(),
scope_delegated: vec![],
callback_address: std::path::Path::new("/tmp/parent.sock").into(),
}],
)
.unwrap();
store.add_peer("agent", "peer-b").unwrap();
store.add_peer("agent", "peer-a").unwrap();
store.add_peer("agent", "peer-a").unwrap();
let restored = store.read_by_name("agent").unwrap().unwrap();
assert_eq!(restored.active, Some(active));
assert_eq!(restored.spawned_children.len(), 1);
assert_eq!(restored.resolved_manifest_snapshot, Some(snapshot));
assert_eq!(
restored
.peers
.iter()
.map(|peer| peer.pod_name.as_str())
.collect::<Vec<_>>(),
vec!["peer-a", "peer-b"]
);
store.remove_peer("agent", "peer-a").unwrap();
let restored = store.read_by_name("agent").unwrap().unwrap();
assert_eq!(restored.peers.len(), 1);
assert_eq!(restored.peers[0].pod_name, "peer-b");
}
#[test] #[test]
fn reclaim_children_removes_outstanding_and_records_history() { fn reclaim_children_removes_outstanding_and_records_history() {
let tmp = tempfile::TempDir::new().unwrap(); let tmp = tempfile::TempDir::new().unwrap();

View File

@ -8,7 +8,7 @@ use pod_store::PodMetadataStore;
use session_store::Store; use session_store::Store;
use tokio::sync::{broadcast, mpsc, oneshot}; use tokio::sync::{broadcast, mpsc, oneshot};
use crate::discovery::{PodDiscovery, list_pods_tool, restore_pod_tool}; use crate::discovery::{PodDiscovery, list_pods_tool, restore_pod_tool, send_to_peer_pod_tool};
use crate::ipc::alerter::Alerter; use crate::ipc::alerter::Alerter;
use crate::ipc::notify_buffer::NotifyBuffer; use crate::ipc::notify_buffer::NotifyBuffer;
use crate::ipc::server::SocketServer; use crate::ipc::server::SocketServer;
@ -561,7 +561,8 @@ where
worker.register_tool(stop_pod_tool(spawned_registry.clone())); worker.register_tool(stop_pod_tool(spawned_registry.clone()));
let discovery = PodDiscovery::new(pod_store, spawner_name, runtime_base, pwd, spawned_registry); let discovery = PodDiscovery::new(pod_store, spawner_name, runtime_base, pwd, spawned_registry);
worker.register_tool(list_pods_tool(discovery.clone())); worker.register_tool(list_pods_tool(discovery.clone()));
worker.register_tool(restore_pod_tool(discovery)); worker.register_tool(restore_pod_tool(discovery.clone()));
worker.register_tool(send_to_peer_pod_tool(discovery));
pod.attach_tracker(tracker); pod.attach_tracker(tracker);
fs_for_view fs_for_view
} }
@ -862,6 +863,26 @@ async fn controller_loop<C, St>(
} }
}, },
Method::RegisterPeer { name } => match discovery.register_peer(&name) {
Ok(result) => match serde_json::to_value(result) {
Ok(result) => {
let _ = event_tx.send(Event::PeerRegistered { result });
}
Err(error) => {
let _ = event_tx.send(Event::Error {
code: ErrorCode::Internal,
message: format!("serialize peer registration result: {error}"),
});
}
},
Err(error) => {
let _ = event_tx.send(Event::Error {
code: ErrorCode::InvalidRequest,
message: error.to_string(),
});
}
},
// ListCompletions is handled at the socket layer (direct // ListCompletions is handled at the socket layer (direct
// response). If it reaches the controller, ignore it. // response). If it reaches the controller, ignore it.
Method::ListCompletions { .. } => {} Method::ListCompletions { .. } => {}
@ -1063,10 +1084,10 @@ where
notify_buffer.push_notify(message); notify_buffer.push_notify(message);
} }
Some(Method::ListCompletions { .. }) => {} Some(Method::ListCompletions { .. }) => {}
Some(Method::ListPods | Method::RestorePod { .. }) => { Some(Method::ListPods | Method::RestorePod { .. } | Method::RegisterPeer { .. }) => {
let _ = event_tx.send(Event::Error { let _ = event_tx.send(Event::Error {
code: ErrorCode::AlreadyRunning, code: ErrorCode::AlreadyRunning,
message: "Pod discovery requests are only handled while the Pod is idle or paused" message: "Pod discovery/control requests are only handled while the Pod is idle or paused"
.into(), .into(),
}); });
} }

View File

@ -17,9 +17,9 @@ use async_trait::async_trait;
use client::PodRuntimeCommand; use client::PodRuntimeCommand;
use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput}; use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput};
use manifest::{Permission, ScopeRule}; use manifest::{Permission, ScopeRule};
use pod_store::{PodActiveSegmentRef, PodMetadata, PodMetadataStore}; use pod_store::{PodActiveSegmentRef, PodMetadata, PodMetadataStore, validate_pod_name};
use protocol::stream::JsonLineReader; use protocol::stream::{JsonLineReader, JsonLineWriter};
use protocol::{Event, PodStatus}; use protocol::{Event, Method, PodStatus};
use schemars::JsonSchema; use schemars::JsonSchema;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use session_store::{SegmentId, SessionId}; use session_store::{SegmentId, SessionId};
@ -172,6 +172,41 @@ where
}) })
} }
pub fn register_peer(
&self,
peer_name: &str,
) -> Result<PeerRegistrationResult, PodDiscoveryError> {
validate_pod_name(peer_name)?;
if peer_name == self.self_pod_name {
return Err(PodDiscoveryError::SelfPeer {
pod_name: peer_name.to_string(),
});
}
let self_exists = self.store.read_by_name(&self.self_pod_name)?.is_some();
if !self_exists {
return Err(PodDiscoveryError::StateMissing {
pod_name: self.self_pod_name.clone(),
});
}
let peer_exists = self.store.read_by_name(peer_name)?.is_some();
if !peer_exists {
return Err(PodDiscoveryError::MissingPod {
pod_name: peer_name.to_string(),
});
}
self.store.add_peer(&self.self_pod_name, peer_name)?;
if let Err(error) = self.store.add_peer(peer_name, &self.self_pod_name) {
let _ = self.store.remove_peer(&self.self_pod_name, peer_name);
return Err(PodDiscoveryError::PodStore(error));
}
Ok(PeerRegistrationResult {
source: self.self_pod_name.clone(),
peer: peer_name.to_string(),
})
}
async fn visibility(&self) -> Result<VisibilitySet, PodDiscoveryError> { async fn visibility(&self) -> Result<VisibilitySet, PodDiscoveryError> {
let mut visible = BTreeMap::new(); let mut visible = BTreeMap::new();
let mut child_sockets = BTreeMap::new(); let mut child_sockets = BTreeMap::new();
@ -187,6 +222,11 @@ where
child_sockets.insert(child.pod_name.clone(), child.socket_path.clone()); child_sockets.insert(child.pod_name.clone(), child.socket_path.clone());
comm_registry.insert(child.pod_name.clone(), comm_info_from_spawned_child(&child)); comm_registry.insert(child.pod_name.clone(), comm_info_from_spawned_child(&child));
} }
for peer in metadata.peers {
visible
.entry(peer.pod_name)
.or_insert(VisibilityReason::Peer);
}
} }
// The live in-memory registry covers just-spawned children even if a // The live in-memory registry covers just-spawned children even if a
@ -379,6 +419,7 @@ where
pub enum VisibilityReason { pub enum VisibilityReason {
SelfPod, SelfPod,
SpawnedChild, SpawnedChild,
Peer,
} }
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
@ -525,6 +566,12 @@ pub enum RestoreResult {
}, },
} }
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct PeerRegistrationResult {
pub source: String,
pub peer: String,
}
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub enum PodDiscoveryError { pub enum PodDiscoveryError {
#[error("pod state missing for `{pod_name}`")] #[error("pod state missing for `{pod_name}`")]
@ -533,6 +580,10 @@ pub enum PodDiscoveryError {
NotVisible { pod_name: String }, NotVisible { pod_name: String },
#[error("pod `{pod_name}` is not restorable: {reason}")] #[error("pod `{pod_name}` is not restorable: {reason}")]
NotRestorable { pod_name: String, reason: String }, NotRestorable { pod_name: String, reason: String },
#[error("pod `{pod_name}` cannot be registered as a peer of itself")]
SelfPeer { pod_name: String },
#[error("pod `{pod_name}` does not exist")]
MissingPod { pod_name: String },
#[error( #[error(
"pod `{pod_name}` segment {segment_id} is locked by `{owner_pod}` pid {pid} at {socket_path}" "pod `{pod_name}` segment {segment_id} is locked by `{owner_pod}` pid {pid} at {socket_path}"
)] )]
@ -683,6 +734,14 @@ struct PodNameInput {
name: String, name: String,
} }
#[derive(Debug, Deserialize, JsonSchema)]
struct SendToPeerPodInput {
/// Target peer Pod name.
name: String,
/// Text delivered to the peer as a peer notification.
message: String,
}
struct ListPodsTool<St> { struct ListPodsTool<St> {
discovery: PodDiscovery<St>, discovery: PodDiscovery<St>,
} }
@ -776,6 +835,83 @@ where
}) })
} }
const SEND_TO_PEER_POD_DESCRIPTION: &str = "Send a text message to a peer Pod made visible by an explicit peer handshake. The message is delivered as a peer notification through the target Pod's durable notification/history path. This does not grant delegated scope, create a spawned-child output cursor, imply parent ownership, or produce child completion notifications. Fails if the target is not a visible live peer.";
struct SendToPeerPodTool<St> {
discovery: PodDiscovery<St>,
}
#[async_trait]
impl<St> Tool for SendToPeerPodTool<St>
where
St: PodMetadataStore + Clone + Send + Sync + 'static,
{
async fn execute(&self, input_json: &str) -> Result<ToolOutput, ToolError> {
let input: SendToPeerPodInput = serde_json::from_str(input_json)
.map_err(|e| ToolError::InvalidArgument(format!("invalid SendToPeerPod input: {e}")))?;
let detail = self
.discovery
.inspect(&input.name)
.await
.map_err(discovery_error_to_tool_error)?;
if detail.visibility != VisibilityReason::Peer {
return Err(ToolError::InvalidArgument(format!(
"pod `{}` is visible as {:?}, not as a peer",
input.name, detail.visibility
)));
}
if !detail.live.reachable {
return Err(ToolError::ExecutionFailed(format!(
"peer pod `{}` is not live/reachable; restore it before sending",
input.name
)));
}
let message = format!(
"[Peer message from `{}`]\n{}",
self.discovery.self_pod_name, input.message
);
send_peer_notify(&detail.live.socket_path, message)
.await
.map_err(|error| {
ToolError::ExecutionFailed(format!("send to peer `{}`: {error}", input.name))
})?;
Ok(ToolOutput {
summary: format!("sent peer message to `{}`", input.name),
content: None,
})
}
}
pub fn send_to_peer_pod_tool<St>(discovery: PodDiscovery<St>) -> ToolDefinition
where
St: PodMetadataStore + Clone + Send + Sync + 'static,
{
Arc::new(move || {
let meta = ToolMeta::new("SendToPeerPod")
.description(SEND_TO_PEER_POD_DESCRIPTION)
.input_schema(serde_json::to_value(schemars::schema_for!(SendToPeerPodInput)).unwrap());
let tool: Arc<dyn Tool> = Arc::new(SendToPeerPodTool {
discovery: discovery.clone(),
});
(meta, tool)
})
}
async fn send_peer_notify(socket_path: &Path, message: String) -> io::Result<()> {
let stream = tokio::time::timeout(Duration::from_secs(5), UnixStream::connect(socket_path))
.await
.map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "connect timed out"))??;
let mut writer = JsonLineWriter::new(stream);
tokio::time::timeout(
Duration::from_secs(5),
writer.write(&Method::Notify { message }),
)
.await
.map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "write timed out"))?
}
fn json_content<T: Serialize>(value: &T) -> Result<String, ToolError> { fn json_content<T: Serialize>(value: &T) -> Result<String, ToolError> {
serde_json::to_string_pretty(value) serde_json::to_string_pretty(value)
.map_err(|e| ToolError::Internal(format!("serialize pod discovery output: {e}"))) .map_err(|e| ToolError::Internal(format!("serialize pod discovery output: {e}")))
@ -785,7 +921,9 @@ fn discovery_error_to_tool_error(error: PodDiscoveryError) -> ToolError {
match error { match error {
PodDiscoveryError::StateMissing { .. } PodDiscoveryError::StateMissing { .. }
| PodDiscoveryError::NotVisible { .. } | PodDiscoveryError::NotVisible { .. }
| PodDiscoveryError::NotRestorable { .. } => ToolError::InvalidArgument(error.to_string()), | PodDiscoveryError::NotRestorable { .. }
| PodDiscoveryError::SelfPeer { .. }
| PodDiscoveryError::MissingPod { .. } => ToolError::InvalidArgument(error.to_string()),
PodDiscoveryError::LockConflict { .. } PodDiscoveryError::LockConflict { .. }
| PodDiscoveryError::Store(_) | PodDiscoveryError::Store(_)
| PodDiscoveryError::PodStore(_) | PodDiscoveryError::PodStore(_)
@ -805,7 +943,7 @@ mod tests {
use manifest::{Permission, ScopeRule}; use manifest::{Permission, ScopeRule};
use pod_store::{FsPodStore, PodSpawnedChild, PodSpawnedScopeRule}; use pod_store::{FsPodStore, PodSpawnedChild, PodSpawnedScopeRule};
use protocol::stream::JsonLineWriter; use protocol::stream::JsonLineWriter;
use protocol::{Alert, AlertLevel, AlertSource, Greeting}; use protocol::{Alert, AlertLevel, AlertSource};
use session_store::{new_segment_id, new_session_id}; use session_store::{new_segment_id, new_session_id};
use tempfile::TempDir; use tempfile::TempDir;
use tokio::net::UnixListener; use tokio::net::UnixListener;
@ -844,6 +982,9 @@ mod tests {
child("child-pending", &pending_socket), child("child-pending", &pending_socket),
], ],
reclaimed_children: Vec::new(), reclaimed_children: Vec::new(),
peers: vec![pod_store::PodPeer {
pod_name: "peer".into(),
}],
resolved_manifest_snapshot: None, resolved_manifest_snapshot: None,
}; };
store.write(&parent).unwrap(); store.write(&parent).unwrap();
@ -856,6 +997,7 @@ mod tests {
)), )),
spawned_children: Vec::new(), spawned_children: Vec::new(),
reclaimed_children: Vec::new(), reclaimed_children: Vec::new(),
peers: Vec::new(),
resolved_manifest_snapshot: None, resolved_manifest_snapshot: None,
}) })
.unwrap(); .unwrap();
@ -868,6 +1010,7 @@ mod tests {
)), )),
spawned_children: Vec::new(), spawned_children: Vec::new(),
reclaimed_children: Vec::new(), reclaimed_children: Vec::new(),
peers: Vec::new(),
resolved_manifest_snapshot: None, resolved_manifest_snapshot: None,
}) })
.unwrap(); .unwrap();
@ -877,6 +1020,7 @@ mod tests {
active: Some(PodActiveSegmentRef::pending_segment(pending_session_id)), active: Some(PodActiveSegmentRef::pending_segment(pending_session_id)),
spawned_children: Vec::new(), spawned_children: Vec::new(),
reclaimed_children: Vec::new(), reclaimed_children: Vec::new(),
peers: Vec::new(),
resolved_manifest_snapshot: None, resolved_manifest_snapshot: None,
}) })
.unwrap(); .unwrap();
@ -889,6 +1033,19 @@ mod tests {
)), )),
spawned_children: Vec::new(), spawned_children: Vec::new(),
reclaimed_children: Vec::new(), reclaimed_children: Vec::new(),
peers: Vec::new(),
resolved_manifest_snapshot: None,
})
.unwrap();
store
.write(&PodMetadata {
pod_name: "peer".into(),
active: None,
spawned_children: Vec::new(),
reclaimed_children: Vec::new(),
peers: vec![pod_store::PodPeer {
pod_name: "parent".into(),
}],
resolved_manifest_snapshot: None, resolved_manifest_snapshot: None,
}) })
.unwrap(); .unwrap();
@ -913,14 +1070,37 @@ mod tests {
let restore_tool_def = restore_pod_tool(discovery.clone()); let restore_tool_def = restore_pod_tool(discovery.clone());
let (restore_meta, _) = restore_tool_def(); let (restore_meta, _) = restore_tool_def();
assert_eq!(restore_meta.name, "RestorePod"); assert_eq!(restore_meta.name, "RestorePod");
let send_peer_tool_def = send_to_peer_pod_tool(discovery.clone());
let (send_peer_meta, _) = send_peer_tool_def();
assert_eq!(send_peer_meta.name, "SendToPeerPod");
let list = discovery.list_visible().await.unwrap(); let list = discovery.list_visible().await.unwrap();
let names: Vec<_> = list.iter().map(|p| p.pod_name.as_str()).collect(); let names: Vec<_> = list.iter().map(|p| p.pod_name.as_str()).collect();
assert_eq!( assert_eq!(
names, names,
vec!["child-live", "child-pending", "child-stale", "parent"] vec![
"child-live",
"child-pending",
"child-stale",
"parent",
"peer"
]
); );
assert!(!names.contains(&"hidden")); assert!(!names.contains(&"hidden"));
assert_eq!(
list.iter()
.find(|p| p.pod_name == "peer")
.unwrap()
.visibility,
VisibilityReason::Peer
);
assert_eq!(
list.iter()
.find(|p| p.pod_name == "child-live")
.unwrap()
.visibility,
VisibilityReason::SpawnedChild
);
assert!( assert!(
list.iter() list.iter()
.find(|p| p.pod_name == "child-live") .find(|p| p.pod_name == "child-live")
@ -983,6 +1163,151 @@ mod tests {
live_listener.abort(); live_listener.abort();
} }
#[tokio::test(flavor = "current_thread")]
async fn register_peer_persists_reciprocal_metadata() {
let root = TempDir::new().unwrap();
let store_dir = root.path().join("store");
let runtime_base = root.path().join("runtime");
std::fs::create_dir_all(&runtime_base).unwrap();
let store = FsPodStore::new(&store_dir).unwrap();
store.write(&PodMetadata::new("source", None)).unwrap();
store.write(&PodMetadata::new("target", None)).unwrap();
let runtime_dir = Arc::new(RuntimeDir::create(&runtime_base, "source").await.unwrap());
let discovery = PodDiscovery::new(
store.clone(),
"source".into(),
runtime_base.clone(),
root.path().to_path_buf(),
SpawnedPodRegistry::new(runtime_dir),
);
let result = discovery.register_peer("target").unwrap();
assert_eq!(result.source, "source");
assert_eq!(result.peer, "target");
let source = store.read_by_name("source").unwrap().unwrap();
let target = store.read_by_name("target").unwrap().unwrap();
assert_eq!(source.peers[0].pod_name, "target");
assert_eq!(target.peers[0].pod_name, "source");
let list = discovery.list_visible().await.unwrap();
assert_eq!(
list.iter()
.find(|item| item.pod_name == "target")
.unwrap()
.visibility,
VisibilityReason::Peer
);
}
#[tokio::test(flavor = "current_thread")]
async fn register_peer_rejects_self_and_missing_target() {
let root = TempDir::new().unwrap();
let store_dir = root.path().join("store");
let runtime_base = root.path().join("runtime");
std::fs::create_dir_all(&runtime_base).unwrap();
let store = FsPodStore::new(&store_dir).unwrap();
store.write(&PodMetadata::new("source", None)).unwrap();
let runtime_dir = Arc::new(RuntimeDir::create(&runtime_base, "source").await.unwrap());
let discovery = PodDiscovery::new(
store,
"source".into(),
runtime_base,
root.path().to_path_buf(),
SpawnedPodRegistry::new(runtime_dir),
);
let self_err = discovery.register_peer("source").unwrap_err();
assert!(matches!(self_err, PodDiscoveryError::SelfPeer { .. }));
let missing_err = discovery.register_peer("missing").unwrap_err();
assert!(matches!(missing_err, PodDiscoveryError::MissingPod { .. }));
}
#[tokio::test(flavor = "current_thread")]
async fn send_to_peer_pod_delivers_notify_without_child_registry() {
let root = TempDir::new().unwrap();
let store_dir = root.path().join("store");
let runtime_base = root.path().join("runtime");
std::fs::create_dir_all(runtime_base.join("target")).unwrap();
let store = FsPodStore::new(&store_dir).unwrap();
store
.write(&PodMetadata {
pod_name: "source".into(),
active: None,
spawned_children: Vec::new(),
reclaimed_children: Vec::new(),
peers: vec![pod_store::PodPeer {
pod_name: "target".into(),
}],
resolved_manifest_snapshot: None,
})
.unwrap();
store
.write(&PodMetadata {
pod_name: "target".into(),
active: None,
spawned_children: Vec::new(),
reclaimed_children: Vec::new(),
peers: vec![pod_store::PodPeer {
pod_name: "source".into(),
}],
resolved_manifest_snapshot: None,
})
.unwrap();
let runtime_dir = Arc::new(RuntimeDir::create(&runtime_base, "source").await.unwrap());
let discovery = PodDiscovery::new(
store,
"source".into(),
runtime_base.clone(),
root.path().to_path_buf(),
SpawnedPodRegistry::new(runtime_dir),
);
let socket = runtime_base.join("target").join("sock");
let listener = UnixListener::bind(&socket).unwrap();
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
let target = tokio::spawn(async move {
let (stream, _) = listener.accept().await.unwrap();
let mut writer = JsonLineWriter::new(stream);
writer
.write(&Event::Snapshot {
entries: Vec::new(),
greeting: protocol::Greeting {
pod_name: "target".into(),
cwd: "/tmp".into(),
provider: "test".into(),
model: "test".into(),
scope_summary: String::new(),
tools: Vec::new(),
context_window: 0,
context_tokens: 0,
},
status: PodStatus::Idle,
})
.await
.unwrap();
let (stream, _) = listener.accept().await.unwrap();
let mut reader = JsonLineReader::new(stream);
let method = reader.next::<Method>().await.unwrap().unwrap();
if let Method::Notify { message } = method {
tx.send(message).await.unwrap();
} else {
panic!("expected Notify, got {method:?}");
}
});
let (_, tool) = send_to_peer_pod_tool(discovery)();
let output = tool
.execute(r#"{"name":"target","message":"hello"}"#)
.await
.unwrap();
assert_eq!(output.summary, "sent peer message to `target`");
let message = rx.recv().await.unwrap();
assert_eq!(message, "[Peer message from `source`]\nhello");
target.await.unwrap();
}
#[tokio::test(flavor = "current_thread")] #[tokio::test(flavor = "current_thread")]
async fn probe_socket_reads_status_after_replayed_alert() { async fn probe_socket_reads_status_after_replayed_alert() {
let root = TempDir::new().unwrap(); let root = TempDir::new().unwrap();
@ -1003,7 +1328,7 @@ mod tests {
writer writer
.write(&Event::Snapshot { .write(&Event::Snapshot {
entries: Vec::new(), entries: Vec::new(),
greeting: Greeting { greeting: protocol::Greeting {
pod_name: "alerted".into(), pod_name: "alerted".into(),
cwd: "/tmp".into(), cwd: "/tmp".into(),
provider: "test".into(), provider: "test".into(),
@ -1051,7 +1376,7 @@ mod tests {
let _ = writer let _ = writer
.write(&Event::Snapshot { .write(&Event::Snapshot {
entries: Vec::new(), entries: Vec::new(),
greeting: Greeting { greeting: protocol::Greeting {
pod_name: "child-live".into(), pod_name: "child-live".into(),
cwd: "/tmp".into(), cwd: "/tmp".into(),
provider: "test".into(), provider: "test".into(),

View File

@ -64,6 +64,13 @@ pub enum Method {
RestorePod { RestorePod {
name: String, name: String,
}, },
/// Register another existing Pod as a reciprocal peer of this Pod.
///
/// This is metadata/control state only: it must not grant delegated scope,
/// spawned-child ownership, output cursors, or child lifecycle authority.
RegisterPeer {
name: String,
},
} }
/// Typed lifecycle events sent from a child Pod to its parent. /// Typed lifecycle events sent from a child Pod to its parent.
@ -480,6 +487,10 @@ pub enum Event {
PodRestored { PodRestored {
result: serde_json::Value, result: serde_json::Value,
}, },
/// Reply to `Method::RegisterPeer`.
PeerRegistered {
result: serde_json::Value,
},
Alert(Alert), Alert(Alert),
/// Latest memory extract/consolidation lifecycle event for UI observability. /// Latest memory extract/consolidation lifecycle event for UI observability.
/// ///
@ -1465,13 +1476,17 @@ mod tests {
Method::RestorePod { Method::RestorePod {
name: "child".into(), name: "child".into(),
}, },
Method::RegisterPeer {
name: "peer".into(),
},
]; ];
for method in methods { for method in methods {
let json = serde_json::to_string(&method).unwrap(); let json = serde_json::to_string(&method).unwrap();
let decoded: Method = serde_json::from_str(&json).unwrap(); let decoded: Method = serde_json::from_str(&json).unwrap();
match (decoded, method) { match (decoded, method) {
(Method::ListPods, Method::ListPods) (Method::ListPods, Method::ListPods)
| (Method::RestorePod { .. }, Method::RestorePod { .. }) => {} | (Method::RestorePod { .. }, Method::RestorePod { .. })
| (Method::RegisterPeer { .. }, Method::RegisterPeer { .. }) => {}
(decoded, expected) => panic!("decoded {decoded:?}, expected {expected:?}"), (decoded, expected) => panic!("decoded {decoded:?}, expected {expected:?}"),
} }
} }
@ -1486,6 +1501,9 @@ mod tests {
Event::PodRestored { Event::PodRestored {
result: serde_json::json!({ "action": "already_live" }), result: serde_json::json!({ "action": "already_live" }),
}, },
Event::PeerRegistered {
result: serde_json::json!({ "source": "self", "peer": "other" }),
},
]; ];
for event in events { for event in events {
let json = serde_json::to_string(&event).unwrap(); let json = serde_json::to_string(&event).unwrap();
@ -1497,6 +1515,9 @@ mod tests {
(Event::PodRestored { result }, Event::PodRestored { result: expected }) => { (Event::PodRestored { result }, Event::PodRestored { result: expected }) => {
assert_eq!(result, expected) assert_eq!(result, expected)
} }
(Event::PeerRegistered { result }, Event::PeerRegistered { result: expected }) => {
assert_eq!(result, expected)
}
(decoded, expected) => panic!("decoded {decoded:?}, expected {expected:?}"), (decoded, expected) => panic!("decoded {decoded:?}, expected {expected:?}"),
} }
} }

View File

@ -1204,6 +1204,22 @@ impl App {
}); });
} }
Event::PodsListed { .. } | Event::PodRestored { .. } => {} Event::PodsListed { .. } | Event::PodRestored { .. } => {}
Event::PeerRegistered { result } => {
let source = result
.get("source")
.and_then(serde_json::Value::as_str)
.unwrap_or("this Pod");
let peer = result
.get("peer")
.and_then(serde_json::Value::as_str)
.unwrap_or("peer Pod");
self.flash_actionbar_notice(
format!("Peer handshake registered: `{source}` ↔ `{peer}`"),
ActionbarNoticeLevel::Info,
ActionbarNoticeSource::Tui,
Duration::from_secs(4),
);
}
Event::Shutdown => { Event::Shutdown => {
self.mark_orphan_compacts_incomplete(); self.mark_orphan_compacts_incomplete();
self.quit = true; self.quit = true;

View File

@ -156,6 +156,15 @@ impl CommandRegistry {
can_execute: rewind_available, can_execute: rewind_available,
executor: rewind_command, executor: rewind_command,
}); });
registry.register(CommandSpec {
name: "peer",
aliases: &[],
usage: "peer <pod-name>",
description: "Make another existing Pod visible as a reciprocal peer.",
argument_parser: peer_args,
can_execute: peer_available,
executor: peer_command,
});
registry registry
} }
@ -302,6 +311,17 @@ fn rewind_args(raw: &str) -> Result<CommandArgs, CommandDiagnostic> {
} }
} }
fn peer_args(raw: &str) -> Result<CommandArgs, CommandDiagnostic> {
let args = CommandArgs::parse_whitespace(raw);
if args.argv().len() == 1 {
Ok(args)
} else {
Err(CommandDiagnostic::new(
"Invalid arguments. Usage: peer <pod-name>",
))
}
}
fn compact_available(environment: &CommandEnvironment) -> Result<(), CommandDiagnostic> { fn compact_available(environment: &CommandEnvironment) -> Result<(), CommandDiagnostic> {
if !environment.connected { if !environment.connected {
return Err(CommandDiagnostic::new( return Err(CommandDiagnostic::new(
@ -335,6 +355,20 @@ fn rewind_available(environment: &CommandEnvironment) -> Result<(), CommandDiagn
Ok(()) Ok(())
} }
fn peer_available(environment: &CommandEnvironment) -> Result<(), CommandDiagnostic> {
if !environment.connected {
return Err(CommandDiagnostic::new(
"Cannot register a peer before the Pod is connected.",
));
}
if environment.running {
return Err(CommandDiagnostic::new(
"Cannot register a peer while the Pod is running.",
));
}
Ok(())
}
fn help_command(invocation: CommandInvocation<'_>) -> CommandExecution { fn help_command(invocation: CommandInvocation<'_>) -> CommandExecution {
if let Some(name) = invocation.args.argv().first() { if let Some(name) = invocation.args.argv().first() {
let Some(command) = invocation.registry.find(name) else { let Some(command) = invocation.registry.find(name) else {
@ -394,6 +428,20 @@ fn rewind_command(invocation: CommandInvocation<'_>) -> CommandExecution {
} }
} }
fn peer_command(invocation: CommandInvocation<'_>) -> CommandExecution {
let _ = invocation.command;
let _ = invocation.environment;
let name = invocation.args.argv()[0].clone();
CommandExecution {
method: Some(Method::RegisterPeer { name: name.clone() }),
diagnostics: vec![CommandDiagnostic::new(format!(
"peer handshake requested with `{name}`"
))],
exit_command_mode: true,
clear_input: true,
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
@ -501,4 +549,38 @@ mod tests {
let result = registry.dispatch("rewind", &paused); let result = registry.dispatch("rewind", &paused);
assert!(matches!(result.method, Some(Method::ListRewindTargets))); assert!(matches!(result.method, Some(Method::ListRewindTargets)));
} }
#[test]
fn peer_command_returns_register_peer_method() {
let registry = CommandRegistry::builtins();
let result = registry.dispatch("peer reviewer", &env());
assert!(matches!(
result.method,
Some(Method::RegisterPeer { ref name }) if name == "reviewer"
));
assert!(result.exit_command_mode);
assert!(result.clear_input);
assert!(result.diagnostics[0].message.contains("peer handshake"));
}
#[test]
fn peer_invalid_arguments_are_local_diagnostic() {
let registry = CommandRegistry::builtins();
for command in ["peer", "peer one two"] {
let result = registry.dispatch(command, &env());
assert!(result.method.is_none());
assert!(!result.exit_command_mode);
assert!(result.diagnostics[0].message.contains("Invalid arguments"));
}
}
#[test]
fn peer_rejects_running() {
let registry = CommandRegistry::builtins();
let mut running = env();
running.running = true;
let result = registry.dispatch("peer reviewer", &running);
assert!(result.method.is_none());
assert!(result.diagnostics[0].message.contains("running"));
}
} }

View File

@ -32,6 +32,12 @@ Parent-visible children are sourced from Pod metadata, not from a transient runt
Delegated write scope is a capability loan. Stopping, shutting down, or pruning a child must reclaim the parent's effective write permissions while preserving explicit base denies. Delegated write scope is a capability loan. Stopping, shutting down, or pruning a child must reclaim the parent's effective write permissions while preserving explicit base denies.
## Peer Pods
Peer visibility is also Pod metadata, but it is distinct from spawned-child delegation. A TUI user can run `:peer <pod-name>` while attached to an idle Pod to register a reciprocal peer handshake with another existing Pod.
A peer relationship only makes the Pods mutually visible through `ListPods` with visibility source `peer`. It does not grant filesystem scope, create a child output cursor, make either Pod the other's parent, or imply child completion notifications. Peer messages use `SendToPeerPod`, which delivers a labeled notification into the target Pod's normal durable notification/history path.
## Notifications are not authority ## Notifications are not authority
Pod completion notifications are UX hints. Before treating delegated work as complete, inspect queryable evidence: child output, session/log state, worktree status, diffs, and validation output. Pod completion notifications are UX hints. Before treating delegated work as complete, inspect queryable evidence: child output, session/log state, worktree status, diffs, and validation output.

View File

@ -7,4 +7,6 @@ The parent does not need to keep a turn open or call tools solely to wait for a
Before treating delegated work as complete, read the child output and inspect concrete evidence such as worktree status, diff, and test results. Notifications are hints, not proof of completion. Before treating delegated work as complete, read the child output and inspect concrete evidence such as worktree status, diff, and test results. Notifications are hints, not proof of completion.
Peer Pods made visible by a handshake are not spawned children. Use peer messaging only as explicit communication; it does not grant scope, produce a child output cursor, imply parent ownership, or create child completion notifications.
This guidance is not scheduler or auto-maintain authorization. Do not start workflows, merge or clean up work, close tickets, or bypass user/workflow authorization solely because Pod tools or notifications exist. This guidance is not scheduler or auto-maintain authorization. Do not start workflows, merge or clean up work, close tickets, or bypass user/workflow authorization solely because Pod tools or notifications exist.

View File

@ -0,0 +1,29 @@
# Implementation report: peer Pod handshake command
Date: 2026-06-02
## Investigation
The current boundaries are documented in `artifacts/investigation-summary.md`. No escalation blocker was found. The main concern identified was avoiding reuse of spawned-child state (`SpawnedPodRegistry`, delegated scope, output cursors, and child completion semantics) for peer communication; the implementation therefore adds separate peer metadata and a separate peer send tool.
## Implemented behavior
- Added reciprocal peer metadata to `PodMetadata` as `peers`, separate from `spawned_children` and `reclaimed_children`.
- Added protocol `Method::RegisterPeer { name }` and `Event::PeerRegistered { result }`.
- Added controller handling for `RegisterPeer`, idle/paused only, validating an existing target Pod and rejecting self-handshakes.
- Added `PodDiscovery::register_peer` that persists both metadata directions and rolls back the first side on ordinary second-side write failure.
- Extended `ListPods` visibility to include `VisibilityReason::Peer`; a successful handshake makes both Pods see each other as `peer` through Pod metadata.
- Added `SendToPeerPod` as a distinct LLM tool. It only sends to visible live peer Pods, delivers `Method::Notify` with a source label, and does not use child delegation, output cursors, parent ownership, or child completion notifications.
- Added TUI command `:peer <pod-name>` for idle attached Pods. Success is reported through a transient actionbar notice when the controller returns `PeerRegistered`.
- Documented peer semantics in `docs/design/pod-session-state.md` and added prompt guidance that peer Pods are not spawned children.
## Tests and validation run
- `cargo test -p protocol -p pod-store -p pod -p tui --lib`
- `./tickets.sh doctor`
- `git diff --check`
- `nix build .#yoi`
## Notes
The two-file reciprocal metadata update is not crash-transactional because the existing Pod metadata store has no multi-record transaction boundary. The implementation avoids successful replies with one-sided state for normal validation/write failures by rolling back the first write if the reciprocal write fails.

View File

@ -0,0 +1,24 @@
# Investigation summary: peer Pod handshake
Date: 2026-06-02
## Current authority map
- `ListPods` / `RestorePod` are implemented in `crates/pod/src/discovery.rs` and are served both as LLM tools and protocol methods from the currently attached Pod's controller. They intentionally start from the caller Pod's visibility set rather than a host-wide Pod universe. Today the visibility set is the caller itself plus spawned children from durable Pod metadata and the live spawned-child registry.
- Pod metadata is in `crates/pod-store/src/lib.rs` as name-keyed current state (`PodMetadata`) under the Pod-state root. `spawned_children` is durable current parent/child visibility and delegation state; session JSONL remains the durable explanation for delivered context/history.
- The spawned-child registry (`SpawnedPodRegistry`) is runtime/current ownership state for `SpawnPod`, `SendToPod`, `ReadPodOutput`, and `StopPod`. It carries output cursors and delegated scope; it should not be reused for peer visibility.
- `SendToPod` in `crates/pod/src/spawn/comm_tools.rs` is explicitly spawned-child scoped: it looks up only `SpawnedPodRegistry`, sends `Method::Run`, and is paired with `ReadPodOutput` cursor semantics. Broadening it would blur child vs peer semantics, so a distinct peer-safe send surface is preferable.
- Protocol methods are in `crates/protocol/src/lib.rs`; TUI command dispatch is local parsing in `crates/tui/src/command.rs`, returning typed `Method`s that `single_pod.rs` sends to the currently attached Pod. This is suitable for a TUI command that asks the current Pod to perform authoritative metadata changes.
- Delivered notifications already have an explainable durable path: `Method::Notify` is queued through `NotifyBuffer`, rendered as a `SystemItem`, committed as `LogEntry::SystemItem`, and then appended to model history. A peer message can reuse that receive-side path without hidden context injection.
## Implementation direction
No escalation blocker found. The minimal safe design is:
- Add a `peers` field to `PodMetadata`, separate from `spawned_children`, with serde defaults so no broad migration is needed.
- Add a controller-handled `Method::RegisterPeer { name }` that validates the target Pod state exists, rejects self, and writes reciprocal peer entries to both Pod metadata records. Normal write failures roll back the first side so successful replies do not leave one-sided visibility.
- Extend `ListPods` visibility with a `peer` reason sourced from `PodMetadata.peers`; keep spawned-child comm registry and child output cursors unchanged.
- Add a separate `SendToPeerPod` tool backed by `PodDiscovery` visibility. It only sends to visible peers, uses the target's live/restored socket from discovery, and delivers a `Method::Notify` message labeled with the sender Pod name. It does not read output, stop Pods, grant scope, or use `SpawnedPodRegistry`.
- Add TUI `:peer <name>` command that emits `Method::RegisterPeer { name }` to the attached Pod and shows local diagnostics; the controller returns a typed result event for success and normal `Event::Error` for failure.
Crash-level atomicity for two metadata files is not provided by the existing store, but the implementation can avoid misleading partial state for ordinary validation/write failures and does not require broad schema migration or hidden history changes.

View File

@ -7,7 +7,7 @@ kind: task
priority: P2 priority: P2
labels: [tui, pod, command, orchestration] labels: [tui, pod, command, orchestration]
created_at: 2026-06-01T13:29:55Z created_at: 2026-06-01T13:29:55Z
updated_at: 2026-06-02T10:17:42Z updated_at: 2026-06-02T10:42:37Z
assignee: null assignee: null
legacy_ticket: null legacy_ticket: null
--- ---

View File

@ -85,4 +85,41 @@ Validation:
- Commit the implementation in the worktree when reviewable. - Commit the implementation in the worktree when reviewable.
---
<!-- event: implementation_report author: hare at: 2026-06-02T10:42:37Z -->
## Implementation report
# Implementation report: peer Pod handshake command
Date: 2026-06-02
## Investigation
The current boundaries are documented in `artifacts/investigation-summary.md`. No escalation blocker was found. The main concern identified was avoiding reuse of spawned-child state (`SpawnedPodRegistry`, delegated scope, output cursors, and child completion semantics) for peer communication; the implementation therefore adds separate peer metadata and a separate peer send tool.
## Implemented behavior
- Added reciprocal peer metadata to `PodMetadata` as `peers`, separate from `spawned_children` and `reclaimed_children`.
- Added protocol `Method::RegisterPeer { name }` and `Event::PeerRegistered { result }`.
- Added controller handling for `RegisterPeer`, idle/paused only, validating an existing target Pod and rejecting self-handshakes.
- Added `PodDiscovery::register_peer` that persists both metadata directions and rolls back the first side on ordinary second-side write failure.
- Extended `ListPods` visibility to include `VisibilityReason::Peer`; a successful handshake makes both Pods see each other as `peer` through Pod metadata.
- Added `SendToPeerPod` as a distinct LLM tool. It only sends to visible live peer Pods, delivers `Method::Notify` with a source label, and does not use child delegation, output cursors, parent ownership, or child completion notifications.
- Added TUI command `:peer <pod-name>` for idle attached Pods. Success is reported through a transient actionbar notice when the controller returns `PeerRegistered`.
- Documented peer semantics in `docs/design/pod-session-state.md` and added prompt guidance that peer Pods are not spawned children.
## Tests and validation run
- `cargo test -p protocol -p pod-store -p pod -p tui --lib`
- `./tickets.sh doctor`
- `git diff --check`
- `nix build .#yoi`
## Notes
The two-file reciprocal metadata update is not crash-transactional because the existing Pod metadata store has no multi-record transaction boundary. The implementation avoids successful replies with one-sided state for normal validation/write failures by rolling back the first write if the reciprocal write fails.
--- ---