fix: drain snapshots before pod callbacks

This commit is contained in:
Keisuke Hirata 2026-05-23 04:57:03 +09:00
parent b5219dc862
commit fdd2f16df0
4 changed files with 201 additions and 20 deletions

View File

@ -953,7 +953,7 @@ fn worker_error_code(e: &PodError) -> ErrorCode {
mod tests {
use super::*;
use protocol::PodEvent;
use protocol::stream::JsonLineReader;
use protocol::stream::{JsonLineReader, JsonLineWriter};
use std::time::Duration;
use tempfile::TempDir;
use tokio::net::UnixListener;
@ -1031,7 +1031,24 @@ mod tests {
async fn recv_pod_event(listener: UnixListener, timeout: Duration) -> Option<PodEvent> {
let accept = async {
let (stream, _) = listener.accept().await.ok()?;
let mut reader = JsonLineReader::new(stream);
let (r, w) = stream.into_split();
let mut writer = JsonLineWriter::new(w);
writer
.write(&Event::Snapshot {
entries: Vec::new(),
greeting: protocol::Greeting {
pod_name: "parent".into(),
cwd: "/tmp".into(),
provider: "test".into(),
model: "test".into(),
scope_summary: String::new(),
tools: Vec::new(),
},
status: PodStatus::Idle,
})
.await
.ok()?;
let mut reader = JsonLineReader::new(r);
match reader.next::<Method>().await {
Ok(Some(Method::PodEvent(e))) => Some(e),
_ => None,

View File

@ -321,21 +321,52 @@ fn unknown_pod_err(name: &str) -> ToolError {
ToolError::InvalidArgument(format!("no spawned pod named `{name}`"))
}
/// Connect with a timeout, write one `Method` line, flush, and close.
/// Any socket error maps to an `io::Error`; the caller decides whether
/// to surface it to the LLM or treat it as "pod stopped".
/// Connect with a timeout, drain the server's connect-time snapshot,
/// write one `Method` line, flush, and close.
///
/// The Pod socket protocol sends replayed alerts and an initial
/// `Event::Snapshot` before it starts reading client methods. Send-only
/// callers must consume that prefix; otherwise a large snapshot can block
/// the server's writer before it reaches the method-read branch. Any
/// socket error maps to an `io::Error`; the caller decides whether to
/// surface it to the LLM or treat it as "pod stopped".
pub(crate) async fn connect_and_send(socket: &Path, method: &Method) -> std::io::Result<()> {
let stream = tokio::time::timeout(SOCKET_OP_TIMEOUT, UnixStream::connect(socket))
.await
.map_err(|_| std::io::Error::new(std::io::ErrorKind::TimedOut, "connect timed out"))??;
let (_r, w) = stream.into_split();
let (r, w) = stream.into_split();
let mut reader = JsonLineReader::new(r);
let mut writer = JsonLineWriter::new(w);
drain_initial_snapshot(&mut reader).await?;
tokio::time::timeout(SOCKET_OP_TIMEOUT, writer.write(method))
.await
.map_err(|_| std::io::Error::new(std::io::ErrorKind::TimedOut, "write timed out"))??;
Ok(())
}
async fn drain_initial_snapshot<R>(reader: &mut JsonLineReader<R>) -> std::io::Result<()>
where
R: tokio::io::AsyncBufRead + Unpin,
{
loop {
let event = tokio::time::timeout(SOCKET_OP_TIMEOUT, reader.next::<Event>())
.await
.map_err(|_| std::io::Error::new(std::io::ErrorKind::TimedOut, "read timed out"))??;
match event {
Some(Event::Snapshot { .. }) => return Ok(()),
Some(_) => continue,
None => {
return Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"pod closed connection before Snapshot event",
));
}
}
}
}
/// Failure modes distinguished by `SendToPod`.
enum SendRunError {
/// Target Pod responded with `Error { AlreadyRunning }` — the
@ -498,3 +529,93 @@ fn release_scope(pod_name: &str) {
};
let _ = pod_registry::release_pod(&mut guard, pod_name);
}
#[cfg(test)]
mod tests {
use super::*;
use protocol::{Alert, AlertLevel, AlertSource, Greeting, PodEvent, PodStatus};
use tempfile::TempDir;
use tokio::net::UnixListener;
use tokio::task::JoinHandle;
fn snapshot(entries: Vec<serde_json::Value>) -> Event {
Event::Snapshot {
entries,
greeting: Greeting {
pod_name: "server".into(),
cwd: "/tmp".into(),
provider: "test".into(),
model: "test".into(),
scope_summary: String::new(),
tools: Vec::new(),
},
status: PodStatus::Idle,
}
}
fn serve_initial_events_then_method(
listener: UnixListener,
events: Vec<Event>,
) -> JoinHandle<Option<Method>> {
tokio::spawn(async move {
let (stream, _) = listener.accept().await.ok()?;
let (r, w) = stream.into_split();
let mut reader = JsonLineReader::new(r);
let mut writer = JsonLineWriter::new(w);
for event in events {
writer.write(&event).await.ok()?;
}
reader.next::<Method>().await.ok().flatten()
})
}
#[tokio::test]
async fn connect_and_send_drains_initial_alert_and_snapshot_before_method() {
let tmp = TempDir::new().unwrap();
let socket = tmp.path().join("pod.sock");
let listener = UnixListener::bind(&socket).unwrap();
let received = serve_initial_events_then_method(
listener,
vec![
Event::Alert(Alert {
level: AlertLevel::Warn,
source: AlertSource::Pod,
message: "replayed alert".into(),
timestamp_ms: 0,
}),
snapshot(Vec::new()),
],
);
connect_and_send(&socket, &Method::Shutdown).await.unwrap();
let method = received.await.unwrap().expect("expected method");
assert!(matches!(method, Method::Shutdown));
}
#[tokio::test]
async fn connect_and_send_delivers_method_after_large_initial_snapshot() {
let tmp = TempDir::new().unwrap();
let socket = tmp.path().join("pod.sock");
let listener = UnixListener::bind(&socket).unwrap();
let large_payload = "x".repeat(2 * 1024 * 1024);
let received = serve_initial_events_then_method(
listener,
vec![snapshot(vec![
serde_json::json!({ "payload": large_payload }),
])],
);
let expected = Method::PodEvent(PodEvent::TurnEnded {
pod_name: "child".into(),
});
connect_and_send(&socket, &expected).await.unwrap();
let method = received.await.unwrap().expect("expected method");
match method {
Method::PodEvent(PodEvent::TurnEnded { pod_name }) => assert_eq!(pod_name, "child"),
other => panic!("expected TurnEnded PodEvent, got {other:?}"),
}
}
}

View File

@ -2,11 +2,11 @@
//! `ReadPodOutput`, `StopPod`, `ListPods`).
//!
//! The real child Pod binary is not started. Instead each test stands
//! up a mock `UnixListener` that speaks the socket protocol directly
//! (accepting `Method::Run` / `Method::GetHistory` / `Method::Shutdown`
//! and responding with `Event::History` when asked). This keeps the
//! tests fast and independent of the LLM layer — the tools are exercised
//! for their wire behaviour alone.
//! up a mock `UnixListener` that speaks the socket protocol directly:
//! it emits the connect-time `Event::Snapshot`, accepts methods such as
//! `Method::Run` / `Method::Shutdown`, and responds with the relevant
//! events when needed. This keeps the tests fast and independent of the
//! LLM layer — the tools are exercised for their wire behaviour alone.
use std::path::{Path, PathBuf};
use std::sync::{Arc, LazyLock, Mutex};
@ -115,20 +115,40 @@ async fn bind_mock_socket(dir: &Path, name: &str) -> (PathBuf, UnixListener) {
(socket, listener)
}
/// Accept one connection and read exactly one `Method` line from it.
/// Minimal connect-time snapshot used by mock socket servers.
fn empty_snapshot() -> Event {
Event::Snapshot {
entries: Vec::new(),
greeting: Greeting {
pod_name: "child".into(),
cwd: "/tmp".into(),
provider: "anthropic".into(),
model: "x".into(),
scope_summary: String::new(),
tools: Vec::new(),
},
status: protocol::PodStatus::Idle,
}
}
/// Accept one connection, send the protocol's connect-time snapshot,
/// and read exactly one `Method` line from it.
/// The reader half is kept open; caller awaits the returned handle.
fn accept_one_method(listener: UnixListener) -> JoinHandle<Option<Method>> {
tokio::spawn(async move {
let (stream, _) = listener.accept().await.ok()?;
let (r, _w) = stream.into_split();
let (r, w) = stream.into_split();
let mut reader = JsonLineReader::new(r);
let mut writer = JsonLineWriter::new(w);
writer.write(&empty_snapshot()).await.ok()?;
reader.next::<Method>().await.ok().flatten()
})
}
/// Accept one connection, read one `Method`, then write `response`
/// back. Used by `SendToPod` tests to mock the real controller's
/// `TurnStart` acknowledgement (or its `AlreadyRunning` rejection).
/// Accept one connection, send the protocol's connect-time snapshot,
/// read one `Method`, then write `response` back. Used by `SendToPod`
/// tests to mock the real controller's `TurnStart` acknowledgement (or
/// its `AlreadyRunning` rejection).
fn accept_method_and_respond(
listener: UnixListener,
response: Event,
@ -138,6 +158,7 @@ fn accept_method_and_respond(
let (r, w) = stream.into_split();
let mut reader = JsonLineReader::new(r);
let mut writer = JsonLineWriter::new(w);
writer.write(&empty_snapshot()).await.ok()?;
let method = reader.next::<Method>().await.ok().flatten();
if method.is_some() {
let _ = writer.write(&response).await;
@ -195,6 +216,9 @@ fn serve_pod_methods(listener: UnixListener) -> mpsc::Receiver<Method> {
let (r, w) = stream.into_split();
let mut reader = JsonLineReader::new(r);
let mut writer = JsonLineWriter::new(w);
if writer.write(&empty_snapshot()).await.is_err() {
continue;
}
let Some(method) = reader.next::<Method>().await.ok().flatten() else {
continue;
};

View File

@ -13,8 +13,8 @@ use pod::ipc::event::{apply_event_side_effects, fire_and_forget, render_event};
use pod::runtime::dir::{RuntimeDir, SpawnedPodRecord};
use pod::runtime::pod_registry::{self, LockFileGuard};
use pod::spawn::registry::SpawnedPodRegistry;
use protocol::stream::JsonLineReader;
use protocol::{Method, Permission, PodEvent, ScopeRule};
use protocol::stream::{JsonLineReader, JsonLineWriter};
use protocol::{Event, Greeting, Method, Permission, PodEvent, PodStatus, ScopeRule};
use tempfile::TempDir;
use tokio::net::UnixListener;
@ -76,11 +76,30 @@ fn clear_runtime_dir() {
}
}
/// Accept a single connection, read one `Method`, and return it.
/// Minimal connect-time snapshot used by mock parent sockets.
fn empty_snapshot() -> Event {
Event::Snapshot {
entries: Vec::new(),
greeting: Greeting {
pod_name: "parent".into(),
cwd: "/tmp".into(),
provider: "test".into(),
model: "test".into(),
scope_summary: String::new(),
tools: Vec::new(),
},
status: PodStatus::Idle,
}
}
/// Accept a single connection, send the protocol's connect-time snapshot,
/// read one `Method`, and return it.
fn accept_one_method(listener: UnixListener) -> tokio::task::JoinHandle<Option<Method>> {
tokio::spawn(async move {
let (stream, _) = listener.accept().await.ok()?;
let (reader, _writer) = stream.into_split();
let (reader, writer) = stream.into_split();
let mut w = JsonLineWriter::new(writer);
w.write(&empty_snapshot()).await.ok()?;
let mut r = JsonLineReader::new(reader);
r.next::<Method>().await.ok().flatten()
})