update: Paused→Run の interrupt 前処理を Pod::run に内包

This commit is contained in:
Keisuke Hirata 2026-05-15 05:48:17 +09:00
parent d710cac879
commit 282a857248
No known key found for this signature in database
5 changed files with 69 additions and 85 deletions

View File

@ -94,7 +94,6 @@ async fn finish_controller_run<C, St>(
/// `pod.run_for_notification()` drains the NotifyBuffer on its own.
enum PendingRun {
Run(Vec<Segment>),
InterruptAndRun(Vec<Segment>),
RunForNotification,
Resume,
}
@ -108,7 +107,7 @@ impl PendingRun {
/// notify buffer (Notify / inbound PodEvent) and stays silent.
fn is_parent_originated(&self) -> bool {
match self {
PendingRun::Run(_) | PendingRun::InterruptAndRun(_) | PendingRun::Resume => true,
PendingRun::Run(_) | PendingRun::Resume => true,
PendingRun::RunForNotification => false,
}
}
@ -536,21 +535,6 @@ async fn controller_loop<C, St>(
)
.await
}
PendingRun::InterruptAndRun(input) => {
drive_turn(
pod.interrupt_and_run(input),
&mut method_rx,
&event_tx,
&cancel_tx,
&shared_state,
&notify_buffer,
self_parent_socket.as_ref(),
&spawner_name,
&spawned_registry,
parent_originated,
)
.await
}
PendingRun::RunForNotification => {
drive_turn(
pod.run_for_notification(),
@ -598,8 +582,7 @@ async fn controller_loop<C, St>(
match method {
Method::Run { input } => {
let status_before = shared_state.get_status();
if status_before == PodStatus::Running {
if shared_state.get_status() == PodStatus::Running {
// Defensive: the inner select! inside drive_turn
// already rejects `Run` while a turn is live, so
// this branch is only reachable across a race window
@ -616,17 +599,16 @@ async fn controller_loop<C, St>(
// shared_state's `user_segments` is re-synced from
// `pod` after the run completes, so we don't push
// here. Workflow-invocation validation happens inside
// `Pod::run` / `Pod::interrupt_and_run`; on failure the
// turn errors out via `Event::Error { InvalidRequest }`
// before any UserInput is committed.
// `Pod::run`; on failure the turn errors out via
// `Event::Error { InvalidRequest }` before any
// UserInput is committed. Paused→Run cleanup (orphan
// tool_result closure + interrupt system note) is
// applied inside `Pod::run` itself when the worker's
// `last_run_interrupted` flag is set.
let _ = event_tx.send(Event::UserMessage {
segments: input.clone(),
});
pending = Some(if status_before == PodStatus::Paused {
PendingRun::InterruptAndRun(input)
} else {
PendingRun::Run(input)
});
pending = Some(PendingRun::Run(input));
}
Method::Notify { message } => {
@ -958,7 +940,6 @@ mod tests {
#[test]
fn pending_run_parent_origin_table() {
assert!(PendingRun::Run(Vec::new()).is_parent_originated());
assert!(PendingRun::InterruptAndRun(Vec::new()).is_parent_originated());
assert!(PendingRun::Resume.is_parent_originated());
assert!(!PendingRun::RunForNotification.is_parent_originated());
}

View File

@ -1,4 +1,5 @@
//! Transition from `Paused` to a fresh turn via user input.
//! Pre-run cleanup that fires when a Pod transitions out of `Paused`
//! into a fresh turn via new user input.
//!
//! The previously in-flight turn is treated as finished. Any orphan
//! `Item::ToolCall` (tool_use emitted by the LLM but whose tool did not
@ -6,59 +7,17 @@
//! `Item::ToolResult` so the next request is wire-valid under providers
//! that require every `tool_use` to be followed by a matching
//! `tool_result` (Anthropic). A short system note is then inserted so
//! the LLM understands the prior work was cut short, and finally the
//! user's new input is appended via `worker.run(input)`.
//! the LLM understands the prior work was cut short. Both side effects
//! happen at the front of `Pod::run` when
//! `worker.last_run_interrupted()` is set; see `Pod::apply_interrupt_prep`.
use llm_worker::Item;
use llm_worker::llm_client::client::LlmClient;
use protocol::Segment;
use session_store::Store;
use crate::pod::{Pod, PodError, PodRunResult};
#[cfg(test)]
use crate::prompt::catalog::PromptCatalog;
impl<C: LlmClient, St: Store> Pod<C, St> {
/// Close out the current (paused) turn and start a new one with `input`.
///
/// Invoked by the controller when a `Method::Run` arrives while the
/// Pod is `Paused`. See module docs for the wire-compatibility
/// rationale around synthetic tool results.
pub async fn interrupt_and_run(
&mut self,
input: Vec<Segment>,
) -> Result<PodRunResult, PodError> {
// Validate before any side effects so a bad workflow slug does
// not leave half-applied interrupt prep (orphan closure +
// system note) in worker history. `Pod::run` validates again at
// its own entry; the duplicate call is cheap (read-only) and
// collapses naturally once `interrupt_and_run` folds into
// `Pod::run` (see ticket pod-interrupt-prep-internalize).
self.validate_workflow_invocations(&input)?;
let tool_result_summary = self
.prompts()
.interrupt_tool_result_summary()
.map_err(PodError::from)?;
let system_note = self
.prompts()
.interrupt_system_note()
.map_err(PodError::from)?;
let closures: Vec<Item> =
orphan_tool_result_closures(self.worker().history(), &tool_result_summary);
if !closures.is_empty() {
self.worker_mut().extend_history(closures);
}
self.worker_mut()
.push_item(Item::system_message(system_note));
self.run(input).await
}
}
/// Build synthetic `Item::ToolResult` items for every unanswered
/// `Item::ToolCall` in `history`, preserving order.
fn orphan_tool_result_closures(history: &[Item], summary: &str) -> Vec<Item> {
pub(crate) fn orphan_tool_result_closures(history: &[Item], summary: &str) -> Vec<Item> {
let mut answered: std::collections::HashSet<&str> = std::collections::HashSet::new();
for item in history {
if let Item::ToolResult { call_id, .. } = item {

View File

@ -11,7 +11,7 @@ pub mod spawn;
pub mod workflow;
mod factory;
mod interrupt_and_run;
mod interrupt_prep;
mod permission;
mod pod;

View File

@ -1138,10 +1138,24 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
pub async fn run(&mut self, input: Vec<Segment>) -> Result<PodRunResult, PodError> {
// Validate workflow invocations up front so an invalid slug
// never commits a UserInput entry, never triggers pre-run
// compaction, and never half-applies interrupt prep when run
// from `interrupt_and_run`. Read-only against `workflow_registry`.
// compaction, and never half-applies interrupt prep when the
// previous turn was interrupted. Read-only against
// `workflow_registry`.
self.validate_workflow_invocations(&input)?;
// Paused→Run transition: if the previous turn was cut short,
// any `Item::ToolCall` whose tool never produced a matching
// `ToolResult` is closed with a synthetic one, and a short
// system note explaining the interruption is appended — so the
// next request is wire-valid (Anthropic) and the LLM knows
// prior work was abandoned. Driven by the worker's own
// `last_run_interrupted` flag; `Pod::resume` reuses the prior
// context via a different entry point and never triggers this
// path.
if self.worker.as_ref().unwrap().last_run_interrupted() {
self.apply_interrupt_prep()?;
}
self.prepare_for_run().await?;
// Persist the user input as typed segments before the worker
@ -1399,11 +1413,40 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
Ok(out)
}
/// Stage the post-interruption cleanup at the front of worker
/// history: close every unanswered `Item::ToolCall` with a synthetic
/// `Item::ToolResult` (Anthropic wire-validity), then append a
/// system note so the LLM understands the prior turn was cut
/// short. Called from `Pod::run` when the worker's
/// `last_run_interrupted` flag is set (i.e. the Pod just transitioned
/// out of Paused via a new user input).
fn apply_interrupt_prep(&mut self) -> Result<(), PodError> {
let tool_result_summary = self
.prompts()
.interrupt_tool_result_summary()
.map_err(PodError::from)?;
let system_note = self
.prompts()
.interrupt_system_note()
.map_err(PodError::from)?;
let closures = crate::interrupt_prep::orphan_tool_result_closures(
self.worker().history(),
&tool_result_summary,
);
if !closures.is_empty() {
self.worker_mut().extend_history(closures);
}
self.worker_mut()
.push_item(llm_worker::Item::system_message(system_note));
Ok(())
}
/// Validate explicit workflow invocations without reading dependency
/// bodies. Called from `Pod::run` / `Pod::interrupt_and_run` entry so
/// an invalid slug aborts the turn before any session-log commit or
/// interrupt-prep side effects; `pub` so completion / preview paths
/// can also dry-check inputs.
/// bodies. Called from `Pod::run` entry so an invalid slug aborts
/// the turn before any session-log commit or interrupt-prep side
/// effects; `pub` so completion / preview paths can also dry-check
/// inputs.
pub fn validate_workflow_invocations(
&self,
segments: &[Segment],

View File

@ -1367,9 +1367,10 @@ async fn paused_then_run_closes_orphan_tool_use_for_next_request() {
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
assert_eq!(handle.shared_state.get_status(), PodStatus::Paused);
// New user input while Paused → controller routes to
// `Pod::interrupt_and_run`, which closes the orphan + injects a
// system note before the fresh user message.
// New user input while Paused → `Pod::run` observes
// `last_run_interrupted` and runs its interrupt-prep step, which
// closes the orphan + injects a system note before the fresh user
// message.
handle.send(Method::run_text("new request")).await.unwrap();
assert!(
drain_until(&mut rx, std::time::Duration::from_secs(2), |e| matches!(