merge: spawnpod initial run confirmation

This commit is contained in:
Keisuke Hirata 2026-05-28 22:24:14 +09:00
commit 3cb1138e84
No known key found for this signature in database
3 changed files with 144 additions and 18 deletions

View File

@ -548,10 +548,20 @@ async fn probe_socket(socket_path: &Path) -> LiveInfo {
Ok(Ok(stream)) => {
let (r, _w) = stream.into_split();
let mut reader = JsonLineReader::new(r);
let status = match tokio::time::timeout(PROBE_TIMEOUT, reader.next::<Event>()).await {
Ok(Ok(Some(Event::Snapshot { status, .. }))) => Some(status),
_ => None,
};
let mut status = None;
loop {
match tokio::time::timeout(PROBE_TIMEOUT, reader.next::<Event>()).await {
Ok(Ok(Some(Event::Snapshot {
status: snapshot_status,
..
}))) => {
status = Some(snapshot_status);
break;
}
Ok(Ok(Some(Event::Alert(_)))) => continue,
Ok(Ok(Some(_))) | Ok(Ok(None)) | Ok(Err(_)) | Err(_) => break,
}
}
LiveInfo {
socket_path: socket_path.to_path_buf(),
reachable: true,
@ -755,8 +765,8 @@ mod tests {
use std::sync::Mutex;
use manifest::{Permission, ScopeRule};
use protocol::Greeting;
use protocol::stream::JsonLineWriter;
use protocol::{Alert, AlertLevel, AlertSource, Greeting};
use session_store::{
FsStore, PodSpawnedChild, PodSpawnedScopeRule, new_segment_id, new_session_id,
};
@ -931,6 +941,48 @@ mod tests {
live_listener.abort();
}
#[tokio::test(flavor = "current_thread")]
async fn probe_socket_reads_status_after_replayed_alert() {
let root = TempDir::new().unwrap();
let socket = root.path().join("pod.sock");
let listener = UnixListener::bind(&socket).unwrap();
let handle = tokio::spawn(async move {
let (stream, _) = listener.accept().await.unwrap();
let mut writer = JsonLineWriter::new(stream);
writer
.write(&Event::Alert(Alert {
level: AlertLevel::Warn,
source: AlertSource::Pod,
message: "replayed alert".into(),
timestamp_ms: 0,
}))
.await
.unwrap();
writer
.write(&Event::Snapshot {
entries: Vec::new(),
greeting: Greeting {
pod_name: "alerted".into(),
cwd: "/tmp".into(),
provider: "test".into(),
model: "test".into(),
scope_summary: String::new(),
tools: Vec::new(),
context_window: 0,
context_tokens: 0,
},
status: PodStatus::Paused,
})
.await
.unwrap();
});
let info = probe_socket(&socket).await;
assert!(info.reachable);
assert!(matches!(info.status, Some(PodStatus::Paused)));
handle.await.unwrap();
}
fn child(name: &str, socket_path: &Path) -> PodSpawnedChild {
PodSpawnedChild {
pod_name: name.to_string(),

View File

@ -79,6 +79,10 @@ impl Tool for SendToPodTool {
"pod `{}` is already running a turn; wait for it to finish and retry",
input.name
)),
SendRunError::Rejected { code, message } => ToolError::ExecutionFailed(format!(
"pod `{}` rejected the run with {code:?}: {message}",
input.name
)),
SendRunError::Io(msg) => {
ToolError::ExecutionFailed(format!("send to `{}`: {msg}", input.name))
}
@ -370,16 +374,20 @@ pub(crate) enum SendRunError {
/// Target Pod responded with `Error { AlreadyRunning }` — the
/// caller can retry once the current turn ends.
AlreadyRunning,
/// Any other failure (connect / write / read / unexpected EOF).
/// Target Pod explicitly rejected the run after delivery reached the
/// controller.
Rejected { code: ErrorCode, message: String },
/// Transport, protocol, timeout, or unexpected EOF before acceptance
/// evidence was observed.
Io(String),
}
/// Write `Method::Run` to the target and read back events until we see
/// evidence that the controller accepted the run (`UserMessage`,
/// `TurnStart`, or a user-send `InvokeStart`) or rejected it with
/// `Error { AlreadyRunning }`. Any connect-time Snapshot or replayed alerts
/// that precede the response are skipped. Times out per-read so a stuck Pod
/// doesn't hang the tool.
/// `TurnStart`, or a user-send `InvokeStart`) or rejected it. The connect-time
/// event prelude is drained before sending the method so large Snapshots and
/// large Run payloads cannot block each other on the same socket. Times out
/// per operation so a stuck Pod doesn't hang the tool.
pub(crate) async fn send_run_and_confirm(socket: &Path, input: String) -> Result<(), SendRunError> {
let stream = tokio::time::timeout(SOCKET_OP_TIMEOUT, UnixStream::connect(socket))
.await
@ -388,6 +396,31 @@ pub(crate) async fn send_run_and_confirm(socket: &Path, input: String) -> Result
let (r, w) = stream.into_split();
let mut writer = JsonLineWriter::new(w);
let mut reader = JsonLineReader::new(r);
loop {
let event = tokio::time::timeout(SOCKET_OP_TIMEOUT, reader.next::<Event>())
.await
.map_err(|_| SendRunError::Io("read initial Snapshot timed out".into()))?
.map_err(|e| SendRunError::Io(format!("read initial Snapshot: {e}")))?;
match event {
Some(Event::Snapshot { .. }) => break,
Some(Event::Alert(_)) => continue,
Some(Event::Error {
code: ErrorCode::AlreadyRunning,
..
}) => return Err(SendRunError::AlreadyRunning),
Some(Event::Error { code, message }) => {
return Err(SendRunError::Rejected { code, message });
}
Some(_) => continue,
None => {
return Err(SendRunError::Io(
"connection closed before initial Snapshot".into(),
));
}
}
}
tokio::time::timeout(
SOCKET_OP_TIMEOUT,
writer.write(&Method::Run {
@ -400,26 +433,23 @@ pub(crate) async fn send_run_and_confirm(socket: &Path, input: String) -> Result
loop {
let event = tokio::time::timeout(SOCKET_OP_TIMEOUT, reader.next::<Event>())
.await
.map_err(|_| SendRunError::Io("read timed out".into()))?
.map_err(|e| SendRunError::Io(format!("read: {e}")))?;
.map_err(|_| SendRunError::Io("read response timed out".into()))?
.map_err(|e| SendRunError::Io(format!("read response: {e}")))?;
match event {
Some(Event::Error {
code: ErrorCode::AlreadyRunning,
..
}) => return Err(SendRunError::AlreadyRunning),
Some(Event::Error { code, message }) => {
return Err(SendRunError::Io(format!(
"pod returned {code:?}: {message}"
)));
return Err(SendRunError::Rejected { code, message });
}
Some(Event::InvokeStart {
kind: InvokeKind::UserSend,
})
| Some(Event::UserMessage { .. })
| Some(Event::TurnStart { .. }) => return Ok(()),
// Alerts, Snapshot, and other pre-turn events can precede the
// controller's response; keep reading until the Run is accepted
// or rejected.
// Other post-Snapshot events can race with the controller's
// response; keep reading until the Run is accepted or rejected.
Some(_) => continue,
None => return Err(SendRunError::Io("connection closed before response".into())),
}
@ -618,6 +648,47 @@ mod tests {
}
}
#[tokio::test]
async fn send_run_and_confirm_drains_alert_and_large_snapshot_before_large_run() {
let tmp = TempDir::new().unwrap();
let socket = tmp.path().join("pod.sock");
let listener = UnixListener::bind(&socket).unwrap();
let large_snapshot_payload = "s".repeat(2 * 1024 * 1024);
let large_run_payload = "r".repeat(2 * 1024 * 1024);
let received = serve_initial_events_then_run_ack(
listener,
vec![
Event::Alert(Alert {
level: AlertLevel::Warn,
source: AlertSource::Pod,
message: "replayed alert".into(),
timestamp_ms: 0,
}),
snapshot(vec![
serde_json::json!({ "payload": large_snapshot_payload }),
]),
],
Event::InvokeStart {
kind: InvokeKind::UserSend,
},
);
send_run_and_confirm(&socket, large_run_payload.clone())
.await
.unwrap();
let method = received.await.unwrap().expect("expected method");
match method {
Method::Run { input } => {
assert_eq!(
protocol::Segment::flatten_to_text(&input),
large_run_payload
);
}
other => panic!("expected Run, got {other:?}"),
}
}
#[tokio::test]
async fn send_run_and_confirm_reports_already_running() {
let tmp = TempDir::new().unwrap();

View File

@ -464,6 +464,9 @@ fn spawn_delivery_error(pod_name: &str, err: SendRunError) -> ToolError {
SendRunError::AlreadyRunning => ToolError::ExecutionFailed(format!(
"spawned pod `{pod_name}` rejected its initial task as already running; the pod remains registered and can be inspected or stopped"
)),
SendRunError::Rejected { code, message } => ToolError::ExecutionFailed(format!(
"spawned pod `{pod_name}` rejected its initial task with {code:?}: {message}; the pod remains registered and can be inspected or stopped"
)),
SendRunError::Io(msg) => ToolError::ExecutionFailed(format!(
"spawned pod `{pod_name}` did not confirm initial task delivery: {msg}; the pod remains registered and can be inspected or stopped"
)),