From 2b4bdda89c07afcebd53c9235cde13bae48e8c03 Mon Sep 17 00:00:00 2001 From: Hare Date: Thu, 28 May 2026 22:14:28 +0900 Subject: [PATCH] fix: confirm initial SpawnPod run delivery --- crates/pod/src/discovery.rs | 62 +++++++++++++++++-- crates/pod/src/spawn/comm_tools.rs | 97 ++++++++++++++++++++++++++---- crates/pod/src/spawn/tool.rs | 3 + 3 files changed, 144 insertions(+), 18 deletions(-) diff --git a/crates/pod/src/discovery.rs b/crates/pod/src/discovery.rs index 68a01e40..354cf88a 100644 --- a/crates/pod/src/discovery.rs +++ b/crates/pod/src/discovery.rs @@ -548,10 +548,20 @@ async fn probe_socket(socket_path: &Path) -> LiveInfo { Ok(Ok(stream)) => { let (r, _w) = stream.into_split(); let mut reader = JsonLineReader::new(r); - let status = match tokio::time::timeout(PROBE_TIMEOUT, reader.next::()).await { - Ok(Ok(Some(Event::Snapshot { status, .. }))) => Some(status), - _ => None, - }; + let mut status = None; + loop { + match tokio::time::timeout(PROBE_TIMEOUT, reader.next::()).await { + Ok(Ok(Some(Event::Snapshot { + status: snapshot_status, + .. + }))) => { + status = Some(snapshot_status); + break; + } + Ok(Ok(Some(Event::Alert(_)))) => continue, + Ok(Ok(Some(_))) | Ok(Ok(None)) | Ok(Err(_)) | Err(_) => break, + } + } LiveInfo { socket_path: socket_path.to_path_buf(), reachable: true, @@ -755,8 +765,8 @@ mod tests { use std::sync::Mutex; use manifest::{Permission, ScopeRule}; - use protocol::Greeting; use protocol::stream::JsonLineWriter; + use protocol::{Alert, AlertLevel, AlertSource, Greeting}; use session_store::{ FsStore, PodSpawnedChild, PodSpawnedScopeRule, new_segment_id, new_session_id, }; @@ -931,6 +941,48 @@ mod tests { live_listener.abort(); } + #[tokio::test(flavor = "current_thread")] + async fn probe_socket_reads_status_after_replayed_alert() { + let root = TempDir::new().unwrap(); + let socket = root.path().join("pod.sock"); + let listener = UnixListener::bind(&socket).unwrap(); + let handle = tokio::spawn(async move { + let (stream, _) = listener.accept().await.unwrap(); + let mut writer = JsonLineWriter::new(stream); + writer + .write(&Event::Alert(Alert { + level: AlertLevel::Warn, + source: AlertSource::Pod, + message: "replayed alert".into(), + timestamp_ms: 0, + })) + .await + .unwrap(); + writer + .write(&Event::Snapshot { + entries: Vec::new(), + greeting: Greeting { + pod_name: "alerted".into(), + cwd: "/tmp".into(), + provider: "test".into(), + model: "test".into(), + scope_summary: String::new(), + tools: Vec::new(), + context_window: 0, + context_tokens: 0, + }, + status: PodStatus::Paused, + }) + .await + .unwrap(); + }); + + let info = probe_socket(&socket).await; + assert!(info.reachable); + assert!(matches!(info.status, Some(PodStatus::Paused))); + handle.await.unwrap(); + } + fn child(name: &str, socket_path: &Path) -> PodSpawnedChild { PodSpawnedChild { pod_name: name.to_string(), diff --git a/crates/pod/src/spawn/comm_tools.rs b/crates/pod/src/spawn/comm_tools.rs index 337cc5e3..b3217980 100644 --- a/crates/pod/src/spawn/comm_tools.rs +++ b/crates/pod/src/spawn/comm_tools.rs @@ -79,6 +79,10 @@ impl Tool for SendToPodTool { "pod `{}` is already running a turn; wait for it to finish and retry", input.name )), + SendRunError::Rejected { code, message } => ToolError::ExecutionFailed(format!( + "pod `{}` rejected the run with {code:?}: {message}", + input.name + )), SendRunError::Io(msg) => { ToolError::ExecutionFailed(format!("send to `{}`: {msg}", input.name)) } @@ -370,16 +374,20 @@ pub(crate) enum SendRunError { /// Target Pod responded with `Error { AlreadyRunning }` — the /// caller can retry once the current turn ends. AlreadyRunning, - /// Any other failure (connect / write / read / unexpected EOF). + /// Target Pod explicitly rejected the run after delivery reached the + /// controller. + Rejected { code: ErrorCode, message: String }, + /// Transport, protocol, timeout, or unexpected EOF before acceptance + /// evidence was observed. Io(String), } /// Write `Method::Run` to the target and read back events until we see /// evidence that the controller accepted the run (`UserMessage`, -/// `TurnStart`, or a user-send `InvokeStart`) or rejected it with -/// `Error { AlreadyRunning }`. Any connect-time Snapshot or replayed alerts -/// that precede the response are skipped. Times out per-read so a stuck Pod -/// doesn't hang the tool. +/// `TurnStart`, or a user-send `InvokeStart`) or rejected it. The connect-time +/// event prelude is drained before sending the method so large Snapshots and +/// large Run payloads cannot block each other on the same socket. Times out +/// per operation so a stuck Pod doesn't hang the tool. pub(crate) async fn send_run_and_confirm(socket: &Path, input: String) -> Result<(), SendRunError> { let stream = tokio::time::timeout(SOCKET_OP_TIMEOUT, UnixStream::connect(socket)) .await @@ -388,6 +396,31 @@ pub(crate) async fn send_run_and_confirm(socket: &Path, input: String) -> Result let (r, w) = stream.into_split(); let mut writer = JsonLineWriter::new(w); let mut reader = JsonLineReader::new(r); + + loop { + let event = tokio::time::timeout(SOCKET_OP_TIMEOUT, reader.next::()) + .await + .map_err(|_| SendRunError::Io("read initial Snapshot timed out".into()))? + .map_err(|e| SendRunError::Io(format!("read initial Snapshot: {e}")))?; + match event { + Some(Event::Snapshot { .. }) => break, + Some(Event::Alert(_)) => continue, + Some(Event::Error { + code: ErrorCode::AlreadyRunning, + .. + }) => return Err(SendRunError::AlreadyRunning), + Some(Event::Error { code, message }) => { + return Err(SendRunError::Rejected { code, message }); + } + Some(_) => continue, + None => { + return Err(SendRunError::Io( + "connection closed before initial Snapshot".into(), + )); + } + } + } + tokio::time::timeout( SOCKET_OP_TIMEOUT, writer.write(&Method::Run { @@ -400,26 +433,23 @@ pub(crate) async fn send_run_and_confirm(socket: &Path, input: String) -> Result loop { let event = tokio::time::timeout(SOCKET_OP_TIMEOUT, reader.next::()) .await - .map_err(|_| SendRunError::Io("read timed out".into()))? - .map_err(|e| SendRunError::Io(format!("read: {e}")))?; + .map_err(|_| SendRunError::Io("read response timed out".into()))? + .map_err(|e| SendRunError::Io(format!("read response: {e}")))?; match event { Some(Event::Error { code: ErrorCode::AlreadyRunning, .. }) => return Err(SendRunError::AlreadyRunning), Some(Event::Error { code, message }) => { - return Err(SendRunError::Io(format!( - "pod returned {code:?}: {message}" - ))); + return Err(SendRunError::Rejected { code, message }); } Some(Event::InvokeStart { kind: InvokeKind::UserSend, }) | Some(Event::UserMessage { .. }) | Some(Event::TurnStart { .. }) => return Ok(()), - // Alerts, Snapshot, and other pre-turn events can precede the - // controller's response; keep reading until the Run is accepted - // or rejected. + // Other post-Snapshot events can race with the controller's + // response; keep reading until the Run is accepted or rejected. Some(_) => continue, None => return Err(SendRunError::Io("connection closed before response".into())), } @@ -618,6 +648,47 @@ mod tests { } } + #[tokio::test] + async fn send_run_and_confirm_drains_alert_and_large_snapshot_before_large_run() { + let tmp = TempDir::new().unwrap(); + let socket = tmp.path().join("pod.sock"); + let listener = UnixListener::bind(&socket).unwrap(); + let large_snapshot_payload = "s".repeat(2 * 1024 * 1024); + let large_run_payload = "r".repeat(2 * 1024 * 1024); + let received = serve_initial_events_then_run_ack( + listener, + vec![ + Event::Alert(Alert { + level: AlertLevel::Warn, + source: AlertSource::Pod, + message: "replayed alert".into(), + timestamp_ms: 0, + }), + snapshot(vec![ + serde_json::json!({ "payload": large_snapshot_payload }), + ]), + ], + Event::InvokeStart { + kind: InvokeKind::UserSend, + }, + ); + + send_run_and_confirm(&socket, large_run_payload.clone()) + .await + .unwrap(); + + let method = received.await.unwrap().expect("expected method"); + match method { + Method::Run { input } => { + assert_eq!( + protocol::Segment::flatten_to_text(&input), + large_run_payload + ); + } + other => panic!("expected Run, got {other:?}"), + } + } + #[tokio::test] async fn send_run_and_confirm_reports_already_running() { let tmp = TempDir::new().unwrap(); diff --git a/crates/pod/src/spawn/tool.rs b/crates/pod/src/spawn/tool.rs index b7e48292..54bff15b 100644 --- a/crates/pod/src/spawn/tool.rs +++ b/crates/pod/src/spawn/tool.rs @@ -464,6 +464,9 @@ fn spawn_delivery_error(pod_name: &str, err: SendRunError) -> ToolError { SendRunError::AlreadyRunning => ToolError::ExecutionFailed(format!( "spawned pod `{pod_name}` rejected its initial task as already running; the pod remains registered and can be inspected or stopped" )), + SendRunError::Rejected { code, message } => ToolError::ExecutionFailed(format!( + "spawned pod `{pod_name}` rejected its initial task with {code:?}: {message}; the pod remains registered and can be inspected or stopped" + )), SendRunError::Io(msg) => ToolError::ExecutionFailed(format!( "spawned pod `{pod_name}` did not confirm initial task delivery: {msg}; the pod remains registered and can be inspected or stopped" )),