merge: peer pod handshake command

This commit is contained in:
Keisuke Hirata 2026-06-02 22:53:09 +09:00
commit 3750a350fd
No known key found for this signature in database
12 changed files with 832 additions and 15 deletions

View File

@ -84,6 +84,16 @@ pub struct PodReclaimedChild {
pub scope_delegated: Vec<PodSpawnedScopeRule>,
}
/// One peer Pod made visible by an explicit peer handshake.
///
/// Peer visibility is intentionally separate from spawned-child delegation: it
/// does not carry filesystem scope, callback ownership, output cursors, or
/// lifecycle-notification authority.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct PodPeer {
pub pod_name: String,
}
/// Persistent metadata for a Pod name.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct PodMetadata {
@ -94,6 +104,8 @@ pub struct PodMetadata {
pub spawned_children: Vec<PodSpawnedChild>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub reclaimed_children: Vec<PodReclaimedChild>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub peers: Vec<PodPeer>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub resolved_manifest_snapshot: Option<serde_json::Value>,
}
@ -106,6 +118,7 @@ impl PodMetadata {
active,
spawned_children: Vec::new(),
reclaimed_children: Vec::new(),
peers: Vec::new(),
resolved_manifest_snapshot: None,
}
}
@ -168,6 +181,33 @@ pub trait PodMetadataStore: Send + Sync {
})
}
/// Set peer visibility state while preserving active pointer, child state,
/// and manifest snapshot.
fn set_peers(&self, pod_name: &str, peers: Vec<PodPeer>) -> Result<PodMetadata, PodStoreError> {
self.update_by_name(pod_name, |metadata| {
metadata.peers = peers;
})
}
/// Add one peer if absent while preserving every other metadata field.
fn add_peer(&self, pod_name: &str, peer_name: &str) -> Result<PodMetadata, PodStoreError> {
self.update_by_name(pod_name, |metadata| {
if !metadata.peers.iter().any(|peer| peer.pod_name == peer_name) {
metadata.peers.push(PodPeer {
pod_name: peer_name.to_string(),
});
metadata.peers.sort_by(|a, b| a.pod_name.cmp(&b.pod_name));
}
})
}
/// Remove one peer while preserving every other metadata field.
fn remove_peer(&self, pod_name: &str, peer_name: &str) -> Result<PodMetadata, PodStoreError> {
self.update_by_name(pod_name, |metadata| {
metadata.peers.retain(|peer| peer.pod_name != peer_name);
})
}
/// Remove reclaimed child delegations from the outstanding set and record
/// them in durable reclaim history.
fn reclaim_spawned_children(
@ -503,6 +543,52 @@ mod tests {
assert_eq!(restored.resolved_manifest_snapshot, Some(snapshot));
}
#[test]
fn peer_updates_preserve_active_children_and_manifest_snapshot() {
let tmp = tempfile::TempDir::new().unwrap();
let store = FsPodStore::new(tmp.path()).unwrap();
let active = PodActiveSegmentRef::active_segment(
session_store::new_session_id(),
session_store::new_segment_id(),
);
let snapshot = serde_json::json!({"pod":{"name":"agent"}});
store
.set_active("agent", Some(active.clone()), Some(snapshot.clone()))
.unwrap();
store
.set_spawned_children(
"agent",
vec![PodSpawnedChild {
pod_name: "child".into(),
socket_path: std::path::Path::new("/tmp/child.sock").into(),
scope_delegated: vec![],
callback_address: std::path::Path::new("/tmp/parent.sock").into(),
}],
)
.unwrap();
store.add_peer("agent", "peer-b").unwrap();
store.add_peer("agent", "peer-a").unwrap();
store.add_peer("agent", "peer-a").unwrap();
let restored = store.read_by_name("agent").unwrap().unwrap();
assert_eq!(restored.active, Some(active));
assert_eq!(restored.spawned_children.len(), 1);
assert_eq!(restored.resolved_manifest_snapshot, Some(snapshot));
assert_eq!(
restored
.peers
.iter()
.map(|peer| peer.pod_name.as_str())
.collect::<Vec<_>>(),
vec!["peer-a", "peer-b"]
);
store.remove_peer("agent", "peer-a").unwrap();
let restored = store.read_by_name("agent").unwrap().unwrap();
assert_eq!(restored.peers.len(), 1);
assert_eq!(restored.peers[0].pod_name, "peer-b");
}
#[test]
fn reclaim_children_removes_outstanding_and_records_history() {
let tmp = tempfile::TempDir::new().unwrap();

View File

@ -8,7 +8,7 @@ use pod_store::PodMetadataStore;
use session_store::Store;
use tokio::sync::{broadcast, mpsc, oneshot};
use crate::discovery::{PodDiscovery, list_pods_tool, restore_pod_tool};
use crate::discovery::{PodDiscovery, list_pods_tool, restore_pod_tool, send_to_peer_pod_tool};
use crate::ipc::alerter::Alerter;
use crate::ipc::notify_buffer::NotifyBuffer;
use crate::ipc::server::SocketServer;
@ -561,7 +561,8 @@ where
worker.register_tool(stop_pod_tool(spawned_registry.clone()));
let discovery = PodDiscovery::new(pod_store, spawner_name, runtime_base, pwd, spawned_registry);
worker.register_tool(list_pods_tool(discovery.clone()));
worker.register_tool(restore_pod_tool(discovery));
worker.register_tool(restore_pod_tool(discovery.clone()));
worker.register_tool(send_to_peer_pod_tool(discovery));
pod.attach_tracker(tracker);
fs_for_view
}
@ -862,6 +863,26 @@ async fn controller_loop<C, St>(
}
},
Method::RegisterPeer { name } => match discovery.register_peer(&name) {
Ok(result) => match serde_json::to_value(result) {
Ok(result) => {
let _ = event_tx.send(Event::PeerRegistered { result });
}
Err(error) => {
let _ = event_tx.send(Event::Error {
code: ErrorCode::Internal,
message: format!("serialize peer registration result: {error}"),
});
}
},
Err(error) => {
let _ = event_tx.send(Event::Error {
code: ErrorCode::InvalidRequest,
message: error.to_string(),
});
}
},
// ListCompletions is handled at the socket layer (direct
// response). If it reaches the controller, ignore it.
Method::ListCompletions { .. } => {}
@ -1063,10 +1084,10 @@ where
notify_buffer.push_notify(message);
}
Some(Method::ListCompletions { .. }) => {}
Some(Method::ListPods | Method::RestorePod { .. }) => {
Some(Method::ListPods | Method::RestorePod { .. } | Method::RegisterPeer { .. }) => {
let _ = event_tx.send(Event::Error {
code: ErrorCode::AlreadyRunning,
message: "Pod discovery requests are only handled while the Pod is idle or paused"
message: "Pod discovery/control requests are only handled while the Pod is idle or paused"
.into(),
});
}

View File

@ -17,9 +17,9 @@ use async_trait::async_trait;
use client::PodRuntimeCommand;
use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput};
use manifest::{Permission, ScopeRule};
use pod_store::{PodActiveSegmentRef, PodMetadata, PodMetadataStore};
use pod_store::{PodActiveSegmentRef, PodMetadata, PodMetadataStore, validate_pod_name};
use protocol::stream::JsonLineReader;
use protocol::{Event, PodStatus};
use protocol::{Event, Method, PodStatus};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use session_store::{SegmentId, SessionId};
@ -28,6 +28,7 @@ use tokio::process::Command;
use crate::runtime::dir::SpawnedPodRecord;
use crate::runtime::pod_registry;
use crate::spawn::comm_tools::connect_and_send;
use crate::spawn::registry::SpawnedPodRegistry;
const PROBE_TIMEOUT: Duration = Duration::from_millis(500);
@ -172,6 +173,41 @@ where
})
}
pub fn register_peer(
&self,
peer_name: &str,
) -> Result<PeerRegistrationResult, PodDiscoveryError> {
validate_pod_name(peer_name)?;
if peer_name == self.self_pod_name {
return Err(PodDiscoveryError::SelfPeer {
pod_name: peer_name.to_string(),
});
}
let self_metadata = self
.store
.read_by_name(&self.self_pod_name)?
.ok_or_else(|| PodDiscoveryError::StateMissing {
pod_name: self.self_pod_name.clone(),
})?;
let prior_self_peers = self_metadata.peers.clone();
if self.store.read_by_name(peer_name)?.is_none() {
return Err(PodDiscoveryError::MissingPod {
pod_name: peer_name.to_string(),
});
}
self.store.add_peer(&self.self_pod_name, peer_name)?;
if let Err(error) = self.store.add_peer(peer_name, &self.self_pod_name) {
let _ = self.store.set_peers(&self.self_pod_name, prior_self_peers);
return Err(PodDiscoveryError::PodStore(error));
}
Ok(PeerRegistrationResult {
source: self.self_pod_name.clone(),
peer: peer_name.to_string(),
})
}
async fn visibility(&self) -> Result<VisibilitySet, PodDiscoveryError> {
let mut visible = BTreeMap::new();
let mut child_sockets = BTreeMap::new();
@ -187,6 +223,11 @@ where
child_sockets.insert(child.pod_name.clone(), child.socket_path.clone());
comm_registry.insert(child.pod_name.clone(), comm_info_from_spawned_child(&child));
}
for peer in metadata.peers {
visible
.entry(peer.pod_name)
.or_insert(VisibilityReason::Peer);
}
}
// The live in-memory registry covers just-spawned children even if a
@ -379,6 +420,7 @@ where
pub enum VisibilityReason {
SelfPod,
SpawnedChild,
Peer,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
@ -525,6 +567,12 @@ pub enum RestoreResult {
},
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct PeerRegistrationResult {
pub source: String,
pub peer: String,
}
#[derive(Debug, thiserror::Error)]
pub enum PodDiscoveryError {
#[error("pod state missing for `{pod_name}`")]
@ -533,6 +581,10 @@ pub enum PodDiscoveryError {
NotVisible { pod_name: String },
#[error("pod `{pod_name}` is not restorable: {reason}")]
NotRestorable { pod_name: String, reason: String },
#[error("pod `{pod_name}` cannot be registered as a peer of itself")]
SelfPeer { pod_name: String },
#[error("pod `{pod_name}` does not exist")]
MissingPod { pod_name: String },
#[error(
"pod `{pod_name}` segment {segment_id} is locked by `{owner_pod}` pid {pid} at {socket_path}"
)]
@ -683,6 +735,14 @@ struct PodNameInput {
name: String,
}
#[derive(Debug, Deserialize, JsonSchema)]
struct SendToPeerPodInput {
/// Target peer Pod name.
name: String,
/// Text delivered to the peer as a peer notification.
message: String,
}
struct ListPodsTool<St> {
discovery: PodDiscovery<St>,
}
@ -745,7 +805,7 @@ where
Arc::new(move || {
let meta = ToolMeta::new("ListPods")
.description(
"List Pods visible to this Pod from durable Pod state and the spawned-child registry. This does not expose the host-wide Pod universe.",
"List Pods visible to this Pod from durable Pod state, peer metadata, and the spawned-child registry. This does not expose the host-wide Pod universe.",
)
.input_schema(serde_json::json!({
"type": "object",
@ -776,6 +836,74 @@ where
})
}
const SEND_TO_PEER_POD_DESCRIPTION: &str = "Send a text message to a peer Pod made visible by explicit reciprocal peer metadata. The message is delivered as a peer notification through the target Pod's durable notification/history path. This does not grant delegated scope, create a spawned-child output cursor, imply parent ownership, or produce child completion notifications. Fails clearly if the target is not a visible live peer; it does not auto-restore stopped peers.";
struct SendToPeerPodTool<St> {
discovery: PodDiscovery<St>,
}
#[async_trait]
impl<St> Tool for SendToPeerPodTool<St>
where
St: PodMetadataStore + Clone + Send + Sync + 'static,
{
async fn execute(&self, input_json: &str) -> Result<ToolOutput, ToolError> {
let input: SendToPeerPodInput = serde_json::from_str(input_json)
.map_err(|e| ToolError::InvalidArgument(format!("invalid SendToPeerPod input: {e}")))?;
let detail = self
.discovery
.inspect(&input.name)
.await
.map_err(discovery_error_to_tool_error)?;
if detail.visibility != VisibilityReason::Peer {
return Err(ToolError::InvalidArgument(format!(
"pod `{}` is visible as {:?}, not as a peer",
input.name, detail.visibility
)));
}
if !detail.live.reachable {
return Err(ToolError::ExecutionFailed(format!(
"peer pod `{}` is not live/reachable; restore it before sending",
input.name
)));
}
let message = format!(
"[Peer message from `{}`]\n{}",
self.discovery.self_pod_name, input.message
);
send_peer_notify(&detail.live.socket_path, message)
.await
.map_err(|error| {
ToolError::ExecutionFailed(format!("send to peer `{}`: {error}", input.name))
})?;
Ok(ToolOutput {
summary: format!("sent peer message to `{}`", input.name),
content: None,
})
}
}
pub fn send_to_peer_pod_tool<St>(discovery: PodDiscovery<St>) -> ToolDefinition
where
St: PodMetadataStore + Clone + Send + Sync + 'static,
{
Arc::new(move || {
let meta = ToolMeta::new("SendToPeerPod")
.description(SEND_TO_PEER_POD_DESCRIPTION)
.input_schema(serde_json::to_value(schemars::schema_for!(SendToPeerPodInput)).unwrap());
let tool: Arc<dyn Tool> = Arc::new(SendToPeerPodTool {
discovery: discovery.clone(),
});
(meta, tool)
})
}
async fn send_peer_notify(socket_path: &Path, message: String) -> io::Result<()> {
connect_and_send(socket_path, &Method::Notify { message }).await
}
fn json_content<T: Serialize>(value: &T) -> Result<String, ToolError> {
serde_json::to_string_pretty(value)
.map_err(|e| ToolError::Internal(format!("serialize pod discovery output: {e}")))
@ -785,7 +913,9 @@ fn discovery_error_to_tool_error(error: PodDiscoveryError) -> ToolError {
match error {
PodDiscoveryError::StateMissing { .. }
| PodDiscoveryError::NotVisible { .. }
| PodDiscoveryError::NotRestorable { .. } => ToolError::InvalidArgument(error.to_string()),
| PodDiscoveryError::NotRestorable { .. }
| PodDiscoveryError::SelfPeer { .. }
| PodDiscoveryError::MissingPod { .. } => ToolError::InvalidArgument(error.to_string()),
PodDiscoveryError::LockConflict { .. }
| PodDiscoveryError::Store(_)
| PodDiscoveryError::PodStore(_)
@ -803,15 +933,49 @@ mod tests {
use std::sync::Mutex;
use manifest::{Permission, ScopeRule};
use pod_store::{FsPodStore, PodSpawnedChild, PodSpawnedScopeRule};
use pod_store::{FsPodStore, PodSpawnedChild, PodSpawnedScopeRule, PodStoreError};
use protocol::stream::JsonLineWriter;
use protocol::{Alert, AlertLevel, AlertSource, Greeting};
use protocol::{Alert, AlertLevel, AlertSource};
use session_store::{new_segment_id, new_session_id};
use tempfile::TempDir;
use tokio::net::UnixListener;
use crate::runtime::dir::RuntimeDir;
#[derive(Clone)]
struct FailTargetPeerStore {
inner: FsPodStore,
}
impl PodMetadataStore for FailTargetPeerStore {
fn write(&self, metadata: &PodMetadata) -> Result<(), PodStoreError> {
if metadata.pod_name == "target"
&& metadata.peers.iter().any(|peer| peer.pod_name == "source")
{
return Err(PodStoreError::Io(io::Error::other(
"injected target-side peer write failure",
)));
}
self.inner.write(metadata)
}
fn read_by_name(&self, pod_name: &str) -> Result<Option<PodMetadata>, PodStoreError> {
self.inner.read_by_name(pod_name)
}
fn list_names(&self) -> Result<Vec<String>, PodStoreError> {
self.inner.list_names()
}
fn root_dir(&self) -> Option<PathBuf> {
self.inner.root_dir()
}
fn delete_by_name(&self, pod_name: &str) -> Result<(), PodStoreError> {
self.inner.delete_by_name(pod_name)
}
}
static ENV_LOCK: Mutex<()> = Mutex::new(());
#[tokio::test(flavor = "current_thread")]
@ -844,6 +1008,9 @@ mod tests {
child("child-pending", &pending_socket),
],
reclaimed_children: Vec::new(),
peers: vec![pod_store::PodPeer {
pod_name: "peer".into(),
}],
resolved_manifest_snapshot: None,
};
store.write(&parent).unwrap();
@ -856,6 +1023,7 @@ mod tests {
)),
spawned_children: Vec::new(),
reclaimed_children: Vec::new(),
peers: Vec::new(),
resolved_manifest_snapshot: None,
})
.unwrap();
@ -868,6 +1036,7 @@ mod tests {
)),
spawned_children: Vec::new(),
reclaimed_children: Vec::new(),
peers: Vec::new(),
resolved_manifest_snapshot: None,
})
.unwrap();
@ -877,6 +1046,7 @@ mod tests {
active: Some(PodActiveSegmentRef::pending_segment(pending_session_id)),
spawned_children: Vec::new(),
reclaimed_children: Vec::new(),
peers: Vec::new(),
resolved_manifest_snapshot: None,
})
.unwrap();
@ -889,6 +1059,19 @@ mod tests {
)),
spawned_children: Vec::new(),
reclaimed_children: Vec::new(),
peers: Vec::new(),
resolved_manifest_snapshot: None,
})
.unwrap();
store
.write(&PodMetadata {
pod_name: "peer".into(),
active: None,
spawned_children: Vec::new(),
reclaimed_children: Vec::new(),
peers: vec![pod_store::PodPeer {
pod_name: "parent".into(),
}],
resolved_manifest_snapshot: None,
})
.unwrap();
@ -913,14 +1096,37 @@ mod tests {
let restore_tool_def = restore_pod_tool(discovery.clone());
let (restore_meta, _) = restore_tool_def();
assert_eq!(restore_meta.name, "RestorePod");
let send_peer_tool_def = send_to_peer_pod_tool(discovery.clone());
let (send_peer_meta, _) = send_peer_tool_def();
assert_eq!(send_peer_meta.name, "SendToPeerPod");
let list = discovery.list_visible().await.unwrap();
let names: Vec<_> = list.iter().map(|p| p.pod_name.as_str()).collect();
assert_eq!(
names,
vec!["child-live", "child-pending", "child-stale", "parent"]
vec![
"child-live",
"child-pending",
"child-stale",
"parent",
"peer"
]
);
assert!(!names.contains(&"hidden"));
assert_eq!(
list.iter()
.find(|p| p.pod_name == "peer")
.unwrap()
.visibility,
VisibilityReason::Peer
);
assert_eq!(
list.iter()
.find(|p| p.pod_name == "child-live")
.unwrap()
.visibility,
VisibilityReason::SpawnedChild
);
assert!(
list.iter()
.find(|p| p.pod_name == "child-live")
@ -983,6 +1189,218 @@ mod tests {
live_listener.abort();
}
#[tokio::test(flavor = "current_thread")]
async fn register_peer_persists_reciprocal_metadata() {
let root = TempDir::new().unwrap();
let store_dir = root.path().join("store");
let runtime_base = root.path().join("runtime");
std::fs::create_dir_all(&runtime_base).unwrap();
let store = FsPodStore::new(&store_dir).unwrap();
store.write(&PodMetadata::new("source", None)).unwrap();
store.write(&PodMetadata::new("target", None)).unwrap();
let runtime_dir = Arc::new(RuntimeDir::create(&runtime_base, "source").await.unwrap());
let discovery = PodDiscovery::new(
store.clone(),
"source".into(),
runtime_base.clone(),
root.path().to_path_buf(),
SpawnedPodRegistry::new(runtime_dir),
);
let result = discovery.register_peer("target").unwrap();
assert_eq!(result.source, "source");
assert_eq!(result.peer, "target");
let source = store.read_by_name("source").unwrap().unwrap();
let target = store.read_by_name("target").unwrap().unwrap();
assert_eq!(source.peers[0].pod_name, "target");
assert_eq!(target.peers[0].pod_name, "source");
let list = discovery.list_visible().await.unwrap();
assert_eq!(
list.iter()
.find(|item| item.pod_name == "target")
.unwrap()
.visibility,
VisibilityReason::Peer
);
}
#[tokio::test(flavor = "current_thread")]
async fn register_peer_rejects_self_and_missing_target() {
let root = TempDir::new().unwrap();
let store_dir = root.path().join("store");
let runtime_base = root.path().join("runtime");
std::fs::create_dir_all(&runtime_base).unwrap();
let store = FsPodStore::new(&store_dir).unwrap();
store.write(&PodMetadata::new("source", None)).unwrap();
let runtime_dir = Arc::new(RuntimeDir::create(&runtime_base, "source").await.unwrap());
let discovery = PodDiscovery::new(
store,
"source".into(),
runtime_base,
root.path().to_path_buf(),
SpawnedPodRegistry::new(runtime_dir),
);
let self_err = discovery.register_peer("source").unwrap_err();
assert!(matches!(self_err, PodDiscoveryError::SelfPeer { .. }));
let missing_err = discovery.register_peer("missing").unwrap_err();
assert!(matches!(missing_err, PodDiscoveryError::MissingPod { .. }));
}
#[tokio::test(flavor = "current_thread")]
async fn register_peer_target_failure_preserves_existing_source_peer() {
let root = TempDir::new().unwrap();
let store_dir = root.path().join("store");
let runtime_base = root.path().join("runtime");
std::fs::create_dir_all(&runtime_base).unwrap();
let inner = FsPodStore::new(&store_dir).unwrap();
inner
.write(&PodMetadata {
pod_name: "source".into(),
active: None,
spawned_children: Vec::new(),
reclaimed_children: Vec::new(),
peers: vec![pod_store::PodPeer {
pod_name: "target".into(),
}],
resolved_manifest_snapshot: None,
})
.unwrap();
inner.write(&PodMetadata::new("target", None)).unwrap();
let store = FailTargetPeerStore { inner };
let runtime_dir = Arc::new(RuntimeDir::create(&runtime_base, "source").await.unwrap());
let discovery = PodDiscovery::new(
store.clone(),
"source".into(),
runtime_base,
root.path().to_path_buf(),
SpawnedPodRegistry::new(runtime_dir),
);
let err = discovery.register_peer("target").unwrap_err();
assert!(matches!(err, PodDiscoveryError::PodStore(_)));
let source = store.read_by_name("source").unwrap().unwrap();
assert_eq!(source.peers.len(), 1);
assert_eq!(source.peers[0].pod_name, "target");
let target = store.read_by_name("target").unwrap().unwrap();
assert!(target.peers.is_empty());
}
#[tokio::test(flavor = "current_thread")]
async fn send_to_peer_pod_delivers_notify_without_child_registry() {
let root = TempDir::new().unwrap();
let store_dir = root.path().join("store");
let runtime_base = root.path().join("runtime");
std::fs::create_dir_all(runtime_base.join("target")).unwrap();
let store = FsPodStore::new(&store_dir).unwrap();
store
.write(&PodMetadata {
pod_name: "source".into(),
active: None,
spawned_children: Vec::new(),
reclaimed_children: Vec::new(),
peers: vec![pod_store::PodPeer {
pod_name: "target".into(),
}],
resolved_manifest_snapshot: None,
})
.unwrap();
store
.write(&PodMetadata {
pod_name: "target".into(),
active: None,
spawned_children: Vec::new(),
reclaimed_children: Vec::new(),
peers: vec![pod_store::PodPeer {
pod_name: "source".into(),
}],
resolved_manifest_snapshot: None,
})
.unwrap();
let runtime_dir = Arc::new(RuntimeDir::create(&runtime_base, "source").await.unwrap());
let discovery = PodDiscovery::new(
store,
"source".into(),
runtime_base.clone(),
root.path().to_path_buf(),
SpawnedPodRegistry::new(runtime_dir),
);
let socket = runtime_base.join("target").join("sock");
let listener = UnixListener::bind(&socket).unwrap();
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
let target = tokio::spawn(async move {
let (stream, _) = listener.accept().await.unwrap();
let mut writer = JsonLineWriter::new(stream);
writer
.write(&Event::Snapshot {
entries: Vec::new(),
greeting: protocol::Greeting {
pod_name: "target".into(),
cwd: "/tmp".into(),
provider: "test".into(),
model: "test".into(),
scope_summary: String::new(),
tools: Vec::new(),
context_window: 0,
context_tokens: 0,
},
status: PodStatus::Idle,
})
.await
.unwrap();
let (stream, _) = listener.accept().await.unwrap();
let (reader_half, writer_half) = stream.into_split();
let mut reader = JsonLineReader::new(reader_half);
let mut writer = JsonLineWriter::new(writer_half);
writer
.write(&Event::Alert(Alert {
level: AlertLevel::Warn,
source: AlertSource::Pod,
message: "connect-time alert".into(),
timestamp_ms: 0,
}))
.await
.unwrap();
writer
.write(&Event::Snapshot {
entries: Vec::new(),
greeting: protocol::Greeting {
pod_name: "target".into(),
cwd: "/tmp".into(),
provider: "test".into(),
model: "test".into(),
scope_summary: String::new(),
tools: Vec::new(),
context_window: 0,
context_tokens: 0,
},
status: PodStatus::Idle,
})
.await
.unwrap();
let method = reader.next::<Method>().await.unwrap().unwrap();
if let Method::Notify { message } = method {
tx.send(message).await.unwrap();
} else {
panic!("expected Notify, got {method:?}");
}
});
let (_, tool) = send_to_peer_pod_tool(discovery)();
let output = tool
.execute(r#"{"name":"target","message":"hello"}"#)
.await
.unwrap();
assert_eq!(output.summary, "sent peer message to `target`");
let message = rx.recv().await.unwrap();
assert_eq!(message, "[Peer message from `source`]\nhello");
target.await.unwrap();
}
#[tokio::test(flavor = "current_thread")]
async fn probe_socket_reads_status_after_replayed_alert() {
let root = TempDir::new().unwrap();
@ -1003,7 +1421,7 @@ mod tests {
writer
.write(&Event::Snapshot {
entries: Vec::new(),
greeting: Greeting {
greeting: protocol::Greeting {
pod_name: "alerted".into(),
cwd: "/tmp".into(),
provider: "test".into(),
@ -1051,7 +1469,7 @@ mod tests {
let _ = writer
.write(&Event::Snapshot {
entries: Vec::new(),
greeting: Greeting {
greeting: protocol::Greeting {
pod_name: "child-live".into(),
cwd: "/tmp".into(),
provider: "test".into(),

View File

@ -64,6 +64,14 @@ pub enum Method {
RestorePod {
name: String,
},
/// Register another existing Pod as a reciprocal peer of this Pod.
///
/// This is metadata/control state only: it does not ask the target's live
/// controller for consent, and it must not grant delegated scope,
/// spawned-child ownership, output cursors, or child lifecycle authority.
RegisterPeer {
name: String,
},
}
/// Typed lifecycle events sent from a child Pod to its parent.
@ -480,6 +488,10 @@ pub enum Event {
PodRestored {
result: serde_json::Value,
},
/// Reply to `Method::RegisterPeer`.
PeerRegistered {
result: serde_json::Value,
},
Alert(Alert),
/// Latest memory extract/consolidation lifecycle event for UI observability.
///
@ -1465,13 +1477,17 @@ mod tests {
Method::RestorePod {
name: "child".into(),
},
Method::RegisterPeer {
name: "peer".into(),
},
];
for method in methods {
let json = serde_json::to_string(&method).unwrap();
let decoded: Method = serde_json::from_str(&json).unwrap();
match (decoded, method) {
(Method::ListPods, Method::ListPods)
| (Method::RestorePod { .. }, Method::RestorePod { .. }) => {}
| (Method::RestorePod { .. }, Method::RestorePod { .. })
| (Method::RegisterPeer { .. }, Method::RegisterPeer { .. }) => {}
(decoded, expected) => panic!("decoded {decoded:?}, expected {expected:?}"),
}
}
@ -1486,6 +1502,9 @@ mod tests {
Event::PodRestored {
result: serde_json::json!({ "action": "already_live" }),
},
Event::PeerRegistered {
result: serde_json::json!({ "source": "self", "peer": "other" }),
},
];
for event in events {
let json = serde_json::to_string(&event).unwrap();
@ -1497,6 +1516,9 @@ mod tests {
(Event::PodRestored { result }, Event::PodRestored { result: expected }) => {
assert_eq!(result, expected)
}
(Event::PeerRegistered { result }, Event::PeerRegistered { result: expected }) => {
assert_eq!(result, expected)
}
(decoded, expected) => panic!("decoded {decoded:?}, expected {expected:?}"),
}
}

View File

@ -1204,6 +1204,22 @@ impl App {
});
}
Event::PodsListed { .. } | Event::PodRestored { .. } => {}
Event::PeerRegistered { result } => {
let source = result
.get("source")
.and_then(serde_json::Value::as_str)
.unwrap_or("this Pod");
let peer = result
.get("peer")
.and_then(serde_json::Value::as_str)
.unwrap_or("peer Pod");
self.flash_actionbar_notice(
format!("Peer metadata registered: `{source}` ↔ `{peer}`"),
ActionbarNoticeLevel::Info,
ActionbarNoticeSource::Tui,
Duration::from_secs(4),
);
}
Event::Shutdown => {
self.mark_orphan_compacts_incomplete();
self.quit = true;

View File

@ -156,6 +156,15 @@ impl CommandRegistry {
can_execute: rewind_available,
executor: rewind_command,
});
registry.register(CommandSpec {
name: "peer",
aliases: &[],
usage: "peer <pod-name>",
description: "Register another existing Pod as a reciprocal metadata peer.",
argument_parser: peer_args,
can_execute: peer_available,
executor: peer_command,
});
registry
}
@ -302,6 +311,17 @@ fn rewind_args(raw: &str) -> Result<CommandArgs, CommandDiagnostic> {
}
}
fn peer_args(raw: &str) -> Result<CommandArgs, CommandDiagnostic> {
let args = CommandArgs::parse_whitespace(raw);
if args.argv().len() == 1 {
Ok(args)
} else {
Err(CommandDiagnostic::new(
"Invalid arguments. Usage: peer <pod-name>",
))
}
}
fn compact_available(environment: &CommandEnvironment) -> Result<(), CommandDiagnostic> {
if !environment.connected {
return Err(CommandDiagnostic::new(
@ -335,6 +355,20 @@ fn rewind_available(environment: &CommandEnvironment) -> Result<(), CommandDiagn
Ok(())
}
fn peer_available(environment: &CommandEnvironment) -> Result<(), CommandDiagnostic> {
if !environment.connected {
return Err(CommandDiagnostic::new(
"Cannot register a peer before the Pod is connected.",
));
}
if environment.running {
return Err(CommandDiagnostic::new(
"Cannot register a peer while the Pod is running.",
));
}
Ok(())
}
fn help_command(invocation: CommandInvocation<'_>) -> CommandExecution {
if let Some(name) = invocation.args.argv().first() {
let Some(command) = invocation.registry.find(name) else {
@ -394,6 +428,20 @@ fn rewind_command(invocation: CommandInvocation<'_>) -> CommandExecution {
}
}
fn peer_command(invocation: CommandInvocation<'_>) -> CommandExecution {
let _ = invocation.command;
let _ = invocation.environment;
let name = invocation.args.argv()[0].clone();
CommandExecution {
method: Some(Method::RegisterPeer { name: name.clone() }),
diagnostics: vec![CommandDiagnostic::new(format!(
"peer metadata registration requested with `{name}`"
))],
exit_command_mode: true,
clear_input: true,
}
}
#[cfg(test)]
mod tests {
use super::*;
@ -501,4 +549,61 @@ mod tests {
let result = registry.dispatch("rewind", &paused);
assert!(matches!(result.method, Some(Method::ListRewindTargets)));
}
#[test]
fn peer_command_returns_register_peer_method() {
let registry = CommandRegistry::builtins();
let result = registry.dispatch("peer reviewer", &env());
assert!(matches!(
result.method,
Some(Method::RegisterPeer { ref name }) if name == "reviewer"
));
assert!(result.exit_command_mode);
assert!(result.clear_input);
assert!(
result.diagnostics[0]
.message
.contains("metadata registration")
);
}
#[test]
fn peer_invalid_arguments_are_local_diagnostic() {
let registry = CommandRegistry::builtins();
for command in ["peer", "peer one two"] {
let result = registry.dispatch(command, &env());
assert!(result.method.is_none());
assert!(!result.exit_command_mode);
assert!(result.diagnostics[0].message.contains("Invalid arguments"));
}
}
#[test]
fn peer_help_mentions_metadata_registration() {
let registry = CommandRegistry::builtins();
let result = registry.dispatch("help peer", &env());
assert!(result.method.is_none());
assert!(result.diagnostics[0].message.contains("peer <pod-name>"));
assert!(result.diagnostics[0].message.contains("metadata peer"));
}
#[test]
fn peer_rejects_disconnected() {
let registry = CommandRegistry::builtins();
let mut disconnected = env();
disconnected.connected = false;
let result = registry.dispatch("peer reviewer", &disconnected);
assert!(result.method.is_none());
assert!(result.diagnostics[0].message.contains("connected"));
}
#[test]
fn peer_rejects_running() {
let registry = CommandRegistry::builtins();
let mut running = env();
running.running = true;
let result = registry.dispatch("peer reviewer", &running);
assert!(result.method.is_none());
assert!(result.diagnostics[0].message.contains("running"));
}
}

View File

@ -32,6 +32,12 @@ Parent-visible children are sourced from Pod metadata, not from a transient runt
Delegated write scope is a capability loan. Stopping, shutting down, or pruning a child must reclaim the parent's effective write permissions while preserving explicit base denies.
## Peer Pods
Peer visibility is also Pod metadata, but it is distinct from spawned-child delegation. A TUI user can run `:peer <pod-name>` while attached to an idle Pod to register reciprocal peer metadata with another existing Pod. This is a metadata-level registration, not live target-controller consent.
A peer relationship only makes the Pods mutually visible through `ListPods` with visibility source `peer`. It does not grant filesystem scope, create a child output cursor, make either Pod the other's parent, or imply child completion notifications. Peer messages use `SendToPeerPod`, which delivers a labeled notification into the target Pod's normal durable notification/history path. `SendToPeerPod` requires the peer to be live and fails clearly for non-live peers rather than auto-restoring them.
## Notifications are not authority
Pod completion notifications are UX hints. Before treating delegated work as complete, inspect queryable evidence: child output, session/log state, worktree status, diffs, and validation output.

View File

@ -7,4 +7,6 @@ The parent does not need to keep a turn open or call tools solely to wait for a
Before treating delegated work as complete, read the child output and inspect concrete evidence such as worktree status, diff, and test results. Notifications are hints, not proof of completion.
Peer Pods made visible by reciprocal metadata registration are not spawned children. Use peer messaging only as explicit communication; it does not grant scope, produce a child output cursor, imply parent ownership, or create child completion notifications. Peer sends require a live peer and do not auto-restore stopped peers.
This guidance is not scheduler or auto-maintain authorization. Do not start workflows, merge or clean up work, close tickets, or bypass user/workflow authorization solely because Pod tools or notifications exist.

View File

@ -0,0 +1,36 @@
# Implementation report: peer Pod handshake command
Date: 2026-06-02
## Investigation
The current boundaries are documented in `artifacts/investigation-summary.md`. No escalation blocker was found. The main concern identified was avoiding reuse of spawned-child state (`SpawnedPodRegistry`, delegated scope, output cursors, and child completion semantics) for peer communication; the implementation therefore adds separate peer metadata and a separate peer send tool.
## Implemented behavior
- Added reciprocal peer metadata to `PodMetadata` as `peers`, separate from `spawned_children` and `reclaimed_children`.
- Added protocol `Method::RegisterPeer { name }` and `Event::PeerRegistered { result }`.
- Added controller handling for `RegisterPeer`, idle/paused only, validating an existing target Pod and rejecting self-handshakes.
- Added `PodDiscovery::register_peer` that persists both metadata directions and restores the exact prior source-side peer state on ordinary second-side write failure.
- Extended `ListPods` visibility to include `VisibilityReason::Peer`; a successful handshake makes both Pods see each other as `peer` through Pod metadata.
- Added `SendToPeerPod` as a distinct LLM tool. It only sends to visible live peer Pods, delivers `Method::Notify` with a source label, and does not use child delegation, output cursors, parent ownership, or child completion notifications.
- Added TUI command `:peer <pod-name>` for idle attached Pods. Success is reported through a transient actionbar notice when the controller returns `PeerRegistered`.
- Documented peer semantics in `docs/design/pod-session-state.md` and added prompt guidance that peer Pods are not spawned children.
## Reviewer blocker fixes
- `SendToPeerPod` now reuses the existing one-shot Pod socket client path (`connect_and_send`), which drains connect-time `Alert` / `Snapshot` traffic before writing `Notify` and returns an error if method delivery fails.
- Added a regression test where the target socket emits an alert and snapshot before reading the peer `Notify`, proving the peer send drains the prelude and still delivers the message.
- Registration failure rollback now restores the exact prior source peer list instead of unconditionally removing `source -> target`; a target-side injected failure test verifies a pre-existing source relation is preserved.
- Wording now describes `:peer` as metadata-level reciprocal registration rather than live target-controller consent, and documents that `SendToPeerPod` fails for non-live peers instead of auto-restoring them.
## Tests and validation run
- `cargo test -p protocol -p pod-store -p pod -p tui --lib`
- `./tickets.sh doctor`
- `git diff --check`
- `nix build .#yoi`
## Notes
The two-file reciprocal metadata update is not crash-transactional because the existing Pod metadata store has no multi-record transaction boundary. The implementation avoids successful replies with one-sided state for normal validation/write failures by restoring the exact prior source-side peer list if the reciprocal write fails.

View File

@ -0,0 +1,24 @@
# Investigation summary: peer Pod handshake
Date: 2026-06-02
## Current authority map
- `ListPods` / `RestorePod` are implemented in `crates/pod/src/discovery.rs` and are served both as LLM tools and protocol methods from the currently attached Pod's controller. They intentionally start from the caller Pod's visibility set rather than a host-wide Pod universe. Today the visibility set is the caller itself plus spawned children from durable Pod metadata and the live spawned-child registry.
- Pod metadata is in `crates/pod-store/src/lib.rs` as name-keyed current state (`PodMetadata`) under the Pod-state root. `spawned_children` is durable current parent/child visibility and delegation state; session JSONL remains the durable explanation for delivered context/history.
- The spawned-child registry (`SpawnedPodRegistry`) is runtime/current ownership state for `SpawnPod`, `SendToPod`, `ReadPodOutput`, and `StopPod`. It carries output cursors and delegated scope; it should not be reused for peer visibility.
- `SendToPod` in `crates/pod/src/spawn/comm_tools.rs` is explicitly spawned-child scoped: it looks up only `SpawnedPodRegistry`, sends `Method::Run`, and is paired with `ReadPodOutput` cursor semantics. Broadening it would blur child vs peer semantics, so a distinct peer-safe send surface is preferable.
- Protocol methods are in `crates/protocol/src/lib.rs`; TUI command dispatch is local parsing in `crates/tui/src/command.rs`, returning typed `Method`s that `single_pod.rs` sends to the currently attached Pod. This is suitable for a TUI command that asks the current Pod to perform authoritative metadata changes.
- Delivered notifications already have an explainable durable path: `Method::Notify` is queued through `NotifyBuffer`, rendered as a `SystemItem`, committed as `LogEntry::SystemItem`, and then appended to model history. A peer message can reuse that receive-side path without hidden context injection.
## Implementation direction
No escalation blocker found. The minimal safe design is:
- Add a `peers` field to `PodMetadata`, separate from `spawned_children`, with serde defaults so no broad migration is needed.
- Add a controller-handled `Method::RegisterPeer { name }` that validates the target Pod state exists, rejects self, and writes reciprocal peer entries to both Pod metadata records. Normal write failures roll back the first side so successful replies do not leave one-sided visibility.
- Extend `ListPods` visibility with a `peer` reason sourced from `PodMetadata.peers`; keep spawned-child comm registry and child output cursors unchanged.
- Add a separate `SendToPeerPod` tool backed by `PodDiscovery` visibility. It only sends to visible peers, uses the target's live/restored socket from discovery, and delivers a `Method::Notify` message labeled with the sender Pod name. It does not read output, stop Pods, grant scope, or use `SpawnedPodRegistry`.
- Add TUI `:peer <name>` command that emits `Method::RegisterPeer { name }` to the attached Pod and shows local diagnostics; the controller returns a typed result event for success and normal `Event::Error` for failure.
Crash-level atomicity for two metadata files is not provided by the existing store, but the implementation can avoid misleading partial state for ordinary validation/write failures and does not require broad schema migration or hidden history changes.

View File

@ -7,7 +7,7 @@ kind: task
priority: P2
labels: [tui, pod, command, orchestration]
created_at: 2026-06-01T13:29:55Z
updated_at: 2026-06-02T10:17:42Z
updated_at: 2026-06-02T13:18:34Z
assignee: null
legacy_ticket: null
---

View File

@ -85,4 +85,85 @@ Validation:
- Commit the implementation in the worktree when reviewable.
---
<!-- event: implementation_report author: hare at: 2026-06-02T10:42:37Z -->
## Implementation report
# Implementation report: peer Pod handshake command
Date: 2026-06-02
## Investigation
The current boundaries are documented in `artifacts/investigation-summary.md`. No escalation blocker was found. The main concern identified was avoiding reuse of spawned-child state (`SpawnedPodRegistry`, delegated scope, output cursors, and child completion semantics) for peer communication; the implementation therefore adds separate peer metadata and a separate peer send tool.
## Implemented behavior
- Added reciprocal peer metadata to `PodMetadata` as `peers`, separate from `spawned_children` and `reclaimed_children`.
- Added protocol `Method::RegisterPeer { name }` and `Event::PeerRegistered { result }`.
- Added controller handling for `RegisterPeer`, idle/paused only, validating an existing target Pod and rejecting self-handshakes.
- Added `PodDiscovery::register_peer` that persists both metadata directions and rolls back the first side on ordinary second-side write failure.
- Extended `ListPods` visibility to include `VisibilityReason::Peer`; a successful handshake makes both Pods see each other as `peer` through Pod metadata.
- Added `SendToPeerPod` as a distinct LLM tool. It only sends to visible live peer Pods, delivers `Method::Notify` with a source label, and does not use child delegation, output cursors, parent ownership, or child completion notifications.
- Added TUI command `:peer <pod-name>` for idle attached Pods. Success is reported through a transient actionbar notice when the controller returns `PeerRegistered`.
- Documented peer semantics in `docs/design/pod-session-state.md` and added prompt guidance that peer Pods are not spawned children.
## Tests and validation run
- `cargo test -p protocol -p pod-store -p pod -p tui --lib`
- `./tickets.sh doctor`
- `git diff --check`
- `nix build .#yoi`
## Notes
The two-file reciprocal metadata update is not crash-transactional because the existing Pod metadata store has no multi-record transaction boundary. The implementation avoids successful replies with one-sided state for normal validation/write failures by rolling back the first write if the reciprocal write fails.
---
<!-- event: implementation_report author: hare at: 2026-06-02T13:18:34Z -->
## Implementation report
# Implementation report: peer Pod handshake command
Date: 2026-06-02
## Investigation
The current boundaries are documented in `artifacts/investigation-summary.md`. No escalation blocker was found. The main concern identified was avoiding reuse of spawned-child state (`SpawnedPodRegistry`, delegated scope, output cursors, and child completion semantics) for peer communication; the implementation therefore adds separate peer metadata and a separate peer send tool.
## Implemented behavior
- Added reciprocal peer metadata to `PodMetadata` as `peers`, separate from `spawned_children` and `reclaimed_children`.
- Added protocol `Method::RegisterPeer { name }` and `Event::PeerRegistered { result }`.
- Added controller handling for `RegisterPeer`, idle/paused only, validating an existing target Pod and rejecting self-handshakes.
- Added `PodDiscovery::register_peer` that persists both metadata directions and restores the exact prior source-side peer state on ordinary second-side write failure.
- Extended `ListPods` visibility to include `VisibilityReason::Peer`; a successful handshake makes both Pods see each other as `peer` through Pod metadata.
- Added `SendToPeerPod` as a distinct LLM tool. It only sends to visible live peer Pods, delivers `Method::Notify` with a source label, and does not use child delegation, output cursors, parent ownership, or child completion notifications.
- Added TUI command `:peer <pod-name>` for idle attached Pods. Success is reported through a transient actionbar notice when the controller returns `PeerRegistered`.
- Documented peer semantics in `docs/design/pod-session-state.md` and added prompt guidance that peer Pods are not spawned children.
## Reviewer blocker fixes
- `SendToPeerPod` now reuses the existing one-shot Pod socket client path (`connect_and_send`), which drains connect-time `Alert` / `Snapshot` traffic before writing `Notify` and returns an error if method delivery fails.
- Added a regression test where the target socket emits an alert and snapshot before reading the peer `Notify`, proving the peer send drains the prelude and still delivers the message.
- Registration failure rollback now restores the exact prior source peer list instead of unconditionally removing `source -> target`; a target-side injected failure test verifies a pre-existing source relation is preserved.
- Wording now describes `:peer` as metadata-level reciprocal registration rather than live target-controller consent, and documents that `SendToPeerPod` fails for non-live peers instead of auto-restoring them.
## Tests and validation run
- `cargo test -p protocol -p pod-store -p pod -p tui --lib`
- `./tickets.sh doctor`
- `git diff --check`
- `nix build .#yoi`
## Notes
The two-file reciprocal metadata update is not crash-transactional because the existing Pod metadata store has no multi-record transaction boundary. The implementation avoids successful replies with one-sided state for normal validation/write failures by restoring the exact prior source-side peer list if the reciprocal write fails.
---