fix: preserve terminal turn failures

This commit is contained in:
Keisuke Hirata 2026-05-30 09:02:11 +09:00
parent e37c151f0e
commit a47f2c4689
No known key found for this signature in database
4 changed files with 107 additions and 12 deletions

View File

@ -262,13 +262,17 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
}
fn drain_cancel_queue(&mut self) {
use tokio::sync::mpsc::error::TryRecvError;
loop {
match self.cancel_rx.try_recv() {
Ok(()) => continue,
Err(TryRecvError::Empty) | Err(TryRecvError::Disconnected) => break,
}
}
while self.cancel_rx.try_recv().is_ok() {}
}
/// Discard pending cancellation notifications while the worker is idle.
///
/// Cancellation is a running-turn control signal. Callers that own a higher
/// level run state can use this before starting a new turn so an old idle
/// signal does not poison the next request, while cancellation queued after
/// the run has been accepted remains observable by the turn loop.
pub fn clear_pending_cancel(&mut self) {
self.drain_cancel_queue();
}
fn try_cancelled(&mut self) -> bool {
@ -1058,7 +1062,6 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
/// Internal turn execution logic
async fn run_turn_loop(&mut self) -> Result<WorkerResult, WorkerError> {
self.reset_interruption_state();
self.drain_cancel_queue();
let tool_definitions = self.build_tool_definitions();
info!(
@ -1363,9 +1366,27 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
"elapsed_ms": stream_started.elapsed().as_millis() as u64,
}),
);
match wait_for_first_stream_event(stream, DEFAULT_FIRST_STREAM_EVENT_TIMEOUT)
.await
{
let first_event_result = tokio::select! {
first_event = wait_for_first_stream_event(stream, DEFAULT_FIRST_STREAM_EVENT_TIMEOUT) => first_event,
cancel = self.cancel_rx.recv() => {
if cancel.is_some() {
info!("Cancelled before first stream event");
}
self.emit_lifecycle_trace(
turn,
llm_call,
"stream_first_event_cancelled",
json!({
"attempt": attempt,
"elapsed_ms": stream_started.elapsed().as_millis() as u64,
}),
);
self.timeline.abort_current_block();
self.last_run_interrupted = true;
return Err(WorkerError::Cancelled);
}
};
match first_event_result {
Ok(FirstStreamEvent::Ready(stream)) => return Ok(stream),
Ok(FirstStreamEvent::Empty(stream)) => return Ok(stream),
Err(err) => {
@ -1502,6 +1523,17 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
}
self.emit_stream_event(turn, llm_call, &event);
self.timeline.dispatch(&event);
if let Event::Error(err) = &event {
self.timeline.abort_current_block();
self.timeline.flush_usage();
self.last_run_interrupted = true;
return Err(WorkerError::Client(ClientError::Api {
status: None,
code: err.code.clone(),
message: err.message.clone(),
retry_after: None,
}));
}
}
None => break,
}

View File

@ -617,6 +617,11 @@ async fn controller_loop<C, St>(
// here so the status flip → drive_turn → finish sequence lives
// in one place, regardless of which Method caused it.
if let Some(run) = pending.take() {
// Cancellation is meaningful only for an accepted running turn. Clear
// idle/stale signals before the status flip; any Cancel/Pause received
// after this point is delivered to the turn and must not be discarded by
// the Worker at run start.
pod.worker_mut().clear_pending_cancel();
set_controller_status(&shared_state, &runtime_dir, &event_tx, PodStatus::Running).await;
let parent_originated = run.is_parent_originated();
let (new_status, shutdown) = match run {

View File

@ -5,7 +5,7 @@ use std::sync::{Arc, Mutex};
use async_trait::async_trait;
use futures::{Stream, StreamExt};
use llm_worker::Worker;
use llm_worker::llm_client::event::{Event as LlmEvent, ResponseStatus, StatusEvent};
use llm_worker::llm_client::event::{ErrorEvent, Event as LlmEvent, ResponseStatus, StatusEvent};
use llm_worker::llm_client::types::Item;
use llm_worker::llm_client::{ClientError, LlmClient, Request};
use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput};
@ -254,6 +254,52 @@ async fn run_end_returns_to_idle_without_busy_status() {
assert_eq!(handle.shared_state.get_status(), PodStatus::Idle);
}
#[tokio::test]
async fn provider_stream_error_records_run_errored() {
let client = MockClient::new(vec![LlmEvent::Error(ErrorEvent {
code: Some("context_length_exceeded".into()),
message: "request too large".into(),
})]);
let pod = make_pod(client).await;
let handle = spawn_controller(pod).await;
let mut rx = handle.subscribe();
handle.send(Method::run_text("ping")).await.unwrap();
assert!(
drain_until(&mut rx, std::time::Duration::from_secs(2), |e| matches!(
e,
Event::Error {
code: protocol::ErrorCode::ProviderError,
message,
} if message.contains("context_length_exceeded")
))
.await,
"provider stream error should be surfaced as a live provider error"
);
wait_for_status(&handle, PodStatus::Idle).await;
let (entries, _rx) = handle.sink.subscribe_with_snapshot();
assert!(
entries.iter().any(|entry| matches!(
entry,
LogEntry::RunErrored { message, .. }
if message.contains("context_length_exceeded")
)),
"provider stream error should be persisted as RunErrored"
);
assert!(
!entries.iter().any(|entry| matches!(
entry,
LogEntry::RunCompleted {
result: llm_worker::WorkerResult::Finished,
..
}
)),
"provider stream error must not be recorded as a finished run"
);
}
/// Mid-turn re-attach: a client connecting while the worker is still
/// running observes the in-flight `UserInput` entry in the connect-time
/// `Event::Snapshot`. This is the load-bearing property of the new

View File

@ -17,3 +17,15 @@ Artifacts:
- `artifacts/review-r2.md`
---
<!-- event: fix author: insomnia at: 2026-05-30T00:08:00Z -->
## Parent-side validation fix
After merging the approved implementation, post-merge validation failed on `cargo test -p pod --test controller_test empty_turn_pause_rolls_back_and_snapshot_does_not_restore_input`.
The parent took over the stopped/failed handoff and fixed the adjacent turn-control regression directly on main: cancellation received immediately after the controller accepts a run was being lost before the worker reached its first stream event wait, so empty turns could hang instead of rolling back. The fix preserves idle stale-cancel cleanup at the controller boundary and makes first-event waiting cancellation-aware.
While investigating the child Pod's `context_length_exceeded` ping failure, the parent also fixed provider terminal stream errors so `Event::Error` is not only a live TUI event: terminal provider errors now fail the worker turn and persist `RunErrored` instead of allowing an empty `RunCompleted::Finished`.
---