update: Paused→Run の interrupt 前処理を Pod::run に内包
This commit is contained in:
parent
ce6085b5f4
commit
4c8596db38
|
|
@ -94,7 +94,6 @@ async fn finish_controller_run<C, St>(
|
|||
/// `pod.run_for_notification()` drains the NotifyBuffer on its own.
|
||||
enum PendingRun {
|
||||
Run(Vec<Segment>),
|
||||
InterruptAndRun(Vec<Segment>),
|
||||
RunForNotification,
|
||||
Resume,
|
||||
}
|
||||
|
|
@ -108,7 +107,7 @@ impl PendingRun {
|
|||
/// notify buffer (Notify / inbound PodEvent) and stays silent.
|
||||
fn is_parent_originated(&self) -> bool {
|
||||
match self {
|
||||
PendingRun::Run(_) | PendingRun::InterruptAndRun(_) | PendingRun::Resume => true,
|
||||
PendingRun::Run(_) | PendingRun::Resume => true,
|
||||
PendingRun::RunForNotification => false,
|
||||
}
|
||||
}
|
||||
|
|
@ -536,21 +535,6 @@ async fn controller_loop<C, St>(
|
|||
)
|
||||
.await
|
||||
}
|
||||
PendingRun::InterruptAndRun(input) => {
|
||||
drive_turn(
|
||||
pod.interrupt_and_run(input),
|
||||
&mut method_rx,
|
||||
&event_tx,
|
||||
&cancel_tx,
|
||||
&shared_state,
|
||||
¬ify_buffer,
|
||||
self_parent_socket.as_ref(),
|
||||
&spawner_name,
|
||||
&spawned_registry,
|
||||
parent_originated,
|
||||
)
|
||||
.await
|
||||
}
|
||||
PendingRun::RunForNotification => {
|
||||
drive_turn(
|
||||
pod.run_for_notification(),
|
||||
|
|
@ -598,8 +582,7 @@ async fn controller_loop<C, St>(
|
|||
|
||||
match method {
|
||||
Method::Run { input } => {
|
||||
let status_before = shared_state.get_status();
|
||||
if status_before == PodStatus::Running {
|
||||
if shared_state.get_status() == PodStatus::Running {
|
||||
// Defensive: the inner select! inside drive_turn
|
||||
// already rejects `Run` while a turn is live, so
|
||||
// this branch is only reachable across a race window
|
||||
|
|
@ -616,17 +599,16 @@ async fn controller_loop<C, St>(
|
|||
// shared_state's `user_segments` is re-synced from
|
||||
// `pod` after the run completes, so we don't push
|
||||
// here. Workflow-invocation validation happens inside
|
||||
// `Pod::run` / `Pod::interrupt_and_run`; on failure the
|
||||
// turn errors out via `Event::Error { InvalidRequest }`
|
||||
// before any UserInput is committed.
|
||||
// `Pod::run`; on failure the turn errors out via
|
||||
// `Event::Error { InvalidRequest }` before any
|
||||
// UserInput is committed. Paused→Run cleanup (orphan
|
||||
// tool_result closure + interrupt system note) is
|
||||
// applied inside `Pod::run` itself when the worker's
|
||||
// `last_run_interrupted` flag is set.
|
||||
let _ = event_tx.send(Event::UserMessage {
|
||||
segments: input.clone(),
|
||||
});
|
||||
pending = Some(if status_before == PodStatus::Paused {
|
||||
PendingRun::InterruptAndRun(input)
|
||||
} else {
|
||||
PendingRun::Run(input)
|
||||
});
|
||||
pending = Some(PendingRun::Run(input));
|
||||
}
|
||||
|
||||
Method::Notify { message } => {
|
||||
|
|
@ -958,7 +940,6 @@ mod tests {
|
|||
#[test]
|
||||
fn pending_run_parent_origin_table() {
|
||||
assert!(PendingRun::Run(Vec::new()).is_parent_originated());
|
||||
assert!(PendingRun::InterruptAndRun(Vec::new()).is_parent_originated());
|
||||
assert!(PendingRun::Resume.is_parent_originated());
|
||||
assert!(!PendingRun::RunForNotification.is_parent_originated());
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,4 +1,5 @@
|
|||
//! Transition from `Paused` to a fresh turn via user input.
|
||||
//! Pre-run cleanup that fires when a Pod transitions out of `Paused`
|
||||
//! into a fresh turn via new user input.
|
||||
//!
|
||||
//! The previously in-flight turn is treated as finished. Any orphan
|
||||
//! `Item::ToolCall` (tool_use emitted by the LLM but whose tool did not
|
||||
|
|
@ -6,59 +7,17 @@
|
|||
//! `Item::ToolResult` so the next request is wire-valid under providers
|
||||
//! that require every `tool_use` to be followed by a matching
|
||||
//! `tool_result` (Anthropic). A short system note is then inserted so
|
||||
//! the LLM understands the prior work was cut short, and finally the
|
||||
//! user's new input is appended via `worker.run(input)`.
|
||||
//! the LLM understands the prior work was cut short. Both side effects
|
||||
//! happen at the front of `Pod::run` when
|
||||
//! `worker.last_run_interrupted()` is set; see `Pod::apply_interrupt_prep`.
|
||||
|
||||
use llm_worker::Item;
|
||||
use llm_worker::llm_client::client::LlmClient;
|
||||
use protocol::Segment;
|
||||
use session_store::Store;
|
||||
|
||||
use crate::pod::{Pod, PodError, PodRunResult};
|
||||
#[cfg(test)]
|
||||
use crate::prompt::catalog::PromptCatalog;
|
||||
|
||||
impl<C: LlmClient, St: Store> Pod<C, St> {
|
||||
/// Close out the current (paused) turn and start a new one with `input`.
|
||||
///
|
||||
/// Invoked by the controller when a `Method::Run` arrives while the
|
||||
/// Pod is `Paused`. See module docs for the wire-compatibility
|
||||
/// rationale around synthetic tool results.
|
||||
pub async fn interrupt_and_run(
|
||||
&mut self,
|
||||
input: Vec<Segment>,
|
||||
) -> Result<PodRunResult, PodError> {
|
||||
// Validate before any side effects so a bad workflow slug does
|
||||
// not leave half-applied interrupt prep (orphan closure +
|
||||
// system note) in worker history. `Pod::run` validates again at
|
||||
// its own entry; the duplicate call is cheap (read-only) and
|
||||
// collapses naturally once `interrupt_and_run` folds into
|
||||
// `Pod::run` (see ticket pod-interrupt-prep-internalize).
|
||||
self.validate_workflow_invocations(&input)?;
|
||||
|
||||
let tool_result_summary = self
|
||||
.prompts()
|
||||
.interrupt_tool_result_summary()
|
||||
.map_err(PodError::from)?;
|
||||
let system_note = self
|
||||
.prompts()
|
||||
.interrupt_system_note()
|
||||
.map_err(PodError::from)?;
|
||||
|
||||
let closures: Vec<Item> =
|
||||
orphan_tool_result_closures(self.worker().history(), &tool_result_summary);
|
||||
if !closures.is_empty() {
|
||||
self.worker_mut().extend_history(closures);
|
||||
}
|
||||
self.worker_mut()
|
||||
.push_item(Item::system_message(system_note));
|
||||
self.run(input).await
|
||||
}
|
||||
}
|
||||
|
||||
/// Build synthetic `Item::ToolResult` items for every unanswered
|
||||
/// `Item::ToolCall` in `history`, preserving order.
|
||||
fn orphan_tool_result_closures(history: &[Item], summary: &str) -> Vec<Item> {
|
||||
pub(crate) fn orphan_tool_result_closures(history: &[Item], summary: &str) -> Vec<Item> {
|
||||
let mut answered: std::collections::HashSet<&str> = std::collections::HashSet::new();
|
||||
for item in history {
|
||||
if let Item::ToolResult { call_id, .. } = item {
|
||||
|
|
@ -11,7 +11,7 @@ pub mod spawn;
|
|||
pub mod workflow;
|
||||
|
||||
mod factory;
|
||||
mod interrupt_and_run;
|
||||
mod interrupt_prep;
|
||||
mod permission;
|
||||
mod pod;
|
||||
|
||||
|
|
|
|||
|
|
@ -1138,10 +1138,24 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
pub async fn run(&mut self, input: Vec<Segment>) -> Result<PodRunResult, PodError> {
|
||||
// Validate workflow invocations up front so an invalid slug
|
||||
// never commits a UserInput entry, never triggers pre-run
|
||||
// compaction, and never half-applies interrupt prep when run
|
||||
// from `interrupt_and_run`. Read-only against `workflow_registry`.
|
||||
// compaction, and never half-applies interrupt prep when the
|
||||
// previous turn was interrupted. Read-only against
|
||||
// `workflow_registry`.
|
||||
self.validate_workflow_invocations(&input)?;
|
||||
|
||||
// Paused→Run transition: if the previous turn was cut short,
|
||||
// any `Item::ToolCall` whose tool never produced a matching
|
||||
// `ToolResult` is closed with a synthetic one, and a short
|
||||
// system note explaining the interruption is appended — so the
|
||||
// next request is wire-valid (Anthropic) and the LLM knows
|
||||
// prior work was abandoned. Driven by the worker's own
|
||||
// `last_run_interrupted` flag; `Pod::resume` reuses the prior
|
||||
// context via a different entry point and never triggers this
|
||||
// path.
|
||||
if self.worker.as_ref().unwrap().last_run_interrupted() {
|
||||
self.apply_interrupt_prep()?;
|
||||
}
|
||||
|
||||
self.prepare_for_run().await?;
|
||||
|
||||
// Persist the user input as typed segments before the worker
|
||||
|
|
@ -1399,11 +1413,40 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
Ok(out)
|
||||
}
|
||||
|
||||
/// Stage the post-interruption cleanup at the front of worker
|
||||
/// history: close every unanswered `Item::ToolCall` with a synthetic
|
||||
/// `Item::ToolResult` (Anthropic wire-validity), then append a
|
||||
/// system note so the LLM understands the prior turn was cut
|
||||
/// short. Called from `Pod::run` when the worker's
|
||||
/// `last_run_interrupted` flag is set (i.e. the Pod just transitioned
|
||||
/// out of Paused via a new user input).
|
||||
fn apply_interrupt_prep(&mut self) -> Result<(), PodError> {
|
||||
let tool_result_summary = self
|
||||
.prompts()
|
||||
.interrupt_tool_result_summary()
|
||||
.map_err(PodError::from)?;
|
||||
let system_note = self
|
||||
.prompts()
|
||||
.interrupt_system_note()
|
||||
.map_err(PodError::from)?;
|
||||
|
||||
let closures = crate::interrupt_prep::orphan_tool_result_closures(
|
||||
self.worker().history(),
|
||||
&tool_result_summary,
|
||||
);
|
||||
if !closures.is_empty() {
|
||||
self.worker_mut().extend_history(closures);
|
||||
}
|
||||
self.worker_mut()
|
||||
.push_item(llm_worker::Item::system_message(system_note));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Validate explicit workflow invocations without reading dependency
|
||||
/// bodies. Called from `Pod::run` / `Pod::interrupt_and_run` entry so
|
||||
/// an invalid slug aborts the turn before any session-log commit or
|
||||
/// interrupt-prep side effects; `pub` so completion / preview paths
|
||||
/// can also dry-check inputs.
|
||||
/// bodies. Called from `Pod::run` entry so an invalid slug aborts
|
||||
/// the turn before any session-log commit or interrupt-prep side
|
||||
/// effects; `pub` so completion / preview paths can also dry-check
|
||||
/// inputs.
|
||||
pub fn validate_workflow_invocations(
|
||||
&self,
|
||||
segments: &[Segment],
|
||||
|
|
|
|||
|
|
@ -1367,9 +1367,10 @@ async fn paused_then_run_closes_orphan_tool_use_for_next_request() {
|
|||
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
|
||||
assert_eq!(handle.shared_state.get_status(), PodStatus::Paused);
|
||||
|
||||
// New user input while Paused → controller routes to
|
||||
// `Pod::interrupt_and_run`, which closes the orphan + injects a
|
||||
// system note before the fresh user message.
|
||||
// New user input while Paused → `Pod::run` observes
|
||||
// `last_run_interrupted` and runs its interrupt-prep step, which
|
||||
// closes the orphan + injects a system note before the fresh user
|
||||
// message.
|
||||
handle.send(Method::run_text("new request")).await.unwrap();
|
||||
assert!(
|
||||
drain_until(&mut rx, std::time::Duration::from_secs(2), |e| matches!(
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user