diff --git a/crates/pod/src/pod.rs b/crates/pod/src/pod.rs index 9c57db03..5994580d 100644 --- a/crates/pod/src/pod.rs +++ b/crates/pod/src/pod.rs @@ -1868,8 +1868,8 @@ impl Pod { /// This uses the same explicit interrupt preparation as the next fresh /// `run` would have used, then clears the worker's interrupted marker so /// future input is treated as a normal new turn instead of a resume. - /// The `RunCompleted` marker is a state-reset record for session replay; - /// no provider stream is resumed or mutated here. + /// The explicit `PausedTurnAbandoned` marker preserves durable lifecycle + /// semantics without claiming another `run` / `resume` completed. pub fn cancel_paused_turn(&mut self) -> Result<(), PodError> { if !self.worker().last_run_interrupted() { return Ok(()); @@ -1877,10 +1877,8 @@ impl Pod { self.apply_interrupt_prep()?; self.worker_mut().set_last_run_interrupted(false); - self.commit_entry(LogEntry::RunCompleted { + self.commit_entry(LogEntry::PausedTurnAbandoned { ts: segment_log::now_millis(), - result: WorkerResult::Finished, - interrupted: false, })?; Ok(()) } diff --git a/crates/pod/src/segment_log_sink.rs b/crates/pod/src/segment_log_sink.rs index cc4feff7..906d5005 100644 --- a/crates/pod/src/segment_log_sink.rs +++ b/crates/pod/src/segment_log_sink.rs @@ -94,8 +94,8 @@ impl SegmentLogSink { /// - `LogEntry::SystemItem` → `Event::SystemItem`. /// - `LogEntry::Invoke` → `Event::InvokeStart`. /// Everything else (AssistantItem, ToolResult, TurnEnd, - /// RunCompleted, RunErrored, LlmUsage, Extension, ConfigChanged) is - /// reflected in the mirror so reconnect snapshots stay accurate, + /// RunCompleted, RunErrored, PausedTurnAbandoned, LlmUsage, Extension, + /// ConfigChanged) is reflected in the mirror so reconnect snapshots stay accurate, /// but is not sent live — the streaming events (TextDelta / /// ToolCallStart / ToolResult / TurnEnd / etc.) already provide /// that data, and re-broadcasting it as a typed entry would just diff --git a/crates/pod/tests/controller_test.rs b/crates/pod/tests/controller_test.rs index 29397d15..d86c3717 100644 --- a/crates/pod/tests/controller_test.rs +++ b/crates/pod/tests/controller_test.rs @@ -1998,6 +1998,24 @@ async fn paused_cancel_abandons_resume_and_next_input_is_fresh_run() { handle.send(Method::Cancel).await.unwrap(); wait_for_status(&handle, PodStatus::Idle).await; + let (entries_after_cancel, _rx_after_cancel) = handle.sink.subscribe_with_snapshot(); + assert!( + entries_after_cancel + .iter() + .any(|entry| matches!(entry, LogEntry::PausedTurnAbandoned { .. })), + "paused cancel should have an explicit lifecycle log entry: {entries_after_cancel:?}" + ); + assert!( + !entries_after_cancel.iter().any(|entry| matches!( + entry, + LogEntry::RunCompleted { + result: llm_worker::WorkerResult::Finished, + interrupted: false, + .. + } + )), + "paused cancel must not be logged as a normal finished run: {entries_after_cancel:?}" + ); assert_eq!( client_for_assert.captured_requests().len(), 1, diff --git a/crates/session-store/src/segment_log.rs b/crates/session-store/src/segment_log.rs index ded51de8..9e5cef92 100644 --- a/crates/session-store/src/segment_log.rs +++ b/crates/session-store/src/segment_log.rs @@ -29,6 +29,7 @@ use crate::system_item::SystemItem; /// run completion); the fork-point seq for `at_turn_index` is the /// preceding `Invoke` entry, not the TurnEnd. /// - `RunCompleted` / `RunErrored` — marks end of a `run()` or `resume()` call +/// - `PausedTurnAbandoned` — explicit abandon/cancel of a paused interrupted turn /// - `ConfigChanged` — `RequestConfig` mutation #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "kind", rename_all = "snake_case")] @@ -114,6 +115,11 @@ pub enum LogEntry { message: String, }, + /// A paused interrupted turn was explicitly abandoned without calling + /// `run()` or `resume()` again. Replay clears the interrupted marker so + /// the restored Pod is idle and future user input starts a normal new turn. + PausedTurnAbandoned { ts: u64 }, + /// `RequestConfig` changed. ConfigChanged { ts: u64, config: RequestConfig }, @@ -257,6 +263,9 @@ pub fn collect_state(entries: &[LogEntry]) -> RestoredState { LogEntry::RunErrored { interrupted, .. } => { state.last_run_interrupted = *interrupted; } + LogEntry::PausedTurnAbandoned { .. } => { + state.last_run_interrupted = false; + } LogEntry::ConfigChanged { config, .. } => { state.config = config.clone(); } @@ -569,6 +578,41 @@ mod tests { assert_eq!(state.turn_count, 1); } + #[test] + fn replay_paused_turn_abandoned_clears_interrupted_marker() { + let state = collect_state(&[ + LogEntry::SegmentStart { + ts: 0, + session_id: uuid::Uuid::nil(), + system_prompt: None, + config: RequestConfig::default(), + history: vec![], + forked_from: None, + compacted_from: None, + }, + LogEntry::RunCompleted { + ts: 100, + interrupted: true, + result: WorkerResult::Paused, + }, + LogEntry::PausedTurnAbandoned { ts: 200 }, + ]); + assert!(!state.last_run_interrupted); + } + + #[test] + fn paused_turn_abandoned_entry_round_trip_via_json() { + let entry = LogEntry::PausedTurnAbandoned { ts: 12345 }; + let json = serde_json::to_string(&entry).unwrap(); + let parsed: serde_json::Value = serde_json::from_str(&json).unwrap(); + assert_eq!(parsed["kind"], "paused_turn_abandoned"); + let decoded: LogEntry = serde_json::from_str(&json).unwrap(); + match decoded { + LogEntry::PausedTurnAbandoned { ts } => assert_eq!(ts, 12345), + other => panic!("expected PausedTurnAbandoned, got {other:?}"), + } + } + #[test] fn replay_extension_collects_domain_payload_pairs() { let state = collect_state(&[