merge: 00001KVSMJJNV paused ctrl-x cancel
This commit is contained in:
commit
76c800542c
|
|
@ -900,12 +900,36 @@ async fn controller_loop<C, St>(
|
||||||
pending = Some(PendingRun::Resume);
|
pending = Some(PendingRun::Resume);
|
||||||
}
|
}
|
||||||
|
|
||||||
Method::Cancel => {
|
Method::Cancel => match shared_state.get_status() {
|
||||||
|
PodStatus::Paused => match pod.cancel_paused_turn() {
|
||||||
|
Ok(()) => {
|
||||||
|
set_controller_status(
|
||||||
|
&shared_state,
|
||||||
|
&runtime_dir,
|
||||||
|
&event_tx,
|
||||||
|
PodStatus::Idle,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
Err(error) => {
|
||||||
|
let _ = event_tx.send(Event::Error {
|
||||||
|
code: worker_error_code(&error),
|
||||||
|
message: error.to_string(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
},
|
||||||
|
PodStatus::Idle => {
|
||||||
let _ = event_tx.send(Event::Error {
|
let _ = event_tx.send(Event::Error {
|
||||||
code: ErrorCode::NotRunning,
|
code: ErrorCode::NotRunning,
|
||||||
message: "Pod is not running".into(),
|
message: "Pod is not running".into(),
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
PodStatus::Running => {
|
||||||
|
// Running turns receive Cancel through drive_turn; this is
|
||||||
|
// only reachable across a defensive race window.
|
||||||
|
let _ = cancel_tx.try_send(());
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
Method::Pause => {
|
Method::Pause => {
|
||||||
// Already paused → idempotent no-op. Otherwise the
|
// Already paused → idempotent no-op. Otherwise the
|
||||||
|
|
|
||||||
|
|
@ -1863,6 +1863,26 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Abandon a paused/interrupted turn without resuming it.
|
||||||
|
///
|
||||||
|
/// This uses the same explicit interrupt preparation as the next fresh
|
||||||
|
/// `run` would have used, then clears the worker's interrupted marker so
|
||||||
|
/// future input is treated as a normal new turn instead of a resume.
|
||||||
|
/// The explicit `PausedTurnAbandoned` marker preserves durable lifecycle
|
||||||
|
/// semantics without claiming another `run` / `resume` completed.
|
||||||
|
pub fn cancel_paused_turn(&mut self) -> Result<(), PodError> {
|
||||||
|
if !self.worker().last_run_interrupted() {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
self.apply_interrupt_prep()?;
|
||||||
|
self.worker_mut().set_last_run_interrupted(false);
|
||||||
|
self.commit_entry(LogEntry::PausedTurnAbandoned {
|
||||||
|
ts: segment_log::now_millis(),
|
||||||
|
})?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
/// Validate explicit workflow invocations without reading dependency
|
/// Validate explicit workflow invocations without reading dependency
|
||||||
/// bodies. Called from `Pod::run` entry so an invalid slug aborts
|
/// bodies. Called from `Pod::run` entry so an invalid slug aborts
|
||||||
/// the turn before any session-log commit or interrupt-prep side
|
/// the turn before any session-log commit or interrupt-prep side
|
||||||
|
|
|
||||||
|
|
@ -94,8 +94,8 @@ impl SegmentLogSink {
|
||||||
/// - `LogEntry::SystemItem` → `Event::SystemItem`.
|
/// - `LogEntry::SystemItem` → `Event::SystemItem`.
|
||||||
/// - `LogEntry::Invoke` → `Event::InvokeStart`.
|
/// - `LogEntry::Invoke` → `Event::InvokeStart`.
|
||||||
/// Everything else (AssistantItem, ToolResult, TurnEnd,
|
/// Everything else (AssistantItem, ToolResult, TurnEnd,
|
||||||
/// RunCompleted, RunErrored, LlmUsage, Extension, ConfigChanged) is
|
/// RunCompleted, RunErrored, PausedTurnAbandoned, LlmUsage, Extension,
|
||||||
/// reflected in the mirror so reconnect snapshots stay accurate,
|
/// ConfigChanged) is reflected in the mirror so reconnect snapshots stay accurate,
|
||||||
/// but is not sent live — the streaming events (TextDelta /
|
/// but is not sent live — the streaming events (TextDelta /
|
||||||
/// ToolCallStart / ToolResult / TurnEnd / etc.) already provide
|
/// ToolCallStart / ToolResult / TurnEnd / etc.) already provide
|
||||||
/// that data, and re-broadcasting it as a typed entry would just
|
/// that data, and re-broadcasting it as a typed entry would just
|
||||||
|
|
|
||||||
|
|
@ -1946,6 +1946,152 @@ async fn paused_then_run_closes_orphan_tool_use_for_next_request() {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn paused_cancel_abandons_resume_and_next_input_is_fresh_run() {
|
||||||
|
let tool_name = "HangyTool";
|
||||||
|
let first = MockResponse::Complete(vec![
|
||||||
|
LlmEvent::tool_use_start(0, "call_cancelled", tool_name),
|
||||||
|
LlmEvent::tool_input_delta(0, "{}"),
|
||||||
|
LlmEvent::tool_use_stop(0),
|
||||||
|
LlmEvent::Status(StatusEvent {
|
||||||
|
status: ResponseStatus::Completed,
|
||||||
|
}),
|
||||||
|
]);
|
||||||
|
let second = MockResponse::Complete(vec![
|
||||||
|
LlmEvent::text_block_start(0),
|
||||||
|
LlmEvent::text_delta(0, "fresh output"),
|
||||||
|
LlmEvent::text_block_stop(0, None),
|
||||||
|
LlmEvent::Status(StatusEvent {
|
||||||
|
status: ResponseStatus::Completed,
|
||||||
|
}),
|
||||||
|
]);
|
||||||
|
let client = MockClient::sequential(vec![first, second]);
|
||||||
|
let client_for_assert = client.clone();
|
||||||
|
let mut pod = make_pod(client).await;
|
||||||
|
pod.worker_mut()
|
||||||
|
.register_tool(hanging_tool_definition(tool_name));
|
||||||
|
let handle = spawn_controller(pod).await;
|
||||||
|
let mut rx = handle.subscribe();
|
||||||
|
|
||||||
|
handle.send(Method::run_text("first")).await.unwrap();
|
||||||
|
assert!(
|
||||||
|
drain_until(&mut rx, std::time::Duration::from_secs(2), |e| matches!(
|
||||||
|
e,
|
||||||
|
Event::ToolCallDone { .. }
|
||||||
|
))
|
||||||
|
.await,
|
||||||
|
"tool_call_done should arrive before pause"
|
||||||
|
);
|
||||||
|
|
||||||
|
handle.send(Method::Pause).await.unwrap();
|
||||||
|
assert!(
|
||||||
|
drain_until(&mut rx, std::time::Duration::from_secs(2), |e| matches!(
|
||||||
|
e,
|
||||||
|
Event::RunEnd {
|
||||||
|
result: protocol::RunResult::Paused
|
||||||
|
}
|
||||||
|
))
|
||||||
|
.await,
|
||||||
|
"expected RunEnd::Paused"
|
||||||
|
);
|
||||||
|
wait_for_status(&handle, PodStatus::Paused).await;
|
||||||
|
|
||||||
|
handle.send(Method::Cancel).await.unwrap();
|
||||||
|
wait_for_status(&handle, PodStatus::Idle).await;
|
||||||
|
let (entries_after_cancel, _rx_after_cancel) = handle.sink.subscribe_with_snapshot();
|
||||||
|
assert!(
|
||||||
|
entries_after_cancel
|
||||||
|
.iter()
|
||||||
|
.any(|entry| matches!(entry, LogEntry::PausedTurnAbandoned { .. })),
|
||||||
|
"paused cancel should have an explicit lifecycle log entry: {entries_after_cancel:?}"
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
!entries_after_cancel.iter().any(|entry| matches!(
|
||||||
|
entry,
|
||||||
|
LogEntry::RunCompleted {
|
||||||
|
result: llm_worker::WorkerResult::Finished,
|
||||||
|
interrupted: false,
|
||||||
|
..
|
||||||
|
}
|
||||||
|
)),
|
||||||
|
"paused cancel must not be logged as a normal finished run: {entries_after_cancel:?}"
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
client_for_assert.captured_requests().len(),
|
||||||
|
1,
|
||||||
|
"paused cancel must not resume or start another LLM request"
|
||||||
|
);
|
||||||
|
|
||||||
|
handle.send(Method::Resume).await.unwrap();
|
||||||
|
assert!(
|
||||||
|
drain_until(&mut rx, std::time::Duration::from_secs(2), |e| matches!(
|
||||||
|
e,
|
||||||
|
Event::Error {
|
||||||
|
code: pod::ErrorCode::NotPaused,
|
||||||
|
..
|
||||||
|
}
|
||||||
|
))
|
||||||
|
.await,
|
||||||
|
"resume after paused cancel should be rejected as not paused"
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
client_for_assert.captured_requests().len(),
|
||||||
|
1,
|
||||||
|
"rejected resume must not call the LLM"
|
||||||
|
);
|
||||||
|
|
||||||
|
handle
|
||||||
|
.send(Method::run_text("fresh request"))
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
assert!(
|
||||||
|
drain_until(&mut rx, std::time::Duration::from_secs(2), |e| matches!(
|
||||||
|
e,
|
||||||
|
Event::RunEnd {
|
||||||
|
result: protocol::RunResult::Finished
|
||||||
|
}
|
||||||
|
))
|
||||||
|
.await,
|
||||||
|
"expected RunEnd::Finished for fresh run"
|
||||||
|
);
|
||||||
|
|
||||||
|
let requests = client_for_assert.captured_requests();
|
||||||
|
assert_eq!(
|
||||||
|
requests.len(),
|
||||||
|
2,
|
||||||
|
"fresh input should start exactly one new LLM request"
|
||||||
|
);
|
||||||
|
let items = &requests[1].items;
|
||||||
|
assert!(
|
||||||
|
items.iter().any(|item| matches!(
|
||||||
|
item,
|
||||||
|
llm_worker::Item::ToolResult { call_id, summary, .. }
|
||||||
|
if call_id == "call_cancelled" && summary == "[Interrupted by user]"
|
||||||
|
)),
|
||||||
|
"paused cancel should close orphan tool_use before future requests: {items:?}"
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
items.iter().any(|item| matches!(
|
||||||
|
item,
|
||||||
|
llm_worker::Item::Message {
|
||||||
|
role: llm_worker::Role::System,
|
||||||
|
..
|
||||||
|
} if item_text_contains(item, "interrupted by the user")
|
||||||
|
)),
|
||||||
|
"paused cancel should record an explicit interruption note: {items:?}"
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
items.iter().any(|item| matches!(
|
||||||
|
item,
|
||||||
|
llm_worker::Item::Message {
|
||||||
|
role: llm_worker::Role::User,
|
||||||
|
..
|
||||||
|
} if item_text_contains(item, "fresh request")
|
||||||
|
)),
|
||||||
|
"fresh user input should be part of the next normal run: {items:?}"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
fn item_text_contains(item: &Item, needle: &str) -> bool {
|
fn item_text_contains(item: &Item, needle: &str) -> bool {
|
||||||
item.as_text().unwrap_or_default().contains(needle)
|
item.as_text().unwrap_or_default().contains(needle)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -29,6 +29,7 @@ use crate::system_item::SystemItem;
|
||||||
/// run completion); the fork-point seq for `at_turn_index` is the
|
/// run completion); the fork-point seq for `at_turn_index` is the
|
||||||
/// preceding `Invoke` entry, not the TurnEnd.
|
/// preceding `Invoke` entry, not the TurnEnd.
|
||||||
/// - `RunCompleted` / `RunErrored` — marks end of a `run()` or `resume()` call
|
/// - `RunCompleted` / `RunErrored` — marks end of a `run()` or `resume()` call
|
||||||
|
/// - `PausedTurnAbandoned` — explicit abandon/cancel of a paused interrupted turn
|
||||||
/// - `ConfigChanged` — `RequestConfig` mutation
|
/// - `ConfigChanged` — `RequestConfig` mutation
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
#[serde(tag = "kind", rename_all = "snake_case")]
|
#[serde(tag = "kind", rename_all = "snake_case")]
|
||||||
|
|
@ -114,6 +115,11 @@ pub enum LogEntry {
|
||||||
message: String,
|
message: String,
|
||||||
},
|
},
|
||||||
|
|
||||||
|
/// A paused interrupted turn was explicitly abandoned without calling
|
||||||
|
/// `run()` or `resume()` again. Replay clears the interrupted marker so
|
||||||
|
/// the restored Pod is idle and future user input starts a normal new turn.
|
||||||
|
PausedTurnAbandoned { ts: u64 },
|
||||||
|
|
||||||
/// `RequestConfig` changed.
|
/// `RequestConfig` changed.
|
||||||
ConfigChanged { ts: u64, config: RequestConfig },
|
ConfigChanged { ts: u64, config: RequestConfig },
|
||||||
|
|
||||||
|
|
@ -257,6 +263,9 @@ pub fn collect_state(entries: &[LogEntry]) -> RestoredState {
|
||||||
LogEntry::RunErrored { interrupted, .. } => {
|
LogEntry::RunErrored { interrupted, .. } => {
|
||||||
state.last_run_interrupted = *interrupted;
|
state.last_run_interrupted = *interrupted;
|
||||||
}
|
}
|
||||||
|
LogEntry::PausedTurnAbandoned { .. } => {
|
||||||
|
state.last_run_interrupted = false;
|
||||||
|
}
|
||||||
LogEntry::ConfigChanged { config, .. } => {
|
LogEntry::ConfigChanged { config, .. } => {
|
||||||
state.config = config.clone();
|
state.config = config.clone();
|
||||||
}
|
}
|
||||||
|
|
@ -569,6 +578,41 @@ mod tests {
|
||||||
assert_eq!(state.turn_count, 1);
|
assert_eq!(state.turn_count, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn replay_paused_turn_abandoned_clears_interrupted_marker() {
|
||||||
|
let state = collect_state(&[
|
||||||
|
LogEntry::SegmentStart {
|
||||||
|
ts: 0,
|
||||||
|
session_id: uuid::Uuid::nil(),
|
||||||
|
system_prompt: None,
|
||||||
|
config: RequestConfig::default(),
|
||||||
|
history: vec![],
|
||||||
|
forked_from: None,
|
||||||
|
compacted_from: None,
|
||||||
|
},
|
||||||
|
LogEntry::RunCompleted {
|
||||||
|
ts: 100,
|
||||||
|
interrupted: true,
|
||||||
|
result: WorkerResult::Paused,
|
||||||
|
},
|
||||||
|
LogEntry::PausedTurnAbandoned { ts: 200 },
|
||||||
|
]);
|
||||||
|
assert!(!state.last_run_interrupted);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn paused_turn_abandoned_entry_round_trip_via_json() {
|
||||||
|
let entry = LogEntry::PausedTurnAbandoned { ts: 12345 };
|
||||||
|
let json = serde_json::to_string(&entry).unwrap();
|
||||||
|
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
|
||||||
|
assert_eq!(parsed["kind"], "paused_turn_abandoned");
|
||||||
|
let decoded: LogEntry = serde_json::from_str(&json).unwrap();
|
||||||
|
match decoded {
|
||||||
|
LogEntry::PausedTurnAbandoned { ts } => assert_eq!(ts, 12345),
|
||||||
|
other => panic!("expected PausedTurnAbandoned, got {other:?}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn replay_extension_collects_domain_payload_pairs() {
|
fn replay_extension_collects_domain_payload_pairs() {
|
||||||
let state = collect_state(&[
|
let state = collect_state(&[
|
||||||
|
|
|
||||||
|
|
@ -938,11 +938,11 @@ fn handle_key(app: &mut App, key: KeyEvent) -> Option<Method> {
|
||||||
}
|
}
|
||||||
KeyCode::Char('c') if ctrl => Some(handle_pause_or_quit(app)),
|
KeyCode::Char('c') if ctrl => Some(handle_pause_or_quit(app)),
|
||||||
KeyCode::Char('x') if ctrl => Some(match app.pod_status {
|
KeyCode::Char('x') if ctrl => Some(match app.pod_status {
|
||||||
PodStatus::Running => {
|
PodStatus::Running | PodStatus::Paused => {
|
||||||
app.clear_queued_inputs();
|
app.clear_queued_inputs();
|
||||||
Some(Method::Cancel)
|
Some(Method::Cancel)
|
||||||
}
|
}
|
||||||
PodStatus::Paused | PodStatus::Idle => Some(Method::Shutdown),
|
PodStatus::Idle => Some(Method::Shutdown),
|
||||||
}),
|
}),
|
||||||
KeyCode::Char('d') if ctrl => {
|
KeyCode::Char('d') if ctrl => {
|
||||||
app.quit = true;
|
app.quit = true;
|
||||||
|
|
@ -1518,6 +1518,30 @@ mod tests {
|
||||||
assert_eq!(app.queued_input_count(), 0);
|
assert_eq!(app.queued_input_count(), 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn ctrl_x_cancels_paused_turn_without_shutdown() {
|
||||||
|
let mut app = App::new("agent".to_string());
|
||||||
|
app.set_pod_status(PodStatus::Paused);
|
||||||
|
|
||||||
|
let cancel = handle_key(
|
||||||
|
&mut app,
|
||||||
|
KeyEvent::new(KeyCode::Char('x'), KeyModifiers::CONTROL),
|
||||||
|
);
|
||||||
|
assert!(matches!(cancel, Some(Method::Cancel)));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn ctrl_x_shutdown_while_idle_is_unchanged() {
|
||||||
|
let mut app = App::new("agent".to_string());
|
||||||
|
app.set_pod_status(PodStatus::Idle);
|
||||||
|
|
||||||
|
let shutdown = handle_key(
|
||||||
|
&mut app,
|
||||||
|
KeyEvent::new(KeyCode::Char('x'), KeyModifiers::CONTROL),
|
||||||
|
);
|
||||||
|
assert!(matches!(shutdown, Some(Method::Shutdown)));
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn word_navigation_keys_edit_composer() {
|
fn word_navigation_keys_edit_composer() {
|
||||||
let mut app = App::new("agent".to_string());
|
let mut app = App::new("agent".to_string());
|
||||||
|
|
|
||||||
|
|
@ -1446,7 +1446,7 @@ fn draw_status(frame: &mut Frame, app: &App, area: Rect) {
|
||||||
.add_modifier(Modifier::BOLD),
|
.add_modifier(Modifier::BOLD),
|
||||||
));
|
));
|
||||||
spans.push(Span::styled(
|
spans.push(Span::styled(
|
||||||
" — Enter to resume, type to start new turn",
|
" — Enter to resume, Ctrl-X to cancel, type to start new turn",
|
||||||
Style::default().fg(Color::DarkGray),
|
Style::default().fg(Color::DarkGray),
|
||||||
));
|
));
|
||||||
} else {
|
} else {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user