fix: confirm initial SpawnPod run delivery
This commit is contained in:
parent
3658242bbc
commit
2b4bdda89c
|
|
@ -548,10 +548,20 @@ async fn probe_socket(socket_path: &Path) -> LiveInfo {
|
|||
Ok(Ok(stream)) => {
|
||||
let (r, _w) = stream.into_split();
|
||||
let mut reader = JsonLineReader::new(r);
|
||||
let status = match tokio::time::timeout(PROBE_TIMEOUT, reader.next::<Event>()).await {
|
||||
Ok(Ok(Some(Event::Snapshot { status, .. }))) => Some(status),
|
||||
_ => None,
|
||||
};
|
||||
let mut status = None;
|
||||
loop {
|
||||
match tokio::time::timeout(PROBE_TIMEOUT, reader.next::<Event>()).await {
|
||||
Ok(Ok(Some(Event::Snapshot {
|
||||
status: snapshot_status,
|
||||
..
|
||||
}))) => {
|
||||
status = Some(snapshot_status);
|
||||
break;
|
||||
}
|
||||
Ok(Ok(Some(Event::Alert(_)))) => continue,
|
||||
Ok(Ok(Some(_))) | Ok(Ok(None)) | Ok(Err(_)) | Err(_) => break,
|
||||
}
|
||||
}
|
||||
LiveInfo {
|
||||
socket_path: socket_path.to_path_buf(),
|
||||
reachable: true,
|
||||
|
|
@ -755,8 +765,8 @@ mod tests {
|
|||
use std::sync::Mutex;
|
||||
|
||||
use manifest::{Permission, ScopeRule};
|
||||
use protocol::Greeting;
|
||||
use protocol::stream::JsonLineWriter;
|
||||
use protocol::{Alert, AlertLevel, AlertSource, Greeting};
|
||||
use session_store::{
|
||||
FsStore, PodSpawnedChild, PodSpawnedScopeRule, new_segment_id, new_session_id,
|
||||
};
|
||||
|
|
@ -931,6 +941,48 @@ mod tests {
|
|||
live_listener.abort();
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "current_thread")]
|
||||
async fn probe_socket_reads_status_after_replayed_alert() {
|
||||
let root = TempDir::new().unwrap();
|
||||
let socket = root.path().join("pod.sock");
|
||||
let listener = UnixListener::bind(&socket).unwrap();
|
||||
let handle = tokio::spawn(async move {
|
||||
let (stream, _) = listener.accept().await.unwrap();
|
||||
let mut writer = JsonLineWriter::new(stream);
|
||||
writer
|
||||
.write(&Event::Alert(Alert {
|
||||
level: AlertLevel::Warn,
|
||||
source: AlertSource::Pod,
|
||||
message: "replayed alert".into(),
|
||||
timestamp_ms: 0,
|
||||
}))
|
||||
.await
|
||||
.unwrap();
|
||||
writer
|
||||
.write(&Event::Snapshot {
|
||||
entries: Vec::new(),
|
||||
greeting: Greeting {
|
||||
pod_name: "alerted".into(),
|
||||
cwd: "/tmp".into(),
|
||||
provider: "test".into(),
|
||||
model: "test".into(),
|
||||
scope_summary: String::new(),
|
||||
tools: Vec::new(),
|
||||
context_window: 0,
|
||||
context_tokens: 0,
|
||||
},
|
||||
status: PodStatus::Paused,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
});
|
||||
|
||||
let info = probe_socket(&socket).await;
|
||||
assert!(info.reachable);
|
||||
assert!(matches!(info.status, Some(PodStatus::Paused)));
|
||||
handle.await.unwrap();
|
||||
}
|
||||
|
||||
fn child(name: &str, socket_path: &Path) -> PodSpawnedChild {
|
||||
PodSpawnedChild {
|
||||
pod_name: name.to_string(),
|
||||
|
|
|
|||
|
|
@ -79,6 +79,10 @@ impl Tool for SendToPodTool {
|
|||
"pod `{}` is already running a turn; wait for it to finish and retry",
|
||||
input.name
|
||||
)),
|
||||
SendRunError::Rejected { code, message } => ToolError::ExecutionFailed(format!(
|
||||
"pod `{}` rejected the run with {code:?}: {message}",
|
||||
input.name
|
||||
)),
|
||||
SendRunError::Io(msg) => {
|
||||
ToolError::ExecutionFailed(format!("send to `{}`: {msg}", input.name))
|
||||
}
|
||||
|
|
@ -370,16 +374,20 @@ pub(crate) enum SendRunError {
|
|||
/// Target Pod responded with `Error { AlreadyRunning }` — the
|
||||
/// caller can retry once the current turn ends.
|
||||
AlreadyRunning,
|
||||
/// Any other failure (connect / write / read / unexpected EOF).
|
||||
/// Target Pod explicitly rejected the run after delivery reached the
|
||||
/// controller.
|
||||
Rejected { code: ErrorCode, message: String },
|
||||
/// Transport, protocol, timeout, or unexpected EOF before acceptance
|
||||
/// evidence was observed.
|
||||
Io(String),
|
||||
}
|
||||
|
||||
/// Write `Method::Run` to the target and read back events until we see
|
||||
/// evidence that the controller accepted the run (`UserMessage`,
|
||||
/// `TurnStart`, or a user-send `InvokeStart`) or rejected it with
|
||||
/// `Error { AlreadyRunning }`. Any connect-time Snapshot or replayed alerts
|
||||
/// that precede the response are skipped. Times out per-read so a stuck Pod
|
||||
/// doesn't hang the tool.
|
||||
/// `TurnStart`, or a user-send `InvokeStart`) or rejected it. The connect-time
|
||||
/// event prelude is drained before sending the method so large Snapshots and
|
||||
/// large Run payloads cannot block each other on the same socket. Times out
|
||||
/// per operation so a stuck Pod doesn't hang the tool.
|
||||
pub(crate) async fn send_run_and_confirm(socket: &Path, input: String) -> Result<(), SendRunError> {
|
||||
let stream = tokio::time::timeout(SOCKET_OP_TIMEOUT, UnixStream::connect(socket))
|
||||
.await
|
||||
|
|
@ -388,6 +396,31 @@ pub(crate) async fn send_run_and_confirm(socket: &Path, input: String) -> Result
|
|||
let (r, w) = stream.into_split();
|
||||
let mut writer = JsonLineWriter::new(w);
|
||||
let mut reader = JsonLineReader::new(r);
|
||||
|
||||
loop {
|
||||
let event = tokio::time::timeout(SOCKET_OP_TIMEOUT, reader.next::<Event>())
|
||||
.await
|
||||
.map_err(|_| SendRunError::Io("read initial Snapshot timed out".into()))?
|
||||
.map_err(|e| SendRunError::Io(format!("read initial Snapshot: {e}")))?;
|
||||
match event {
|
||||
Some(Event::Snapshot { .. }) => break,
|
||||
Some(Event::Alert(_)) => continue,
|
||||
Some(Event::Error {
|
||||
code: ErrorCode::AlreadyRunning,
|
||||
..
|
||||
}) => return Err(SendRunError::AlreadyRunning),
|
||||
Some(Event::Error { code, message }) => {
|
||||
return Err(SendRunError::Rejected { code, message });
|
||||
}
|
||||
Some(_) => continue,
|
||||
None => {
|
||||
return Err(SendRunError::Io(
|
||||
"connection closed before initial Snapshot".into(),
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tokio::time::timeout(
|
||||
SOCKET_OP_TIMEOUT,
|
||||
writer.write(&Method::Run {
|
||||
|
|
@ -400,26 +433,23 @@ pub(crate) async fn send_run_and_confirm(socket: &Path, input: String) -> Result
|
|||
loop {
|
||||
let event = tokio::time::timeout(SOCKET_OP_TIMEOUT, reader.next::<Event>())
|
||||
.await
|
||||
.map_err(|_| SendRunError::Io("read timed out".into()))?
|
||||
.map_err(|e| SendRunError::Io(format!("read: {e}")))?;
|
||||
.map_err(|_| SendRunError::Io("read response timed out".into()))?
|
||||
.map_err(|e| SendRunError::Io(format!("read response: {e}")))?;
|
||||
match event {
|
||||
Some(Event::Error {
|
||||
code: ErrorCode::AlreadyRunning,
|
||||
..
|
||||
}) => return Err(SendRunError::AlreadyRunning),
|
||||
Some(Event::Error { code, message }) => {
|
||||
return Err(SendRunError::Io(format!(
|
||||
"pod returned {code:?}: {message}"
|
||||
)));
|
||||
return Err(SendRunError::Rejected { code, message });
|
||||
}
|
||||
Some(Event::InvokeStart {
|
||||
kind: InvokeKind::UserSend,
|
||||
})
|
||||
| Some(Event::UserMessage { .. })
|
||||
| Some(Event::TurnStart { .. }) => return Ok(()),
|
||||
// Alerts, Snapshot, and other pre-turn events can precede the
|
||||
// controller's response; keep reading until the Run is accepted
|
||||
// or rejected.
|
||||
// Other post-Snapshot events can race with the controller's
|
||||
// response; keep reading until the Run is accepted or rejected.
|
||||
Some(_) => continue,
|
||||
None => return Err(SendRunError::Io("connection closed before response".into())),
|
||||
}
|
||||
|
|
@ -618,6 +648,47 @@ mod tests {
|
|||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn send_run_and_confirm_drains_alert_and_large_snapshot_before_large_run() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
let socket = tmp.path().join("pod.sock");
|
||||
let listener = UnixListener::bind(&socket).unwrap();
|
||||
let large_snapshot_payload = "s".repeat(2 * 1024 * 1024);
|
||||
let large_run_payload = "r".repeat(2 * 1024 * 1024);
|
||||
let received = serve_initial_events_then_run_ack(
|
||||
listener,
|
||||
vec![
|
||||
Event::Alert(Alert {
|
||||
level: AlertLevel::Warn,
|
||||
source: AlertSource::Pod,
|
||||
message: "replayed alert".into(),
|
||||
timestamp_ms: 0,
|
||||
}),
|
||||
snapshot(vec![
|
||||
serde_json::json!({ "payload": large_snapshot_payload }),
|
||||
]),
|
||||
],
|
||||
Event::InvokeStart {
|
||||
kind: InvokeKind::UserSend,
|
||||
},
|
||||
);
|
||||
|
||||
send_run_and_confirm(&socket, large_run_payload.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let method = received.await.unwrap().expect("expected method");
|
||||
match method {
|
||||
Method::Run { input } => {
|
||||
assert_eq!(
|
||||
protocol::Segment::flatten_to_text(&input),
|
||||
large_run_payload
|
||||
);
|
||||
}
|
||||
other => panic!("expected Run, got {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn send_run_and_confirm_reports_already_running() {
|
||||
let tmp = TempDir::new().unwrap();
|
||||
|
|
|
|||
|
|
@ -464,6 +464,9 @@ fn spawn_delivery_error(pod_name: &str, err: SendRunError) -> ToolError {
|
|||
SendRunError::AlreadyRunning => ToolError::ExecutionFailed(format!(
|
||||
"spawned pod `{pod_name}` rejected its initial task as already running; the pod remains registered and can be inspected or stopped"
|
||||
)),
|
||||
SendRunError::Rejected { code, message } => ToolError::ExecutionFailed(format!(
|
||||
"spawned pod `{pod_name}` rejected its initial task with {code:?}: {message}; the pod remains registered and can be inspected or stopped"
|
||||
)),
|
||||
SendRunError::Io(msg) => ToolError::ExecutionFailed(format!(
|
||||
"spawned pod `{pod_name}` did not confirm initial task delivery: {msg}; the pod remains registered and can be inspected or stopped"
|
||||
)),
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user