fix: log paused cancel lifecycle explicitly
This commit is contained in:
parent
90b1a1fccb
commit
8c8fb01426
|
|
@ -1868,8 +1868,8 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
/// This uses the same explicit interrupt preparation as the next fresh
|
||||
/// `run` would have used, then clears the worker's interrupted marker so
|
||||
/// future input is treated as a normal new turn instead of a resume.
|
||||
/// The `RunCompleted` marker is a state-reset record for session replay;
|
||||
/// no provider stream is resumed or mutated here.
|
||||
/// The explicit `PausedTurnAbandoned` marker preserves durable lifecycle
|
||||
/// semantics without claiming another `run` / `resume` completed.
|
||||
pub fn cancel_paused_turn(&mut self) -> Result<(), PodError> {
|
||||
if !self.worker().last_run_interrupted() {
|
||||
return Ok(());
|
||||
|
|
@ -1877,10 +1877,8 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
|
||||
self.apply_interrupt_prep()?;
|
||||
self.worker_mut().set_last_run_interrupted(false);
|
||||
self.commit_entry(LogEntry::RunCompleted {
|
||||
self.commit_entry(LogEntry::PausedTurnAbandoned {
|
||||
ts: segment_log::now_millis(),
|
||||
result: WorkerResult::Finished,
|
||||
interrupted: false,
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
|||
|
|
@ -94,8 +94,8 @@ impl SegmentLogSink {
|
|||
/// - `LogEntry::SystemItem` → `Event::SystemItem`.
|
||||
/// - `LogEntry::Invoke` → `Event::InvokeStart`.
|
||||
/// Everything else (AssistantItem, ToolResult, TurnEnd,
|
||||
/// RunCompleted, RunErrored, LlmUsage, Extension, ConfigChanged) is
|
||||
/// reflected in the mirror so reconnect snapshots stay accurate,
|
||||
/// RunCompleted, RunErrored, PausedTurnAbandoned, LlmUsage, Extension,
|
||||
/// ConfigChanged) is reflected in the mirror so reconnect snapshots stay accurate,
|
||||
/// but is not sent live — the streaming events (TextDelta /
|
||||
/// ToolCallStart / ToolResult / TurnEnd / etc.) already provide
|
||||
/// that data, and re-broadcasting it as a typed entry would just
|
||||
|
|
|
|||
|
|
@ -1998,6 +1998,24 @@ async fn paused_cancel_abandons_resume_and_next_input_is_fresh_run() {
|
|||
|
||||
handle.send(Method::Cancel).await.unwrap();
|
||||
wait_for_status(&handle, PodStatus::Idle).await;
|
||||
let (entries_after_cancel, _rx_after_cancel) = handle.sink.subscribe_with_snapshot();
|
||||
assert!(
|
||||
entries_after_cancel
|
||||
.iter()
|
||||
.any(|entry| matches!(entry, LogEntry::PausedTurnAbandoned { .. })),
|
||||
"paused cancel should have an explicit lifecycle log entry: {entries_after_cancel:?}"
|
||||
);
|
||||
assert!(
|
||||
!entries_after_cancel.iter().any(|entry| matches!(
|
||||
entry,
|
||||
LogEntry::RunCompleted {
|
||||
result: llm_worker::WorkerResult::Finished,
|
||||
interrupted: false,
|
||||
..
|
||||
}
|
||||
)),
|
||||
"paused cancel must not be logged as a normal finished run: {entries_after_cancel:?}"
|
||||
);
|
||||
assert_eq!(
|
||||
client_for_assert.captured_requests().len(),
|
||||
1,
|
||||
|
|
|
|||
|
|
@ -29,6 +29,7 @@ use crate::system_item::SystemItem;
|
|||
/// run completion); the fork-point seq for `at_turn_index` is the
|
||||
/// preceding `Invoke` entry, not the TurnEnd.
|
||||
/// - `RunCompleted` / `RunErrored` — marks end of a `run()` or `resume()` call
|
||||
/// - `PausedTurnAbandoned` — explicit abandon/cancel of a paused interrupted turn
|
||||
/// - `ConfigChanged` — `RequestConfig` mutation
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(tag = "kind", rename_all = "snake_case")]
|
||||
|
|
@ -114,6 +115,11 @@ pub enum LogEntry {
|
|||
message: String,
|
||||
},
|
||||
|
||||
/// A paused interrupted turn was explicitly abandoned without calling
|
||||
/// `run()` or `resume()` again. Replay clears the interrupted marker so
|
||||
/// the restored Pod is idle and future user input starts a normal new turn.
|
||||
PausedTurnAbandoned { ts: u64 },
|
||||
|
||||
/// `RequestConfig` changed.
|
||||
ConfigChanged { ts: u64, config: RequestConfig },
|
||||
|
||||
|
|
@ -257,6 +263,9 @@ pub fn collect_state(entries: &[LogEntry]) -> RestoredState {
|
|||
LogEntry::RunErrored { interrupted, .. } => {
|
||||
state.last_run_interrupted = *interrupted;
|
||||
}
|
||||
LogEntry::PausedTurnAbandoned { .. } => {
|
||||
state.last_run_interrupted = false;
|
||||
}
|
||||
LogEntry::ConfigChanged { config, .. } => {
|
||||
state.config = config.clone();
|
||||
}
|
||||
|
|
@ -569,6 +578,41 @@ mod tests {
|
|||
assert_eq!(state.turn_count, 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn replay_paused_turn_abandoned_clears_interrupted_marker() {
|
||||
let state = collect_state(&[
|
||||
LogEntry::SegmentStart {
|
||||
ts: 0,
|
||||
session_id: uuid::Uuid::nil(),
|
||||
system_prompt: None,
|
||||
config: RequestConfig::default(),
|
||||
history: vec![],
|
||||
forked_from: None,
|
||||
compacted_from: None,
|
||||
},
|
||||
LogEntry::RunCompleted {
|
||||
ts: 100,
|
||||
interrupted: true,
|
||||
result: WorkerResult::Paused,
|
||||
},
|
||||
LogEntry::PausedTurnAbandoned { ts: 200 },
|
||||
]);
|
||||
assert!(!state.last_run_interrupted);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn paused_turn_abandoned_entry_round_trip_via_json() {
|
||||
let entry = LogEntry::PausedTurnAbandoned { ts: 12345 };
|
||||
let json = serde_json::to_string(&entry).unwrap();
|
||||
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
|
||||
assert_eq!(parsed["kind"], "paused_turn_abandoned");
|
||||
let decoded: LogEntry = serde_json::from_str(&json).unwrap();
|
||||
match decoded {
|
||||
LogEntry::PausedTurnAbandoned { ts } => assert_eq!(ts, 12345),
|
||||
other => panic!("expected PausedTurnAbandoned, got {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn replay_extension_collects_domain_payload_pairs() {
|
||||
let state = collect_state(&[
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user