From fdd2f16df067c5d7fcbfb85de3b9ab47999a9d2e Mon Sep 17 00:00:00 2001 From: Hare Date: Sat, 23 May 2026 04:57:03 +0900 Subject: [PATCH] fix: drain snapshots before pod callbacks --- crates/pod/src/controller.rs | 21 +++- crates/pod/src/spawn/comm_tools.rs | 129 +++++++++++++++++++++++- crates/pod/tests/pod_comm_tools_test.rs | 44 ++++++-- crates/pod/tests/pod_events_test.rs | 27 ++++- 4 files changed, 201 insertions(+), 20 deletions(-) diff --git a/crates/pod/src/controller.rs b/crates/pod/src/controller.rs index c6864274..2c033719 100644 --- a/crates/pod/src/controller.rs +++ b/crates/pod/src/controller.rs @@ -953,7 +953,7 @@ fn worker_error_code(e: &PodError) -> ErrorCode { mod tests { use super::*; use protocol::PodEvent; - use protocol::stream::JsonLineReader; + use protocol::stream::{JsonLineReader, JsonLineWriter}; use std::time::Duration; use tempfile::TempDir; use tokio::net::UnixListener; @@ -1031,7 +1031,24 @@ mod tests { async fn recv_pod_event(listener: UnixListener, timeout: Duration) -> Option { let accept = async { let (stream, _) = listener.accept().await.ok()?; - let mut reader = JsonLineReader::new(stream); + let (r, w) = stream.into_split(); + let mut writer = JsonLineWriter::new(w); + writer + .write(&Event::Snapshot { + entries: Vec::new(), + greeting: protocol::Greeting { + pod_name: "parent".into(), + cwd: "/tmp".into(), + provider: "test".into(), + model: "test".into(), + scope_summary: String::new(), + tools: Vec::new(), + }, + status: PodStatus::Idle, + }) + .await + .ok()?; + let mut reader = JsonLineReader::new(r); match reader.next::().await { Ok(Some(Method::PodEvent(e))) => Some(e), _ => None, diff --git a/crates/pod/src/spawn/comm_tools.rs b/crates/pod/src/spawn/comm_tools.rs index dc70d111..ace7b282 100644 --- a/crates/pod/src/spawn/comm_tools.rs +++ b/crates/pod/src/spawn/comm_tools.rs @@ -321,21 +321,52 @@ fn unknown_pod_err(name: &str) -> ToolError { ToolError::InvalidArgument(format!("no spawned pod named `{name}`")) } -/// Connect with a timeout, write one `Method` line, flush, and close. -/// Any socket error maps to an `io::Error`; the caller decides whether -/// to surface it to the LLM or treat it as "pod stopped". +/// Connect with a timeout, drain the server's connect-time snapshot, +/// write one `Method` line, flush, and close. +/// +/// The Pod socket protocol sends replayed alerts and an initial +/// `Event::Snapshot` before it starts reading client methods. Send-only +/// callers must consume that prefix; otherwise a large snapshot can block +/// the server's writer before it reaches the method-read branch. Any +/// socket error maps to an `io::Error`; the caller decides whether to +/// surface it to the LLM or treat it as "pod stopped". pub(crate) async fn connect_and_send(socket: &Path, method: &Method) -> std::io::Result<()> { let stream = tokio::time::timeout(SOCKET_OP_TIMEOUT, UnixStream::connect(socket)) .await .map_err(|_| std::io::Error::new(std::io::ErrorKind::TimedOut, "connect timed out"))??; - let (_r, w) = stream.into_split(); + let (r, w) = stream.into_split(); + let mut reader = JsonLineReader::new(r); let mut writer = JsonLineWriter::new(w); + + drain_initial_snapshot(&mut reader).await?; + tokio::time::timeout(SOCKET_OP_TIMEOUT, writer.write(method)) .await .map_err(|_| std::io::Error::new(std::io::ErrorKind::TimedOut, "write timed out"))??; Ok(()) } +async fn drain_initial_snapshot(reader: &mut JsonLineReader) -> std::io::Result<()> +where + R: tokio::io::AsyncBufRead + Unpin, +{ + loop { + let event = tokio::time::timeout(SOCKET_OP_TIMEOUT, reader.next::()) + .await + .map_err(|_| std::io::Error::new(std::io::ErrorKind::TimedOut, "read timed out"))??; + match event { + Some(Event::Snapshot { .. }) => return Ok(()), + Some(_) => continue, + None => { + return Err(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "pod closed connection before Snapshot event", + )); + } + } + } +} + /// Failure modes distinguished by `SendToPod`. enum SendRunError { /// Target Pod responded with `Error { AlreadyRunning }` — the @@ -498,3 +529,93 @@ fn release_scope(pod_name: &str) { }; let _ = pod_registry::release_pod(&mut guard, pod_name); } + +#[cfg(test)] +mod tests { + use super::*; + + use protocol::{Alert, AlertLevel, AlertSource, Greeting, PodEvent, PodStatus}; + use tempfile::TempDir; + use tokio::net::UnixListener; + use tokio::task::JoinHandle; + + fn snapshot(entries: Vec) -> Event { + Event::Snapshot { + entries, + greeting: Greeting { + pod_name: "server".into(), + cwd: "/tmp".into(), + provider: "test".into(), + model: "test".into(), + scope_summary: String::new(), + tools: Vec::new(), + }, + status: PodStatus::Idle, + } + } + + fn serve_initial_events_then_method( + listener: UnixListener, + events: Vec, + ) -> JoinHandle> { + tokio::spawn(async move { + let (stream, _) = listener.accept().await.ok()?; + let (r, w) = stream.into_split(); + let mut reader = JsonLineReader::new(r); + let mut writer = JsonLineWriter::new(w); + for event in events { + writer.write(&event).await.ok()?; + } + reader.next::().await.ok().flatten() + }) + } + + #[tokio::test] + async fn connect_and_send_drains_initial_alert_and_snapshot_before_method() { + let tmp = TempDir::new().unwrap(); + let socket = tmp.path().join("pod.sock"); + let listener = UnixListener::bind(&socket).unwrap(); + let received = serve_initial_events_then_method( + listener, + vec![ + Event::Alert(Alert { + level: AlertLevel::Warn, + source: AlertSource::Pod, + message: "replayed alert".into(), + timestamp_ms: 0, + }), + snapshot(Vec::new()), + ], + ); + + connect_and_send(&socket, &Method::Shutdown).await.unwrap(); + + let method = received.await.unwrap().expect("expected method"); + assert!(matches!(method, Method::Shutdown)); + } + + #[tokio::test] + async fn connect_and_send_delivers_method_after_large_initial_snapshot() { + let tmp = TempDir::new().unwrap(); + let socket = tmp.path().join("pod.sock"); + let listener = UnixListener::bind(&socket).unwrap(); + let large_payload = "x".repeat(2 * 1024 * 1024); + let received = serve_initial_events_then_method( + listener, + vec![snapshot(vec![ + serde_json::json!({ "payload": large_payload }), + ])], + ); + let expected = Method::PodEvent(PodEvent::TurnEnded { + pod_name: "child".into(), + }); + + connect_and_send(&socket, &expected).await.unwrap(); + + let method = received.await.unwrap().expect("expected method"); + match method { + Method::PodEvent(PodEvent::TurnEnded { pod_name }) => assert_eq!(pod_name, "child"), + other => panic!("expected TurnEnded PodEvent, got {other:?}"), + } + } +} diff --git a/crates/pod/tests/pod_comm_tools_test.rs b/crates/pod/tests/pod_comm_tools_test.rs index ef41ff6d..f00f0a74 100644 --- a/crates/pod/tests/pod_comm_tools_test.rs +++ b/crates/pod/tests/pod_comm_tools_test.rs @@ -2,11 +2,11 @@ //! `ReadPodOutput`, `StopPod`, `ListPods`). //! //! The real child Pod binary is not started. Instead each test stands -//! up a mock `UnixListener` that speaks the socket protocol directly -//! (accepting `Method::Run` / `Method::GetHistory` / `Method::Shutdown` -//! and responding with `Event::History` when asked). This keeps the -//! tests fast and independent of the LLM layer — the tools are exercised -//! for their wire behaviour alone. +//! up a mock `UnixListener` that speaks the socket protocol directly: +//! it emits the connect-time `Event::Snapshot`, accepts methods such as +//! `Method::Run` / `Method::Shutdown`, and responds with the relevant +//! events when needed. This keeps the tests fast and independent of the +//! LLM layer — the tools are exercised for their wire behaviour alone. use std::path::{Path, PathBuf}; use std::sync::{Arc, LazyLock, Mutex}; @@ -115,20 +115,40 @@ async fn bind_mock_socket(dir: &Path, name: &str) -> (PathBuf, UnixListener) { (socket, listener) } -/// Accept one connection and read exactly one `Method` line from it. +/// Minimal connect-time snapshot used by mock socket servers. +fn empty_snapshot() -> Event { + Event::Snapshot { + entries: Vec::new(), + greeting: Greeting { + pod_name: "child".into(), + cwd: "/tmp".into(), + provider: "anthropic".into(), + model: "x".into(), + scope_summary: String::new(), + tools: Vec::new(), + }, + status: protocol::PodStatus::Idle, + } +} + +/// Accept one connection, send the protocol's connect-time snapshot, +/// and read exactly one `Method` line from it. /// The reader half is kept open; caller awaits the returned handle. fn accept_one_method(listener: UnixListener) -> JoinHandle> { tokio::spawn(async move { let (stream, _) = listener.accept().await.ok()?; - let (r, _w) = stream.into_split(); + let (r, w) = stream.into_split(); let mut reader = JsonLineReader::new(r); + let mut writer = JsonLineWriter::new(w); + writer.write(&empty_snapshot()).await.ok()?; reader.next::().await.ok().flatten() }) } -/// Accept one connection, read one `Method`, then write `response` -/// back. Used by `SendToPod` tests to mock the real controller's -/// `TurnStart` acknowledgement (or its `AlreadyRunning` rejection). +/// Accept one connection, send the protocol's connect-time snapshot, +/// read one `Method`, then write `response` back. Used by `SendToPod` +/// tests to mock the real controller's `TurnStart` acknowledgement (or +/// its `AlreadyRunning` rejection). fn accept_method_and_respond( listener: UnixListener, response: Event, @@ -138,6 +158,7 @@ fn accept_method_and_respond( let (r, w) = stream.into_split(); let mut reader = JsonLineReader::new(r); let mut writer = JsonLineWriter::new(w); + writer.write(&empty_snapshot()).await.ok()?; let method = reader.next::().await.ok().flatten(); if method.is_some() { let _ = writer.write(&response).await; @@ -195,6 +216,9 @@ fn serve_pod_methods(listener: UnixListener) -> mpsc::Receiver { let (r, w) = stream.into_split(); let mut reader = JsonLineReader::new(r); let mut writer = JsonLineWriter::new(w); + if writer.write(&empty_snapshot()).await.is_err() { + continue; + } let Some(method) = reader.next::().await.ok().flatten() else { continue; }; diff --git a/crates/pod/tests/pod_events_test.rs b/crates/pod/tests/pod_events_test.rs index 00c2526c..f305ef9a 100644 --- a/crates/pod/tests/pod_events_test.rs +++ b/crates/pod/tests/pod_events_test.rs @@ -13,8 +13,8 @@ use pod::ipc::event::{apply_event_side_effects, fire_and_forget, render_event}; use pod::runtime::dir::{RuntimeDir, SpawnedPodRecord}; use pod::runtime::pod_registry::{self, LockFileGuard}; use pod::spawn::registry::SpawnedPodRegistry; -use protocol::stream::JsonLineReader; -use protocol::{Method, Permission, PodEvent, ScopeRule}; +use protocol::stream::{JsonLineReader, JsonLineWriter}; +use protocol::{Event, Greeting, Method, Permission, PodEvent, PodStatus, ScopeRule}; use tempfile::TempDir; use tokio::net::UnixListener; @@ -76,11 +76,30 @@ fn clear_runtime_dir() { } } -/// Accept a single connection, read one `Method`, and return it. +/// Minimal connect-time snapshot used by mock parent sockets. +fn empty_snapshot() -> Event { + Event::Snapshot { + entries: Vec::new(), + greeting: Greeting { + pod_name: "parent".into(), + cwd: "/tmp".into(), + provider: "test".into(), + model: "test".into(), + scope_summary: String::new(), + tools: Vec::new(), + }, + status: PodStatus::Idle, + } +} + +/// Accept a single connection, send the protocol's connect-time snapshot, +/// read one `Method`, and return it. fn accept_one_method(listener: UnixListener) -> tokio::task::JoinHandle> { tokio::spawn(async move { let (stream, _) = listener.accept().await.ok()?; - let (reader, _writer) = stream.into_split(); + let (reader, writer) = stream.into_split(); + let mut w = JsonLineWriter::new(writer); + w.write(&empty_snapshot()).await.ok()?; let mut r = JsonLineReader::new(reader); r.next::().await.ok().flatten() })