From 28ad8f01ec81ffcea3f0314c1b0d411c8a5b689f Mon Sep 17 00:00:00 2001 From: Hare Date: Tue, 26 May 2026 08:37:24 +0900 Subject: [PATCH] fix: confirm SpawnPod initial run delivery --- crates/pod/src/spawn/comm_tools.rs | 104 ++++++++++++++++++++++++++--- crates/pod/src/spawn/tool.rs | 35 ++++------ crates/pod/tests/spawn_pod_test.rs | 18 +++-- 3 files changed, 120 insertions(+), 37 deletions(-) diff --git a/crates/pod/src/spawn/comm_tools.rs b/crates/pod/src/spawn/comm_tools.rs index 9b2f0318..337cc5e3 100644 --- a/crates/pod/src/spawn/comm_tools.rs +++ b/crates/pod/src/spawn/comm_tools.rs @@ -17,7 +17,7 @@ use async_trait::async_trait; use llm_worker::llm_client::types::{ContentPart, Item, Role}; use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput}; use protocol::stream::{JsonLineReader, JsonLineWriter}; -use protocol::{ErrorCode, Event, Method}; +use protocol::{ErrorCode, Event, InvokeKind, Method}; use serde::Deserialize; use session_store::LogEntry; use tokio::net::UnixStream; @@ -365,7 +365,8 @@ where } /// Failure modes distinguished by `SendToPod`. -enum SendRunError { +#[derive(Debug)] +pub(crate) enum SendRunError { /// Target Pod responded with `Error { AlreadyRunning }` — the /// caller can retry once the current turn ends. AlreadyRunning, @@ -374,10 +375,12 @@ enum SendRunError { } /// Write `Method::Run` to the target and read back events until we see -/// either `TurnStart` (accepted) or `Error { AlreadyRunning }` -/// (rejected). Any replayed alerts that precede the response are -/// skipped. Times out per-read so a stuck Pod doesn't hang the tool. -async fn send_run_and_confirm(socket: &Path, input: String) -> Result<(), SendRunError> { +/// evidence that the controller accepted the run (`UserMessage`, +/// `TurnStart`, or a user-send `InvokeStart`) or rejected it with +/// `Error { AlreadyRunning }`. Any connect-time Snapshot or replayed alerts +/// that precede the response are skipped. Times out per-read so a stuck Pod +/// doesn't hang the tool. +pub(crate) async fn send_run_and_confirm(socket: &Path, input: String) -> Result<(), SendRunError> { let stream = tokio::time::timeout(SOCKET_OP_TIMEOUT, UnixStream::connect(socket)) .await .map_err(|_| SendRunError::Io("connect timed out".into()))? @@ -404,10 +407,19 @@ async fn send_run_and_confirm(socket: &Path, input: String) -> Result<(), SendRu code: ErrorCode::AlreadyRunning, .. }) => return Err(SendRunError::AlreadyRunning), - Some(Event::TurnStart { .. }) => return Ok(()), - // Alerts and other pre-turn events are replayed to new - // subscribers; keep reading until the controller's response - // to our `Run` shows up. + Some(Event::Error { code, message }) => { + return Err(SendRunError::Io(format!( + "pod returned {code:?}: {message}" + ))); + } + Some(Event::InvokeStart { + kind: InvokeKind::UserSend, + }) + | Some(Event::UserMessage { .. }) + | Some(Event::TurnStart { .. }) => return Ok(()), + // Alerts, Snapshot, and other pre-turn events can precede the + // controller's response; keep reading until the Run is accepted + // or rejected. Some(_) => continue, None => return Err(SendRunError::Io("connection closed before response".into())), } @@ -555,6 +567,78 @@ mod tests { }) } + fn serve_initial_events_then_run_ack( + listener: UnixListener, + initial_events: Vec, + ack: Event, + ) -> JoinHandle> { + tokio::spawn(async move { + let (stream, _) = listener.accept().await.ok()?; + let (r, w) = stream.into_split(); + let mut reader = JsonLineReader::new(r); + let mut writer = JsonLineWriter::new(w); + for event in initial_events { + writer.write(&event).await.ok()?; + } + let method = reader.next::().await.ok().flatten()?; + writer.write(&ack).await.ok()?; + Some(method) + }) + } + + #[tokio::test] + async fn send_run_and_confirm_keeps_connection_open_until_user_message_ack() { + let tmp = TempDir::new().unwrap(); + let socket = tmp.path().join("pod.sock"); + let listener = UnixListener::bind(&socket).unwrap(); + let received = serve_initial_events_then_run_ack( + listener, + vec![ + Event::Alert(Alert { + level: AlertLevel::Warn, + source: AlertSource::Pod, + message: "replayed alert".into(), + timestamp_ms: 0, + }), + snapshot(Vec::new()), + ], + Event::UserMessage { + segments: vec![protocol::Segment::text("hello")], + }, + ); + + send_run_and_confirm(&socket, "hello".into()).await.unwrap(); + + let method = received.await.unwrap().expect("expected method"); + match method { + Method::Run { input } => { + assert_eq!(protocol::Segment::flatten_to_text(&input), "hello"); + } + other => panic!("expected Run, got {other:?}"), + } + } + + #[tokio::test] + async fn send_run_and_confirm_reports_already_running() { + let tmp = TempDir::new().unwrap(); + let socket = tmp.path().join("pod.sock"); + let listener = UnixListener::bind(&socket).unwrap(); + let received = serve_initial_events_then_run_ack( + listener, + vec![snapshot(Vec::new())], + Event::Error { + code: ErrorCode::AlreadyRunning, + message: "busy".into(), + }, + ); + + let err = send_run_and_confirm(&socket, "hello".into()) + .await + .expect_err("expected AlreadyRunning"); + assert!(matches!(err, SendRunError::AlreadyRunning)); + assert!(matches!(received.await.unwrap(), Some(Method::Run { .. }))); + } + #[tokio::test] async fn connect_and_send_drains_initial_alert_and_snapshot_before_method() { let tmp = TempDir::new().unwrap(); diff --git a/crates/pod/src/spawn/tool.rs b/crates/pod/src/spawn/tool.rs index bf29f2b2..b7e48292 100644 --- a/crates/pod/src/spawn/tool.rs +++ b/crates/pod/src/spawn/tool.rs @@ -17,8 +17,6 @@ use manifest::{ ModelManifest, Permission, PodManifestConfig, PodMetaConfig, ScopeConfig, ScopeRule, SharedScope, WorkerManifestConfig, }; -use protocol::Method; -use protocol::stream::JsonLineWriter; use serde::Deserialize; use session_store::PodScopeSnapshot; use tokio::net::UnixStream; @@ -28,6 +26,7 @@ use tokio::time::sleep; use crate::ipc::event; use crate::runtime::dir::SpawnedPodRecord; use crate::runtime::pod_registry::{self, LockFileGuard, ScopeLockError}; +use crate::spawn::comm_tools::{SendRunError, send_run_and_confirm}; use crate::spawn::registry::SpawnedPodRegistry; use protocol::PodEvent; @@ -258,8 +257,6 @@ impl Tool for SpawnPodTool { }); } - send_run(&predicted_socket, &input.task).await?; - let record = SpawnedPodRecord { pod_name: input.name.clone(), socket_path: predicted_socket.clone(), @@ -284,6 +281,10 @@ impl Tool for SpawnPodTool { }, ); + send_run_and_confirm(&predicted_socket, input.task.clone()) + .await + .map_err(|err| spawn_delivery_error(&input.name, err))?; + Ok(ToolOutput { summary: format!( "spawned pod `{}` listening on {}", @@ -458,23 +459,15 @@ async fn wait_for_socket(path: &Path, timeout: Duration) -> Result<(), ToolError } } -async fn send_run(socket: &Path, task: &str) -> Result<(), ToolError> { - let stream = UnixStream::connect(socket) - .await - .map_err(|e| ToolError::ExecutionFailed(format!("connect {}: {e}", socket.display())))?; - let (_reader, writer) = stream.into_split(); - let mut w = JsonLineWriter::new(writer); - w.write(&Method::Run { - input: vec![protocol::Segment::text(task)], - }) - .await - .map_err(|e| ToolError::ExecutionFailed(format!("send Method::Run: {e}")))?; - // Drop the writer to close the socket's write half. The flush - // inside `JsonLineWriter::write` has already pushed the bytes - // across, so the child will see a complete method line followed by - // EOF. - drop(w); - Ok(()) +fn spawn_delivery_error(pod_name: &str, err: SendRunError) -> ToolError { + match err { + SendRunError::AlreadyRunning => ToolError::ExecutionFailed(format!( + "spawned pod `{pod_name}` rejected its initial task as already running; the pod remains registered and can be inspected or stopped" + )), + SendRunError::Io(msg) => ToolError::ExecutionFailed(format!( + "spawned pod `{pod_name}` did not confirm initial task delivery: {msg}; the pod remains registered and can be inspected or stopped" + )), + } } fn pod_registry_err_to_tool(e: ScopeLockError) -> ToolError { diff --git a/crates/pod/tests/spawn_pod_test.rs b/crates/pod/tests/spawn_pod_test.rs index 91f51578..0327015a 100644 --- a/crates/pod/tests/spawn_pod_test.rs +++ b/crates/pod/tests/spawn_pod_test.rs @@ -16,8 +16,8 @@ use pod::runtime::dir::{RuntimeDir, SpawnedPodRecord}; use pod::runtime::pod_registry::{self, LockFileGuard}; use pod::spawn::registry::SpawnedPodRegistry; use pod::spawn::tool::spawn_pod_tool; -use protocol::Method; -use protocol::stream::JsonLineReader; +use protocol::stream::{JsonLineReader, JsonLineWriter}; +use protocol::{Event, Method}; use serde_json::json; use std::sync::Arc; use tempfile::TempDir; @@ -97,16 +97,22 @@ async fn bind_mock_pod_socket(runtime_base: &Path, pod_name: &str) -> (PathBuf, } /// Launch a tokio task that accepts connections until one carries a -/// `Method` line, then returns it. `wait_for_socket` inside the tool -/// makes a probe connection that carries no data, so the task must -/// tolerate an empty connection and keep listening. +/// `Method` line, then acknowledges it and returns it. `wait_for_socket` +/// inside the tool makes a probe connection that carries no data, so the +/// task must tolerate an empty connection and keep listening. fn accept_one_method(listener: UnixListener) -> tokio::task::JoinHandle> { tokio::spawn(async move { loop { let (stream, _) = listener.accept().await.ok()?; - let (reader, _writer) = stream.into_split(); + let (reader, writer) = stream.into_split(); let mut r = JsonLineReader::new(reader); + let mut w = JsonLineWriter::new(writer); if let Ok(Some(method)) = r.next::().await { + w.write(&Event::UserMessage { + segments: vec![protocol::Segment::text("accepted")], + }) + .await + .ok()?; return Some(method); } }