From 223d06c77e76a174b2adb6763e498a77dd9d0b2e Mon Sep 17 00:00:00 2001 From: Hare Date: Sun, 19 Apr 2026 14:27:53 +0900 Subject: [PATCH] =?UTF-8?q?TUI=E3=81=8B=E3=82=89Pause=E3=81=99=E3=82=8B?= =?UTF-8?q?=E5=AE=9F=E8=A3=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- crates/pod/src/controller.rs | 40 ++- crates/pod/src/interrupt_and_run.rs | 121 +++++++++ crates/pod/src/lib.rs | 1 + crates/pod/tests/controller_test.rs | 364 +++++++++++++++++++++++++++- crates/protocol/src/lib.rs | 16 ++ crates/tui/src/app.rs | 21 +- crates/tui/src/main.rs | 55 +++-- crates/tui/src/ui.rs | 12 + tickets/user-pause.md | 184 ++++++++++++++ tickets/user-pause.review.md | 34 +++ 10 files changed, 823 insertions(+), 25 deletions(-) create mode 100644 crates/pod/src/interrupt_and_run.rs create mode 100644 tickets/user-pause.md create mode 100644 tickets/user-pause.review.md diff --git a/crates/pod/src/controller.rs b/crates/pod/src/controller.rs index a5b7d6f1..3b4fd3e5 100644 --- a/crates/pod/src/controller.rs +++ b/crates/pod/src/controller.rs @@ -256,18 +256,27 @@ impl PodController { match method { Method::Run { input } => { - if shared_state.get_status() != PodStatus::Idle { + let status_before = shared_state.get_status(); + if status_before == PodStatus::Running { let _ = event_tx.send(Event::Error { code: ErrorCode::AlreadyRunning, message: "Pod is already executing a turn".into(), }); continue; } + let was_paused = status_before == PodStatus::Paused; shared_state.set_status(PodStatus::Running); let _ = runtime_dir.write_status(&shared_state).await; + let run_future = async { + if was_paused { + pod.interrupt_and_run(input).await + } else { + pod.run(input).await + } + }; let (new_status, shutdown) = run_with_cancel_support( - pod.run(&input), + run_future, &mut method_rx, &event_tx, &cancel_tx, @@ -406,6 +415,19 @@ impl PodController { }); } + Method::Pause => { + // Already paused → idempotent no-op. Otherwise + // the Pod is Idle (Running turns go through + // `run_with_cancel_support`, not this outer + // match), so there is nothing to pause. + if shared_state.get_status() != PodStatus::Paused { + let _ = event_tx.send(Event::Error { + code: ErrorCode::NotRunning, + message: "Pod is not running".into(), + }); + } + } + Method::Shutdown => { let _ = event_tx.send(Event::Shutdown); break; @@ -529,6 +551,7 @@ where { tokio::pin!(pod_future); let mut shutdown_requested = false; + let mut pause_requested = false; loop { tokio::select! { @@ -551,6 +574,15 @@ where } (status, shutdown_requested) } + Err(PodError::Worker(WorkerError::Cancelled)) if pause_requested => { + // User-initiated Pause. Report the transition to + // clients as a normal Paused run-end, and + // intentionally skip `PodEvent::Errored` upward: + // that channel is reserved for worker runtime + // failures, not deliberate interruptions. + let _ = event_tx.send(Event::RunEnd { result: RunResult::Paused }); + (PodStatus::Paused, shutdown_requested) + } Err(e) => { let code = worker_error_code(&e); let message = e.to_string(); @@ -574,6 +606,10 @@ where Some(Method::Cancel) => { let _ = cancel_tx.try_send(()); } + Some(Method::Pause) => { + pause_requested = true; + let _ = cancel_tx.try_send(()); + } Some(Method::Shutdown) => { shutdown_requested = true; let _ = cancel_tx.try_send(()); diff --git a/crates/pod/src/interrupt_and_run.rs b/crates/pod/src/interrupt_and_run.rs new file mode 100644 index 00000000..36b3776a --- /dev/null +++ b/crates/pod/src/interrupt_and_run.rs @@ -0,0 +1,121 @@ +//! Transition from `Paused` to a fresh turn via user input. +//! +//! The previously in-flight turn is treated as finished. Any orphan +//! `Item::ToolCall` (tool_use emitted by the LLM but whose tool did not +//! run to completion before the pause) is closed with a synthetic +//! `Item::ToolResult` so the next request is wire-valid under providers +//! that require every `tool_use` to be followed by a matching +//! `tool_result` (Anthropic). A short system note is then inserted so +//! the LLM understands the prior work was cut short, and finally the +//! user's new input is appended via `worker.run(input)`. + +use llm_worker::Item; +use llm_worker::llm_client::client::LlmClient; +use session_store::Store; + +use crate::pod::{Pod, PodError, PodRunResult}; + +const INTERRUPT_TOOL_RESULT_SUMMARY: &str = "[Interrupted by user]"; +const INTERRUPT_SYSTEM_NOTE: &str = + "[The previous turn was interrupted by the user. The user's next request follows.]"; + +impl Pod { + /// Close out the current (paused) turn and start a new one with `input`. + /// + /// Invoked by the controller when a `Method::Run` arrives while the + /// Pod is `Paused`. See module docs for the wire-compatibility + /// rationale around synthetic tool results. + pub async fn interrupt_and_run( + &mut self, + input: impl Into, + ) -> Result { + let closures: Vec = orphan_tool_result_closures(self.worker().history()); + if !closures.is_empty() { + self.worker_mut().extend_history(closures); + } + self.worker_mut() + .push_item(Item::system_message(INTERRUPT_SYSTEM_NOTE)); + self.run(input).await + } +} + +/// Build synthetic `Item::ToolResult` items for every unanswered +/// `Item::ToolCall` in `history`, preserving order. +fn orphan_tool_result_closures(history: &[Item]) -> Vec { + let mut answered: std::collections::HashSet<&str> = std::collections::HashSet::new(); + for item in history { + if let Item::ToolResult { call_id, .. } = item { + answered.insert(call_id.as_str()); + } + } + let mut out = Vec::new(); + for item in history { + if let Item::ToolCall { call_id, .. } = item { + if !answered.contains(call_id.as_str()) { + out.push(Item::tool_result( + call_id.clone(), + INTERRUPT_TOOL_RESULT_SUMMARY, + )); + } + } + } + out +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn no_orphans_returns_empty() { + let history = vec![ + Item::user_message("hi"), + Item::assistant_message("hello"), + ]; + assert!(orphan_tool_result_closures(&history).is_empty()); + } + + #[test] + fn paired_call_and_result_is_not_orphan() { + let history = vec![ + Item::tool_call("c1", "Read", "{}"), + Item::tool_result("c1", "ok"), + ]; + assert!(orphan_tool_result_closures(&history).is_empty()); + } + + #[test] + fn unanswered_call_becomes_closure() { + let history = vec![Item::tool_call("c1", "Read", "{}")]; + let out = orphan_tool_result_closures(&history); + assert_eq!(out.len(), 1); + match &out[0] { + Item::ToolResult { + call_id, summary, .. + } => { + assert_eq!(call_id, "c1"); + assert_eq!(summary, INTERRUPT_TOOL_RESULT_SUMMARY); + } + other => panic!("expected ToolResult, got {other:?}"), + } + } + + #[test] + fn multiple_orphans_are_closed_in_order() { + let history = vec![ + Item::tool_call("c1", "Read", "{}"), + Item::tool_call("c2", "Write", "{}"), + Item::tool_result("c1", "ok"), + Item::tool_call("c3", "Grep", "{}"), + ]; + let out = orphan_tool_result_closures(&history); + let ids: Vec<&str> = out + .iter() + .map(|i| match i { + Item::ToolResult { call_id, .. } => call_id.as_str(), + _ => unreachable!(), + }) + .collect(); + assert_eq!(ids, vec!["c2", "c3"]); + } +} diff --git a/crates/pod/src/lib.rs b/crates/pod/src/lib.rs index 46e89e24..32370b93 100644 --- a/crates/pod/src/lib.rs +++ b/crates/pod/src/lib.rs @@ -14,6 +14,7 @@ mod agents_md; mod compact_state; mod compact_worker; mod factory; +mod interrupt_and_run; mod notification_buffer; mod pod; mod pod_interceptor; diff --git a/crates/pod/tests/controller_test.rs b/crates/pod/tests/controller_test.rs index e337be80..f694cb45 100644 --- a/crates/pod/tests/controller_test.rs +++ b/crates/pod/tests/controller_test.rs @@ -3,10 +3,11 @@ use std::sync::{Arc, Mutex}; use std::sync::atomic::{AtomicUsize, Ordering}; use async_trait::async_trait; -use futures::Stream; +use futures::{Stream, StreamExt}; use llm_worker::Worker; use llm_worker::llm_client::event::{Event as LlmEvent, ResponseStatus, StatusEvent}; use llm_worker::llm_client::{ClientError, LlmClient, Request}; +use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput}; use session_store::FsStore; use pod::{Event, Method, Pod, PodController, PodManifest, PodStatus}; @@ -15,17 +16,34 @@ use pod::{Event, Method, Pod, PodController, PodManifest, PodStatus}; // Mock LLM Client // --------------------------------------------------------------------------- +/// One scripted mock response. +#[derive(Clone)] +enum MockResponse { + /// Emit the events and let the stream terminate naturally. + Complete(Vec), + /// Emit the events and then pend forever so the Worker blocks on + /// `stream.next()` — used to exercise the Cancel/Pause path while a + /// turn is actively in flight. + Hang(Vec), +} + #[derive(Clone)] struct MockClient { - responses: Arc>>, + responses: Arc>, call_count: Arc, captured: Arc>>, } impl MockClient { fn new(events: Vec) -> Self { + Self::sequential(vec![MockResponse::Complete(events)]) + } + + /// Script multiple sequential responses. The Nth call to `stream()` + /// returns the Nth entry. + fn sequential(responses: Vec) -> Self { Self { - responses: Arc::new(vec![events]), + responses: Arc::new(responses), call_count: Arc::new(AtomicUsize::new(0)), captured: Arc::new(Mutex::new(Vec::new())), } @@ -56,9 +74,18 @@ impl LlmClient for MockClient { message: "No more responses".into(), }); } - let events = self.responses[count].clone(); - let stream = futures::stream::iter(events.into_iter().map(Ok)); - Ok(Box::pin(stream)) + let response = self.responses[count].clone(); + let (events, hang) = match response { + MockResponse::Complete(e) => (e, false), + MockResponse::Hang(e) => (e, true), + }; + let iter = futures::stream::iter(events.into_iter().map(Ok)); + if hang { + let pending = futures::stream::pending::>(); + Ok(Box::pin(iter.chain(pending))) + } else { + Ok(Box::pin(iter)) + } } } @@ -539,3 +566,328 @@ async fn socket_invalid_method_returns_error() { assert!(saw_error, "should see error for invalid method"); } + +// --------------------------------------------------------------------------- +// Pause / Resume / Paused→Run +// --------------------------------------------------------------------------- + +/// Tool that pends forever when called. Used to park a turn between +/// the ToolCall being committed to history and its ToolResult being +/// produced, so a `Method::Pause` leaves an orphan `tool_use` behind. +struct HangingTool; + +#[async_trait] +impl Tool for HangingTool { + async fn execute(&self, _input: &str) -> Result { + std::future::pending::<()>().await; + unreachable!() + } +} + +fn hanging_tool_definition(name: &'static str) -> ToolDefinition { + Arc::new(move || { + ( + ToolMeta::new(name) + .description("test-only tool that pends forever") + .input_schema(serde_json::json!({"type": "object"})), + Arc::new(HangingTool) as Arc, + ) + }) +} + +async fn drain_until bool>( + rx: &mut tokio::sync::broadcast::Receiver, + timeout: std::time::Duration, + mut done: F, +) -> bool { + let deadline = tokio::time::Instant::now() + timeout; + loop { + tokio::select! { + ev = rx.recv() => { + match ev { + Ok(e) => { if done(&e) { return true; } } + Err(_) => return false, + } + } + _ = tokio::time::sleep_until(deadline) => return false, + } + } +} + +/// Pause mid-stream, then Resume: status round-trips Running → +/// Paused → Running → Idle, and the final history contains exactly +/// one user turn plus the assistant reply produced by the resume call. +#[tokio::test] +async fn pause_then_resume_transitions_and_preserves_history_consistency() { + // Response 1: hang after opening a text block (no stop / completed), + // so the Worker is parked inside the stream read and `cancel_rx` + // races it cleanly on Method::Pause. + let hang = MockResponse::Hang(vec![ + LlmEvent::text_block_start(0), + LlmEvent::text_delta(0, "partial..."), + ]); + // Response 2: a clean assistant reply delivered on Resume. + let ok = MockResponse::Complete(vec![ + LlmEvent::text_block_start(0), + LlmEvent::text_delta(0, "resumed output"), + LlmEvent::text_block_stop(0, None), + LlmEvent::Status(StatusEvent { + status: ResponseStatus::Completed, + }), + ]); + let client = MockClient::sequential(vec![hang, ok]); + let pod = make_pod(client).await; + let handle = spawn_controller(pod).await; + let mut rx = handle.subscribe(); + + handle + .send(Method::Run { + input: "hello".into(), + }) + .await + .unwrap(); + + // Wait for the partial text_delta to confirm the first stream is + // live before we pause. + assert!( + drain_until(&mut rx, std::time::Duration::from_secs(2), |e| matches!( + e, + Event::TextDelta { .. } + )) + .await, + "text_delta should arrive before pause" + ); + + handle.send(Method::Pause).await.unwrap(); + + // The controller emits RunEnd { Paused } when the + // WorkerError::Cancelled is translated under pause_requested. + assert!( + drain_until(&mut rx, std::time::Duration::from_secs(2), |e| matches!( + e, + Event::RunEnd { + result: protocol::RunResult::Paused + } + )) + .await, + "expected RunEnd::Paused after Pause" + ); + + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + assert_eq!(handle.shared_state.get_status(), PodStatus::Paused); + + handle.send(Method::Resume).await.unwrap(); + + assert!( + drain_until(&mut rx, std::time::Duration::from_secs(2), |e| matches!( + e, + Event::RunEnd { + result: protocol::RunResult::Finished + } + )) + .await, + "expected RunEnd::Finished after Resume" + ); + + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + assert_eq!(handle.shared_state.get_status(), PodStatus::Idle); + + // History consistency: exactly [user "hello", assistant + // "resumed output"]. No artifacts from the aborted stream + // (partial text is not committed), no orphan tool_use. + let history_json = handle.shared_state.history_json(); + let items: Vec = serde_json::from_str(&history_json).unwrap(); + let roles: Vec<&str> = items + .iter() + .filter_map(|i| i["role"].as_str()) + .collect(); + assert_eq!( + roles, + vec!["user", "assistant"], + "history = user + assistant only; got {items:?}" + ); + let assistant_text = items[1]["content"] + .as_array() + .and_then(|parts| parts.iter().filter_map(|p| p["text"].as_str()).next()) + .unwrap_or(""); + assert_eq!(assistant_text, "resumed output"); + let has_tool_call = items + .iter() + .any(|i| i["type"].as_str() == Some("tool_call")); + assert!(!has_tool_call, "no orphan tool_call in history"); +} + +/// Paused with an orphan `tool_use` in history + a fresh `Method::Run` +/// must produce a wire-valid next LLM request: the orphan is closed +/// with a synthetic `tool_result`, a system note is inserted, and the +/// new user input is appended. +#[tokio::test] +async fn paused_then_run_closes_orphan_tool_use_for_next_request() { + // Response 1: emit a tool_use block (complete with stop) targeting + // our hanging tool. The Worker commits the ToolCall to history, + // then parks inside `execute_tools` waiting on the tool — which is + // where Method::Pause catches it. + let tool_name = "HangyTool"; + let first = MockResponse::Complete(vec![ + LlmEvent::tool_use_start(0, "call_orphan", tool_name), + LlmEvent::tool_input_delta(0, "{}"), + LlmEvent::tool_use_stop(0), + LlmEvent::Status(StatusEvent { + status: ResponseStatus::Completed, + }), + ]); + // Response 2: ordinary completion after the Paused→Run transition. + let second = MockResponse::Complete(vec![ + LlmEvent::text_block_start(0), + LlmEvent::text_delta(0, "ok"), + LlmEvent::text_block_stop(0, None), + LlmEvent::Status(StatusEvent { + status: ResponseStatus::Completed, + }), + ]); + let client = MockClient::sequential(vec![first, second]); + let client_for_assert = client.clone(); + let mut pod = make_pod(client).await; + pod.worker_mut() + .register_tool(hanging_tool_definition(tool_name)); + let handle = spawn_controller(pod).await; + let mut rx = handle.subscribe(); + + handle + .send(Method::Run { + input: "first".into(), + }) + .await + .unwrap(); + + // Wait for ToolCallDone — the ToolCall is committed to history + // right before the Worker enters tool execution and pends. + assert!( + drain_until(&mut rx, std::time::Duration::from_secs(2), |e| matches!( + e, + Event::ToolCallDone { .. } + )) + .await, + "tool_call_done should arrive before pause" + ); + + handle.send(Method::Pause).await.unwrap(); + assert!( + drain_until(&mut rx, std::time::Duration::from_secs(2), |e| matches!( + e, + Event::RunEnd { + result: protocol::RunResult::Paused + } + )) + .await, + "expected RunEnd::Paused" + ); + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + assert_eq!(handle.shared_state.get_status(), PodStatus::Paused); + + // New user input while Paused → controller routes to + // `Pod::interrupt_and_run`, which closes the orphan + injects a + // system note before the fresh user message. + handle + .send(Method::Run { + input: "new request".into(), + }) + .await + .unwrap(); + assert!( + drain_until(&mut rx, std::time::Duration::from_secs(2), |e| matches!( + e, + Event::RunEnd { + result: protocol::RunResult::Finished + } + )) + .await, + "expected RunEnd::Finished after Paused→Run" + ); + + // The second LLM request carries the closure chain. Walk its items + // and assert the invariants — order matters for wire correctness. + let requests = client_for_assert.captured_requests(); + assert_eq!(requests.len(), 2, "two LLM calls expected"); + let items = &requests[1].items; + + // Find the ToolCall and ensure the immediately-subsequent + // ToolResult (if any) carries the synthetic summary. + let mut saw_synthetic_tool_result = false; + let mut saw_interruption_note = false; + let mut saw_new_user = false; + for item in items { + match item { + llm_worker::Item::ToolResult { + call_id, summary, .. + } if call_id == "call_orphan" => { + assert_eq!(summary, "[Interrupted by user]"); + saw_synthetic_tool_result = true; + } + llm_worker::Item::Message { role, content, .. } + if *role == llm_worker::Role::System => + { + let text: String = content.iter().map(|p| p.as_text()).collect(); + if text.contains("interrupted by the user") { + saw_interruption_note = true; + } + } + llm_worker::Item::Message { role, content, .. } + if *role == llm_worker::Role::User => + { + let text: String = content.iter().map(|p| p.as_text()).collect(); + if text.contains("new request") { + saw_new_user = true; + } + } + _ => {} + } + } + assert!( + saw_synthetic_tool_result, + "synthetic tool_result for orphan missing in 2nd request items: {items:?}" + ); + assert!( + saw_interruption_note, + "system interruption note missing in 2nd request items: {items:?}" + ); + assert!( + saw_new_user, + "new user message missing in 2nd request items: {items:?}" + ); + + // Also confirm the closure chain is ordered: tool_result for the + // orphan precedes the system note, which precedes the new user + // message. + let idx = |pred: &dyn Fn(&llm_worker::Item) -> bool| { + items.iter().position(pred).unwrap() + }; + let tool_result_idx = idx(&|i| matches!(i, llm_worker::Item::ToolResult { call_id, .. } if call_id == "call_orphan")); + let sys_idx = idx(&|i| match i { + llm_worker::Item::Message { + role: llm_worker::Role::System, + content, + .. + } => content + .iter() + .map(|p| p.as_text()) + .collect::() + .contains("interrupted by the user"), + _ => false, + }); + let user_idx = idx(&|i| match i { + llm_worker::Item::Message { + role: llm_worker::Role::User, + content, + .. + } => content + .iter() + .map(|p| p.as_text()) + .collect::() + .contains("new request"), + _ => false, + }); + assert!(tool_result_idx < sys_idx, "tool_result must precede system note"); + assert!(sys_idx < user_idx, "system note must precede new user message"); +} + diff --git a/crates/protocol/src/lib.rs b/crates/protocol/src/lib.rs index 84a15e08..651dbe9b 100644 --- a/crates/protocol/src/lib.rs +++ b/crates/protocol/src/lib.rs @@ -20,6 +20,13 @@ pub enum Method { PodEvent(PodEvent), Resume, Cancel, + /// Stop the in-flight turn and transition to `Paused`. + /// + /// Unlike `Cancel` (which discards and returns to `Idle`), a paused + /// Pod can resume the interrupted work via `Resume`, or start a + /// fresh turn via `Run` (orphan `tool_use` items are closed with a + /// synthetic tool result before the new user message is appended). + Pause, Shutdown, GetHistory, } @@ -265,6 +272,15 @@ mod tests { assert!(matches!(method, Method::Resume)); } + #[test] + fn method_pause_roundtrip() { + let json = r#"{"method":"pause"}"#; + let method: Method = serde_json::from_str(json).unwrap(); + assert!(matches!(method, Method::Pause)); + let serialized = serde_json::to_string(&method).unwrap(); + assert_eq!(serialized, json); + } + #[test] fn event_text_delta_format() { let event = Event::TextDelta { diff --git a/crates/tui/src/app.rs b/crates/tui/src/app.rs index 31604e80..287ce2c9 100644 --- a/crates/tui/src/app.rs +++ b/crates/tui/src/app.rs @@ -1,9 +1,13 @@ -use protocol::{Event, Greeting, Method, NotificationLevel, NotificationSource}; +use protocol::{Event, Greeting, Method, NotificationLevel, NotificationSource, RunResult}; pub struct App { pub pod_name: String, pub connected: bool, pub running: bool, + /// True while the Pod is in `PodStatus::Paused`. Set on + /// `RunEnd::Paused` and cleared when a new turn starts (either via + /// `Resume` or a fresh `Run`). + pub paused: bool, pub run_requests: usize, pub run_input_tokens: u64, pub run_output_tokens: u64, @@ -13,6 +17,10 @@ pub struct App { pub cursor: usize, pub quit: bool, pub shutdown_confirm: Option, + /// 2-tap guard for `Ctrl-C` when the Pod is not running. First press + /// records the instant; a second press within the timeout exits the + /// TUI (the Pod itself stays alive). + pub quit_confirm: Option, /// Lines waiting to be flushed to terminal via insert_before. pub output_queue: Vec, /// Partial streaming text not yet terminated by newline. @@ -48,6 +56,7 @@ impl App { pod_name, connected: false, running: false, + paused: false, run_requests: 0, run_input_tokens: 0, run_output_tokens: 0, @@ -57,6 +66,7 @@ impl App { cursor: 0, quit: false, shutdown_confirm: None, + quit_confirm: None, output_queue: Vec::new(), pending_text: String::new(), } @@ -65,6 +75,11 @@ impl App { pub fn submit_input(&mut self) -> Option { let text = self.input.trim().to_owned(); if text.is_empty() { + // Empty Enter only does something meaningful when the Pod + // is paused: resume the interrupted turn. Otherwise no-op. + if self.paused { + return Some(Method::Resume); + } return None; } self.turn_index += 1; @@ -83,6 +98,7 @@ impl App { match event { Event::TurnStart { .. } => { self.running = true; + self.paused = false; self.run_requests += 1; self.current_tool = None; } @@ -154,7 +170,7 @@ impl App { format!("[{code:?}] {message}"), )); } - Event::RunEnd { .. } => { + Event::RunEnd { result } => { self.output_queue.push(OutputItem::PaddedRight( MessageKind::TurnStats, format!( @@ -166,6 +182,7 @@ impl App { )); self.output_queue.push(OutputItem::Blank); self.running = false; + self.paused = matches!(result, RunResult::Paused); self.run_requests = 0; self.run_input_tokens = 0; self.run_output_tokens = 0; diff --git a/crates/tui/src/main.rs b/crates/tui/src/main.rs index 1331e5ff..d96a0f78 100644 --- a/crates/tui/src/main.rs +++ b/crates/tui/src/main.rs @@ -141,14 +141,14 @@ async fn run_loop( Ok(()) } -fn run_disconnected(app: &mut App) -> Result<(), Box> { +fn run_disconnected(_app: &mut App) -> Result<(), Box> { loop { if event::poll(std::time::Duration::from_millis(100))? { if let TermEvent::Key(key) = event::read()? { - match key.code { - KeyCode::Esc => break, - KeyCode::Char('c') if key.modifiers.contains(KeyModifiers::CONTROL) => break, - _ => {} + if let KeyCode::Char('c') = key.code { + if key.modifiers.contains(KeyModifiers::CONTROL) { + break; + } } } } @@ -158,16 +158,20 @@ fn run_disconnected(app: &mut App) -> Result<(), Box> { fn handle_key(app: &mut App, key: KeyEvent) -> Option { match key.code { - KeyCode::Esc => { - app.quit = true; - None - } KeyCode::Char('c') if key.modifiers.contains(KeyModifiers::CONTROL) => { - app.quit = true; - None + handle_pause_or_quit(app) + } + KeyCode::Char('x') if key.modifiers.contains(KeyModifiers::CONTROL) => { + if app.running { + Some(Method::Cancel) + } else { + app.output_queue.push(app::OutputItem::Padded( + app::MessageKind::Error, + "Nothing to cancel (Pod is not running).".into(), + )); + None + } } - KeyCode::Char('r') if key.modifiers.contains(KeyModifiers::CONTROL) => Some(Method::Resume), - KeyCode::Char('x') if key.modifiers.contains(KeyModifiers::CONTROL) => Some(Method::Cancel), KeyCode::Char('d') if key.modifiers.contains(KeyModifiers::CONTROL) => { return handle_shutdown(app); } @@ -204,14 +208,14 @@ fn handle_key(app: &mut App, key: KeyEvent) -> Option { } } -const SHUTDOWN_CONFIRM_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(3); +const CONFIRM_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(3); fn handle_shutdown(app: &mut App) -> Option { if !app.running { return Some(Method::Shutdown); } if let Some(t) = app.shutdown_confirm { - if t.elapsed() < SHUTDOWN_CONFIRM_TIMEOUT { + if t.elapsed() < CONFIRM_TIMEOUT { app.shutdown_confirm = None; return Some(Method::Shutdown); } @@ -223,3 +227,24 @@ fn handle_shutdown(app: &mut App) -> Option { )); None } + +/// Running → send `Method::Pause`. +/// Idle / Paused → 2-tap to quit the TUI (the Pod keeps running). +fn handle_pause_or_quit(app: &mut App) -> Option { + if app.running { + return Some(Method::Pause); + } + if let Some(t) = app.quit_confirm { + if t.elapsed() < CONFIRM_TIMEOUT { + app.quit_confirm = None; + app.quit = true; + return None; + } + } + app.quit_confirm = Some(std::time::Instant::now()); + app.output_queue.push(app::OutputItem::Padded( + app::MessageKind::Error, + "Press Ctrl-C again within 3 s to exit the TUI (the Pod keeps running).".into(), + )); + None +} diff --git a/crates/tui/src/ui.rs b/crates/tui/src/ui.rs index cebf0855..75585e29 100644 --- a/crates/tui/src/ui.rs +++ b/crates/tui/src/ui.rs @@ -170,6 +170,18 @@ fn draw_status(frame: &mut Frame, app: &App, area: Rect) { }; spans.push(Span::raw(" | ")); spans.push(Span::styled(status, Style::default().fg(Color::Yellow))); + } else if app.paused { + spans.push(Span::raw(" | ")); + spans.push(Span::styled( + "paused", + Style::default() + .fg(Color::Cyan) + .add_modifier(Modifier::BOLD), + )); + spans.push(Span::styled( + " — Enter to resume, type to start new turn", + Style::default().fg(Color::DarkGray), + )); } else { spans.push(Span::styled(" idle", Style::default().fg(Color::DarkGray))); } diff --git a/tickets/user-pause.md b/tickets/user-pause.md new file mode 100644 index 00000000..b9e8ebf5 --- /dev/null +++ b/tickets/user-pause.md @@ -0,0 +1,184 @@ +# ユーザ起点の Pause と Resume + +## 背景 + +現状 `PodStatus::Paused` に到達する経路は interceptor フック(`PreToolAction::Pause` / `TurnEndAction::Pause`)のみで、**ユーザが TUI から Paused 状態に落とす手段がない**。結果として `Method::Resume` と TUI の `Ctrl-R` は実装されているのに発火機会がなく死に筋になっている。 + +`Method::Cancel`(現 TUI `Ctrl-X`)はハード中止で `PodStatus::Idle` に落ちる — turn を破棄したい時には使えるが、続きから再開する手段がない。 + +ユーザが「今の LLM リクエストを止めて、続きは後で再開したい」「止めて、別のことを先にやりたい」という操作が可能な primitive と UI を整える。 + +## 依存 + +- `Method::Resume` / `PodStatus::Paused`: 既に実装済み、変更不要 +- `cancel_tx` による worker 割り込み: 既に実装済み +- `Method::PodEvent` / `Method::Notify` の既存プロトコル拡張パターンを踏襲 + +## 設計 + +### 新しい primitive + +```rust +pub enum Method { + Run { input: String }, + Notify { message: String }, + PodEvent(PodEvent), + Resume, + Cancel, + Pause, // 新: 実行中の turn を止めて Paused 状態に落とす + Shutdown, + GetHistory, +} +``` + +**Cancel / Pause / Shutdown のセマンティクス分離**: + +| Method | 実行中に受けた時 | 結果状態 | 用途 | +|---|---|---|---| +| `Cancel` | cancel_tx 発火、turn を破棄 | `Idle` | 中止して捨てる | +| `Pause` | cancel_tx 発火、turn を中断 | `Paused` | 止めるけど続きは後で | +| `Shutdown` | cancel_tx 発火、controller loop 終了 | プロセス終了 | Pod を落とす | + +### Controller 側の変更 + +`run_with_cancel_support` に `pause_requested` フラグを導入: + +```rust +async fn run_with_cancel_support(...) -> (PodStatus, bool) { + let mut shutdown_requested = false; + let mut pause_requested = false; + + loop { + tokio::select! { + result = &mut pod_future => { + return match result { + Ok(r) => { ... } + Err(PodError::Worker(WorkerError::Cancelled)) if pause_requested => { + // Pause 時は upward PodEvent::Errored を抑止 + (PodStatus::Paused, shutdown_requested) + } + Err(e) => { ... PodEvent::Errored 発火 → (PodStatus::Idle, ...) } + }; + } + method = method_rx.recv() => { + match method { + Some(Method::Cancel) => { cancel_tx.try_send(()); } + Some(Method::Pause) => { + pause_requested = true; + cancel_tx.try_send(()); + } + Some(Method::Shutdown) => { ... } + ... + } + } + } + } +} +``` + +worker の割り込みは `cancel_tx` 一本で共通、Cancel と Pause の区別は pause フラグの有無で controller 側が判定する。 + +**Pause 時の upward PodEvent 抑止**: `PodEvent::Errored` は worker エラー限定の報告なので、ユーザ起点 Pause では発火させない。親 Pod への「子が Paused になった」通知は本チケットでは実装しない(必要なら別チケットで `PodEvent::Paused` variant を追加)。 + +**自動 turn 中の Pause**: `Method::Notify` の IDLE 自動起動、`Method::PodEvent` の auto-kick も同じ `run_with_cancel_support` を通るので Pause は同じ挙動で効く。 + +### 割り込みタイミングと history の整合性 + +`cancel_tx` 発火時の history 状態: + +- **LLM ストリーム中**: block collector の Stop が発火していないので partial text / tool_call は history に入らない → history は前ラウンド末で consistent +- **Tool 実行中**: tool 自体は cancellation を知らず完了を待つ。完了後の次チェックポイントで Cancelled → history には tool_result が書かれる、orphan なし +- **tool_call 確定後 ~ tool 実行前**: orphan `tool_use` が history に残りうる +- **ラウンド間待機**: consistent、orphan なし + +Resume と Pause→Run で扱いが分かれる。 + +### Resume の挙動 + +`worker.resume()` は既存実装のままで全ケース対応: + +- Partial text で止まっていた場合: history は前ラウンド末なので、同じリクエストを再送 → LLM が新しく生成 +- Orphan `tool_use` が残っている場合: `run_turn_loop` が `get_pending_tool_calls()` で拾って tool を実行 +- 綺麗な境界で止まっていた場合: そのまま次ラウンドへ + +Resume の仕様は変更なし。 + +### Paused → Run(新しい turn として開始) + +ユーザが Paused 中に新しい入力を投げたら、**「現 turn は終わった」として新しい turn を開始**する。orphan `tool_use` がある場合は history に下記を順に追加してから `worker.run(input)` を呼ぶ: + +1. 各 orphan `tool_use` に対して synthetic `tool_result`(summary: `"[Interrupted by user]"`、content なし)を inject — wire 互換性のため +2. `Item::system_message`(内容: `"[The previous turn was interrupted by the user. The user's next request follows.]"`)を inject — LLM に文脈を伝える +3. 新しい user message(`Run` の input)を append +4. `run_turn_loop` に入る + +実装する API の目安:`Pod::interrupt_and_run(input: String)` みたいな名前で上記 4 ステップを提供。Controller が Paused → Run 遷移時に呼ぶ。 + +注: 実機テストで Anthropic が orphan `tool_use`(次 user turn に `tool_result` が含まれない形)を許容するようなら step 1 は削除できる。まずは安全側に倒して両方入れる。 + +### Idle / Paused 時の Pause / Cancel + +- `Pause` when `Idle`: `Event::Error { NotRunning }` を返す(`Cancel` と同じ) +- `Pause` when `Paused`: no-op(既に Paused) +- `Cancel` when `Idle` / `Paused`: `NotRunning` エラー(既存の Cancel 挙動と揃える) + +### TUI のキー割り当て + +現状:`Ctrl-X` = Cancel、`Ctrl-R` = Resume、`Ctrl-D` = Shutdown(running なら 2 連打確認)、`Esc` = TUI 終了、`Ctrl-C` = TUI 終了。 + +変更後: + +| キー | Running 時 | Idle / Paused 時 | +|---|---|---| +| `Ctrl-X` | `Method::Cancel`(破棄 → Idle) | no-op(エラー表示) | +| `Ctrl-C` | `Method::Pause`(→ Paused) | 1 回目 warn / 3 秒以内の 2 回目で TUI 終了(Pod は残る) | +| `Ctrl-D` | 1 回目 warn / 3 秒以内の 2 回目で `Method::Shutdown` | `Method::Shutdown`(Pod を落とす) | +| `Enter` (入力あり) | —(入力は TUI 側でバッファ) | Idle なら `Method::Run`。Paused なら同じく `Method::Run`(Controller が前 turn を打ち切って新 turn 開始) | +| `Enter` (空) | — | Paused なら `Method::Resume`。Idle なら no-op | + +`Ctrl-R` と `Esc` は廃止。 + +Ctrl-C の 2 段階 UX は Ctrl-D と対称:running 中の 1 回目は安全な中断(Pause)、非 running 時の 1 回目は確認 warn、どちらも連打で最終アクション。 + +### Paused 状態の TUI 表示 + +現状 `App` は `running: bool` のみで Paused を区別していない。`ui.rs:draw_status` も「running / idle」の 2 状態のみ。 + +追加: + +- `App` に `paused: bool`(または `status: enum { Idle, Running, Paused }`) +- `Event::RunEnd { result: RunResult::Paused }` で `paused = true`、`running = false` +- `Event::TurnStart` または新しい Run / Resume 送信時に clear +- `draw_status` に paused 分岐を追加、hint は `Enter to resume, type to start new turn` + +## 影響範囲 + +- `crates/protocol/src/lib.rs`: `Method::Pause` variant 追加、serde round-trip テスト +- `crates/pod/src/controller.rs`: `run_with_cancel_support` に `pause_requested` 追加、Paused → Run の interrupt 処理 +- `crates/pod/src/pod.rs`: `interrupt_and_run(input)` 相当の API(orphan 閉じ + system note inject + run) +- `crates/tui/src/app.rs`: `paused` フラグ、`submit_input` の Paused 分岐(空なら Resume) +- `crates/tui/src/main.rs`: `handle_key` のキー割り当て変更(Ctrl-X / Ctrl-C / Ctrl-D)、Ctrl-C の 2 連打 Quit、`Ctrl-R` / `Esc` 削除 +- `crates/tui/src/ui.rs`: `draw_status` の paused 分岐 +- `docs/tui-keybindings.md`: 既存ファイルを更新 +- 既存の interceptor-driven Pause(`PreToolAction::Pause` / `TurnEndAction::Pause`)は挙動不変 + +## 完了条件 + +- `Method::Pause` が protocol crate に追加され、serde round-trip テストが通る +- 実行中 Pod に `Method::Pause` を送ると `PodStatus::Paused` に落ち、`Method::Resume` で続きから再開できる +- TUI で `Ctrl-C` を押すと Pause される(LLM ストリーム中・tool 実行中どちらからでも、Notify / PodEvent 起点の自動 turn 中でも) +- Pause 時に親 Pod への `PodEvent::Errored` が飛ばない(upward 通知抑止) +- Paused 中に空 Enter → Resume、入力あり Enter → 新 turn として Run(orphan tool_use は synthetic tool_result + system note で閉じる) +- Paused → Run 後、LLM への送信が wire 上正しい(orphan tool_use が解消されている)ことを統合テストで確認 +- Pause → Resume の history consistency を統合テストで確認 +- `Ctrl-X` = Cancel(破棄 → Idle)、`Ctrl-C` = Pause / 2 連打で TUI 終了、`Ctrl-D` = Shutdown(running 中は 2 連打) +- `Ctrl-R` / `Esc` キーが無効化されている +- TUI status line に Paused 状態と Enter ヒントが表示される +- 既存の Cancel / Shutdown / interceptor-driven Pause の挙動が回帰しない + +## 範囲外 + +- 複数クライアントが同時に Pod を触る時の race。最後の勝ち or 冪等挙動で十分 +- Tool 実行の cancellation 対応(長時間 tool を interrupt する仕組み) +- 親 Pod への `PodEvent::Paused` 通知 +- Anthropic が orphan `tool_use` を本当に 400 で弾くかの実機検証。まず synthetic tool_result を入れて安全側で実装、許容されるなら後で削る diff --git a/tickets/user-pause.review.md b/tickets/user-pause.review.md new file mode 100644 index 00000000..1e8b5d40 --- /dev/null +++ b/tickets/user-pause.review.md @@ -0,0 +1,34 @@ +# user-pause レビュー + +## 結論 + +主要ロジック(protocol / controller / Pod / TUI)は仕様通りで、ビルドと既存テストはすべて green。**ただしチケット完了条件で明示されている統合テスト 3 ケースが未実装のため、現状では完了条件未達**。 + +## 良い点 + +- `protocol` の `Method::Pause` variant 追加と serde round-trip テスト +- `controller.rs::run_with_cancel_support` の `pause_requested` フラグ設計と `PodEvent::Errored` の upward 抑止が仕様通り +- `Method::Pause` の Idle / Paused 区別(NotRunning エラー / no-op) +- `interrupt_and_run.rs` の責務分離: orphan 検出を純粋関数 `orphan_tool_result_closures` に切り出し、ユニットテスト 4 ケースで境界条件を網羅 +- TUI の Ctrl-C 2 連打 UX と Ctrl-D との対称性。`paused` フラグを `Event::TurnStart` でクリアする経路は Resume / Run 両方でカバーされる +- `docs/tui-keybindings.md` が人間向け解説として丁寧。Cancel と Pause の使い分け、`Ctrl-R` / `Esc` 廃止経緯まで言及 + +## 指摘事項 + +### 1. 統合テスト不足(completion blocker) + +チケット完了条件のうち、以下 3 項目に対応する統合テストが `crates/pod/tests/controller_test.rs` に追加されていない: + +- 実行中 Pod に `Method::Pause` を送ると `PodStatus::Paused` に落ち、`Method::Resume` で続きから再開できる +- Paused → Run 後、LLM への送信が wire 上正しい(orphan `tool_use` が解消されている) +- Pause → Resume の history consistency + +`interrupt_and_run.rs` のユニットテストは関数単位の検証であり、controller を通したエンドツーエンドの確認とは別物。既存の `resume_without_pause_returns_error` と同形式で 3 ケース追加すること。 + +### 2. ライフサイクル運用 + +`tickets/user-pause.md`, `crates/pod/src/interrupt_and_run.rs` 等が untracked / unstaged。CLAUDE.md のチケット運用(作成 commit → 詳細化 commit → レビュー commit)に沿うなら、実装と分けて commit すべき。 + +## 判定 + +統合テスト 3 ケースを追加した時点で完了条件達成。それまでは未完了。