fix: harden peer pod registration

This commit is contained in:
Keisuke Hirata 2026-06-02 22:23:13 +09:00
parent 093a84fc83
commit 057c2eff5f
No known key found for this signature in database
9 changed files with 202 additions and 34 deletions

View File

@ -18,7 +18,7 @@ use client::PodRuntimeCommand;
use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput}; use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput};
use manifest::{Permission, ScopeRule}; use manifest::{Permission, ScopeRule};
use pod_store::{PodActiveSegmentRef, PodMetadata, PodMetadataStore, validate_pod_name}; use pod_store::{PodActiveSegmentRef, PodMetadata, PodMetadataStore, validate_pod_name};
use protocol::stream::{JsonLineReader, JsonLineWriter}; use protocol::stream::JsonLineReader;
use protocol::{Event, Method, PodStatus}; use protocol::{Event, Method, PodStatus};
use schemars::JsonSchema; use schemars::JsonSchema;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -28,6 +28,7 @@ use tokio::process::Command;
use crate::runtime::dir::SpawnedPodRecord; use crate::runtime::dir::SpawnedPodRecord;
use crate::runtime::pod_registry; use crate::runtime::pod_registry;
use crate::spawn::comm_tools::connect_and_send;
use crate::spawn::registry::SpawnedPodRegistry; use crate::spawn::registry::SpawnedPodRegistry;
const PROBE_TIMEOUT: Duration = Duration::from_millis(500); const PROBE_TIMEOUT: Duration = Duration::from_millis(500);
@ -182,14 +183,14 @@ where
pod_name: peer_name.to_string(), pod_name: peer_name.to_string(),
}); });
} }
let self_exists = self.store.read_by_name(&self.self_pod_name)?.is_some(); let self_metadata = self
if !self_exists { .store
return Err(PodDiscoveryError::StateMissing { .read_by_name(&self.self_pod_name)?
.ok_or_else(|| PodDiscoveryError::StateMissing {
pod_name: self.self_pod_name.clone(), pod_name: self.self_pod_name.clone(),
}); })?;
} let prior_self_peers = self_metadata.peers.clone();
let peer_exists = self.store.read_by_name(peer_name)?.is_some(); if self.store.read_by_name(peer_name)?.is_none() {
if !peer_exists {
return Err(PodDiscoveryError::MissingPod { return Err(PodDiscoveryError::MissingPod {
pod_name: peer_name.to_string(), pod_name: peer_name.to_string(),
}); });
@ -197,7 +198,7 @@ where
self.store.add_peer(&self.self_pod_name, peer_name)?; self.store.add_peer(&self.self_pod_name, peer_name)?;
if let Err(error) = self.store.add_peer(peer_name, &self.self_pod_name) { if let Err(error) = self.store.add_peer(peer_name, &self.self_pod_name) {
let _ = self.store.remove_peer(&self.self_pod_name, peer_name); let _ = self.store.set_peers(&self.self_pod_name, prior_self_peers);
return Err(PodDiscoveryError::PodStore(error)); return Err(PodDiscoveryError::PodStore(error));
} }
@ -804,7 +805,7 @@ where
Arc::new(move || { Arc::new(move || {
let meta = ToolMeta::new("ListPods") let meta = ToolMeta::new("ListPods")
.description( .description(
"List Pods visible to this Pod from durable Pod state and the spawned-child registry. This does not expose the host-wide Pod universe.", "List Pods visible to this Pod from durable Pod state, peer metadata, and the spawned-child registry. This does not expose the host-wide Pod universe.",
) )
.input_schema(serde_json::json!({ .input_schema(serde_json::json!({
"type": "object", "type": "object",
@ -835,7 +836,7 @@ where
}) })
} }
const SEND_TO_PEER_POD_DESCRIPTION: &str = "Send a text message to a peer Pod made visible by an explicit peer handshake. The message is delivered as a peer notification through the target Pod's durable notification/history path. This does not grant delegated scope, create a spawned-child output cursor, imply parent ownership, or produce child completion notifications. Fails if the target is not a visible live peer."; const SEND_TO_PEER_POD_DESCRIPTION: &str = "Send a text message to a peer Pod made visible by explicit reciprocal peer metadata. The message is delivered as a peer notification through the target Pod's durable notification/history path. This does not grant delegated scope, create a spawned-child output cursor, imply parent ownership, or produce child completion notifications. Fails clearly if the target is not a visible live peer; it does not auto-restore stopped peers.";
struct SendToPeerPodTool<St> { struct SendToPeerPodTool<St> {
discovery: PodDiscovery<St>, discovery: PodDiscovery<St>,
@ -900,16 +901,7 @@ where
} }
async fn send_peer_notify(socket_path: &Path, message: String) -> io::Result<()> { async fn send_peer_notify(socket_path: &Path, message: String) -> io::Result<()> {
let stream = tokio::time::timeout(Duration::from_secs(5), UnixStream::connect(socket_path)) connect_and_send(socket_path, &Method::Notify { message }).await
.await
.map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "connect timed out"))??;
let mut writer = JsonLineWriter::new(stream);
tokio::time::timeout(
Duration::from_secs(5),
writer.write(&Method::Notify { message }),
)
.await
.map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "write timed out"))?
} }
fn json_content<T: Serialize>(value: &T) -> Result<String, ToolError> { fn json_content<T: Serialize>(value: &T) -> Result<String, ToolError> {
@ -941,7 +933,7 @@ mod tests {
use std::sync::Mutex; use std::sync::Mutex;
use manifest::{Permission, ScopeRule}; use manifest::{Permission, ScopeRule};
use pod_store::{FsPodStore, PodSpawnedChild, PodSpawnedScopeRule}; use pod_store::{FsPodStore, PodSpawnedChild, PodSpawnedScopeRule, PodStoreError};
use protocol::stream::JsonLineWriter; use protocol::stream::JsonLineWriter;
use protocol::{Alert, AlertLevel, AlertSource}; use protocol::{Alert, AlertLevel, AlertSource};
use session_store::{new_segment_id, new_session_id}; use session_store::{new_segment_id, new_session_id};
@ -950,6 +942,40 @@ mod tests {
use crate::runtime::dir::RuntimeDir; use crate::runtime::dir::RuntimeDir;
#[derive(Clone)]
struct FailTargetPeerStore {
inner: FsPodStore,
}
impl PodMetadataStore for FailTargetPeerStore {
fn write(&self, metadata: &PodMetadata) -> Result<(), PodStoreError> {
if metadata.pod_name == "target"
&& metadata.peers.iter().any(|peer| peer.pod_name == "source")
{
return Err(PodStoreError::Io(io::Error::other(
"injected target-side peer write failure",
)));
}
self.inner.write(metadata)
}
fn read_by_name(&self, pod_name: &str) -> Result<Option<PodMetadata>, PodStoreError> {
self.inner.read_by_name(pod_name)
}
fn list_names(&self) -> Result<Vec<String>, PodStoreError> {
self.inner.list_names()
}
fn root_dir(&self) -> Option<PathBuf> {
self.inner.root_dir()
}
fn delete_by_name(&self, pod_name: &str) -> Result<(), PodStoreError> {
self.inner.delete_by_name(pod_name)
}
}
static ENV_LOCK: Mutex<()> = Mutex::new(()); static ENV_LOCK: Mutex<()> = Mutex::new(());
#[tokio::test(flavor = "current_thread")] #[tokio::test(flavor = "current_thread")]
@ -1223,6 +1249,45 @@ mod tests {
assert!(matches!(missing_err, PodDiscoveryError::MissingPod { .. })); assert!(matches!(missing_err, PodDiscoveryError::MissingPod { .. }));
} }
#[tokio::test(flavor = "current_thread")]
async fn register_peer_target_failure_preserves_existing_source_peer() {
let root = TempDir::new().unwrap();
let store_dir = root.path().join("store");
let runtime_base = root.path().join("runtime");
std::fs::create_dir_all(&runtime_base).unwrap();
let inner = FsPodStore::new(&store_dir).unwrap();
inner
.write(&PodMetadata {
pod_name: "source".into(),
active: None,
spawned_children: Vec::new(),
reclaimed_children: Vec::new(),
peers: vec![pod_store::PodPeer {
pod_name: "target".into(),
}],
resolved_manifest_snapshot: None,
})
.unwrap();
inner.write(&PodMetadata::new("target", None)).unwrap();
let store = FailTargetPeerStore { inner };
let runtime_dir = Arc::new(RuntimeDir::create(&runtime_base, "source").await.unwrap());
let discovery = PodDiscovery::new(
store.clone(),
"source".into(),
runtime_base,
root.path().to_path_buf(),
SpawnedPodRegistry::new(runtime_dir),
);
let err = discovery.register_peer("target").unwrap_err();
assert!(matches!(err, PodDiscoveryError::PodStore(_)));
let source = store.read_by_name("source").unwrap().unwrap();
assert_eq!(source.peers.len(), 1);
assert_eq!(source.peers[0].pod_name, "target");
let target = store.read_by_name("target").unwrap().unwrap();
assert!(target.peers.is_empty());
}
#[tokio::test(flavor = "current_thread")] #[tokio::test(flavor = "current_thread")]
async fn send_to_peer_pod_delivers_notify_without_child_registry() { async fn send_to_peer_pod_delivers_notify_without_child_registry() {
let root = TempDir::new().unwrap(); let root = TempDir::new().unwrap();
@ -1288,7 +1353,35 @@ mod tests {
.unwrap(); .unwrap();
let (stream, _) = listener.accept().await.unwrap(); let (stream, _) = listener.accept().await.unwrap();
let mut reader = JsonLineReader::new(stream); let (reader_half, writer_half) = stream.into_split();
let mut reader = JsonLineReader::new(reader_half);
let mut writer = JsonLineWriter::new(writer_half);
writer
.write(&Event::Alert(Alert {
level: AlertLevel::Warn,
source: AlertSource::Pod,
message: "connect-time alert".into(),
timestamp_ms: 0,
}))
.await
.unwrap();
writer
.write(&Event::Snapshot {
entries: Vec::new(),
greeting: protocol::Greeting {
pod_name: "target".into(),
cwd: "/tmp".into(),
provider: "test".into(),
model: "test".into(),
scope_summary: String::new(),
tools: Vec::new(),
context_window: 0,
context_tokens: 0,
},
status: PodStatus::Idle,
})
.await
.unwrap();
let method = reader.next::<Method>().await.unwrap().unwrap(); let method = reader.next::<Method>().await.unwrap().unwrap();
if let Method::Notify { message } = method { if let Method::Notify { message } = method {
tx.send(message).await.unwrap(); tx.send(message).await.unwrap();

View File

@ -66,7 +66,8 @@ pub enum Method {
}, },
/// Register another existing Pod as a reciprocal peer of this Pod. /// Register another existing Pod as a reciprocal peer of this Pod.
/// ///
/// This is metadata/control state only: it must not grant delegated scope, /// This is metadata/control state only: it does not ask the target's live
/// controller for consent, and it must not grant delegated scope,
/// spawned-child ownership, output cursors, or child lifecycle authority. /// spawned-child ownership, output cursors, or child lifecycle authority.
RegisterPeer { RegisterPeer {
name: String, name: String,

View File

@ -1214,7 +1214,7 @@ impl App {
.and_then(serde_json::Value::as_str) .and_then(serde_json::Value::as_str)
.unwrap_or("peer Pod"); .unwrap_or("peer Pod");
self.flash_actionbar_notice( self.flash_actionbar_notice(
format!("Peer handshake registered: `{source}` ↔ `{peer}`"), format!("Peer metadata registered: `{source}` ↔ `{peer}`"),
ActionbarNoticeLevel::Info, ActionbarNoticeLevel::Info,
ActionbarNoticeSource::Tui, ActionbarNoticeSource::Tui,
Duration::from_secs(4), Duration::from_secs(4),

View File

@ -160,7 +160,7 @@ impl CommandRegistry {
name: "peer", name: "peer",
aliases: &[], aliases: &[],
usage: "peer <pod-name>", usage: "peer <pod-name>",
description: "Make another existing Pod visible as a reciprocal peer.", description: "Register another existing Pod as a reciprocal metadata peer.",
argument_parser: peer_args, argument_parser: peer_args,
can_execute: peer_available, can_execute: peer_available,
executor: peer_command, executor: peer_command,
@ -435,7 +435,7 @@ fn peer_command(invocation: CommandInvocation<'_>) -> CommandExecution {
CommandExecution { CommandExecution {
method: Some(Method::RegisterPeer { name: name.clone() }), method: Some(Method::RegisterPeer { name: name.clone() }),
diagnostics: vec![CommandDiagnostic::new(format!( diagnostics: vec![CommandDiagnostic::new(format!(
"peer handshake requested with `{name}`" "peer metadata registration requested with `{name}`"
))], ))],
exit_command_mode: true, exit_command_mode: true,
clear_input: true, clear_input: true,
@ -560,7 +560,11 @@ mod tests {
)); ));
assert!(result.exit_command_mode); assert!(result.exit_command_mode);
assert!(result.clear_input); assert!(result.clear_input);
assert!(result.diagnostics[0].message.contains("peer handshake")); assert!(
result.diagnostics[0]
.message
.contains("metadata registration")
);
} }
#[test] #[test]
@ -574,6 +578,25 @@ mod tests {
} }
} }
#[test]
fn peer_help_mentions_metadata_registration() {
let registry = CommandRegistry::builtins();
let result = registry.dispatch("help peer", &env());
assert!(result.method.is_none());
assert!(result.diagnostics[0].message.contains("peer <pod-name>"));
assert!(result.diagnostics[0].message.contains("metadata peer"));
}
#[test]
fn peer_rejects_disconnected() {
let registry = CommandRegistry::builtins();
let mut disconnected = env();
disconnected.connected = false;
let result = registry.dispatch("peer reviewer", &disconnected);
assert!(result.method.is_none());
assert!(result.diagnostics[0].message.contains("connected"));
}
#[test] #[test]
fn peer_rejects_running() { fn peer_rejects_running() {
let registry = CommandRegistry::builtins(); let registry = CommandRegistry::builtins();

View File

@ -34,9 +34,9 @@ Delegated write scope is a capability loan. Stopping, shutting down, or pruning
## Peer Pods ## Peer Pods
Peer visibility is also Pod metadata, but it is distinct from spawned-child delegation. A TUI user can run `:peer <pod-name>` while attached to an idle Pod to register a reciprocal peer handshake with another existing Pod. Peer visibility is also Pod metadata, but it is distinct from spawned-child delegation. A TUI user can run `:peer <pod-name>` while attached to an idle Pod to register reciprocal peer metadata with another existing Pod. This is a metadata-level registration, not live target-controller consent.
A peer relationship only makes the Pods mutually visible through `ListPods` with visibility source `peer`. It does not grant filesystem scope, create a child output cursor, make either Pod the other's parent, or imply child completion notifications. Peer messages use `SendToPeerPod`, which delivers a labeled notification into the target Pod's normal durable notification/history path. A peer relationship only makes the Pods mutually visible through `ListPods` with visibility source `peer`. It does not grant filesystem scope, create a child output cursor, make either Pod the other's parent, or imply child completion notifications. Peer messages use `SendToPeerPod`, which delivers a labeled notification into the target Pod's normal durable notification/history path. `SendToPeerPod` requires the peer to be live and fails clearly for non-live peers rather than auto-restoring them.
## Notifications are not authority ## Notifications are not authority

View File

@ -7,6 +7,6 @@ The parent does not need to keep a turn open or call tools solely to wait for a
Before treating delegated work as complete, read the child output and inspect concrete evidence such as worktree status, diff, and test results. Notifications are hints, not proof of completion. Before treating delegated work as complete, read the child output and inspect concrete evidence such as worktree status, diff, and test results. Notifications are hints, not proof of completion.
Peer Pods made visible by a handshake are not spawned children. Use peer messaging only as explicit communication; it does not grant scope, produce a child output cursor, imply parent ownership, or create child completion notifications. Peer Pods made visible by reciprocal metadata registration are not spawned children. Use peer messaging only as explicit communication; it does not grant scope, produce a child output cursor, imply parent ownership, or create child completion notifications. Peer sends require a live peer and do not auto-restore stopped peers.
This guidance is not scheduler or auto-maintain authorization. Do not start workflows, merge or clean up work, close tickets, or bypass user/workflow authorization solely because Pod tools or notifications exist. This guidance is not scheduler or auto-maintain authorization. Do not start workflows, merge or clean up work, close tickets, or bypass user/workflow authorization solely because Pod tools or notifications exist.

View File

@ -11,12 +11,19 @@ The current boundaries are documented in `artifacts/investigation-summary.md`. N
- Added reciprocal peer metadata to `PodMetadata` as `peers`, separate from `spawned_children` and `reclaimed_children`. - Added reciprocal peer metadata to `PodMetadata` as `peers`, separate from `spawned_children` and `reclaimed_children`.
- Added protocol `Method::RegisterPeer { name }` and `Event::PeerRegistered { result }`. - Added protocol `Method::RegisterPeer { name }` and `Event::PeerRegistered { result }`.
- Added controller handling for `RegisterPeer`, idle/paused only, validating an existing target Pod and rejecting self-handshakes. - Added controller handling for `RegisterPeer`, idle/paused only, validating an existing target Pod and rejecting self-handshakes.
- Added `PodDiscovery::register_peer` that persists both metadata directions and rolls back the first side on ordinary second-side write failure. - Added `PodDiscovery::register_peer` that persists both metadata directions and restores the exact prior source-side peer state on ordinary second-side write failure.
- Extended `ListPods` visibility to include `VisibilityReason::Peer`; a successful handshake makes both Pods see each other as `peer` through Pod metadata. - Extended `ListPods` visibility to include `VisibilityReason::Peer`; a successful handshake makes both Pods see each other as `peer` through Pod metadata.
- Added `SendToPeerPod` as a distinct LLM tool. It only sends to visible live peer Pods, delivers `Method::Notify` with a source label, and does not use child delegation, output cursors, parent ownership, or child completion notifications. - Added `SendToPeerPod` as a distinct LLM tool. It only sends to visible live peer Pods, delivers `Method::Notify` with a source label, and does not use child delegation, output cursors, parent ownership, or child completion notifications.
- Added TUI command `:peer <pod-name>` for idle attached Pods. Success is reported through a transient actionbar notice when the controller returns `PeerRegistered`. - Added TUI command `:peer <pod-name>` for idle attached Pods. Success is reported through a transient actionbar notice when the controller returns `PeerRegistered`.
- Documented peer semantics in `docs/design/pod-session-state.md` and added prompt guidance that peer Pods are not spawned children. - Documented peer semantics in `docs/design/pod-session-state.md` and added prompt guidance that peer Pods are not spawned children.
## Reviewer blocker fixes
- `SendToPeerPod` now reuses the existing one-shot Pod socket client path (`connect_and_send`), which drains connect-time `Alert` / `Snapshot` traffic before writing `Notify` and returns an error if method delivery fails.
- Added a regression test where the target socket emits an alert and snapshot before reading the peer `Notify`, proving the peer send drains the prelude and still delivers the message.
- Registration failure rollback now restores the exact prior source peer list instead of unconditionally removing `source -> target`; a target-side injected failure test verifies a pre-existing source relation is preserved.
- Wording now describes `:peer` as metadata-level reciprocal registration rather than live target-controller consent, and documents that `SendToPeerPod` fails for non-live peers instead of auto-restoring them.
## Tests and validation run ## Tests and validation run
- `cargo test -p protocol -p pod-store -p pod -p tui --lib` - `cargo test -p protocol -p pod-store -p pod -p tui --lib`
@ -26,4 +33,4 @@ The current boundaries are documented in `artifacts/investigation-summary.md`. N
## Notes ## Notes
The two-file reciprocal metadata update is not crash-transactional because the existing Pod metadata store has no multi-record transaction boundary. The implementation avoids successful replies with one-sided state for normal validation/write failures by rolling back the first write if the reciprocal write fails. The two-file reciprocal metadata update is not crash-transactional because the existing Pod metadata store has no multi-record transaction boundary. The implementation avoids successful replies with one-sided state for normal validation/write failures by restoring the exact prior source-side peer list if the reciprocal write fails.

View File

@ -7,7 +7,7 @@ kind: task
priority: P2 priority: P2
labels: [tui, pod, command, orchestration] labels: [tui, pod, command, orchestration]
created_at: 2026-06-01T13:29:55Z created_at: 2026-06-01T13:29:55Z
updated_at: 2026-06-02T10:42:37Z updated_at: 2026-06-02T13:18:34Z
assignee: null assignee: null
legacy_ticket: null legacy_ticket: null
--- ---

View File

@ -122,4 +122,48 @@ The current boundaries are documented in `artifacts/investigation-summary.md`. N
The two-file reciprocal metadata update is not crash-transactional because the existing Pod metadata store has no multi-record transaction boundary. The implementation avoids successful replies with one-sided state for normal validation/write failures by rolling back the first write if the reciprocal write fails. The two-file reciprocal metadata update is not crash-transactional because the existing Pod metadata store has no multi-record transaction boundary. The implementation avoids successful replies with one-sided state for normal validation/write failures by rolling back the first write if the reciprocal write fails.
---
<!-- event: implementation_report author: hare at: 2026-06-02T13:18:34Z -->
## Implementation report
# Implementation report: peer Pod handshake command
Date: 2026-06-02
## Investigation
The current boundaries are documented in `artifacts/investigation-summary.md`. No escalation blocker was found. The main concern identified was avoiding reuse of spawned-child state (`SpawnedPodRegistry`, delegated scope, output cursors, and child completion semantics) for peer communication; the implementation therefore adds separate peer metadata and a separate peer send tool.
## Implemented behavior
- Added reciprocal peer metadata to `PodMetadata` as `peers`, separate from `spawned_children` and `reclaimed_children`.
- Added protocol `Method::RegisterPeer { name }` and `Event::PeerRegistered { result }`.
- Added controller handling for `RegisterPeer`, idle/paused only, validating an existing target Pod and rejecting self-handshakes.
- Added `PodDiscovery::register_peer` that persists both metadata directions and restores the exact prior source-side peer state on ordinary second-side write failure.
- Extended `ListPods` visibility to include `VisibilityReason::Peer`; a successful handshake makes both Pods see each other as `peer` through Pod metadata.
- Added `SendToPeerPod` as a distinct LLM tool. It only sends to visible live peer Pods, delivers `Method::Notify` with a source label, and does not use child delegation, output cursors, parent ownership, or child completion notifications.
- Added TUI command `:peer <pod-name>` for idle attached Pods. Success is reported through a transient actionbar notice when the controller returns `PeerRegistered`.
- Documented peer semantics in `docs/design/pod-session-state.md` and added prompt guidance that peer Pods are not spawned children.
## Reviewer blocker fixes
- `SendToPeerPod` now reuses the existing one-shot Pod socket client path (`connect_and_send`), which drains connect-time `Alert` / `Snapshot` traffic before writing `Notify` and returns an error if method delivery fails.
- Added a regression test where the target socket emits an alert and snapshot before reading the peer `Notify`, proving the peer send drains the prelude and still delivers the message.
- Registration failure rollback now restores the exact prior source peer list instead of unconditionally removing `source -> target`; a target-side injected failure test verifies a pre-existing source relation is preserved.
- Wording now describes `:peer` as metadata-level reciprocal registration rather than live target-controller consent, and documents that `SendToPeerPod` fails for non-live peers instead of auto-restoring them.
## Tests and validation run
- `cargo test -p protocol -p pod-store -p pod -p tui --lib`
- `./tickets.sh doctor`
- `git diff --check`
- `nix build .#yoi`
## Notes
The two-file reciprocal metadata update is not crash-transactional because the existing Pod metadata store has no multi-record transaction boundary. The implementation avoids successful replies with one-sided state for normal validation/write failures by restoring the exact prior source-side peer list if the reciprocal write fails.
--- ---