From 282a8572480d7ecc5c359d7ab2448eea339ac593 Mon Sep 17 00:00:00 2001 From: Hare Date: Fri, 15 May 2026 05:48:17 +0900 Subject: [PATCH] =?UTF-8?q?update:=20Paused=E2=86=92Run=20=E3=81=AE=20inte?= =?UTF-8?q?rrupt=20=E5=89=8D=E5=87=A6=E7=90=86=E3=82=92=20Pod::run=20?= =?UTF-8?q?=E3=81=AB=E5=86=85=E5=8C=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- crates/pod/src/controller.rs | 37 +++---------- ...interrupt_and_run.rs => interrupt_prep.rs} | 53 ++---------------- crates/pod/src/lib.rs | 2 +- crates/pod/src/pod.rs | 55 +++++++++++++++++-- crates/pod/tests/controller_test.rs | 7 ++- 5 files changed, 69 insertions(+), 85 deletions(-) rename crates/pod/src/{interrupt_and_run.rs => interrupt_prep.rs} (63%) diff --git a/crates/pod/src/controller.rs b/crates/pod/src/controller.rs index 7f821986..8c67f76f 100644 --- a/crates/pod/src/controller.rs +++ b/crates/pod/src/controller.rs @@ -94,7 +94,6 @@ async fn finish_controller_run( /// `pod.run_for_notification()` drains the NotifyBuffer on its own. enum PendingRun { Run(Vec), - InterruptAndRun(Vec), RunForNotification, Resume, } @@ -108,7 +107,7 @@ impl PendingRun { /// notify buffer (Notify / inbound PodEvent) and stays silent. fn is_parent_originated(&self) -> bool { match self { - PendingRun::Run(_) | PendingRun::InterruptAndRun(_) | PendingRun::Resume => true, + PendingRun::Run(_) | PendingRun::Resume => true, PendingRun::RunForNotification => false, } } @@ -536,21 +535,6 @@ async fn controller_loop( ) .await } - PendingRun::InterruptAndRun(input) => { - drive_turn( - pod.interrupt_and_run(input), - &mut method_rx, - &event_tx, - &cancel_tx, - &shared_state, - ¬ify_buffer, - self_parent_socket.as_ref(), - &spawner_name, - &spawned_registry, - parent_originated, - ) - .await - } PendingRun::RunForNotification => { drive_turn( pod.run_for_notification(), @@ -598,8 +582,7 @@ async fn controller_loop( match method { Method::Run { input } => { - let status_before = shared_state.get_status(); - if status_before == PodStatus::Running { + if shared_state.get_status() == PodStatus::Running { // Defensive: the inner select! inside drive_turn // already rejects `Run` while a turn is live, so // this branch is only reachable across a race window @@ -616,17 +599,16 @@ async fn controller_loop( // shared_state's `user_segments` is re-synced from // `pod` after the run completes, so we don't push // here. Workflow-invocation validation happens inside - // `Pod::run` / `Pod::interrupt_and_run`; on failure the - // turn errors out via `Event::Error { InvalidRequest }` - // before any UserInput is committed. + // `Pod::run`; on failure the turn errors out via + // `Event::Error { InvalidRequest }` before any + // UserInput is committed. Paused→Run cleanup (orphan + // tool_result closure + interrupt system note) is + // applied inside `Pod::run` itself when the worker's + // `last_run_interrupted` flag is set. let _ = event_tx.send(Event::UserMessage { segments: input.clone(), }); - pending = Some(if status_before == PodStatus::Paused { - PendingRun::InterruptAndRun(input) - } else { - PendingRun::Run(input) - }); + pending = Some(PendingRun::Run(input)); } Method::Notify { message } => { @@ -958,7 +940,6 @@ mod tests { #[test] fn pending_run_parent_origin_table() { assert!(PendingRun::Run(Vec::new()).is_parent_originated()); - assert!(PendingRun::InterruptAndRun(Vec::new()).is_parent_originated()); assert!(PendingRun::Resume.is_parent_originated()); assert!(!PendingRun::RunForNotification.is_parent_originated()); } diff --git a/crates/pod/src/interrupt_and_run.rs b/crates/pod/src/interrupt_prep.rs similarity index 63% rename from crates/pod/src/interrupt_and_run.rs rename to crates/pod/src/interrupt_prep.rs index 886f553c..226be8a4 100644 --- a/crates/pod/src/interrupt_and_run.rs +++ b/crates/pod/src/interrupt_prep.rs @@ -1,4 +1,5 @@ -//! Transition from `Paused` to a fresh turn via user input. +//! Pre-run cleanup that fires when a Pod transitions out of `Paused` +//! into a fresh turn via new user input. //! //! The previously in-flight turn is treated as finished. Any orphan //! `Item::ToolCall` (tool_use emitted by the LLM but whose tool did not @@ -6,59 +7,17 @@ //! `Item::ToolResult` so the next request is wire-valid under providers //! that require every `tool_use` to be followed by a matching //! `tool_result` (Anthropic). A short system note is then inserted so -//! the LLM understands the prior work was cut short, and finally the -//! user's new input is appended via `worker.run(input)`. +//! the LLM understands the prior work was cut short. Both side effects +//! happen at the front of `Pod::run` when +//! `worker.last_run_interrupted()` is set; see `Pod::apply_interrupt_prep`. use llm_worker::Item; -use llm_worker::llm_client::client::LlmClient; -use protocol::Segment; -use session_store::Store; - -use crate::pod::{Pod, PodError, PodRunResult}; #[cfg(test)] use crate::prompt::catalog::PromptCatalog; -impl Pod { - /// Close out the current (paused) turn and start a new one with `input`. - /// - /// Invoked by the controller when a `Method::Run` arrives while the - /// Pod is `Paused`. See module docs for the wire-compatibility - /// rationale around synthetic tool results. - pub async fn interrupt_and_run( - &mut self, - input: Vec, - ) -> Result { - // Validate before any side effects so a bad workflow slug does - // not leave half-applied interrupt prep (orphan closure + - // system note) in worker history. `Pod::run` validates again at - // its own entry; the duplicate call is cheap (read-only) and - // collapses naturally once `interrupt_and_run` folds into - // `Pod::run` (see ticket pod-interrupt-prep-internalize). - self.validate_workflow_invocations(&input)?; - - let tool_result_summary = self - .prompts() - .interrupt_tool_result_summary() - .map_err(PodError::from)?; - let system_note = self - .prompts() - .interrupt_system_note() - .map_err(PodError::from)?; - - let closures: Vec = - orphan_tool_result_closures(self.worker().history(), &tool_result_summary); - if !closures.is_empty() { - self.worker_mut().extend_history(closures); - } - self.worker_mut() - .push_item(Item::system_message(system_note)); - self.run(input).await - } -} - /// Build synthetic `Item::ToolResult` items for every unanswered /// `Item::ToolCall` in `history`, preserving order. -fn orphan_tool_result_closures(history: &[Item], summary: &str) -> Vec { +pub(crate) fn orphan_tool_result_closures(history: &[Item], summary: &str) -> Vec { let mut answered: std::collections::HashSet<&str> = std::collections::HashSet::new(); for item in history { if let Item::ToolResult { call_id, .. } = item { diff --git a/crates/pod/src/lib.rs b/crates/pod/src/lib.rs index 24c379a3..b929796c 100644 --- a/crates/pod/src/lib.rs +++ b/crates/pod/src/lib.rs @@ -11,7 +11,7 @@ pub mod spawn; pub mod workflow; mod factory; -mod interrupt_and_run; +mod interrupt_prep; mod permission; mod pod; diff --git a/crates/pod/src/pod.rs b/crates/pod/src/pod.rs index 83c93f16..654ba43a 100644 --- a/crates/pod/src/pod.rs +++ b/crates/pod/src/pod.rs @@ -1138,10 +1138,24 @@ impl Pod { pub async fn run(&mut self, input: Vec) -> Result { // Validate workflow invocations up front so an invalid slug // never commits a UserInput entry, never triggers pre-run - // compaction, and never half-applies interrupt prep when run - // from `interrupt_and_run`. Read-only against `workflow_registry`. + // compaction, and never half-applies interrupt prep when the + // previous turn was interrupted. Read-only against + // `workflow_registry`. self.validate_workflow_invocations(&input)?; + // Paused→Run transition: if the previous turn was cut short, + // any `Item::ToolCall` whose tool never produced a matching + // `ToolResult` is closed with a synthetic one, and a short + // system note explaining the interruption is appended — so the + // next request is wire-valid (Anthropic) and the LLM knows + // prior work was abandoned. Driven by the worker's own + // `last_run_interrupted` flag; `Pod::resume` reuses the prior + // context via a different entry point and never triggers this + // path. + if self.worker.as_ref().unwrap().last_run_interrupted() { + self.apply_interrupt_prep()?; + } + self.prepare_for_run().await?; // Persist the user input as typed segments before the worker @@ -1399,11 +1413,40 @@ impl Pod { Ok(out) } + /// Stage the post-interruption cleanup at the front of worker + /// history: close every unanswered `Item::ToolCall` with a synthetic + /// `Item::ToolResult` (Anthropic wire-validity), then append a + /// system note so the LLM understands the prior turn was cut + /// short. Called from `Pod::run` when the worker's + /// `last_run_interrupted` flag is set (i.e. the Pod just transitioned + /// out of Paused via a new user input). + fn apply_interrupt_prep(&mut self) -> Result<(), PodError> { + let tool_result_summary = self + .prompts() + .interrupt_tool_result_summary() + .map_err(PodError::from)?; + let system_note = self + .prompts() + .interrupt_system_note() + .map_err(PodError::from)?; + + let closures = crate::interrupt_prep::orphan_tool_result_closures( + self.worker().history(), + &tool_result_summary, + ); + if !closures.is_empty() { + self.worker_mut().extend_history(closures); + } + self.worker_mut() + .push_item(llm_worker::Item::system_message(system_note)); + Ok(()) + } + /// Validate explicit workflow invocations without reading dependency - /// bodies. Called from `Pod::run` / `Pod::interrupt_and_run` entry so - /// an invalid slug aborts the turn before any session-log commit or - /// interrupt-prep side effects; `pub` so completion / preview paths - /// can also dry-check inputs. + /// bodies. Called from `Pod::run` entry so an invalid slug aborts + /// the turn before any session-log commit or interrupt-prep side + /// effects; `pub` so completion / preview paths can also dry-check + /// inputs. pub fn validate_workflow_invocations( &self, segments: &[Segment], diff --git a/crates/pod/tests/controller_test.rs b/crates/pod/tests/controller_test.rs index a9cdbbb8..2d97c5c6 100644 --- a/crates/pod/tests/controller_test.rs +++ b/crates/pod/tests/controller_test.rs @@ -1367,9 +1367,10 @@ async fn paused_then_run_closes_orphan_tool_use_for_next_request() { tokio::time::sleep(std::time::Duration::from_millis(50)).await; assert_eq!(handle.shared_state.get_status(), PodStatus::Paused); - // New user input while Paused → controller routes to - // `Pod::interrupt_and_run`, which closes the orphan + injects a - // system note before the fresh user message. + // New user input while Paused → `Pod::run` observes + // `last_run_interrupted` and runs its interrupt-prep step, which + // closes the orphan + injects a system note before the fresh user + // message. handle.send(Method::run_text("new request")).await.unwrap(); assert!( drain_until(&mut rx, std::time::Duration::from_secs(2), |e| matches!(