From 057c2eff5fa8c43e88d2b22603cedb5af63012e3 Mon Sep 17 00:00:00 2001 From: Hare Date: Tue, 2 Jun 2026 22:23:13 +0900 Subject: [PATCH] fix: harden peer pod registration --- crates/pod/src/discovery.rs | 139 +++++++++++++++--- crates/protocol/src/lib.rs | 3 +- crates/tui/src/app.rs | 2 +- crates/tui/src/command.rs | 29 +++- docs/design/pod-session-state.md | 4 +- resources/prompts/common/pod-orchestration.md | 2 +- .../artifacts/implementation-report.md | 11 +- .../item.md | 2 +- .../thread.md | 44 ++++++ 9 files changed, 202 insertions(+), 34 deletions(-) diff --git a/crates/pod/src/discovery.rs b/crates/pod/src/discovery.rs index 4284c488..dfdefb1e 100644 --- a/crates/pod/src/discovery.rs +++ b/crates/pod/src/discovery.rs @@ -18,7 +18,7 @@ use client::PodRuntimeCommand; use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput}; use manifest::{Permission, ScopeRule}; use pod_store::{PodActiveSegmentRef, PodMetadata, PodMetadataStore, validate_pod_name}; -use protocol::stream::{JsonLineReader, JsonLineWriter}; +use protocol::stream::JsonLineReader; use protocol::{Event, Method, PodStatus}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; @@ -28,6 +28,7 @@ use tokio::process::Command; use crate::runtime::dir::SpawnedPodRecord; use crate::runtime::pod_registry; +use crate::spawn::comm_tools::connect_and_send; use crate::spawn::registry::SpawnedPodRegistry; const PROBE_TIMEOUT: Duration = Duration::from_millis(500); @@ -182,14 +183,14 @@ where pod_name: peer_name.to_string(), }); } - let self_exists = self.store.read_by_name(&self.self_pod_name)?.is_some(); - if !self_exists { - return Err(PodDiscoveryError::StateMissing { + let self_metadata = self + .store + .read_by_name(&self.self_pod_name)? + .ok_or_else(|| PodDiscoveryError::StateMissing { pod_name: self.self_pod_name.clone(), - }); - } - let peer_exists = self.store.read_by_name(peer_name)?.is_some(); - if !peer_exists { + })?; + let prior_self_peers = self_metadata.peers.clone(); + if self.store.read_by_name(peer_name)?.is_none() { return Err(PodDiscoveryError::MissingPod { pod_name: peer_name.to_string(), }); @@ -197,7 +198,7 @@ where self.store.add_peer(&self.self_pod_name, peer_name)?; if let Err(error) = self.store.add_peer(peer_name, &self.self_pod_name) { - let _ = self.store.remove_peer(&self.self_pod_name, peer_name); + let _ = self.store.set_peers(&self.self_pod_name, prior_self_peers); return Err(PodDiscoveryError::PodStore(error)); } @@ -804,7 +805,7 @@ where Arc::new(move || { let meta = ToolMeta::new("ListPods") .description( - "List Pods visible to this Pod from durable Pod state and the spawned-child registry. This does not expose the host-wide Pod universe.", + "List Pods visible to this Pod from durable Pod state, peer metadata, and the spawned-child registry. This does not expose the host-wide Pod universe.", ) .input_schema(serde_json::json!({ "type": "object", @@ -835,7 +836,7 @@ where }) } -const SEND_TO_PEER_POD_DESCRIPTION: &str = "Send a text message to a peer Pod made visible by an explicit peer handshake. The message is delivered as a peer notification through the target Pod's durable notification/history path. This does not grant delegated scope, create a spawned-child output cursor, imply parent ownership, or produce child completion notifications. Fails if the target is not a visible live peer."; +const SEND_TO_PEER_POD_DESCRIPTION: &str = "Send a text message to a peer Pod made visible by explicit reciprocal peer metadata. The message is delivered as a peer notification through the target Pod's durable notification/history path. This does not grant delegated scope, create a spawned-child output cursor, imply parent ownership, or produce child completion notifications. Fails clearly if the target is not a visible live peer; it does not auto-restore stopped peers."; struct SendToPeerPodTool { discovery: PodDiscovery, @@ -900,16 +901,7 @@ where } async fn send_peer_notify(socket_path: &Path, message: String) -> io::Result<()> { - let stream = tokio::time::timeout(Duration::from_secs(5), UnixStream::connect(socket_path)) - .await - .map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "connect timed out"))??; - let mut writer = JsonLineWriter::new(stream); - tokio::time::timeout( - Duration::from_secs(5), - writer.write(&Method::Notify { message }), - ) - .await - .map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "write timed out"))? + connect_and_send(socket_path, &Method::Notify { message }).await } fn json_content(value: &T) -> Result { @@ -941,7 +933,7 @@ mod tests { use std::sync::Mutex; use manifest::{Permission, ScopeRule}; - use pod_store::{FsPodStore, PodSpawnedChild, PodSpawnedScopeRule}; + use pod_store::{FsPodStore, PodSpawnedChild, PodSpawnedScopeRule, PodStoreError}; use protocol::stream::JsonLineWriter; use protocol::{Alert, AlertLevel, AlertSource}; use session_store::{new_segment_id, new_session_id}; @@ -950,6 +942,40 @@ mod tests { use crate::runtime::dir::RuntimeDir; + #[derive(Clone)] + struct FailTargetPeerStore { + inner: FsPodStore, + } + + impl PodMetadataStore for FailTargetPeerStore { + fn write(&self, metadata: &PodMetadata) -> Result<(), PodStoreError> { + if metadata.pod_name == "target" + && metadata.peers.iter().any(|peer| peer.pod_name == "source") + { + return Err(PodStoreError::Io(io::Error::other( + "injected target-side peer write failure", + ))); + } + self.inner.write(metadata) + } + + fn read_by_name(&self, pod_name: &str) -> Result, PodStoreError> { + self.inner.read_by_name(pod_name) + } + + fn list_names(&self) -> Result, PodStoreError> { + self.inner.list_names() + } + + fn root_dir(&self) -> Option { + self.inner.root_dir() + } + + fn delete_by_name(&self, pod_name: &str) -> Result<(), PodStoreError> { + self.inner.delete_by_name(pod_name) + } + } + static ENV_LOCK: Mutex<()> = Mutex::new(()); #[tokio::test(flavor = "current_thread")] @@ -1223,6 +1249,45 @@ mod tests { assert!(matches!(missing_err, PodDiscoveryError::MissingPod { .. })); } + #[tokio::test(flavor = "current_thread")] + async fn register_peer_target_failure_preserves_existing_source_peer() { + let root = TempDir::new().unwrap(); + let store_dir = root.path().join("store"); + let runtime_base = root.path().join("runtime"); + std::fs::create_dir_all(&runtime_base).unwrap(); + let inner = FsPodStore::new(&store_dir).unwrap(); + inner + .write(&PodMetadata { + pod_name: "source".into(), + active: None, + spawned_children: Vec::new(), + reclaimed_children: Vec::new(), + peers: vec![pod_store::PodPeer { + pod_name: "target".into(), + }], + resolved_manifest_snapshot: None, + }) + .unwrap(); + inner.write(&PodMetadata::new("target", None)).unwrap(); + let store = FailTargetPeerStore { inner }; + let runtime_dir = Arc::new(RuntimeDir::create(&runtime_base, "source").await.unwrap()); + let discovery = PodDiscovery::new( + store.clone(), + "source".into(), + runtime_base, + root.path().to_path_buf(), + SpawnedPodRegistry::new(runtime_dir), + ); + + let err = discovery.register_peer("target").unwrap_err(); + assert!(matches!(err, PodDiscoveryError::PodStore(_))); + let source = store.read_by_name("source").unwrap().unwrap(); + assert_eq!(source.peers.len(), 1); + assert_eq!(source.peers[0].pod_name, "target"); + let target = store.read_by_name("target").unwrap().unwrap(); + assert!(target.peers.is_empty()); + } + #[tokio::test(flavor = "current_thread")] async fn send_to_peer_pod_delivers_notify_without_child_registry() { let root = TempDir::new().unwrap(); @@ -1288,7 +1353,35 @@ mod tests { .unwrap(); let (stream, _) = listener.accept().await.unwrap(); - let mut reader = JsonLineReader::new(stream); + let (reader_half, writer_half) = stream.into_split(); + let mut reader = JsonLineReader::new(reader_half); + let mut writer = JsonLineWriter::new(writer_half); + writer + .write(&Event::Alert(Alert { + level: AlertLevel::Warn, + source: AlertSource::Pod, + message: "connect-time alert".into(), + timestamp_ms: 0, + })) + .await + .unwrap(); + writer + .write(&Event::Snapshot { + entries: Vec::new(), + greeting: protocol::Greeting { + pod_name: "target".into(), + cwd: "/tmp".into(), + provider: "test".into(), + model: "test".into(), + scope_summary: String::new(), + tools: Vec::new(), + context_window: 0, + context_tokens: 0, + }, + status: PodStatus::Idle, + }) + .await + .unwrap(); let method = reader.next::().await.unwrap().unwrap(); if let Method::Notify { message } = method { tx.send(message).await.unwrap(); diff --git a/crates/protocol/src/lib.rs b/crates/protocol/src/lib.rs index 9eeed8b9..80c4e4bb 100644 --- a/crates/protocol/src/lib.rs +++ b/crates/protocol/src/lib.rs @@ -66,7 +66,8 @@ pub enum Method { }, /// Register another existing Pod as a reciprocal peer of this Pod. /// - /// This is metadata/control state only: it must not grant delegated scope, + /// This is metadata/control state only: it does not ask the target's live + /// controller for consent, and it must not grant delegated scope, /// spawned-child ownership, output cursors, or child lifecycle authority. RegisterPeer { name: String, diff --git a/crates/tui/src/app.rs b/crates/tui/src/app.rs index 92c0780c..7deaabe9 100644 --- a/crates/tui/src/app.rs +++ b/crates/tui/src/app.rs @@ -1214,7 +1214,7 @@ impl App { .and_then(serde_json::Value::as_str) .unwrap_or("peer Pod"); self.flash_actionbar_notice( - format!("Peer handshake registered: `{source}` ↔ `{peer}`"), + format!("Peer metadata registered: `{source}` ↔ `{peer}`"), ActionbarNoticeLevel::Info, ActionbarNoticeSource::Tui, Duration::from_secs(4), diff --git a/crates/tui/src/command.rs b/crates/tui/src/command.rs index 3dee7228..80564fd3 100644 --- a/crates/tui/src/command.rs +++ b/crates/tui/src/command.rs @@ -160,7 +160,7 @@ impl CommandRegistry { name: "peer", aliases: &[], usage: "peer ", - description: "Make another existing Pod visible as a reciprocal peer.", + description: "Register another existing Pod as a reciprocal metadata peer.", argument_parser: peer_args, can_execute: peer_available, executor: peer_command, @@ -435,7 +435,7 @@ fn peer_command(invocation: CommandInvocation<'_>) -> CommandExecution { CommandExecution { method: Some(Method::RegisterPeer { name: name.clone() }), diagnostics: vec![CommandDiagnostic::new(format!( - "peer handshake requested with `{name}`" + "peer metadata registration requested with `{name}`" ))], exit_command_mode: true, clear_input: true, @@ -560,7 +560,11 @@ mod tests { )); assert!(result.exit_command_mode); assert!(result.clear_input); - assert!(result.diagnostics[0].message.contains("peer handshake")); + assert!( + result.diagnostics[0] + .message + .contains("metadata registration") + ); } #[test] @@ -574,6 +578,25 @@ mod tests { } } + #[test] + fn peer_help_mentions_metadata_registration() { + let registry = CommandRegistry::builtins(); + let result = registry.dispatch("help peer", &env()); + assert!(result.method.is_none()); + assert!(result.diagnostics[0].message.contains("peer ")); + assert!(result.diagnostics[0].message.contains("metadata peer")); + } + + #[test] + fn peer_rejects_disconnected() { + let registry = CommandRegistry::builtins(); + let mut disconnected = env(); + disconnected.connected = false; + let result = registry.dispatch("peer reviewer", &disconnected); + assert!(result.method.is_none()); + assert!(result.diagnostics[0].message.contains("connected")); + } + #[test] fn peer_rejects_running() { let registry = CommandRegistry::builtins(); diff --git a/docs/design/pod-session-state.md b/docs/design/pod-session-state.md index 59f1c3eb..96c16547 100644 --- a/docs/design/pod-session-state.md +++ b/docs/design/pod-session-state.md @@ -34,9 +34,9 @@ Delegated write scope is a capability loan. Stopping, shutting down, or pruning ## Peer Pods -Peer visibility is also Pod metadata, but it is distinct from spawned-child delegation. A TUI user can run `:peer ` while attached to an idle Pod to register a reciprocal peer handshake with another existing Pod. +Peer visibility is also Pod metadata, but it is distinct from spawned-child delegation. A TUI user can run `:peer ` while attached to an idle Pod to register reciprocal peer metadata with another existing Pod. This is a metadata-level registration, not live target-controller consent. -A peer relationship only makes the Pods mutually visible through `ListPods` with visibility source `peer`. It does not grant filesystem scope, create a child output cursor, make either Pod the other's parent, or imply child completion notifications. Peer messages use `SendToPeerPod`, which delivers a labeled notification into the target Pod's normal durable notification/history path. +A peer relationship only makes the Pods mutually visible through `ListPods` with visibility source `peer`. It does not grant filesystem scope, create a child output cursor, make either Pod the other's parent, or imply child completion notifications. Peer messages use `SendToPeerPod`, which delivers a labeled notification into the target Pod's normal durable notification/history path. `SendToPeerPod` requires the peer to be live and fails clearly for non-live peers rather than auto-restoring them. ## Notifications are not authority diff --git a/resources/prompts/common/pod-orchestration.md b/resources/prompts/common/pod-orchestration.md index 293502d5..3abd8997 100644 --- a/resources/prompts/common/pod-orchestration.md +++ b/resources/prompts/common/pod-orchestration.md @@ -7,6 +7,6 @@ The parent does not need to keep a turn open or call tools solely to wait for a Before treating delegated work as complete, read the child output and inspect concrete evidence such as worktree status, diff, and test results. Notifications are hints, not proof of completion. -Peer Pods made visible by a handshake are not spawned children. Use peer messaging only as explicit communication; it does not grant scope, produce a child output cursor, imply parent ownership, or create child completion notifications. +Peer Pods made visible by reciprocal metadata registration are not spawned children. Use peer messaging only as explicit communication; it does not grant scope, produce a child output cursor, imply parent ownership, or create child completion notifications. Peer sends require a live peer and do not auto-restore stopped peers. This guidance is not scheduler or auto-maintain authorization. Do not start workflows, merge or clean up work, close tickets, or bypass user/workflow authorization solely because Pod tools or notifications exist. diff --git a/work-items/open/20260601-132955-tui-peer-pod-handshake-command/artifacts/implementation-report.md b/work-items/open/20260601-132955-tui-peer-pod-handshake-command/artifacts/implementation-report.md index c4bdc814..72b40e86 100644 --- a/work-items/open/20260601-132955-tui-peer-pod-handshake-command/artifacts/implementation-report.md +++ b/work-items/open/20260601-132955-tui-peer-pod-handshake-command/artifacts/implementation-report.md @@ -11,12 +11,19 @@ The current boundaries are documented in `artifacts/investigation-summary.md`. N - Added reciprocal peer metadata to `PodMetadata` as `peers`, separate from `spawned_children` and `reclaimed_children`. - Added protocol `Method::RegisterPeer { name }` and `Event::PeerRegistered { result }`. - Added controller handling for `RegisterPeer`, idle/paused only, validating an existing target Pod and rejecting self-handshakes. -- Added `PodDiscovery::register_peer` that persists both metadata directions and rolls back the first side on ordinary second-side write failure. +- Added `PodDiscovery::register_peer` that persists both metadata directions and restores the exact prior source-side peer state on ordinary second-side write failure. - Extended `ListPods` visibility to include `VisibilityReason::Peer`; a successful handshake makes both Pods see each other as `peer` through Pod metadata. - Added `SendToPeerPod` as a distinct LLM tool. It only sends to visible live peer Pods, delivers `Method::Notify` with a source label, and does not use child delegation, output cursors, parent ownership, or child completion notifications. - Added TUI command `:peer ` for idle attached Pods. Success is reported through a transient actionbar notice when the controller returns `PeerRegistered`. - Documented peer semantics in `docs/design/pod-session-state.md` and added prompt guidance that peer Pods are not spawned children. +## Reviewer blocker fixes + +- `SendToPeerPod` now reuses the existing one-shot Pod socket client path (`connect_and_send`), which drains connect-time `Alert` / `Snapshot` traffic before writing `Notify` and returns an error if method delivery fails. +- Added a regression test where the target socket emits an alert and snapshot before reading the peer `Notify`, proving the peer send drains the prelude and still delivers the message. +- Registration failure rollback now restores the exact prior source peer list instead of unconditionally removing `source -> target`; a target-side injected failure test verifies a pre-existing source relation is preserved. +- Wording now describes `:peer` as metadata-level reciprocal registration rather than live target-controller consent, and documents that `SendToPeerPod` fails for non-live peers instead of auto-restoring them. + ## Tests and validation run - `cargo test -p protocol -p pod-store -p pod -p tui --lib` @@ -26,4 +33,4 @@ The current boundaries are documented in `artifacts/investigation-summary.md`. N ## Notes -The two-file reciprocal metadata update is not crash-transactional because the existing Pod metadata store has no multi-record transaction boundary. The implementation avoids successful replies with one-sided state for normal validation/write failures by rolling back the first write if the reciprocal write fails. +The two-file reciprocal metadata update is not crash-transactional because the existing Pod metadata store has no multi-record transaction boundary. The implementation avoids successful replies with one-sided state for normal validation/write failures by restoring the exact prior source-side peer list if the reciprocal write fails. diff --git a/work-items/open/20260601-132955-tui-peer-pod-handshake-command/item.md b/work-items/open/20260601-132955-tui-peer-pod-handshake-command/item.md index dab0f1e3..01f2f64b 100644 --- a/work-items/open/20260601-132955-tui-peer-pod-handshake-command/item.md +++ b/work-items/open/20260601-132955-tui-peer-pod-handshake-command/item.md @@ -7,7 +7,7 @@ kind: task priority: P2 labels: [tui, pod, command, orchestration] created_at: 2026-06-01T13:29:55Z -updated_at: 2026-06-02T10:42:37Z +updated_at: 2026-06-02T13:18:34Z assignee: null legacy_ticket: null --- diff --git a/work-items/open/20260601-132955-tui-peer-pod-handshake-command/thread.md b/work-items/open/20260601-132955-tui-peer-pod-handshake-command/thread.md index 1f084ec5..215e0cc7 100644 --- a/work-items/open/20260601-132955-tui-peer-pod-handshake-command/thread.md +++ b/work-items/open/20260601-132955-tui-peer-pod-handshake-command/thread.md @@ -122,4 +122,48 @@ The current boundaries are documented in `artifacts/investigation-summary.md`. N The two-file reciprocal metadata update is not crash-transactional because the existing Pod metadata store has no multi-record transaction boundary. The implementation avoids successful replies with one-sided state for normal validation/write failures by rolling back the first write if the reciprocal write fails. +--- + + + +## Implementation report + +# Implementation report: peer Pod handshake command + +Date: 2026-06-02 + +## Investigation + +The current boundaries are documented in `artifacts/investigation-summary.md`. No escalation blocker was found. The main concern identified was avoiding reuse of spawned-child state (`SpawnedPodRegistry`, delegated scope, output cursors, and child completion semantics) for peer communication; the implementation therefore adds separate peer metadata and a separate peer send tool. + +## Implemented behavior + +- Added reciprocal peer metadata to `PodMetadata` as `peers`, separate from `spawned_children` and `reclaimed_children`. +- Added protocol `Method::RegisterPeer { name }` and `Event::PeerRegistered { result }`. +- Added controller handling for `RegisterPeer`, idle/paused only, validating an existing target Pod and rejecting self-handshakes. +- Added `PodDiscovery::register_peer` that persists both metadata directions and restores the exact prior source-side peer state on ordinary second-side write failure. +- Extended `ListPods` visibility to include `VisibilityReason::Peer`; a successful handshake makes both Pods see each other as `peer` through Pod metadata. +- Added `SendToPeerPod` as a distinct LLM tool. It only sends to visible live peer Pods, delivers `Method::Notify` with a source label, and does not use child delegation, output cursors, parent ownership, or child completion notifications. +- Added TUI command `:peer ` for idle attached Pods. Success is reported through a transient actionbar notice when the controller returns `PeerRegistered`. +- Documented peer semantics in `docs/design/pod-session-state.md` and added prompt guidance that peer Pods are not spawned children. + +## Reviewer blocker fixes + +- `SendToPeerPod` now reuses the existing one-shot Pod socket client path (`connect_and_send`), which drains connect-time `Alert` / `Snapshot` traffic before writing `Notify` and returns an error if method delivery fails. +- Added a regression test where the target socket emits an alert and snapshot before reading the peer `Notify`, proving the peer send drains the prelude and still delivers the message. +- Registration failure rollback now restores the exact prior source peer list instead of unconditionally removing `source -> target`; a target-side injected failure test verifies a pre-existing source relation is preserved. +- Wording now describes `:peer` as metadata-level reciprocal registration rather than live target-controller consent, and documents that `SendToPeerPod` fails for non-live peers instead of auto-restoring them. + +## Tests and validation run + +- `cargo test -p protocol -p pod-store -p pod -p tui --lib` +- `./tickets.sh doctor` +- `git diff --check` +- `nix build .#yoi` + +## Notes + +The two-file reciprocal metadata update is not crash-transactional because the existing Pod metadata store has no multi-record transaction boundary. The implementation avoids successful replies with one-sided state for normal validation/write failures by restoring the exact prior source-side peer list if the reciprocal write fails. + + ---