merge: pod-event-callback-delivery
This commit is contained in:
commit
baaec0c77f
|
|
@ -953,7 +953,7 @@ fn worker_error_code(e: &PodError) -> ErrorCode {
|
|||
mod tests {
|
||||
use super::*;
|
||||
use protocol::PodEvent;
|
||||
use protocol::stream::JsonLineReader;
|
||||
use protocol::stream::{JsonLineReader, JsonLineWriter};
|
||||
use std::time::Duration;
|
||||
use tempfile::TempDir;
|
||||
use tokio::net::UnixListener;
|
||||
|
|
@ -1031,7 +1031,24 @@ mod tests {
|
|||
async fn recv_pod_event(listener: UnixListener, timeout: Duration) -> Option<PodEvent> {
|
||||
let accept = async {
|
||||
let (stream, _) = listener.accept().await.ok()?;
|
||||
let mut reader = JsonLineReader::new(stream);
|
||||
let (r, w) = stream.into_split();
|
||||
let mut writer = JsonLineWriter::new(w);
|
||||
writer
|
||||
.write(&Event::Snapshot {
|
||||
entries: Vec::new(),
|
||||
greeting: protocol::Greeting {
|
||||
pod_name: "parent".into(),
|
||||
cwd: "/tmp".into(),
|
||||
provider: "test".into(),
|
||||
model: "test".into(),
|
||||
scope_summary: String::new(),
|
||||
tools: Vec::new(),
|
||||
},
|
||||
status: PodStatus::Idle,
|
||||
})
|
||||
.await
|
||||
.ok()?;
|
||||
let mut reader = JsonLineReader::new(r);
|
||||
match reader.next::<Method>().await {
|
||||
Ok(Some(Method::PodEvent(e))) => Some(e),
|
||||
_ => None,
|
||||
|
|
|
|||
|
|
@ -321,21 +321,52 @@ fn unknown_pod_err(name: &str) -> ToolError {
|
|||
ToolError::InvalidArgument(format!("no spawned pod named `{name}`"))
|
||||
}
|
||||
|
||||
/// Connect with a timeout, write one `Method` line, flush, and close.
|
||||
/// Any socket error maps to an `io::Error`; the caller decides whether
|
||||
/// to surface it to the LLM or treat it as "pod stopped".
|
||||
/// Connect with a timeout, drain the server's connect-time snapshot,
|
||||
/// write one `Method` line, flush, and close.
|
||||
///
|
||||
/// The Pod socket protocol sends replayed alerts and an initial
|
||||
/// `Event::Snapshot` before it starts reading client methods. Send-only
|
||||
/// callers must consume that prefix; otherwise a large snapshot can block
|
||||
/// the server's writer before it reaches the method-read branch. Any
|
||||
/// socket error maps to an `io::Error`; the caller decides whether to
|
||||
/// surface it to the LLM or treat it as "pod stopped".
|
||||
pub(crate) async fn connect_and_send(socket: &Path, method: &Method) -> std::io::Result<()> {
|
||||
let stream = tokio::time::timeout(SOCKET_OP_TIMEOUT, UnixStream::connect(socket))
|
||||
.await
|
||||
.map_err(|_| std::io::Error::new(std::io::ErrorKind::TimedOut, "connect timed out"))??;
|
||||
let (_r, w) = stream.into_split();
|
||||
let (r, w) = stream.into_split();
|
||||
let mut reader = JsonLineReader::new(r);
|
||||
let mut writer = JsonLineWriter::new(w);
|
||||
|
||||
drain_initial_snapshot(&mut reader).await?;
|
||||
|
||||
tokio::time::timeout(SOCKET_OP_TIMEOUT, writer.write(method))
|
||||
.await
|
||||
.map_err(|_| std::io::Error::new(std::io::ErrorKind::TimedOut, "write timed out"))??;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn drain_initial_snapshot<R>(reader: &mut JsonLineReader<R>) -> std::io::Result<()>
|
||||
where
|
||||
R: tokio::io::AsyncBufRead + Unpin,
|
||||
{
|
||||
loop {
|
||||
let event = tokio::time::timeout(SOCKET_OP_TIMEOUT, reader.next::<Event>())
|
||||
.await
|
||||
.map_err(|_| std::io::Error::new(std::io::ErrorKind::TimedOut, "read timed out"))??;
|
||||
match event {
|
||||
Some(Event::Snapshot { .. }) => return Ok(()),
|
||||
Some(_) => continue,
|
||||
None => {
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::UnexpectedEof,
|
||||
"pod closed connection before Snapshot event",
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Failure modes distinguished by `SendToPod`.
|
||||
enum SendRunError {
|
||||
/// Target Pod responded with `Error { AlreadyRunning }` — the
|
||||
|
|
@ -498,3 +529,93 @@ fn release_scope(pod_name: &str) {
|
|||
};
|
||||
let _ = pod_registry::release_pod(&mut guard, pod_name);
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
use protocol::{Alert, AlertLevel, AlertSource, Greeting, PodEvent, PodStatus};
|
||||
use tempfile::TempDir;
|
||||
use tokio::net::UnixListener;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
fn snapshot(entries: Vec<serde_json::Value>) -> Event {
|
||||
Event::Snapshot {
|
||||
entries,
|
||||
greeting: Greeting {
|
||||
pod_name: "server".into(),
|
||||
cwd: "/tmp".into(),
|
||||
provider: "test".into(),
|
||||
model: "test".into(),
|
||||
scope_summary: String::new(),
|
||||
tools: Vec::new(),
|
||||
},
|
||||
status: PodStatus::Idle,
|
||||
}
|
||||
}
|
||||
|
||||
fn serve_initial_events_then_method(
|
||||
listener: UnixListener,
|
||||
events: Vec<Event>,
|
||||
) -> JoinHandle<Option<Method>> {
|
||||
tokio::spawn(async move {
|
||||
let (stream, _) = listener.accept().await.ok()?;
|
||||
let (r, w) = stream.into_split();
|
||||
let mut reader = JsonLineReader::new(r);
|
||||
let mut writer = JsonLineWriter::new(w);
|
||||
for event in events {
|
||||
writer.write(&event).await.ok()?;
|
||||
}
|
||||
reader.next::<Method>().await.ok().flatten()
|
||||
})
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn connect_and_send_drains_initial_alert_and_snapshot_before_method() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
let socket = tmp.path().join("pod.sock");
|
||||
let listener = UnixListener::bind(&socket).unwrap();
|
||||
let received = serve_initial_events_then_method(
|
||||
listener,
|
||||
vec![
|
||||
Event::Alert(Alert {
|
||||
level: AlertLevel::Warn,
|
||||
source: AlertSource::Pod,
|
||||
message: "replayed alert".into(),
|
||||
timestamp_ms: 0,
|
||||
}),
|
||||
snapshot(Vec::new()),
|
||||
],
|
||||
);
|
||||
|
||||
connect_and_send(&socket, &Method::Shutdown).await.unwrap();
|
||||
|
||||
let method = received.await.unwrap().expect("expected method");
|
||||
assert!(matches!(method, Method::Shutdown));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn connect_and_send_delivers_method_after_large_initial_snapshot() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
let socket = tmp.path().join("pod.sock");
|
||||
let listener = UnixListener::bind(&socket).unwrap();
|
||||
let large_payload = "x".repeat(2 * 1024 * 1024);
|
||||
let received = serve_initial_events_then_method(
|
||||
listener,
|
||||
vec![snapshot(vec![
|
||||
serde_json::json!({ "payload": large_payload }),
|
||||
])],
|
||||
);
|
||||
let expected = Method::PodEvent(PodEvent::TurnEnded {
|
||||
pod_name: "child".into(),
|
||||
});
|
||||
|
||||
connect_and_send(&socket, &expected).await.unwrap();
|
||||
|
||||
let method = received.await.unwrap().expect("expected method");
|
||||
match method {
|
||||
Method::PodEvent(PodEvent::TurnEnded { pod_name }) => assert_eq!(pod_name, "child"),
|
||||
other => panic!("expected TurnEnded PodEvent, got {other:?}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,11 +2,11 @@
|
|||
//! `ReadPodOutput`, `StopPod`, `ListPods`).
|
||||
//!
|
||||
//! The real child Pod binary is not started. Instead each test stands
|
||||
//! up a mock `UnixListener` that speaks the socket protocol directly
|
||||
//! (accepting `Method::Run` / `Method::GetHistory` / `Method::Shutdown`
|
||||
//! and responding with `Event::History` when asked). This keeps the
|
||||
//! tests fast and independent of the LLM layer — the tools are exercised
|
||||
//! for their wire behaviour alone.
|
||||
//! up a mock `UnixListener` that speaks the socket protocol directly:
|
||||
//! it emits the connect-time `Event::Snapshot`, accepts methods such as
|
||||
//! `Method::Run` / `Method::Shutdown`, and responds with the relevant
|
||||
//! events when needed. This keeps the tests fast and independent of the
|
||||
//! LLM layer — the tools are exercised for their wire behaviour alone.
|
||||
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::{Arc, LazyLock, Mutex};
|
||||
|
|
@ -115,20 +115,40 @@ async fn bind_mock_socket(dir: &Path, name: &str) -> (PathBuf, UnixListener) {
|
|||
(socket, listener)
|
||||
}
|
||||
|
||||
/// Accept one connection and read exactly one `Method` line from it.
|
||||
/// Minimal connect-time snapshot used by mock socket servers.
|
||||
fn empty_snapshot() -> Event {
|
||||
Event::Snapshot {
|
||||
entries: Vec::new(),
|
||||
greeting: Greeting {
|
||||
pod_name: "child".into(),
|
||||
cwd: "/tmp".into(),
|
||||
provider: "anthropic".into(),
|
||||
model: "x".into(),
|
||||
scope_summary: String::new(),
|
||||
tools: Vec::new(),
|
||||
},
|
||||
status: protocol::PodStatus::Idle,
|
||||
}
|
||||
}
|
||||
|
||||
/// Accept one connection, send the protocol's connect-time snapshot,
|
||||
/// and read exactly one `Method` line from it.
|
||||
/// The reader half is kept open; caller awaits the returned handle.
|
||||
fn accept_one_method(listener: UnixListener) -> JoinHandle<Option<Method>> {
|
||||
tokio::spawn(async move {
|
||||
let (stream, _) = listener.accept().await.ok()?;
|
||||
let (r, _w) = stream.into_split();
|
||||
let (r, w) = stream.into_split();
|
||||
let mut reader = JsonLineReader::new(r);
|
||||
let mut writer = JsonLineWriter::new(w);
|
||||
writer.write(&empty_snapshot()).await.ok()?;
|
||||
reader.next::<Method>().await.ok().flatten()
|
||||
})
|
||||
}
|
||||
|
||||
/// Accept one connection, read one `Method`, then write `response`
|
||||
/// back. Used by `SendToPod` tests to mock the real controller's
|
||||
/// `TurnStart` acknowledgement (or its `AlreadyRunning` rejection).
|
||||
/// Accept one connection, send the protocol's connect-time snapshot,
|
||||
/// read one `Method`, then write `response` back. Used by `SendToPod`
|
||||
/// tests to mock the real controller's `TurnStart` acknowledgement (or
|
||||
/// its `AlreadyRunning` rejection).
|
||||
fn accept_method_and_respond(
|
||||
listener: UnixListener,
|
||||
response: Event,
|
||||
|
|
@ -138,6 +158,7 @@ fn accept_method_and_respond(
|
|||
let (r, w) = stream.into_split();
|
||||
let mut reader = JsonLineReader::new(r);
|
||||
let mut writer = JsonLineWriter::new(w);
|
||||
writer.write(&empty_snapshot()).await.ok()?;
|
||||
let method = reader.next::<Method>().await.ok().flatten();
|
||||
if method.is_some() {
|
||||
let _ = writer.write(&response).await;
|
||||
|
|
@ -195,6 +216,9 @@ fn serve_pod_methods(listener: UnixListener) -> mpsc::Receiver<Method> {
|
|||
let (r, w) = stream.into_split();
|
||||
let mut reader = JsonLineReader::new(r);
|
||||
let mut writer = JsonLineWriter::new(w);
|
||||
if writer.write(&empty_snapshot()).await.is_err() {
|
||||
continue;
|
||||
}
|
||||
let Some(method) = reader.next::<Method>().await.ok().flatten() else {
|
||||
continue;
|
||||
};
|
||||
|
|
|
|||
|
|
@ -13,8 +13,8 @@ use pod::ipc::event::{apply_event_side_effects, fire_and_forget, render_event};
|
|||
use pod::runtime::dir::{RuntimeDir, SpawnedPodRecord};
|
||||
use pod::runtime::pod_registry::{self, LockFileGuard};
|
||||
use pod::spawn::registry::SpawnedPodRegistry;
|
||||
use protocol::stream::JsonLineReader;
|
||||
use protocol::{Method, Permission, PodEvent, ScopeRule};
|
||||
use protocol::stream::{JsonLineReader, JsonLineWriter};
|
||||
use protocol::{Event, Greeting, Method, Permission, PodEvent, PodStatus, ScopeRule};
|
||||
use tempfile::TempDir;
|
||||
use tokio::net::UnixListener;
|
||||
|
||||
|
|
@ -76,11 +76,30 @@ fn clear_runtime_dir() {
|
|||
}
|
||||
}
|
||||
|
||||
/// Accept a single connection, read one `Method`, and return it.
|
||||
/// Minimal connect-time snapshot used by mock parent sockets.
|
||||
fn empty_snapshot() -> Event {
|
||||
Event::Snapshot {
|
||||
entries: Vec::new(),
|
||||
greeting: Greeting {
|
||||
pod_name: "parent".into(),
|
||||
cwd: "/tmp".into(),
|
||||
provider: "test".into(),
|
||||
model: "test".into(),
|
||||
scope_summary: String::new(),
|
||||
tools: Vec::new(),
|
||||
},
|
||||
status: PodStatus::Idle,
|
||||
}
|
||||
}
|
||||
|
||||
/// Accept a single connection, send the protocol's connect-time snapshot,
|
||||
/// read one `Method`, and return it.
|
||||
fn accept_one_method(listener: UnixListener) -> tokio::task::JoinHandle<Option<Method>> {
|
||||
tokio::spawn(async move {
|
||||
let (stream, _) = listener.accept().await.ok()?;
|
||||
let (reader, _writer) = stream.into_split();
|
||||
let (reader, writer) = stream.into_split();
|
||||
let mut w = JsonLineWriter::new(writer);
|
||||
w.write(&empty_snapshot()).await.ok()?;
|
||||
let mut r = JsonLineReader::new(reader);
|
||||
r.next::<Method>().await.ok().flatten()
|
||||
})
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user