From 093a84fc836524d655b554a9c083f83df1de4ce1 Mon Sep 17 00:00:00 2001 From: Hare Date: Tue, 2 Jun 2026 19:43:20 +0900 Subject: [PATCH 1/2] feat: add peer pod handshake command --- crates/pod-store/src/lib.rs | 86 +++++ crates/pod/src/controller.rs | 29 +- crates/pod/src/discovery.rs | 341 +++++++++++++++++- crates/protocol/src/lib.rs | 23 +- crates/tui/src/app.rs | 16 + crates/tui/src/command.rs | 82 +++++ docs/design/pod-session-state.md | 6 + resources/prompts/common/pod-orchestration.md | 2 + .../artifacts/implementation-report.md | 29 ++ .../artifacts/investigation-summary.md | 24 ++ .../item.md | 2 +- .../thread.md | 37 ++ 12 files changed, 663 insertions(+), 14 deletions(-) create mode 100644 work-items/open/20260601-132955-tui-peer-pod-handshake-command/artifacts/implementation-report.md create mode 100644 work-items/open/20260601-132955-tui-peer-pod-handshake-command/artifacts/investigation-summary.md diff --git a/crates/pod-store/src/lib.rs b/crates/pod-store/src/lib.rs index 4a2f249a..bc5ee7fd 100644 --- a/crates/pod-store/src/lib.rs +++ b/crates/pod-store/src/lib.rs @@ -84,6 +84,16 @@ pub struct PodReclaimedChild { pub scope_delegated: Vec, } +/// One peer Pod made visible by an explicit peer handshake. +/// +/// Peer visibility is intentionally separate from spawned-child delegation: it +/// does not carry filesystem scope, callback ownership, output cursors, or +/// lifecycle-notification authority. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct PodPeer { + pub pod_name: String, +} + /// Persistent metadata for a Pod name. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct PodMetadata { @@ -94,6 +104,8 @@ pub struct PodMetadata { pub spawned_children: Vec, #[serde(default, skip_serializing_if = "Vec::is_empty")] pub reclaimed_children: Vec, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub peers: Vec, #[serde(default, skip_serializing_if = "Option::is_none")] pub resolved_manifest_snapshot: Option, } @@ -106,6 +118,7 @@ impl PodMetadata { active, spawned_children: Vec::new(), reclaimed_children: Vec::new(), + peers: Vec::new(), resolved_manifest_snapshot: None, } } @@ -168,6 +181,33 @@ pub trait PodMetadataStore: Send + Sync { }) } + /// Set peer visibility state while preserving active pointer, child state, + /// and manifest snapshot. + fn set_peers(&self, pod_name: &str, peers: Vec) -> Result { + self.update_by_name(pod_name, |metadata| { + metadata.peers = peers; + }) + } + + /// Add one peer if absent while preserving every other metadata field. + fn add_peer(&self, pod_name: &str, peer_name: &str) -> Result { + self.update_by_name(pod_name, |metadata| { + if !metadata.peers.iter().any(|peer| peer.pod_name == peer_name) { + metadata.peers.push(PodPeer { + pod_name: peer_name.to_string(), + }); + metadata.peers.sort_by(|a, b| a.pod_name.cmp(&b.pod_name)); + } + }) + } + + /// Remove one peer while preserving every other metadata field. + fn remove_peer(&self, pod_name: &str, peer_name: &str) -> Result { + self.update_by_name(pod_name, |metadata| { + metadata.peers.retain(|peer| peer.pod_name != peer_name); + }) + } + /// Remove reclaimed child delegations from the outstanding set and record /// them in durable reclaim history. fn reclaim_spawned_children( @@ -503,6 +543,52 @@ mod tests { assert_eq!(restored.resolved_manifest_snapshot, Some(snapshot)); } + #[test] + fn peer_updates_preserve_active_children_and_manifest_snapshot() { + let tmp = tempfile::TempDir::new().unwrap(); + let store = FsPodStore::new(tmp.path()).unwrap(); + let active = PodActiveSegmentRef::active_segment( + session_store::new_session_id(), + session_store::new_segment_id(), + ); + let snapshot = serde_json::json!({"pod":{"name":"agent"}}); + store + .set_active("agent", Some(active.clone()), Some(snapshot.clone())) + .unwrap(); + store + .set_spawned_children( + "agent", + vec![PodSpawnedChild { + pod_name: "child".into(), + socket_path: std::path::Path::new("/tmp/child.sock").into(), + scope_delegated: vec![], + callback_address: std::path::Path::new("/tmp/parent.sock").into(), + }], + ) + .unwrap(); + store.add_peer("agent", "peer-b").unwrap(); + store.add_peer("agent", "peer-a").unwrap(); + store.add_peer("agent", "peer-a").unwrap(); + + let restored = store.read_by_name("agent").unwrap().unwrap(); + assert_eq!(restored.active, Some(active)); + assert_eq!(restored.spawned_children.len(), 1); + assert_eq!(restored.resolved_manifest_snapshot, Some(snapshot)); + assert_eq!( + restored + .peers + .iter() + .map(|peer| peer.pod_name.as_str()) + .collect::>(), + vec!["peer-a", "peer-b"] + ); + + store.remove_peer("agent", "peer-a").unwrap(); + let restored = store.read_by_name("agent").unwrap().unwrap(); + assert_eq!(restored.peers.len(), 1); + assert_eq!(restored.peers[0].pod_name, "peer-b"); + } + #[test] fn reclaim_children_removes_outstanding_and_records_history() { let tmp = tempfile::TempDir::new().unwrap(); diff --git a/crates/pod/src/controller.rs b/crates/pod/src/controller.rs index 03058aa1..5cd1907b 100644 --- a/crates/pod/src/controller.rs +++ b/crates/pod/src/controller.rs @@ -8,7 +8,7 @@ use pod_store::PodMetadataStore; use session_store::Store; use tokio::sync::{broadcast, mpsc, oneshot}; -use crate::discovery::{PodDiscovery, list_pods_tool, restore_pod_tool}; +use crate::discovery::{PodDiscovery, list_pods_tool, restore_pod_tool, send_to_peer_pod_tool}; use crate::ipc::alerter::Alerter; use crate::ipc::notify_buffer::NotifyBuffer; use crate::ipc::server::SocketServer; @@ -561,7 +561,8 @@ where worker.register_tool(stop_pod_tool(spawned_registry.clone())); let discovery = PodDiscovery::new(pod_store, spawner_name, runtime_base, pwd, spawned_registry); worker.register_tool(list_pods_tool(discovery.clone())); - worker.register_tool(restore_pod_tool(discovery)); + worker.register_tool(restore_pod_tool(discovery.clone())); + worker.register_tool(send_to_peer_pod_tool(discovery)); pod.attach_tracker(tracker); fs_for_view } @@ -862,6 +863,26 @@ async fn controller_loop( } }, + Method::RegisterPeer { name } => match discovery.register_peer(&name) { + Ok(result) => match serde_json::to_value(result) { + Ok(result) => { + let _ = event_tx.send(Event::PeerRegistered { result }); + } + Err(error) => { + let _ = event_tx.send(Event::Error { + code: ErrorCode::Internal, + message: format!("serialize peer registration result: {error}"), + }); + } + }, + Err(error) => { + let _ = event_tx.send(Event::Error { + code: ErrorCode::InvalidRequest, + message: error.to_string(), + }); + } + }, + // ListCompletions is handled at the socket layer (direct // response). If it reaches the controller, ignore it. Method::ListCompletions { .. } => {} @@ -1063,10 +1084,10 @@ where notify_buffer.push_notify(message); } Some(Method::ListCompletions { .. }) => {} - Some(Method::ListPods | Method::RestorePod { .. }) => { + Some(Method::ListPods | Method::RestorePod { .. } | Method::RegisterPeer { .. }) => { let _ = event_tx.send(Event::Error { code: ErrorCode::AlreadyRunning, - message: "Pod discovery requests are only handled while the Pod is idle or paused" + message: "Pod discovery/control requests are only handled while the Pod is idle or paused" .into(), }); } diff --git a/crates/pod/src/discovery.rs b/crates/pod/src/discovery.rs index 13c34e27..4284c488 100644 --- a/crates/pod/src/discovery.rs +++ b/crates/pod/src/discovery.rs @@ -17,9 +17,9 @@ use async_trait::async_trait; use client::PodRuntimeCommand; use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput}; use manifest::{Permission, ScopeRule}; -use pod_store::{PodActiveSegmentRef, PodMetadata, PodMetadataStore}; -use protocol::stream::JsonLineReader; -use protocol::{Event, PodStatus}; +use pod_store::{PodActiveSegmentRef, PodMetadata, PodMetadataStore, validate_pod_name}; +use protocol::stream::{JsonLineReader, JsonLineWriter}; +use protocol::{Event, Method, PodStatus}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use session_store::{SegmentId, SessionId}; @@ -172,6 +172,41 @@ where }) } + pub fn register_peer( + &self, + peer_name: &str, + ) -> Result { + validate_pod_name(peer_name)?; + if peer_name == self.self_pod_name { + return Err(PodDiscoveryError::SelfPeer { + pod_name: peer_name.to_string(), + }); + } + let self_exists = self.store.read_by_name(&self.self_pod_name)?.is_some(); + if !self_exists { + return Err(PodDiscoveryError::StateMissing { + pod_name: self.self_pod_name.clone(), + }); + } + let peer_exists = self.store.read_by_name(peer_name)?.is_some(); + if !peer_exists { + return Err(PodDiscoveryError::MissingPod { + pod_name: peer_name.to_string(), + }); + } + + self.store.add_peer(&self.self_pod_name, peer_name)?; + if let Err(error) = self.store.add_peer(peer_name, &self.self_pod_name) { + let _ = self.store.remove_peer(&self.self_pod_name, peer_name); + return Err(PodDiscoveryError::PodStore(error)); + } + + Ok(PeerRegistrationResult { + source: self.self_pod_name.clone(), + peer: peer_name.to_string(), + }) + } + async fn visibility(&self) -> Result { let mut visible = BTreeMap::new(); let mut child_sockets = BTreeMap::new(); @@ -187,6 +222,11 @@ where child_sockets.insert(child.pod_name.clone(), child.socket_path.clone()); comm_registry.insert(child.pod_name.clone(), comm_info_from_spawned_child(&child)); } + for peer in metadata.peers { + visible + .entry(peer.pod_name) + .or_insert(VisibilityReason::Peer); + } } // The live in-memory registry covers just-spawned children even if a @@ -379,6 +419,7 @@ where pub enum VisibilityReason { SelfPod, SpawnedChild, + Peer, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] @@ -525,6 +566,12 @@ pub enum RestoreResult { }, } +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct PeerRegistrationResult { + pub source: String, + pub peer: String, +} + #[derive(Debug, thiserror::Error)] pub enum PodDiscoveryError { #[error("pod state missing for `{pod_name}`")] @@ -533,6 +580,10 @@ pub enum PodDiscoveryError { NotVisible { pod_name: String }, #[error("pod `{pod_name}` is not restorable: {reason}")] NotRestorable { pod_name: String, reason: String }, + #[error("pod `{pod_name}` cannot be registered as a peer of itself")] + SelfPeer { pod_name: String }, + #[error("pod `{pod_name}` does not exist")] + MissingPod { pod_name: String }, #[error( "pod `{pod_name}` segment {segment_id} is locked by `{owner_pod}` pid {pid} at {socket_path}" )] @@ -683,6 +734,14 @@ struct PodNameInput { name: String, } +#[derive(Debug, Deserialize, JsonSchema)] +struct SendToPeerPodInput { + /// Target peer Pod name. + name: String, + /// Text delivered to the peer as a peer notification. + message: String, +} + struct ListPodsTool { discovery: PodDiscovery, } @@ -776,6 +835,83 @@ where }) } +const SEND_TO_PEER_POD_DESCRIPTION: &str = "Send a text message to a peer Pod made visible by an explicit peer handshake. The message is delivered as a peer notification through the target Pod's durable notification/history path. This does not grant delegated scope, create a spawned-child output cursor, imply parent ownership, or produce child completion notifications. Fails if the target is not a visible live peer."; + +struct SendToPeerPodTool { + discovery: PodDiscovery, +} + +#[async_trait] +impl Tool for SendToPeerPodTool +where + St: PodMetadataStore + Clone + Send + Sync + 'static, +{ + async fn execute(&self, input_json: &str) -> Result { + let input: SendToPeerPodInput = serde_json::from_str(input_json) + .map_err(|e| ToolError::InvalidArgument(format!("invalid SendToPeerPod input: {e}")))?; + let detail = self + .discovery + .inspect(&input.name) + .await + .map_err(discovery_error_to_tool_error)?; + if detail.visibility != VisibilityReason::Peer { + return Err(ToolError::InvalidArgument(format!( + "pod `{}` is visible as {:?}, not as a peer", + input.name, detail.visibility + ))); + } + if !detail.live.reachable { + return Err(ToolError::ExecutionFailed(format!( + "peer pod `{}` is not live/reachable; restore it before sending", + input.name + ))); + } + + let message = format!( + "[Peer message from `{}`]\n{}", + self.discovery.self_pod_name, input.message + ); + send_peer_notify(&detail.live.socket_path, message) + .await + .map_err(|error| { + ToolError::ExecutionFailed(format!("send to peer `{}`: {error}", input.name)) + })?; + + Ok(ToolOutput { + summary: format!("sent peer message to `{}`", input.name), + content: None, + }) + } +} + +pub fn send_to_peer_pod_tool(discovery: PodDiscovery) -> ToolDefinition +where + St: PodMetadataStore + Clone + Send + Sync + 'static, +{ + Arc::new(move || { + let meta = ToolMeta::new("SendToPeerPod") + .description(SEND_TO_PEER_POD_DESCRIPTION) + .input_schema(serde_json::to_value(schemars::schema_for!(SendToPeerPodInput)).unwrap()); + let tool: Arc = Arc::new(SendToPeerPodTool { + discovery: discovery.clone(), + }); + (meta, tool) + }) +} + +async fn send_peer_notify(socket_path: &Path, message: String) -> io::Result<()> { + let stream = tokio::time::timeout(Duration::from_secs(5), UnixStream::connect(socket_path)) + .await + .map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "connect timed out"))??; + let mut writer = JsonLineWriter::new(stream); + tokio::time::timeout( + Duration::from_secs(5), + writer.write(&Method::Notify { message }), + ) + .await + .map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "write timed out"))? +} + fn json_content(value: &T) -> Result { serde_json::to_string_pretty(value) .map_err(|e| ToolError::Internal(format!("serialize pod discovery output: {e}"))) @@ -785,7 +921,9 @@ fn discovery_error_to_tool_error(error: PodDiscoveryError) -> ToolError { match error { PodDiscoveryError::StateMissing { .. } | PodDiscoveryError::NotVisible { .. } - | PodDiscoveryError::NotRestorable { .. } => ToolError::InvalidArgument(error.to_string()), + | PodDiscoveryError::NotRestorable { .. } + | PodDiscoveryError::SelfPeer { .. } + | PodDiscoveryError::MissingPod { .. } => ToolError::InvalidArgument(error.to_string()), PodDiscoveryError::LockConflict { .. } | PodDiscoveryError::Store(_) | PodDiscoveryError::PodStore(_) @@ -805,7 +943,7 @@ mod tests { use manifest::{Permission, ScopeRule}; use pod_store::{FsPodStore, PodSpawnedChild, PodSpawnedScopeRule}; use protocol::stream::JsonLineWriter; - use protocol::{Alert, AlertLevel, AlertSource, Greeting}; + use protocol::{Alert, AlertLevel, AlertSource}; use session_store::{new_segment_id, new_session_id}; use tempfile::TempDir; use tokio::net::UnixListener; @@ -844,6 +982,9 @@ mod tests { child("child-pending", &pending_socket), ], reclaimed_children: Vec::new(), + peers: vec![pod_store::PodPeer { + pod_name: "peer".into(), + }], resolved_manifest_snapshot: None, }; store.write(&parent).unwrap(); @@ -856,6 +997,7 @@ mod tests { )), spawned_children: Vec::new(), reclaimed_children: Vec::new(), + peers: Vec::new(), resolved_manifest_snapshot: None, }) .unwrap(); @@ -868,6 +1010,7 @@ mod tests { )), spawned_children: Vec::new(), reclaimed_children: Vec::new(), + peers: Vec::new(), resolved_manifest_snapshot: None, }) .unwrap(); @@ -877,6 +1020,7 @@ mod tests { active: Some(PodActiveSegmentRef::pending_segment(pending_session_id)), spawned_children: Vec::new(), reclaimed_children: Vec::new(), + peers: Vec::new(), resolved_manifest_snapshot: None, }) .unwrap(); @@ -889,6 +1033,19 @@ mod tests { )), spawned_children: Vec::new(), reclaimed_children: Vec::new(), + peers: Vec::new(), + resolved_manifest_snapshot: None, + }) + .unwrap(); + store + .write(&PodMetadata { + pod_name: "peer".into(), + active: None, + spawned_children: Vec::new(), + reclaimed_children: Vec::new(), + peers: vec![pod_store::PodPeer { + pod_name: "parent".into(), + }], resolved_manifest_snapshot: None, }) .unwrap(); @@ -913,14 +1070,37 @@ mod tests { let restore_tool_def = restore_pod_tool(discovery.clone()); let (restore_meta, _) = restore_tool_def(); assert_eq!(restore_meta.name, "RestorePod"); + let send_peer_tool_def = send_to_peer_pod_tool(discovery.clone()); + let (send_peer_meta, _) = send_peer_tool_def(); + assert_eq!(send_peer_meta.name, "SendToPeerPod"); let list = discovery.list_visible().await.unwrap(); let names: Vec<_> = list.iter().map(|p| p.pod_name.as_str()).collect(); assert_eq!( names, - vec!["child-live", "child-pending", "child-stale", "parent"] + vec![ + "child-live", + "child-pending", + "child-stale", + "parent", + "peer" + ] ); assert!(!names.contains(&"hidden")); + assert_eq!( + list.iter() + .find(|p| p.pod_name == "peer") + .unwrap() + .visibility, + VisibilityReason::Peer + ); + assert_eq!( + list.iter() + .find(|p| p.pod_name == "child-live") + .unwrap() + .visibility, + VisibilityReason::SpawnedChild + ); assert!( list.iter() .find(|p| p.pod_name == "child-live") @@ -983,6 +1163,151 @@ mod tests { live_listener.abort(); } + #[tokio::test(flavor = "current_thread")] + async fn register_peer_persists_reciprocal_metadata() { + let root = TempDir::new().unwrap(); + let store_dir = root.path().join("store"); + let runtime_base = root.path().join("runtime"); + std::fs::create_dir_all(&runtime_base).unwrap(); + let store = FsPodStore::new(&store_dir).unwrap(); + store.write(&PodMetadata::new("source", None)).unwrap(); + store.write(&PodMetadata::new("target", None)).unwrap(); + let runtime_dir = Arc::new(RuntimeDir::create(&runtime_base, "source").await.unwrap()); + + let discovery = PodDiscovery::new( + store.clone(), + "source".into(), + runtime_base.clone(), + root.path().to_path_buf(), + SpawnedPodRegistry::new(runtime_dir), + ); + let result = discovery.register_peer("target").unwrap(); + assert_eq!(result.source, "source"); + assert_eq!(result.peer, "target"); + + let source = store.read_by_name("source").unwrap().unwrap(); + let target = store.read_by_name("target").unwrap().unwrap(); + assert_eq!(source.peers[0].pod_name, "target"); + assert_eq!(target.peers[0].pod_name, "source"); + + let list = discovery.list_visible().await.unwrap(); + assert_eq!( + list.iter() + .find(|item| item.pod_name == "target") + .unwrap() + .visibility, + VisibilityReason::Peer + ); + } + + #[tokio::test(flavor = "current_thread")] + async fn register_peer_rejects_self_and_missing_target() { + let root = TempDir::new().unwrap(); + let store_dir = root.path().join("store"); + let runtime_base = root.path().join("runtime"); + std::fs::create_dir_all(&runtime_base).unwrap(); + let store = FsPodStore::new(&store_dir).unwrap(); + store.write(&PodMetadata::new("source", None)).unwrap(); + let runtime_dir = Arc::new(RuntimeDir::create(&runtime_base, "source").await.unwrap()); + let discovery = PodDiscovery::new( + store, + "source".into(), + runtime_base, + root.path().to_path_buf(), + SpawnedPodRegistry::new(runtime_dir), + ); + + let self_err = discovery.register_peer("source").unwrap_err(); + assert!(matches!(self_err, PodDiscoveryError::SelfPeer { .. })); + let missing_err = discovery.register_peer("missing").unwrap_err(); + assert!(matches!(missing_err, PodDiscoveryError::MissingPod { .. })); + } + + #[tokio::test(flavor = "current_thread")] + async fn send_to_peer_pod_delivers_notify_without_child_registry() { + let root = TempDir::new().unwrap(); + let store_dir = root.path().join("store"); + let runtime_base = root.path().join("runtime"); + std::fs::create_dir_all(runtime_base.join("target")).unwrap(); + let store = FsPodStore::new(&store_dir).unwrap(); + store + .write(&PodMetadata { + pod_name: "source".into(), + active: None, + spawned_children: Vec::new(), + reclaimed_children: Vec::new(), + peers: vec![pod_store::PodPeer { + pod_name: "target".into(), + }], + resolved_manifest_snapshot: None, + }) + .unwrap(); + store + .write(&PodMetadata { + pod_name: "target".into(), + active: None, + spawned_children: Vec::new(), + reclaimed_children: Vec::new(), + peers: vec![pod_store::PodPeer { + pod_name: "source".into(), + }], + resolved_manifest_snapshot: None, + }) + .unwrap(); + let runtime_dir = Arc::new(RuntimeDir::create(&runtime_base, "source").await.unwrap()); + let discovery = PodDiscovery::new( + store, + "source".into(), + runtime_base.clone(), + root.path().to_path_buf(), + SpawnedPodRegistry::new(runtime_dir), + ); + + let socket = runtime_base.join("target").join("sock"); + let listener = UnixListener::bind(&socket).unwrap(); + let (tx, mut rx) = tokio::sync::mpsc::channel(1); + let target = tokio::spawn(async move { + let (stream, _) = listener.accept().await.unwrap(); + let mut writer = JsonLineWriter::new(stream); + writer + .write(&Event::Snapshot { + entries: Vec::new(), + greeting: protocol::Greeting { + pod_name: "target".into(), + cwd: "/tmp".into(), + provider: "test".into(), + model: "test".into(), + scope_summary: String::new(), + tools: Vec::new(), + context_window: 0, + context_tokens: 0, + }, + status: PodStatus::Idle, + }) + .await + .unwrap(); + + let (stream, _) = listener.accept().await.unwrap(); + let mut reader = JsonLineReader::new(stream); + let method = reader.next::().await.unwrap().unwrap(); + if let Method::Notify { message } = method { + tx.send(message).await.unwrap(); + } else { + panic!("expected Notify, got {method:?}"); + } + }); + + let (_, tool) = send_to_peer_pod_tool(discovery)(); + let output = tool + .execute(r#"{"name":"target","message":"hello"}"#) + .await + .unwrap(); + assert_eq!(output.summary, "sent peer message to `target`"); + let message = rx.recv().await.unwrap(); + assert_eq!(message, "[Peer message from `source`]\nhello"); + target.await.unwrap(); + } + #[tokio::test(flavor = "current_thread")] async fn probe_socket_reads_status_after_replayed_alert() { let root = TempDir::new().unwrap(); @@ -1003,7 +1328,7 @@ mod tests { writer .write(&Event::Snapshot { entries: Vec::new(), - greeting: Greeting { + greeting: protocol::Greeting { pod_name: "alerted".into(), cwd: "/tmp".into(), provider: "test".into(), @@ -1051,7 +1376,7 @@ mod tests { let _ = writer .write(&Event::Snapshot { entries: Vec::new(), - greeting: Greeting { + greeting: protocol::Greeting { pod_name: "child-live".into(), cwd: "/tmp".into(), provider: "test".into(), diff --git a/crates/protocol/src/lib.rs b/crates/protocol/src/lib.rs index b0865de5..9eeed8b9 100644 --- a/crates/protocol/src/lib.rs +++ b/crates/protocol/src/lib.rs @@ -64,6 +64,13 @@ pub enum Method { RestorePod { name: String, }, + /// Register another existing Pod as a reciprocal peer of this Pod. + /// + /// This is metadata/control state only: it must not grant delegated scope, + /// spawned-child ownership, output cursors, or child lifecycle authority. + RegisterPeer { + name: String, + }, } /// Typed lifecycle events sent from a child Pod to its parent. @@ -480,6 +487,10 @@ pub enum Event { PodRestored { result: serde_json::Value, }, + /// Reply to `Method::RegisterPeer`. + PeerRegistered { + result: serde_json::Value, + }, Alert(Alert), /// Latest memory extract/consolidation lifecycle event for UI observability. /// @@ -1465,13 +1476,17 @@ mod tests { Method::RestorePod { name: "child".into(), }, + Method::RegisterPeer { + name: "peer".into(), + }, ]; for method in methods { let json = serde_json::to_string(&method).unwrap(); let decoded: Method = serde_json::from_str(&json).unwrap(); match (decoded, method) { (Method::ListPods, Method::ListPods) - | (Method::RestorePod { .. }, Method::RestorePod { .. }) => {} + | (Method::RestorePod { .. }, Method::RestorePod { .. }) + | (Method::RegisterPeer { .. }, Method::RegisterPeer { .. }) => {} (decoded, expected) => panic!("decoded {decoded:?}, expected {expected:?}"), } } @@ -1486,6 +1501,9 @@ mod tests { Event::PodRestored { result: serde_json::json!({ "action": "already_live" }), }, + Event::PeerRegistered { + result: serde_json::json!({ "source": "self", "peer": "other" }), + }, ]; for event in events { let json = serde_json::to_string(&event).unwrap(); @@ -1497,6 +1515,9 @@ mod tests { (Event::PodRestored { result }, Event::PodRestored { result: expected }) => { assert_eq!(result, expected) } + (Event::PeerRegistered { result }, Event::PeerRegistered { result: expected }) => { + assert_eq!(result, expected) + } (decoded, expected) => panic!("decoded {decoded:?}, expected {expected:?}"), } } diff --git a/crates/tui/src/app.rs b/crates/tui/src/app.rs index ee16cda7..92c0780c 100644 --- a/crates/tui/src/app.rs +++ b/crates/tui/src/app.rs @@ -1204,6 +1204,22 @@ impl App { }); } Event::PodsListed { .. } | Event::PodRestored { .. } => {} + Event::PeerRegistered { result } => { + let source = result + .get("source") + .and_then(serde_json::Value::as_str) + .unwrap_or("this Pod"); + let peer = result + .get("peer") + .and_then(serde_json::Value::as_str) + .unwrap_or("peer Pod"); + self.flash_actionbar_notice( + format!("Peer handshake registered: `{source}` ↔ `{peer}`"), + ActionbarNoticeLevel::Info, + ActionbarNoticeSource::Tui, + Duration::from_secs(4), + ); + } Event::Shutdown => { self.mark_orphan_compacts_incomplete(); self.quit = true; diff --git a/crates/tui/src/command.rs b/crates/tui/src/command.rs index ba624924..3dee7228 100644 --- a/crates/tui/src/command.rs +++ b/crates/tui/src/command.rs @@ -156,6 +156,15 @@ impl CommandRegistry { can_execute: rewind_available, executor: rewind_command, }); + registry.register(CommandSpec { + name: "peer", + aliases: &[], + usage: "peer ", + description: "Make another existing Pod visible as a reciprocal peer.", + argument_parser: peer_args, + can_execute: peer_available, + executor: peer_command, + }); registry } @@ -302,6 +311,17 @@ fn rewind_args(raw: &str) -> Result { } } +fn peer_args(raw: &str) -> Result { + let args = CommandArgs::parse_whitespace(raw); + if args.argv().len() == 1 { + Ok(args) + } else { + Err(CommandDiagnostic::new( + "Invalid arguments. Usage: peer ", + )) + } +} + fn compact_available(environment: &CommandEnvironment) -> Result<(), CommandDiagnostic> { if !environment.connected { return Err(CommandDiagnostic::new( @@ -335,6 +355,20 @@ fn rewind_available(environment: &CommandEnvironment) -> Result<(), CommandDiagn Ok(()) } +fn peer_available(environment: &CommandEnvironment) -> Result<(), CommandDiagnostic> { + if !environment.connected { + return Err(CommandDiagnostic::new( + "Cannot register a peer before the Pod is connected.", + )); + } + if environment.running { + return Err(CommandDiagnostic::new( + "Cannot register a peer while the Pod is running.", + )); + } + Ok(()) +} + fn help_command(invocation: CommandInvocation<'_>) -> CommandExecution { if let Some(name) = invocation.args.argv().first() { let Some(command) = invocation.registry.find(name) else { @@ -394,6 +428,20 @@ fn rewind_command(invocation: CommandInvocation<'_>) -> CommandExecution { } } +fn peer_command(invocation: CommandInvocation<'_>) -> CommandExecution { + let _ = invocation.command; + let _ = invocation.environment; + let name = invocation.args.argv()[0].clone(); + CommandExecution { + method: Some(Method::RegisterPeer { name: name.clone() }), + diagnostics: vec![CommandDiagnostic::new(format!( + "peer handshake requested with `{name}`" + ))], + exit_command_mode: true, + clear_input: true, + } +} + #[cfg(test)] mod tests { use super::*; @@ -501,4 +549,38 @@ mod tests { let result = registry.dispatch("rewind", &paused); assert!(matches!(result.method, Some(Method::ListRewindTargets))); } + + #[test] + fn peer_command_returns_register_peer_method() { + let registry = CommandRegistry::builtins(); + let result = registry.dispatch("peer reviewer", &env()); + assert!(matches!( + result.method, + Some(Method::RegisterPeer { ref name }) if name == "reviewer" + )); + assert!(result.exit_command_mode); + assert!(result.clear_input); + assert!(result.diagnostics[0].message.contains("peer handshake")); + } + + #[test] + fn peer_invalid_arguments_are_local_diagnostic() { + let registry = CommandRegistry::builtins(); + for command in ["peer", "peer one two"] { + let result = registry.dispatch(command, &env()); + assert!(result.method.is_none()); + assert!(!result.exit_command_mode); + assert!(result.diagnostics[0].message.contains("Invalid arguments")); + } + } + + #[test] + fn peer_rejects_running() { + let registry = CommandRegistry::builtins(); + let mut running = env(); + running.running = true; + let result = registry.dispatch("peer reviewer", &running); + assert!(result.method.is_none()); + assert!(result.diagnostics[0].message.contains("running")); + } } diff --git a/docs/design/pod-session-state.md b/docs/design/pod-session-state.md index 926c83ab..59f1c3eb 100644 --- a/docs/design/pod-session-state.md +++ b/docs/design/pod-session-state.md @@ -32,6 +32,12 @@ Parent-visible children are sourced from Pod metadata, not from a transient runt Delegated write scope is a capability loan. Stopping, shutting down, or pruning a child must reclaim the parent's effective write permissions while preserving explicit base denies. +## Peer Pods + +Peer visibility is also Pod metadata, but it is distinct from spawned-child delegation. A TUI user can run `:peer ` while attached to an idle Pod to register a reciprocal peer handshake with another existing Pod. + +A peer relationship only makes the Pods mutually visible through `ListPods` with visibility source `peer`. It does not grant filesystem scope, create a child output cursor, make either Pod the other's parent, or imply child completion notifications. Peer messages use `SendToPeerPod`, which delivers a labeled notification into the target Pod's normal durable notification/history path. + ## Notifications are not authority Pod completion notifications are UX hints. Before treating delegated work as complete, inspect queryable evidence: child output, session/log state, worktree status, diffs, and validation output. diff --git a/resources/prompts/common/pod-orchestration.md b/resources/prompts/common/pod-orchestration.md index 06b8bdd1..293502d5 100644 --- a/resources/prompts/common/pod-orchestration.md +++ b/resources/prompts/common/pod-orchestration.md @@ -7,4 +7,6 @@ The parent does not need to keep a turn open or call tools solely to wait for a Before treating delegated work as complete, read the child output and inspect concrete evidence such as worktree status, diff, and test results. Notifications are hints, not proof of completion. +Peer Pods made visible by a handshake are not spawned children. Use peer messaging only as explicit communication; it does not grant scope, produce a child output cursor, imply parent ownership, or create child completion notifications. + This guidance is not scheduler or auto-maintain authorization. Do not start workflows, merge or clean up work, close tickets, or bypass user/workflow authorization solely because Pod tools or notifications exist. diff --git a/work-items/open/20260601-132955-tui-peer-pod-handshake-command/artifacts/implementation-report.md b/work-items/open/20260601-132955-tui-peer-pod-handshake-command/artifacts/implementation-report.md new file mode 100644 index 00000000..c4bdc814 --- /dev/null +++ b/work-items/open/20260601-132955-tui-peer-pod-handshake-command/artifacts/implementation-report.md @@ -0,0 +1,29 @@ +# Implementation report: peer Pod handshake command + +Date: 2026-06-02 + +## Investigation + +The current boundaries are documented in `artifacts/investigation-summary.md`. No escalation blocker was found. The main concern identified was avoiding reuse of spawned-child state (`SpawnedPodRegistry`, delegated scope, output cursors, and child completion semantics) for peer communication; the implementation therefore adds separate peer metadata and a separate peer send tool. + +## Implemented behavior + +- Added reciprocal peer metadata to `PodMetadata` as `peers`, separate from `spawned_children` and `reclaimed_children`. +- Added protocol `Method::RegisterPeer { name }` and `Event::PeerRegistered { result }`. +- Added controller handling for `RegisterPeer`, idle/paused only, validating an existing target Pod and rejecting self-handshakes. +- Added `PodDiscovery::register_peer` that persists both metadata directions and rolls back the first side on ordinary second-side write failure. +- Extended `ListPods` visibility to include `VisibilityReason::Peer`; a successful handshake makes both Pods see each other as `peer` through Pod metadata. +- Added `SendToPeerPod` as a distinct LLM tool. It only sends to visible live peer Pods, delivers `Method::Notify` with a source label, and does not use child delegation, output cursors, parent ownership, or child completion notifications. +- Added TUI command `:peer ` for idle attached Pods. Success is reported through a transient actionbar notice when the controller returns `PeerRegistered`. +- Documented peer semantics in `docs/design/pod-session-state.md` and added prompt guidance that peer Pods are not spawned children. + +## Tests and validation run + +- `cargo test -p protocol -p pod-store -p pod -p tui --lib` +- `./tickets.sh doctor` +- `git diff --check` +- `nix build .#yoi` + +## Notes + +The two-file reciprocal metadata update is not crash-transactional because the existing Pod metadata store has no multi-record transaction boundary. The implementation avoids successful replies with one-sided state for normal validation/write failures by rolling back the first write if the reciprocal write fails. diff --git a/work-items/open/20260601-132955-tui-peer-pod-handshake-command/artifacts/investigation-summary.md b/work-items/open/20260601-132955-tui-peer-pod-handshake-command/artifacts/investigation-summary.md new file mode 100644 index 00000000..ea97ef1d --- /dev/null +++ b/work-items/open/20260601-132955-tui-peer-pod-handshake-command/artifacts/investigation-summary.md @@ -0,0 +1,24 @@ +# Investigation summary: peer Pod handshake + +Date: 2026-06-02 + +## Current authority map + +- `ListPods` / `RestorePod` are implemented in `crates/pod/src/discovery.rs` and are served both as LLM tools and protocol methods from the currently attached Pod's controller. They intentionally start from the caller Pod's visibility set rather than a host-wide Pod universe. Today the visibility set is the caller itself plus spawned children from durable Pod metadata and the live spawned-child registry. +- Pod metadata is in `crates/pod-store/src/lib.rs` as name-keyed current state (`PodMetadata`) under the Pod-state root. `spawned_children` is durable current parent/child visibility and delegation state; session JSONL remains the durable explanation for delivered context/history. +- The spawned-child registry (`SpawnedPodRegistry`) is runtime/current ownership state for `SpawnPod`, `SendToPod`, `ReadPodOutput`, and `StopPod`. It carries output cursors and delegated scope; it should not be reused for peer visibility. +- `SendToPod` in `crates/pod/src/spawn/comm_tools.rs` is explicitly spawned-child scoped: it looks up only `SpawnedPodRegistry`, sends `Method::Run`, and is paired with `ReadPodOutput` cursor semantics. Broadening it would blur child vs peer semantics, so a distinct peer-safe send surface is preferable. +- Protocol methods are in `crates/protocol/src/lib.rs`; TUI command dispatch is local parsing in `crates/tui/src/command.rs`, returning typed `Method`s that `single_pod.rs` sends to the currently attached Pod. This is suitable for a TUI command that asks the current Pod to perform authoritative metadata changes. +- Delivered notifications already have an explainable durable path: `Method::Notify` is queued through `NotifyBuffer`, rendered as a `SystemItem`, committed as `LogEntry::SystemItem`, and then appended to model history. A peer message can reuse that receive-side path without hidden context injection. + +## Implementation direction + +No escalation blocker found. The minimal safe design is: + +- Add a `peers` field to `PodMetadata`, separate from `spawned_children`, with serde defaults so no broad migration is needed. +- Add a controller-handled `Method::RegisterPeer { name }` that validates the target Pod state exists, rejects self, and writes reciprocal peer entries to both Pod metadata records. Normal write failures roll back the first side so successful replies do not leave one-sided visibility. +- Extend `ListPods` visibility with a `peer` reason sourced from `PodMetadata.peers`; keep spawned-child comm registry and child output cursors unchanged. +- Add a separate `SendToPeerPod` tool backed by `PodDiscovery` visibility. It only sends to visible peers, uses the target's live/restored socket from discovery, and delivers a `Method::Notify` message labeled with the sender Pod name. It does not read output, stop Pods, grant scope, or use `SpawnedPodRegistry`. +- Add TUI `:peer ` command that emits `Method::RegisterPeer { name }` to the attached Pod and shows local diagnostics; the controller returns a typed result event for success and normal `Event::Error` for failure. + +Crash-level atomicity for two metadata files is not provided by the existing store, but the implementation can avoid misleading partial state for ordinary validation/write failures and does not require broad schema migration or hidden history changes. diff --git a/work-items/open/20260601-132955-tui-peer-pod-handshake-command/item.md b/work-items/open/20260601-132955-tui-peer-pod-handshake-command/item.md index cc033441..dab0f1e3 100644 --- a/work-items/open/20260601-132955-tui-peer-pod-handshake-command/item.md +++ b/work-items/open/20260601-132955-tui-peer-pod-handshake-command/item.md @@ -7,7 +7,7 @@ kind: task priority: P2 labels: [tui, pod, command, orchestration] created_at: 2026-06-01T13:29:55Z -updated_at: 2026-06-02T10:17:42Z +updated_at: 2026-06-02T10:42:37Z assignee: null legacy_ticket: null --- diff --git a/work-items/open/20260601-132955-tui-peer-pod-handshake-command/thread.md b/work-items/open/20260601-132955-tui-peer-pod-handshake-command/thread.md index 8cdf4aec..1f084ec5 100644 --- a/work-items/open/20260601-132955-tui-peer-pod-handshake-command/thread.md +++ b/work-items/open/20260601-132955-tui-peer-pod-handshake-command/thread.md @@ -85,4 +85,41 @@ Validation: - Commit the implementation in the worktree when reviewable. +--- + + + +## Implementation report + +# Implementation report: peer Pod handshake command + +Date: 2026-06-02 + +## Investigation + +The current boundaries are documented in `artifacts/investigation-summary.md`. No escalation blocker was found. The main concern identified was avoiding reuse of spawned-child state (`SpawnedPodRegistry`, delegated scope, output cursors, and child completion semantics) for peer communication; the implementation therefore adds separate peer metadata and a separate peer send tool. + +## Implemented behavior + +- Added reciprocal peer metadata to `PodMetadata` as `peers`, separate from `spawned_children` and `reclaimed_children`. +- Added protocol `Method::RegisterPeer { name }` and `Event::PeerRegistered { result }`. +- Added controller handling for `RegisterPeer`, idle/paused only, validating an existing target Pod and rejecting self-handshakes. +- Added `PodDiscovery::register_peer` that persists both metadata directions and rolls back the first side on ordinary second-side write failure. +- Extended `ListPods` visibility to include `VisibilityReason::Peer`; a successful handshake makes both Pods see each other as `peer` through Pod metadata. +- Added `SendToPeerPod` as a distinct LLM tool. It only sends to visible live peer Pods, delivers `Method::Notify` with a source label, and does not use child delegation, output cursors, parent ownership, or child completion notifications. +- Added TUI command `:peer ` for idle attached Pods. Success is reported through a transient actionbar notice when the controller returns `PeerRegistered`. +- Documented peer semantics in `docs/design/pod-session-state.md` and added prompt guidance that peer Pods are not spawned children. + +## Tests and validation run + +- `cargo test -p protocol -p pod-store -p pod -p tui --lib` +- `./tickets.sh doctor` +- `git diff --check` +- `nix build .#yoi` + +## Notes + +The two-file reciprocal metadata update is not crash-transactional because the existing Pod metadata store has no multi-record transaction boundary. The implementation avoids successful replies with one-sided state for normal validation/write failures by rolling back the first write if the reciprocal write fails. + + --- From 057c2eff5fa8c43e88d2b22603cedb5af63012e3 Mon Sep 17 00:00:00 2001 From: Hare Date: Tue, 2 Jun 2026 22:23:13 +0900 Subject: [PATCH 2/2] fix: harden peer pod registration --- crates/pod/src/discovery.rs | 139 +++++++++++++++--- crates/protocol/src/lib.rs | 3 +- crates/tui/src/app.rs | 2 +- crates/tui/src/command.rs | 29 +++- docs/design/pod-session-state.md | 4 +- resources/prompts/common/pod-orchestration.md | 2 +- .../artifacts/implementation-report.md | 11 +- .../item.md | 2 +- .../thread.md | 44 ++++++ 9 files changed, 202 insertions(+), 34 deletions(-) diff --git a/crates/pod/src/discovery.rs b/crates/pod/src/discovery.rs index 4284c488..dfdefb1e 100644 --- a/crates/pod/src/discovery.rs +++ b/crates/pod/src/discovery.rs @@ -18,7 +18,7 @@ use client::PodRuntimeCommand; use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput}; use manifest::{Permission, ScopeRule}; use pod_store::{PodActiveSegmentRef, PodMetadata, PodMetadataStore, validate_pod_name}; -use protocol::stream::{JsonLineReader, JsonLineWriter}; +use protocol::stream::JsonLineReader; use protocol::{Event, Method, PodStatus}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; @@ -28,6 +28,7 @@ use tokio::process::Command; use crate::runtime::dir::SpawnedPodRecord; use crate::runtime::pod_registry; +use crate::spawn::comm_tools::connect_and_send; use crate::spawn::registry::SpawnedPodRegistry; const PROBE_TIMEOUT: Duration = Duration::from_millis(500); @@ -182,14 +183,14 @@ where pod_name: peer_name.to_string(), }); } - let self_exists = self.store.read_by_name(&self.self_pod_name)?.is_some(); - if !self_exists { - return Err(PodDiscoveryError::StateMissing { + let self_metadata = self + .store + .read_by_name(&self.self_pod_name)? + .ok_or_else(|| PodDiscoveryError::StateMissing { pod_name: self.self_pod_name.clone(), - }); - } - let peer_exists = self.store.read_by_name(peer_name)?.is_some(); - if !peer_exists { + })?; + let prior_self_peers = self_metadata.peers.clone(); + if self.store.read_by_name(peer_name)?.is_none() { return Err(PodDiscoveryError::MissingPod { pod_name: peer_name.to_string(), }); @@ -197,7 +198,7 @@ where self.store.add_peer(&self.self_pod_name, peer_name)?; if let Err(error) = self.store.add_peer(peer_name, &self.self_pod_name) { - let _ = self.store.remove_peer(&self.self_pod_name, peer_name); + let _ = self.store.set_peers(&self.self_pod_name, prior_self_peers); return Err(PodDiscoveryError::PodStore(error)); } @@ -804,7 +805,7 @@ where Arc::new(move || { let meta = ToolMeta::new("ListPods") .description( - "List Pods visible to this Pod from durable Pod state and the spawned-child registry. This does not expose the host-wide Pod universe.", + "List Pods visible to this Pod from durable Pod state, peer metadata, and the spawned-child registry. This does not expose the host-wide Pod universe.", ) .input_schema(serde_json::json!({ "type": "object", @@ -835,7 +836,7 @@ where }) } -const SEND_TO_PEER_POD_DESCRIPTION: &str = "Send a text message to a peer Pod made visible by an explicit peer handshake. The message is delivered as a peer notification through the target Pod's durable notification/history path. This does not grant delegated scope, create a spawned-child output cursor, imply parent ownership, or produce child completion notifications. Fails if the target is not a visible live peer."; +const SEND_TO_PEER_POD_DESCRIPTION: &str = "Send a text message to a peer Pod made visible by explicit reciprocal peer metadata. The message is delivered as a peer notification through the target Pod's durable notification/history path. This does not grant delegated scope, create a spawned-child output cursor, imply parent ownership, or produce child completion notifications. Fails clearly if the target is not a visible live peer; it does not auto-restore stopped peers."; struct SendToPeerPodTool { discovery: PodDiscovery, @@ -900,16 +901,7 @@ where } async fn send_peer_notify(socket_path: &Path, message: String) -> io::Result<()> { - let stream = tokio::time::timeout(Duration::from_secs(5), UnixStream::connect(socket_path)) - .await - .map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "connect timed out"))??; - let mut writer = JsonLineWriter::new(stream); - tokio::time::timeout( - Duration::from_secs(5), - writer.write(&Method::Notify { message }), - ) - .await - .map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "write timed out"))? + connect_and_send(socket_path, &Method::Notify { message }).await } fn json_content(value: &T) -> Result { @@ -941,7 +933,7 @@ mod tests { use std::sync::Mutex; use manifest::{Permission, ScopeRule}; - use pod_store::{FsPodStore, PodSpawnedChild, PodSpawnedScopeRule}; + use pod_store::{FsPodStore, PodSpawnedChild, PodSpawnedScopeRule, PodStoreError}; use protocol::stream::JsonLineWriter; use protocol::{Alert, AlertLevel, AlertSource}; use session_store::{new_segment_id, new_session_id}; @@ -950,6 +942,40 @@ mod tests { use crate::runtime::dir::RuntimeDir; + #[derive(Clone)] + struct FailTargetPeerStore { + inner: FsPodStore, + } + + impl PodMetadataStore for FailTargetPeerStore { + fn write(&self, metadata: &PodMetadata) -> Result<(), PodStoreError> { + if metadata.pod_name == "target" + && metadata.peers.iter().any(|peer| peer.pod_name == "source") + { + return Err(PodStoreError::Io(io::Error::other( + "injected target-side peer write failure", + ))); + } + self.inner.write(metadata) + } + + fn read_by_name(&self, pod_name: &str) -> Result, PodStoreError> { + self.inner.read_by_name(pod_name) + } + + fn list_names(&self) -> Result, PodStoreError> { + self.inner.list_names() + } + + fn root_dir(&self) -> Option { + self.inner.root_dir() + } + + fn delete_by_name(&self, pod_name: &str) -> Result<(), PodStoreError> { + self.inner.delete_by_name(pod_name) + } + } + static ENV_LOCK: Mutex<()> = Mutex::new(()); #[tokio::test(flavor = "current_thread")] @@ -1223,6 +1249,45 @@ mod tests { assert!(matches!(missing_err, PodDiscoveryError::MissingPod { .. })); } + #[tokio::test(flavor = "current_thread")] + async fn register_peer_target_failure_preserves_existing_source_peer() { + let root = TempDir::new().unwrap(); + let store_dir = root.path().join("store"); + let runtime_base = root.path().join("runtime"); + std::fs::create_dir_all(&runtime_base).unwrap(); + let inner = FsPodStore::new(&store_dir).unwrap(); + inner + .write(&PodMetadata { + pod_name: "source".into(), + active: None, + spawned_children: Vec::new(), + reclaimed_children: Vec::new(), + peers: vec![pod_store::PodPeer { + pod_name: "target".into(), + }], + resolved_manifest_snapshot: None, + }) + .unwrap(); + inner.write(&PodMetadata::new("target", None)).unwrap(); + let store = FailTargetPeerStore { inner }; + let runtime_dir = Arc::new(RuntimeDir::create(&runtime_base, "source").await.unwrap()); + let discovery = PodDiscovery::new( + store.clone(), + "source".into(), + runtime_base, + root.path().to_path_buf(), + SpawnedPodRegistry::new(runtime_dir), + ); + + let err = discovery.register_peer("target").unwrap_err(); + assert!(matches!(err, PodDiscoveryError::PodStore(_))); + let source = store.read_by_name("source").unwrap().unwrap(); + assert_eq!(source.peers.len(), 1); + assert_eq!(source.peers[0].pod_name, "target"); + let target = store.read_by_name("target").unwrap().unwrap(); + assert!(target.peers.is_empty()); + } + #[tokio::test(flavor = "current_thread")] async fn send_to_peer_pod_delivers_notify_without_child_registry() { let root = TempDir::new().unwrap(); @@ -1288,7 +1353,35 @@ mod tests { .unwrap(); let (stream, _) = listener.accept().await.unwrap(); - let mut reader = JsonLineReader::new(stream); + let (reader_half, writer_half) = stream.into_split(); + let mut reader = JsonLineReader::new(reader_half); + let mut writer = JsonLineWriter::new(writer_half); + writer + .write(&Event::Alert(Alert { + level: AlertLevel::Warn, + source: AlertSource::Pod, + message: "connect-time alert".into(), + timestamp_ms: 0, + })) + .await + .unwrap(); + writer + .write(&Event::Snapshot { + entries: Vec::new(), + greeting: protocol::Greeting { + pod_name: "target".into(), + cwd: "/tmp".into(), + provider: "test".into(), + model: "test".into(), + scope_summary: String::new(), + tools: Vec::new(), + context_window: 0, + context_tokens: 0, + }, + status: PodStatus::Idle, + }) + .await + .unwrap(); let method = reader.next::().await.unwrap().unwrap(); if let Method::Notify { message } = method { tx.send(message).await.unwrap(); diff --git a/crates/protocol/src/lib.rs b/crates/protocol/src/lib.rs index 9eeed8b9..80c4e4bb 100644 --- a/crates/protocol/src/lib.rs +++ b/crates/protocol/src/lib.rs @@ -66,7 +66,8 @@ pub enum Method { }, /// Register another existing Pod as a reciprocal peer of this Pod. /// - /// This is metadata/control state only: it must not grant delegated scope, + /// This is metadata/control state only: it does not ask the target's live + /// controller for consent, and it must not grant delegated scope, /// spawned-child ownership, output cursors, or child lifecycle authority. RegisterPeer { name: String, diff --git a/crates/tui/src/app.rs b/crates/tui/src/app.rs index 92c0780c..7deaabe9 100644 --- a/crates/tui/src/app.rs +++ b/crates/tui/src/app.rs @@ -1214,7 +1214,7 @@ impl App { .and_then(serde_json::Value::as_str) .unwrap_or("peer Pod"); self.flash_actionbar_notice( - format!("Peer handshake registered: `{source}` ↔ `{peer}`"), + format!("Peer metadata registered: `{source}` ↔ `{peer}`"), ActionbarNoticeLevel::Info, ActionbarNoticeSource::Tui, Duration::from_secs(4), diff --git a/crates/tui/src/command.rs b/crates/tui/src/command.rs index 3dee7228..80564fd3 100644 --- a/crates/tui/src/command.rs +++ b/crates/tui/src/command.rs @@ -160,7 +160,7 @@ impl CommandRegistry { name: "peer", aliases: &[], usage: "peer ", - description: "Make another existing Pod visible as a reciprocal peer.", + description: "Register another existing Pod as a reciprocal metadata peer.", argument_parser: peer_args, can_execute: peer_available, executor: peer_command, @@ -435,7 +435,7 @@ fn peer_command(invocation: CommandInvocation<'_>) -> CommandExecution { CommandExecution { method: Some(Method::RegisterPeer { name: name.clone() }), diagnostics: vec![CommandDiagnostic::new(format!( - "peer handshake requested with `{name}`" + "peer metadata registration requested with `{name}`" ))], exit_command_mode: true, clear_input: true, @@ -560,7 +560,11 @@ mod tests { )); assert!(result.exit_command_mode); assert!(result.clear_input); - assert!(result.diagnostics[0].message.contains("peer handshake")); + assert!( + result.diagnostics[0] + .message + .contains("metadata registration") + ); } #[test] @@ -574,6 +578,25 @@ mod tests { } } + #[test] + fn peer_help_mentions_metadata_registration() { + let registry = CommandRegistry::builtins(); + let result = registry.dispatch("help peer", &env()); + assert!(result.method.is_none()); + assert!(result.diagnostics[0].message.contains("peer ")); + assert!(result.diagnostics[0].message.contains("metadata peer")); + } + + #[test] + fn peer_rejects_disconnected() { + let registry = CommandRegistry::builtins(); + let mut disconnected = env(); + disconnected.connected = false; + let result = registry.dispatch("peer reviewer", &disconnected); + assert!(result.method.is_none()); + assert!(result.diagnostics[0].message.contains("connected")); + } + #[test] fn peer_rejects_running() { let registry = CommandRegistry::builtins(); diff --git a/docs/design/pod-session-state.md b/docs/design/pod-session-state.md index 59f1c3eb..96c16547 100644 --- a/docs/design/pod-session-state.md +++ b/docs/design/pod-session-state.md @@ -34,9 +34,9 @@ Delegated write scope is a capability loan. Stopping, shutting down, or pruning ## Peer Pods -Peer visibility is also Pod metadata, but it is distinct from spawned-child delegation. A TUI user can run `:peer ` while attached to an idle Pod to register a reciprocal peer handshake with another existing Pod. +Peer visibility is also Pod metadata, but it is distinct from spawned-child delegation. A TUI user can run `:peer ` while attached to an idle Pod to register reciprocal peer metadata with another existing Pod. This is a metadata-level registration, not live target-controller consent. -A peer relationship only makes the Pods mutually visible through `ListPods` with visibility source `peer`. It does not grant filesystem scope, create a child output cursor, make either Pod the other's parent, or imply child completion notifications. Peer messages use `SendToPeerPod`, which delivers a labeled notification into the target Pod's normal durable notification/history path. +A peer relationship only makes the Pods mutually visible through `ListPods` with visibility source `peer`. It does not grant filesystem scope, create a child output cursor, make either Pod the other's parent, or imply child completion notifications. Peer messages use `SendToPeerPod`, which delivers a labeled notification into the target Pod's normal durable notification/history path. `SendToPeerPod` requires the peer to be live and fails clearly for non-live peers rather than auto-restoring them. ## Notifications are not authority diff --git a/resources/prompts/common/pod-orchestration.md b/resources/prompts/common/pod-orchestration.md index 293502d5..3abd8997 100644 --- a/resources/prompts/common/pod-orchestration.md +++ b/resources/prompts/common/pod-orchestration.md @@ -7,6 +7,6 @@ The parent does not need to keep a turn open or call tools solely to wait for a Before treating delegated work as complete, read the child output and inspect concrete evidence such as worktree status, diff, and test results. Notifications are hints, not proof of completion. -Peer Pods made visible by a handshake are not spawned children. Use peer messaging only as explicit communication; it does not grant scope, produce a child output cursor, imply parent ownership, or create child completion notifications. +Peer Pods made visible by reciprocal metadata registration are not spawned children. Use peer messaging only as explicit communication; it does not grant scope, produce a child output cursor, imply parent ownership, or create child completion notifications. Peer sends require a live peer and do not auto-restore stopped peers. This guidance is not scheduler or auto-maintain authorization. Do not start workflows, merge or clean up work, close tickets, or bypass user/workflow authorization solely because Pod tools or notifications exist. diff --git a/work-items/open/20260601-132955-tui-peer-pod-handshake-command/artifacts/implementation-report.md b/work-items/open/20260601-132955-tui-peer-pod-handshake-command/artifacts/implementation-report.md index c4bdc814..72b40e86 100644 --- a/work-items/open/20260601-132955-tui-peer-pod-handshake-command/artifacts/implementation-report.md +++ b/work-items/open/20260601-132955-tui-peer-pod-handshake-command/artifacts/implementation-report.md @@ -11,12 +11,19 @@ The current boundaries are documented in `artifacts/investigation-summary.md`. N - Added reciprocal peer metadata to `PodMetadata` as `peers`, separate from `spawned_children` and `reclaimed_children`. - Added protocol `Method::RegisterPeer { name }` and `Event::PeerRegistered { result }`. - Added controller handling for `RegisterPeer`, idle/paused only, validating an existing target Pod and rejecting self-handshakes. -- Added `PodDiscovery::register_peer` that persists both metadata directions and rolls back the first side on ordinary second-side write failure. +- Added `PodDiscovery::register_peer` that persists both metadata directions and restores the exact prior source-side peer state on ordinary second-side write failure. - Extended `ListPods` visibility to include `VisibilityReason::Peer`; a successful handshake makes both Pods see each other as `peer` through Pod metadata. - Added `SendToPeerPod` as a distinct LLM tool. It only sends to visible live peer Pods, delivers `Method::Notify` with a source label, and does not use child delegation, output cursors, parent ownership, or child completion notifications. - Added TUI command `:peer ` for idle attached Pods. Success is reported through a transient actionbar notice when the controller returns `PeerRegistered`. - Documented peer semantics in `docs/design/pod-session-state.md` and added prompt guidance that peer Pods are not spawned children. +## Reviewer blocker fixes + +- `SendToPeerPod` now reuses the existing one-shot Pod socket client path (`connect_and_send`), which drains connect-time `Alert` / `Snapshot` traffic before writing `Notify` and returns an error if method delivery fails. +- Added a regression test where the target socket emits an alert and snapshot before reading the peer `Notify`, proving the peer send drains the prelude and still delivers the message. +- Registration failure rollback now restores the exact prior source peer list instead of unconditionally removing `source -> target`; a target-side injected failure test verifies a pre-existing source relation is preserved. +- Wording now describes `:peer` as metadata-level reciprocal registration rather than live target-controller consent, and documents that `SendToPeerPod` fails for non-live peers instead of auto-restoring them. + ## Tests and validation run - `cargo test -p protocol -p pod-store -p pod -p tui --lib` @@ -26,4 +33,4 @@ The current boundaries are documented in `artifacts/investigation-summary.md`. N ## Notes -The two-file reciprocal metadata update is not crash-transactional because the existing Pod metadata store has no multi-record transaction boundary. The implementation avoids successful replies with one-sided state for normal validation/write failures by rolling back the first write if the reciprocal write fails. +The two-file reciprocal metadata update is not crash-transactional because the existing Pod metadata store has no multi-record transaction boundary. The implementation avoids successful replies with one-sided state for normal validation/write failures by restoring the exact prior source-side peer list if the reciprocal write fails. diff --git a/work-items/open/20260601-132955-tui-peer-pod-handshake-command/item.md b/work-items/open/20260601-132955-tui-peer-pod-handshake-command/item.md index dab0f1e3..01f2f64b 100644 --- a/work-items/open/20260601-132955-tui-peer-pod-handshake-command/item.md +++ b/work-items/open/20260601-132955-tui-peer-pod-handshake-command/item.md @@ -7,7 +7,7 @@ kind: task priority: P2 labels: [tui, pod, command, orchestration] created_at: 2026-06-01T13:29:55Z -updated_at: 2026-06-02T10:42:37Z +updated_at: 2026-06-02T13:18:34Z assignee: null legacy_ticket: null --- diff --git a/work-items/open/20260601-132955-tui-peer-pod-handshake-command/thread.md b/work-items/open/20260601-132955-tui-peer-pod-handshake-command/thread.md index 1f084ec5..215e0cc7 100644 --- a/work-items/open/20260601-132955-tui-peer-pod-handshake-command/thread.md +++ b/work-items/open/20260601-132955-tui-peer-pod-handshake-command/thread.md @@ -122,4 +122,48 @@ The current boundaries are documented in `artifacts/investigation-summary.md`. N The two-file reciprocal metadata update is not crash-transactional because the existing Pod metadata store has no multi-record transaction boundary. The implementation avoids successful replies with one-sided state for normal validation/write failures by rolling back the first write if the reciprocal write fails. +--- + + + +## Implementation report + +# Implementation report: peer Pod handshake command + +Date: 2026-06-02 + +## Investigation + +The current boundaries are documented in `artifacts/investigation-summary.md`. No escalation blocker was found. The main concern identified was avoiding reuse of spawned-child state (`SpawnedPodRegistry`, delegated scope, output cursors, and child completion semantics) for peer communication; the implementation therefore adds separate peer metadata and a separate peer send tool. + +## Implemented behavior + +- Added reciprocal peer metadata to `PodMetadata` as `peers`, separate from `spawned_children` and `reclaimed_children`. +- Added protocol `Method::RegisterPeer { name }` and `Event::PeerRegistered { result }`. +- Added controller handling for `RegisterPeer`, idle/paused only, validating an existing target Pod and rejecting self-handshakes. +- Added `PodDiscovery::register_peer` that persists both metadata directions and restores the exact prior source-side peer state on ordinary second-side write failure. +- Extended `ListPods` visibility to include `VisibilityReason::Peer`; a successful handshake makes both Pods see each other as `peer` through Pod metadata. +- Added `SendToPeerPod` as a distinct LLM tool. It only sends to visible live peer Pods, delivers `Method::Notify` with a source label, and does not use child delegation, output cursors, parent ownership, or child completion notifications. +- Added TUI command `:peer ` for idle attached Pods. Success is reported through a transient actionbar notice when the controller returns `PeerRegistered`. +- Documented peer semantics in `docs/design/pod-session-state.md` and added prompt guidance that peer Pods are not spawned children. + +## Reviewer blocker fixes + +- `SendToPeerPod` now reuses the existing one-shot Pod socket client path (`connect_and_send`), which drains connect-time `Alert` / `Snapshot` traffic before writing `Notify` and returns an error if method delivery fails. +- Added a regression test where the target socket emits an alert and snapshot before reading the peer `Notify`, proving the peer send drains the prelude and still delivers the message. +- Registration failure rollback now restores the exact prior source peer list instead of unconditionally removing `source -> target`; a target-side injected failure test verifies a pre-existing source relation is preserved. +- Wording now describes `:peer` as metadata-level reciprocal registration rather than live target-controller consent, and documents that `SendToPeerPod` fails for non-live peers instead of auto-restoring them. + +## Tests and validation run + +- `cargo test -p protocol -p pod-store -p pod -p tui --lib` +- `./tickets.sh doctor` +- `git diff --check` +- `nix build .#yoi` + +## Notes + +The two-file reciprocal metadata update is not crash-transactional because the existing Pod metadata store has no multi-record transaction boundary. The implementation avoids successful replies with one-sided state for normal validation/write failures by restoring the exact prior source-side peer list if the reciprocal write fails. + + ---