diff --git a/crates/llm-worker/src/worker.rs b/crates/llm-worker/src/worker.rs index 2728838a..2bcb5f77 100644 --- a/crates/llm-worker/src/worker.rs +++ b/crates/llm-worker/src/worker.rs @@ -262,13 +262,17 @@ impl Worker { } fn drain_cancel_queue(&mut self) { - use tokio::sync::mpsc::error::TryRecvError; - loop { - match self.cancel_rx.try_recv() { - Ok(()) => continue, - Err(TryRecvError::Empty) | Err(TryRecvError::Disconnected) => break, - } - } + while self.cancel_rx.try_recv().is_ok() {} + } + + /// Discard pending cancellation notifications while the worker is idle. + /// + /// Cancellation is a running-turn control signal. Callers that own a higher + /// level run state can use this before starting a new turn so an old idle + /// signal does not poison the next request, while cancellation queued after + /// the run has been accepted remains observable by the turn loop. + pub fn clear_pending_cancel(&mut self) { + self.drain_cancel_queue(); } fn try_cancelled(&mut self) -> bool { @@ -1058,7 +1062,6 @@ impl Worker { /// Internal turn execution logic async fn run_turn_loop(&mut self) -> Result { self.reset_interruption_state(); - self.drain_cancel_queue(); let tool_definitions = self.build_tool_definitions(); info!( @@ -1363,9 +1366,27 @@ impl Worker { "elapsed_ms": stream_started.elapsed().as_millis() as u64, }), ); - match wait_for_first_stream_event(stream, DEFAULT_FIRST_STREAM_EVENT_TIMEOUT) - .await - { + let first_event_result = tokio::select! { + first_event = wait_for_first_stream_event(stream, DEFAULT_FIRST_STREAM_EVENT_TIMEOUT) => first_event, + cancel = self.cancel_rx.recv() => { + if cancel.is_some() { + info!("Cancelled before first stream event"); + } + self.emit_lifecycle_trace( + turn, + llm_call, + "stream_first_event_cancelled", + json!({ + "attempt": attempt, + "elapsed_ms": stream_started.elapsed().as_millis() as u64, + }), + ); + self.timeline.abort_current_block(); + self.last_run_interrupted = true; + return Err(WorkerError::Cancelled); + } + }; + match first_event_result { Ok(FirstStreamEvent::Ready(stream)) => return Ok(stream), Ok(FirstStreamEvent::Empty(stream)) => return Ok(stream), Err(err) => { @@ -1502,6 +1523,17 @@ impl Worker { } self.emit_stream_event(turn, llm_call, &event); self.timeline.dispatch(&event); + if let Event::Error(err) = &event { + self.timeline.abort_current_block(); + self.timeline.flush_usage(); + self.last_run_interrupted = true; + return Err(WorkerError::Client(ClientError::Api { + status: None, + code: err.code.clone(), + message: err.message.clone(), + retry_after: None, + })); + } } None => break, } diff --git a/crates/pod/src/controller.rs b/crates/pod/src/controller.rs index b61c35e1..822fda94 100644 --- a/crates/pod/src/controller.rs +++ b/crates/pod/src/controller.rs @@ -617,6 +617,11 @@ async fn controller_loop( // here so the status flip → drive_turn → finish sequence lives // in one place, regardless of which Method caused it. if let Some(run) = pending.take() { + // Cancellation is meaningful only for an accepted running turn. Clear + // idle/stale signals before the status flip; any Cancel/Pause received + // after this point is delivered to the turn and must not be discarded by + // the Worker at run start. + pod.worker_mut().clear_pending_cancel(); set_controller_status(&shared_state, &runtime_dir, &event_tx, PodStatus::Running).await; let parent_originated = run.is_parent_originated(); let (new_status, shutdown) = match run { diff --git a/crates/pod/tests/controller_test.rs b/crates/pod/tests/controller_test.rs index 41340376..b7933121 100644 --- a/crates/pod/tests/controller_test.rs +++ b/crates/pod/tests/controller_test.rs @@ -5,7 +5,7 @@ use std::sync::{Arc, Mutex}; use async_trait::async_trait; use futures::{Stream, StreamExt}; use llm_worker::Worker; -use llm_worker::llm_client::event::{Event as LlmEvent, ResponseStatus, StatusEvent}; +use llm_worker::llm_client::event::{ErrorEvent, Event as LlmEvent, ResponseStatus, StatusEvent}; use llm_worker::llm_client::types::Item; use llm_worker::llm_client::{ClientError, LlmClient, Request}; use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput}; @@ -254,6 +254,52 @@ async fn run_end_returns_to_idle_without_busy_status() { assert_eq!(handle.shared_state.get_status(), PodStatus::Idle); } +#[tokio::test] +async fn provider_stream_error_records_run_errored() { + let client = MockClient::new(vec![LlmEvent::Error(ErrorEvent { + code: Some("context_length_exceeded".into()), + message: "request too large".into(), + })]); + let pod = make_pod(client).await; + let handle = spawn_controller(pod).await; + let mut rx = handle.subscribe(); + + handle.send(Method::run_text("ping")).await.unwrap(); + + assert!( + drain_until(&mut rx, std::time::Duration::from_secs(2), |e| matches!( + e, + Event::Error { + code: protocol::ErrorCode::ProviderError, + message, + } if message.contains("context_length_exceeded") + )) + .await, + "provider stream error should be surfaced as a live provider error" + ); + wait_for_status(&handle, PodStatus::Idle).await; + + let (entries, _rx) = handle.sink.subscribe_with_snapshot(); + assert!( + entries.iter().any(|entry| matches!( + entry, + LogEntry::RunErrored { message, .. } + if message.contains("context_length_exceeded") + )), + "provider stream error should be persisted as RunErrored" + ); + assert!( + !entries.iter().any(|entry| matches!( + entry, + LogEntry::RunCompleted { + result: llm_worker::WorkerResult::Finished, + .. + } + )), + "provider stream error must not be recorded as a finished run" + ); +} + /// Mid-turn re-attach: a client connecting while the worker is still /// running observes the in-flight `UserInput` entry in the connect-time /// `Event::Snapshot`. This is the load-bearing property of the new diff --git a/work-items/open/20260529-205844-session-pod-state-boundary/thread.md b/work-items/open/20260529-205844-session-pod-state-boundary/thread.md index cfa351af..ff35e4b1 100644 --- a/work-items/open/20260529-205844-session-pod-state-boundary/thread.md +++ b/work-items/open/20260529-205844-session-pod-state-boundary/thread.md @@ -17,3 +17,15 @@ Artifacts: - `artifacts/review-r2.md` --- + + + +## Parent-side validation fix + +After merging the approved implementation, post-merge validation failed on `cargo test -p pod --test controller_test empty_turn_pause_rolls_back_and_snapshot_does_not_restore_input`. + +The parent took over the stopped/failed handoff and fixed the adjacent turn-control regression directly on main: cancellation received immediately after the controller accepts a run was being lost before the worker reached its first stream event wait, so empty turns could hang instead of rolling back. The fix preserves idle stale-cancel cleanup at the controller boundary and makes first-event waiting cancellation-aware. + +While investigating the child Pod's `context_length_exceeded` ping failure, the parent also fixed provider terminal stream errors so `Event::Error` is not only a live TUI event: terminal provider errors now fail the worker turn and persist `RunErrored` instead of allowing an empty `RunCompleted::Finished`. + +---