From 90b1a1fccba88a9de77d5bcf9404732ccd8efd62 Mon Sep 17 00:00:00 2001 From: Hare Date: Tue, 23 Jun 2026 22:37:40 +0900 Subject: [PATCH 1/2] tui: cancel paused turns with ctrl-x --- crates/pod/src/controller.rs | 36 ++++++-- crates/pod/src/pod.rs | 22 +++++ crates/pod/tests/controller_test.rs | 128 ++++++++++++++++++++++++++++ crates/tui/src/console/mod.rs | 28 +++++- crates/tui/src/ui.rs | 2 +- 5 files changed, 207 insertions(+), 9 deletions(-) diff --git a/crates/pod/src/controller.rs b/crates/pod/src/controller.rs index c0b64ea1..c9115c6a 100644 --- a/crates/pod/src/controller.rs +++ b/crates/pod/src/controller.rs @@ -900,12 +900,36 @@ async fn controller_loop( pending = Some(PendingRun::Resume); } - Method::Cancel => { - let _ = event_tx.send(Event::Error { - code: ErrorCode::NotRunning, - message: "Pod is not running".into(), - }); - } + Method::Cancel => match shared_state.get_status() { + PodStatus::Paused => match pod.cancel_paused_turn() { + Ok(()) => { + set_controller_status( + &shared_state, + &runtime_dir, + &event_tx, + PodStatus::Idle, + ) + .await; + } + Err(error) => { + let _ = event_tx.send(Event::Error { + code: worker_error_code(&error), + message: error.to_string(), + }); + } + }, + PodStatus::Idle => { + let _ = event_tx.send(Event::Error { + code: ErrorCode::NotRunning, + message: "Pod is not running".into(), + }); + } + PodStatus::Running => { + // Running turns receive Cancel through drive_turn; this is + // only reachable across a defensive race window. + let _ = cancel_tx.try_send(()); + } + }, Method::Pause => { // Already paused → idempotent no-op. Otherwise the diff --git a/crates/pod/src/pod.rs b/crates/pod/src/pod.rs index 048f5677..9c57db03 100644 --- a/crates/pod/src/pod.rs +++ b/crates/pod/src/pod.rs @@ -1863,6 +1863,28 @@ impl Pod { Ok(()) } + /// Abandon a paused/interrupted turn without resuming it. + /// + /// This uses the same explicit interrupt preparation as the next fresh + /// `run` would have used, then clears the worker's interrupted marker so + /// future input is treated as a normal new turn instead of a resume. + /// The `RunCompleted` marker is a state-reset record for session replay; + /// no provider stream is resumed or mutated here. + pub fn cancel_paused_turn(&mut self) -> Result<(), PodError> { + if !self.worker().last_run_interrupted() { + return Ok(()); + } + + self.apply_interrupt_prep()?; + self.worker_mut().set_last_run_interrupted(false); + self.commit_entry(LogEntry::RunCompleted { + ts: segment_log::now_millis(), + result: WorkerResult::Finished, + interrupted: false, + })?; + Ok(()) + } + /// Validate explicit workflow invocations without reading dependency /// bodies. Called from `Pod::run` entry so an invalid slug aborts /// the turn before any session-log commit or interrupt-prep side diff --git a/crates/pod/tests/controller_test.rs b/crates/pod/tests/controller_test.rs index 6e340be6..29397d15 100644 --- a/crates/pod/tests/controller_test.rs +++ b/crates/pod/tests/controller_test.rs @@ -1946,6 +1946,134 @@ async fn paused_then_run_closes_orphan_tool_use_for_next_request() { ); } +#[tokio::test] +async fn paused_cancel_abandons_resume_and_next_input_is_fresh_run() { + let tool_name = "HangyTool"; + let first = MockResponse::Complete(vec![ + LlmEvent::tool_use_start(0, "call_cancelled", tool_name), + LlmEvent::tool_input_delta(0, "{}"), + LlmEvent::tool_use_stop(0), + LlmEvent::Status(StatusEvent { + status: ResponseStatus::Completed, + }), + ]); + let second = MockResponse::Complete(vec![ + LlmEvent::text_block_start(0), + LlmEvent::text_delta(0, "fresh output"), + LlmEvent::text_block_stop(0, None), + LlmEvent::Status(StatusEvent { + status: ResponseStatus::Completed, + }), + ]); + let client = MockClient::sequential(vec![first, second]); + let client_for_assert = client.clone(); + let mut pod = make_pod(client).await; + pod.worker_mut() + .register_tool(hanging_tool_definition(tool_name)); + let handle = spawn_controller(pod).await; + let mut rx = handle.subscribe(); + + handle.send(Method::run_text("first")).await.unwrap(); + assert!( + drain_until(&mut rx, std::time::Duration::from_secs(2), |e| matches!( + e, + Event::ToolCallDone { .. } + )) + .await, + "tool_call_done should arrive before pause" + ); + + handle.send(Method::Pause).await.unwrap(); + assert!( + drain_until(&mut rx, std::time::Duration::from_secs(2), |e| matches!( + e, + Event::RunEnd { + result: protocol::RunResult::Paused + } + )) + .await, + "expected RunEnd::Paused" + ); + wait_for_status(&handle, PodStatus::Paused).await; + + handle.send(Method::Cancel).await.unwrap(); + wait_for_status(&handle, PodStatus::Idle).await; + assert_eq!( + client_for_assert.captured_requests().len(), + 1, + "paused cancel must not resume or start another LLM request" + ); + + handle.send(Method::Resume).await.unwrap(); + assert!( + drain_until(&mut rx, std::time::Duration::from_secs(2), |e| matches!( + e, + Event::Error { + code: pod::ErrorCode::NotPaused, + .. + } + )) + .await, + "resume after paused cancel should be rejected as not paused" + ); + assert_eq!( + client_for_assert.captured_requests().len(), + 1, + "rejected resume must not call the LLM" + ); + + handle + .send(Method::run_text("fresh request")) + .await + .unwrap(); + assert!( + drain_until(&mut rx, std::time::Duration::from_secs(2), |e| matches!( + e, + Event::RunEnd { + result: protocol::RunResult::Finished + } + )) + .await, + "expected RunEnd::Finished for fresh run" + ); + + let requests = client_for_assert.captured_requests(); + assert_eq!( + requests.len(), + 2, + "fresh input should start exactly one new LLM request" + ); + let items = &requests[1].items; + assert!( + items.iter().any(|item| matches!( + item, + llm_worker::Item::ToolResult { call_id, summary, .. } + if call_id == "call_cancelled" && summary == "[Interrupted by user]" + )), + "paused cancel should close orphan tool_use before future requests: {items:?}" + ); + assert!( + items.iter().any(|item| matches!( + item, + llm_worker::Item::Message { + role: llm_worker::Role::System, + .. + } if item_text_contains(item, "interrupted by the user") + )), + "paused cancel should record an explicit interruption note: {items:?}" + ); + assert!( + items.iter().any(|item| matches!( + item, + llm_worker::Item::Message { + role: llm_worker::Role::User, + .. + } if item_text_contains(item, "fresh request") + )), + "fresh user input should be part of the next normal run: {items:?}" + ); +} + fn item_text_contains(item: &Item, needle: &str) -> bool { item.as_text().unwrap_or_default().contains(needle) } diff --git a/crates/tui/src/console/mod.rs b/crates/tui/src/console/mod.rs index 5bfbe6c2..6ea66dbe 100644 --- a/crates/tui/src/console/mod.rs +++ b/crates/tui/src/console/mod.rs @@ -938,11 +938,11 @@ fn handle_key(app: &mut App, key: KeyEvent) -> Option { } KeyCode::Char('c') if ctrl => Some(handle_pause_or_quit(app)), KeyCode::Char('x') if ctrl => Some(match app.pod_status { - PodStatus::Running => { + PodStatus::Running | PodStatus::Paused => { app.clear_queued_inputs(); Some(Method::Cancel) } - PodStatus::Paused | PodStatus::Idle => Some(Method::Shutdown), + PodStatus::Idle => Some(Method::Shutdown), }), KeyCode::Char('d') if ctrl => { app.quit = true; @@ -1518,6 +1518,30 @@ mod tests { assert_eq!(app.queued_input_count(), 0); } + #[test] + fn ctrl_x_cancels_paused_turn_without_shutdown() { + let mut app = App::new("agent".to_string()); + app.set_pod_status(PodStatus::Paused); + + let cancel = handle_key( + &mut app, + KeyEvent::new(KeyCode::Char('x'), KeyModifiers::CONTROL), + ); + assert!(matches!(cancel, Some(Method::Cancel))); + } + + #[test] + fn ctrl_x_shutdown_while_idle_is_unchanged() { + let mut app = App::new("agent".to_string()); + app.set_pod_status(PodStatus::Idle); + + let shutdown = handle_key( + &mut app, + KeyEvent::new(KeyCode::Char('x'), KeyModifiers::CONTROL), + ); + assert!(matches!(shutdown, Some(Method::Shutdown))); + } + #[test] fn word_navigation_keys_edit_composer() { let mut app = App::new("agent".to_string()); diff --git a/crates/tui/src/ui.rs b/crates/tui/src/ui.rs index 4f05744a..a59df3ad 100644 --- a/crates/tui/src/ui.rs +++ b/crates/tui/src/ui.rs @@ -1446,7 +1446,7 @@ fn draw_status(frame: &mut Frame, app: &App, area: Rect) { .add_modifier(Modifier::BOLD), )); spans.push(Span::styled( - " — Enter to resume, type to start new turn", + " — Enter to resume, Ctrl-X to cancel, type to start new turn", Style::default().fg(Color::DarkGray), )); } else { From 8c8fb01426a34fa12e17733681acbdd5490aad0c Mon Sep 17 00:00:00 2001 From: Hare Date: Tue, 23 Jun 2026 22:48:21 +0900 Subject: [PATCH 2/2] fix: log paused cancel lifecycle explicitly --- crates/pod/src/pod.rs | 8 ++--- crates/pod/src/segment_log_sink.rs | 4 +-- crates/pod/tests/controller_test.rs | 18 ++++++++++ crates/session-store/src/segment_log.rs | 44 +++++++++++++++++++++++++ 4 files changed, 67 insertions(+), 7 deletions(-) diff --git a/crates/pod/src/pod.rs b/crates/pod/src/pod.rs index 9c57db03..5994580d 100644 --- a/crates/pod/src/pod.rs +++ b/crates/pod/src/pod.rs @@ -1868,8 +1868,8 @@ impl Pod { /// This uses the same explicit interrupt preparation as the next fresh /// `run` would have used, then clears the worker's interrupted marker so /// future input is treated as a normal new turn instead of a resume. - /// The `RunCompleted` marker is a state-reset record for session replay; - /// no provider stream is resumed or mutated here. + /// The explicit `PausedTurnAbandoned` marker preserves durable lifecycle + /// semantics without claiming another `run` / `resume` completed. pub fn cancel_paused_turn(&mut self) -> Result<(), PodError> { if !self.worker().last_run_interrupted() { return Ok(()); @@ -1877,10 +1877,8 @@ impl Pod { self.apply_interrupt_prep()?; self.worker_mut().set_last_run_interrupted(false); - self.commit_entry(LogEntry::RunCompleted { + self.commit_entry(LogEntry::PausedTurnAbandoned { ts: segment_log::now_millis(), - result: WorkerResult::Finished, - interrupted: false, })?; Ok(()) } diff --git a/crates/pod/src/segment_log_sink.rs b/crates/pod/src/segment_log_sink.rs index cc4feff7..906d5005 100644 --- a/crates/pod/src/segment_log_sink.rs +++ b/crates/pod/src/segment_log_sink.rs @@ -94,8 +94,8 @@ impl SegmentLogSink { /// - `LogEntry::SystemItem` → `Event::SystemItem`. /// - `LogEntry::Invoke` → `Event::InvokeStart`. /// Everything else (AssistantItem, ToolResult, TurnEnd, - /// RunCompleted, RunErrored, LlmUsage, Extension, ConfigChanged) is - /// reflected in the mirror so reconnect snapshots stay accurate, + /// RunCompleted, RunErrored, PausedTurnAbandoned, LlmUsage, Extension, + /// ConfigChanged) is reflected in the mirror so reconnect snapshots stay accurate, /// but is not sent live — the streaming events (TextDelta / /// ToolCallStart / ToolResult / TurnEnd / etc.) already provide /// that data, and re-broadcasting it as a typed entry would just diff --git a/crates/pod/tests/controller_test.rs b/crates/pod/tests/controller_test.rs index 29397d15..d86c3717 100644 --- a/crates/pod/tests/controller_test.rs +++ b/crates/pod/tests/controller_test.rs @@ -1998,6 +1998,24 @@ async fn paused_cancel_abandons_resume_and_next_input_is_fresh_run() { handle.send(Method::Cancel).await.unwrap(); wait_for_status(&handle, PodStatus::Idle).await; + let (entries_after_cancel, _rx_after_cancel) = handle.sink.subscribe_with_snapshot(); + assert!( + entries_after_cancel + .iter() + .any(|entry| matches!(entry, LogEntry::PausedTurnAbandoned { .. })), + "paused cancel should have an explicit lifecycle log entry: {entries_after_cancel:?}" + ); + assert!( + !entries_after_cancel.iter().any(|entry| matches!( + entry, + LogEntry::RunCompleted { + result: llm_worker::WorkerResult::Finished, + interrupted: false, + .. + } + )), + "paused cancel must not be logged as a normal finished run: {entries_after_cancel:?}" + ); assert_eq!( client_for_assert.captured_requests().len(), 1, diff --git a/crates/session-store/src/segment_log.rs b/crates/session-store/src/segment_log.rs index ded51de8..9e5cef92 100644 --- a/crates/session-store/src/segment_log.rs +++ b/crates/session-store/src/segment_log.rs @@ -29,6 +29,7 @@ use crate::system_item::SystemItem; /// run completion); the fork-point seq for `at_turn_index` is the /// preceding `Invoke` entry, not the TurnEnd. /// - `RunCompleted` / `RunErrored` — marks end of a `run()` or `resume()` call +/// - `PausedTurnAbandoned` — explicit abandon/cancel of a paused interrupted turn /// - `ConfigChanged` — `RequestConfig` mutation #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "kind", rename_all = "snake_case")] @@ -114,6 +115,11 @@ pub enum LogEntry { message: String, }, + /// A paused interrupted turn was explicitly abandoned without calling + /// `run()` or `resume()` again. Replay clears the interrupted marker so + /// the restored Pod is idle and future user input starts a normal new turn. + PausedTurnAbandoned { ts: u64 }, + /// `RequestConfig` changed. ConfigChanged { ts: u64, config: RequestConfig }, @@ -257,6 +263,9 @@ pub fn collect_state(entries: &[LogEntry]) -> RestoredState { LogEntry::RunErrored { interrupted, .. } => { state.last_run_interrupted = *interrupted; } + LogEntry::PausedTurnAbandoned { .. } => { + state.last_run_interrupted = false; + } LogEntry::ConfigChanged { config, .. } => { state.config = config.clone(); } @@ -569,6 +578,41 @@ mod tests { assert_eq!(state.turn_count, 1); } + #[test] + fn replay_paused_turn_abandoned_clears_interrupted_marker() { + let state = collect_state(&[ + LogEntry::SegmentStart { + ts: 0, + session_id: uuid::Uuid::nil(), + system_prompt: None, + config: RequestConfig::default(), + history: vec![], + forked_from: None, + compacted_from: None, + }, + LogEntry::RunCompleted { + ts: 100, + interrupted: true, + result: WorkerResult::Paused, + }, + LogEntry::PausedTurnAbandoned { ts: 200 }, + ]); + assert!(!state.last_run_interrupted); + } + + #[test] + fn paused_turn_abandoned_entry_round_trip_via_json() { + let entry = LogEntry::PausedTurnAbandoned { ts: 12345 }; + let json = serde_json::to_string(&entry).unwrap(); + let parsed: serde_json::Value = serde_json::from_str(&json).unwrap(); + assert_eq!(parsed["kind"], "paused_turn_abandoned"); + let decoded: LogEntry = serde_json::from_str(&json).unwrap(); + match decoded { + LogEntry::PausedTurnAbandoned { ts } => assert_eq!(ts, 12345), + other => panic!("expected PausedTurnAbandoned, got {other:?}"), + } + } + #[test] fn replay_extension_collects_domain_payload_pairs() { let state = collect_state(&[