TUIからPauseする実装

This commit is contained in:
Keisuke Hirata 2026-04-19 14:27:53 +09:00
parent 605e78468c
commit 223d06c77e
10 changed files with 823 additions and 25 deletions

View File

@ -256,18 +256,27 @@ impl PodController {
match method {
Method::Run { input } => {
if shared_state.get_status() != PodStatus::Idle {
let status_before = shared_state.get_status();
if status_before == PodStatus::Running {
let _ = event_tx.send(Event::Error {
code: ErrorCode::AlreadyRunning,
message: "Pod is already executing a turn".into(),
});
continue;
}
let was_paused = status_before == PodStatus::Paused;
shared_state.set_status(PodStatus::Running);
let _ = runtime_dir.write_status(&shared_state).await;
let run_future = async {
if was_paused {
pod.interrupt_and_run(input).await
} else {
pod.run(input).await
}
};
let (new_status, shutdown) = run_with_cancel_support(
pod.run(&input),
run_future,
&mut method_rx,
&event_tx,
&cancel_tx,
@ -406,6 +415,19 @@ impl PodController {
});
}
Method::Pause => {
// Already paused → idempotent no-op. Otherwise
// the Pod is Idle (Running turns go through
// `run_with_cancel_support`, not this outer
// match), so there is nothing to pause.
if shared_state.get_status() != PodStatus::Paused {
let _ = event_tx.send(Event::Error {
code: ErrorCode::NotRunning,
message: "Pod is not running".into(),
});
}
}
Method::Shutdown => {
let _ = event_tx.send(Event::Shutdown);
break;
@ -529,6 +551,7 @@ where
{
tokio::pin!(pod_future);
let mut shutdown_requested = false;
let mut pause_requested = false;
loop {
tokio::select! {
@ -551,6 +574,15 @@ where
}
(status, shutdown_requested)
}
Err(PodError::Worker(WorkerError::Cancelled)) if pause_requested => {
// User-initiated Pause. Report the transition to
// clients as a normal Paused run-end, and
// intentionally skip `PodEvent::Errored` upward:
// that channel is reserved for worker runtime
// failures, not deliberate interruptions.
let _ = event_tx.send(Event::RunEnd { result: RunResult::Paused });
(PodStatus::Paused, shutdown_requested)
}
Err(e) => {
let code = worker_error_code(&e);
let message = e.to_string();
@ -574,6 +606,10 @@ where
Some(Method::Cancel) => {
let _ = cancel_tx.try_send(());
}
Some(Method::Pause) => {
pause_requested = true;
let _ = cancel_tx.try_send(());
}
Some(Method::Shutdown) => {
shutdown_requested = true;
let _ = cancel_tx.try_send(());

View File

@ -0,0 +1,121 @@
//! Transition from `Paused` to a fresh turn via user input.
//!
//! The previously in-flight turn is treated as finished. Any orphan
//! `Item::ToolCall` (tool_use emitted by the LLM but whose tool did not
//! run to completion before the pause) is closed with a synthetic
//! `Item::ToolResult` so the next request is wire-valid under providers
//! that require every `tool_use` to be followed by a matching
//! `tool_result` (Anthropic). A short system note is then inserted so
//! the LLM understands the prior work was cut short, and finally the
//! user's new input is appended via `worker.run(input)`.
use llm_worker::Item;
use llm_worker::llm_client::client::LlmClient;
use session_store::Store;
use crate::pod::{Pod, PodError, PodRunResult};
const INTERRUPT_TOOL_RESULT_SUMMARY: &str = "[Interrupted by user]";
const INTERRUPT_SYSTEM_NOTE: &str =
"[The previous turn was interrupted by the user. The user's next request follows.]";
impl<C: LlmClient, St: Store> Pod<C, St> {
/// Close out the current (paused) turn and start a new one with `input`.
///
/// Invoked by the controller when a `Method::Run` arrives while the
/// Pod is `Paused`. See module docs for the wire-compatibility
/// rationale around synthetic tool results.
pub async fn interrupt_and_run(
&mut self,
input: impl Into<String>,
) -> Result<PodRunResult, PodError> {
let closures: Vec<Item> = orphan_tool_result_closures(self.worker().history());
if !closures.is_empty() {
self.worker_mut().extend_history(closures);
}
self.worker_mut()
.push_item(Item::system_message(INTERRUPT_SYSTEM_NOTE));
self.run(input).await
}
}
/// Build synthetic `Item::ToolResult` items for every unanswered
/// `Item::ToolCall` in `history`, preserving order.
fn orphan_tool_result_closures(history: &[Item]) -> Vec<Item> {
let mut answered: std::collections::HashSet<&str> = std::collections::HashSet::new();
for item in history {
if let Item::ToolResult { call_id, .. } = item {
answered.insert(call_id.as_str());
}
}
let mut out = Vec::new();
for item in history {
if let Item::ToolCall { call_id, .. } = item {
if !answered.contains(call_id.as_str()) {
out.push(Item::tool_result(
call_id.clone(),
INTERRUPT_TOOL_RESULT_SUMMARY,
));
}
}
}
out
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn no_orphans_returns_empty() {
let history = vec![
Item::user_message("hi"),
Item::assistant_message("hello"),
];
assert!(orphan_tool_result_closures(&history).is_empty());
}
#[test]
fn paired_call_and_result_is_not_orphan() {
let history = vec![
Item::tool_call("c1", "Read", "{}"),
Item::tool_result("c1", "ok"),
];
assert!(orphan_tool_result_closures(&history).is_empty());
}
#[test]
fn unanswered_call_becomes_closure() {
let history = vec![Item::tool_call("c1", "Read", "{}")];
let out = orphan_tool_result_closures(&history);
assert_eq!(out.len(), 1);
match &out[0] {
Item::ToolResult {
call_id, summary, ..
} => {
assert_eq!(call_id, "c1");
assert_eq!(summary, INTERRUPT_TOOL_RESULT_SUMMARY);
}
other => panic!("expected ToolResult, got {other:?}"),
}
}
#[test]
fn multiple_orphans_are_closed_in_order() {
let history = vec![
Item::tool_call("c1", "Read", "{}"),
Item::tool_call("c2", "Write", "{}"),
Item::tool_result("c1", "ok"),
Item::tool_call("c3", "Grep", "{}"),
];
let out = orphan_tool_result_closures(&history);
let ids: Vec<&str> = out
.iter()
.map(|i| match i {
Item::ToolResult { call_id, .. } => call_id.as_str(),
_ => unreachable!(),
})
.collect();
assert_eq!(ids, vec!["c2", "c3"]);
}
}

View File

@ -14,6 +14,7 @@ mod agents_md;
mod compact_state;
mod compact_worker;
mod factory;
mod interrupt_and_run;
mod notification_buffer;
mod pod;
mod pod_interceptor;

View File

@ -3,10 +3,11 @@ use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicUsize, Ordering};
use async_trait::async_trait;
use futures::Stream;
use futures::{Stream, StreamExt};
use llm_worker::Worker;
use llm_worker::llm_client::event::{Event as LlmEvent, ResponseStatus, StatusEvent};
use llm_worker::llm_client::{ClientError, LlmClient, Request};
use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput};
use session_store::FsStore;
use pod::{Event, Method, Pod, PodController, PodManifest, PodStatus};
@ -15,17 +16,34 @@ use pod::{Event, Method, Pod, PodController, PodManifest, PodStatus};
// Mock LLM Client
// ---------------------------------------------------------------------------
/// One scripted mock response.
#[derive(Clone)]
enum MockResponse {
/// Emit the events and let the stream terminate naturally.
Complete(Vec<LlmEvent>),
/// Emit the events and then pend forever so the Worker blocks on
/// `stream.next()` — used to exercise the Cancel/Pause path while a
/// turn is actively in flight.
Hang(Vec<LlmEvent>),
}
#[derive(Clone)]
struct MockClient {
responses: Arc<Vec<Vec<LlmEvent>>>,
responses: Arc<Vec<MockResponse>>,
call_count: Arc<AtomicUsize>,
captured: Arc<Mutex<Vec<Request>>>,
}
impl MockClient {
fn new(events: Vec<LlmEvent>) -> Self {
Self::sequential(vec![MockResponse::Complete(events)])
}
/// Script multiple sequential responses. The Nth call to `stream()`
/// returns the Nth entry.
fn sequential(responses: Vec<MockResponse>) -> Self {
Self {
responses: Arc::new(vec![events]),
responses: Arc::new(responses),
call_count: Arc::new(AtomicUsize::new(0)),
captured: Arc::new(Mutex::new(Vec::new())),
}
@ -56,9 +74,18 @@ impl LlmClient for MockClient {
message: "No more responses".into(),
});
}
let events = self.responses[count].clone();
let stream = futures::stream::iter(events.into_iter().map(Ok));
Ok(Box::pin(stream))
let response = self.responses[count].clone();
let (events, hang) = match response {
MockResponse::Complete(e) => (e, false),
MockResponse::Hang(e) => (e, true),
};
let iter = futures::stream::iter(events.into_iter().map(Ok));
if hang {
let pending = futures::stream::pending::<Result<LlmEvent, ClientError>>();
Ok(Box::pin(iter.chain(pending)))
} else {
Ok(Box::pin(iter))
}
}
}
@ -539,3 +566,328 @@ async fn socket_invalid_method_returns_error() {
assert!(saw_error, "should see error for invalid method");
}
// ---------------------------------------------------------------------------
// Pause / Resume / Paused→Run
// ---------------------------------------------------------------------------
/// Tool that pends forever when called. Used to park a turn between
/// the ToolCall being committed to history and its ToolResult being
/// produced, so a `Method::Pause` leaves an orphan `tool_use` behind.
struct HangingTool;
#[async_trait]
impl Tool for HangingTool {
async fn execute(&self, _input: &str) -> Result<ToolOutput, ToolError> {
std::future::pending::<()>().await;
unreachable!()
}
}
fn hanging_tool_definition(name: &'static str) -> ToolDefinition {
Arc::new(move || {
(
ToolMeta::new(name)
.description("test-only tool that pends forever")
.input_schema(serde_json::json!({"type": "object"})),
Arc::new(HangingTool) as Arc<dyn Tool>,
)
})
}
async fn drain_until<F: FnMut(&Event) -> bool>(
rx: &mut tokio::sync::broadcast::Receiver<Event>,
timeout: std::time::Duration,
mut done: F,
) -> bool {
let deadline = tokio::time::Instant::now() + timeout;
loop {
tokio::select! {
ev = rx.recv() => {
match ev {
Ok(e) => { if done(&e) { return true; } }
Err(_) => return false,
}
}
_ = tokio::time::sleep_until(deadline) => return false,
}
}
}
/// Pause mid-stream, then Resume: status round-trips Running →
/// Paused → Running → Idle, and the final history contains exactly
/// one user turn plus the assistant reply produced by the resume call.
#[tokio::test]
async fn pause_then_resume_transitions_and_preserves_history_consistency() {
// Response 1: hang after opening a text block (no stop / completed),
// so the Worker is parked inside the stream read and `cancel_rx`
// races it cleanly on Method::Pause.
let hang = MockResponse::Hang(vec![
LlmEvent::text_block_start(0),
LlmEvent::text_delta(0, "partial..."),
]);
// Response 2: a clean assistant reply delivered on Resume.
let ok = MockResponse::Complete(vec![
LlmEvent::text_block_start(0),
LlmEvent::text_delta(0, "resumed output"),
LlmEvent::text_block_stop(0, None),
LlmEvent::Status(StatusEvent {
status: ResponseStatus::Completed,
}),
]);
let client = MockClient::sequential(vec![hang, ok]);
let pod = make_pod(client).await;
let handle = spawn_controller(pod).await;
let mut rx = handle.subscribe();
handle
.send(Method::Run {
input: "hello".into(),
})
.await
.unwrap();
// Wait for the partial text_delta to confirm the first stream is
// live before we pause.
assert!(
drain_until(&mut rx, std::time::Duration::from_secs(2), |e| matches!(
e,
Event::TextDelta { .. }
))
.await,
"text_delta should arrive before pause"
);
handle.send(Method::Pause).await.unwrap();
// The controller emits RunEnd { Paused } when the
// WorkerError::Cancelled is translated under pause_requested.
assert!(
drain_until(&mut rx, std::time::Duration::from_secs(2), |e| matches!(
e,
Event::RunEnd {
result: protocol::RunResult::Paused
}
))
.await,
"expected RunEnd::Paused after Pause"
);
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
assert_eq!(handle.shared_state.get_status(), PodStatus::Paused);
handle.send(Method::Resume).await.unwrap();
assert!(
drain_until(&mut rx, std::time::Duration::from_secs(2), |e| matches!(
e,
Event::RunEnd {
result: protocol::RunResult::Finished
}
))
.await,
"expected RunEnd::Finished after Resume"
);
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
assert_eq!(handle.shared_state.get_status(), PodStatus::Idle);
// History consistency: exactly [user "hello", assistant
// "resumed output"]. No artifacts from the aborted stream
// (partial text is not committed), no orphan tool_use.
let history_json = handle.shared_state.history_json();
let items: Vec<serde_json::Value> = serde_json::from_str(&history_json).unwrap();
let roles: Vec<&str> = items
.iter()
.filter_map(|i| i["role"].as_str())
.collect();
assert_eq!(
roles,
vec!["user", "assistant"],
"history = user + assistant only; got {items:?}"
);
let assistant_text = items[1]["content"]
.as_array()
.and_then(|parts| parts.iter().filter_map(|p| p["text"].as_str()).next())
.unwrap_or("");
assert_eq!(assistant_text, "resumed output");
let has_tool_call = items
.iter()
.any(|i| i["type"].as_str() == Some("tool_call"));
assert!(!has_tool_call, "no orphan tool_call in history");
}
/// Paused with an orphan `tool_use` in history + a fresh `Method::Run`
/// must produce a wire-valid next LLM request: the orphan is closed
/// with a synthetic `tool_result`, a system note is inserted, and the
/// new user input is appended.
#[tokio::test]
async fn paused_then_run_closes_orphan_tool_use_for_next_request() {
// Response 1: emit a tool_use block (complete with stop) targeting
// our hanging tool. The Worker commits the ToolCall to history,
// then parks inside `execute_tools` waiting on the tool — which is
// where Method::Pause catches it.
let tool_name = "HangyTool";
let first = MockResponse::Complete(vec![
LlmEvent::tool_use_start(0, "call_orphan", tool_name),
LlmEvent::tool_input_delta(0, "{}"),
LlmEvent::tool_use_stop(0),
LlmEvent::Status(StatusEvent {
status: ResponseStatus::Completed,
}),
]);
// Response 2: ordinary completion after the Paused→Run transition.
let second = MockResponse::Complete(vec![
LlmEvent::text_block_start(0),
LlmEvent::text_delta(0, "ok"),
LlmEvent::text_block_stop(0, None),
LlmEvent::Status(StatusEvent {
status: ResponseStatus::Completed,
}),
]);
let client = MockClient::sequential(vec![first, second]);
let client_for_assert = client.clone();
let mut pod = make_pod(client).await;
pod.worker_mut()
.register_tool(hanging_tool_definition(tool_name));
let handle = spawn_controller(pod).await;
let mut rx = handle.subscribe();
handle
.send(Method::Run {
input: "first".into(),
})
.await
.unwrap();
// Wait for ToolCallDone — the ToolCall is committed to history
// right before the Worker enters tool execution and pends.
assert!(
drain_until(&mut rx, std::time::Duration::from_secs(2), |e| matches!(
e,
Event::ToolCallDone { .. }
))
.await,
"tool_call_done should arrive before pause"
);
handle.send(Method::Pause).await.unwrap();
assert!(
drain_until(&mut rx, std::time::Duration::from_secs(2), |e| matches!(
e,
Event::RunEnd {
result: protocol::RunResult::Paused
}
))
.await,
"expected RunEnd::Paused"
);
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
assert_eq!(handle.shared_state.get_status(), PodStatus::Paused);
// New user input while Paused → controller routes to
// `Pod::interrupt_and_run`, which closes the orphan + injects a
// system note before the fresh user message.
handle
.send(Method::Run {
input: "new request".into(),
})
.await
.unwrap();
assert!(
drain_until(&mut rx, std::time::Duration::from_secs(2), |e| matches!(
e,
Event::RunEnd {
result: protocol::RunResult::Finished
}
))
.await,
"expected RunEnd::Finished after Paused→Run"
);
// The second LLM request carries the closure chain. Walk its items
// and assert the invariants — order matters for wire correctness.
let requests = client_for_assert.captured_requests();
assert_eq!(requests.len(), 2, "two LLM calls expected");
let items = &requests[1].items;
// Find the ToolCall and ensure the immediately-subsequent
// ToolResult (if any) carries the synthetic summary.
let mut saw_synthetic_tool_result = false;
let mut saw_interruption_note = false;
let mut saw_new_user = false;
for item in items {
match item {
llm_worker::Item::ToolResult {
call_id, summary, ..
} if call_id == "call_orphan" => {
assert_eq!(summary, "[Interrupted by user]");
saw_synthetic_tool_result = true;
}
llm_worker::Item::Message { role, content, .. }
if *role == llm_worker::Role::System =>
{
let text: String = content.iter().map(|p| p.as_text()).collect();
if text.contains("interrupted by the user") {
saw_interruption_note = true;
}
}
llm_worker::Item::Message { role, content, .. }
if *role == llm_worker::Role::User =>
{
let text: String = content.iter().map(|p| p.as_text()).collect();
if text.contains("new request") {
saw_new_user = true;
}
}
_ => {}
}
}
assert!(
saw_synthetic_tool_result,
"synthetic tool_result for orphan missing in 2nd request items: {items:?}"
);
assert!(
saw_interruption_note,
"system interruption note missing in 2nd request items: {items:?}"
);
assert!(
saw_new_user,
"new user message missing in 2nd request items: {items:?}"
);
// Also confirm the closure chain is ordered: tool_result for the
// orphan precedes the system note, which precedes the new user
// message.
let idx = |pred: &dyn Fn(&llm_worker::Item) -> bool| {
items.iter().position(pred).unwrap()
};
let tool_result_idx = idx(&|i| matches!(i, llm_worker::Item::ToolResult { call_id, .. } if call_id == "call_orphan"));
let sys_idx = idx(&|i| match i {
llm_worker::Item::Message {
role: llm_worker::Role::System,
content,
..
} => content
.iter()
.map(|p| p.as_text())
.collect::<String>()
.contains("interrupted by the user"),
_ => false,
});
let user_idx = idx(&|i| match i {
llm_worker::Item::Message {
role: llm_worker::Role::User,
content,
..
} => content
.iter()
.map(|p| p.as_text())
.collect::<String>()
.contains("new request"),
_ => false,
});
assert!(tool_result_idx < sys_idx, "tool_result must precede system note");
assert!(sys_idx < user_idx, "system note must precede new user message");
}

View File

@ -20,6 +20,13 @@ pub enum Method {
PodEvent(PodEvent),
Resume,
Cancel,
/// Stop the in-flight turn and transition to `Paused`.
///
/// Unlike `Cancel` (which discards and returns to `Idle`), a paused
/// Pod can resume the interrupted work via `Resume`, or start a
/// fresh turn via `Run` (orphan `tool_use` items are closed with a
/// synthetic tool result before the new user message is appended).
Pause,
Shutdown,
GetHistory,
}
@ -265,6 +272,15 @@ mod tests {
assert!(matches!(method, Method::Resume));
}
#[test]
fn method_pause_roundtrip() {
let json = r#"{"method":"pause"}"#;
let method: Method = serde_json::from_str(json).unwrap();
assert!(matches!(method, Method::Pause));
let serialized = serde_json::to_string(&method).unwrap();
assert_eq!(serialized, json);
}
#[test]
fn event_text_delta_format() {
let event = Event::TextDelta {

View File

@ -1,9 +1,13 @@
use protocol::{Event, Greeting, Method, NotificationLevel, NotificationSource};
use protocol::{Event, Greeting, Method, NotificationLevel, NotificationSource, RunResult};
pub struct App {
pub pod_name: String,
pub connected: bool,
pub running: bool,
/// True while the Pod is in `PodStatus::Paused`. Set on
/// `RunEnd::Paused` and cleared when a new turn starts (either via
/// `Resume` or a fresh `Run`).
pub paused: bool,
pub run_requests: usize,
pub run_input_tokens: u64,
pub run_output_tokens: u64,
@ -13,6 +17,10 @@ pub struct App {
pub cursor: usize,
pub quit: bool,
pub shutdown_confirm: Option<std::time::Instant>,
/// 2-tap guard for `Ctrl-C` when the Pod is not running. First press
/// records the instant; a second press within the timeout exits the
/// TUI (the Pod itself stays alive).
pub quit_confirm: Option<std::time::Instant>,
/// Lines waiting to be flushed to terminal via insert_before.
pub output_queue: Vec<OutputItem>,
/// Partial streaming text not yet terminated by newline.
@ -48,6 +56,7 @@ impl App {
pod_name,
connected: false,
running: false,
paused: false,
run_requests: 0,
run_input_tokens: 0,
run_output_tokens: 0,
@ -57,6 +66,7 @@ impl App {
cursor: 0,
quit: false,
shutdown_confirm: None,
quit_confirm: None,
output_queue: Vec::new(),
pending_text: String::new(),
}
@ -65,6 +75,11 @@ impl App {
pub fn submit_input(&mut self) -> Option<Method> {
let text = self.input.trim().to_owned();
if text.is_empty() {
// Empty Enter only does something meaningful when the Pod
// is paused: resume the interrupted turn. Otherwise no-op.
if self.paused {
return Some(Method::Resume);
}
return None;
}
self.turn_index += 1;
@ -83,6 +98,7 @@ impl App {
match event {
Event::TurnStart { .. } => {
self.running = true;
self.paused = false;
self.run_requests += 1;
self.current_tool = None;
}
@ -154,7 +170,7 @@ impl App {
format!("[{code:?}] {message}"),
));
}
Event::RunEnd { .. } => {
Event::RunEnd { result } => {
self.output_queue.push(OutputItem::PaddedRight(
MessageKind::TurnStats,
format!(
@ -166,6 +182,7 @@ impl App {
));
self.output_queue.push(OutputItem::Blank);
self.running = false;
self.paused = matches!(result, RunResult::Paused);
self.run_requests = 0;
self.run_input_tokens = 0;
self.run_output_tokens = 0;

View File

@ -141,14 +141,14 @@ async fn run_loop(
Ok(())
}
fn run_disconnected(app: &mut App) -> Result<(), Box<dyn std::error::Error>> {
fn run_disconnected(_app: &mut App) -> Result<(), Box<dyn std::error::Error>> {
loop {
if event::poll(std::time::Duration::from_millis(100))? {
if let TermEvent::Key(key) = event::read()? {
match key.code {
KeyCode::Esc => break,
KeyCode::Char('c') if key.modifiers.contains(KeyModifiers::CONTROL) => break,
_ => {}
if let KeyCode::Char('c') = key.code {
if key.modifiers.contains(KeyModifiers::CONTROL) {
break;
}
}
}
}
@ -158,16 +158,20 @@ fn run_disconnected(app: &mut App) -> Result<(), Box<dyn std::error::Error>> {
fn handle_key(app: &mut App, key: KeyEvent) -> Option<Method> {
match key.code {
KeyCode::Esc => {
app.quit = true;
None
}
KeyCode::Char('c') if key.modifiers.contains(KeyModifiers::CONTROL) => {
app.quit = true;
None
handle_pause_or_quit(app)
}
KeyCode::Char('x') if key.modifiers.contains(KeyModifiers::CONTROL) => {
if app.running {
Some(Method::Cancel)
} else {
app.output_queue.push(app::OutputItem::Padded(
app::MessageKind::Error,
"Nothing to cancel (Pod is not running).".into(),
));
None
}
}
KeyCode::Char('r') if key.modifiers.contains(KeyModifiers::CONTROL) => Some(Method::Resume),
KeyCode::Char('x') if key.modifiers.contains(KeyModifiers::CONTROL) => Some(Method::Cancel),
KeyCode::Char('d') if key.modifiers.contains(KeyModifiers::CONTROL) => {
return handle_shutdown(app);
}
@ -204,14 +208,14 @@ fn handle_key(app: &mut App, key: KeyEvent) -> Option<Method> {
}
}
const SHUTDOWN_CONFIRM_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(3);
const CONFIRM_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(3);
fn handle_shutdown(app: &mut App) -> Option<Method> {
if !app.running {
return Some(Method::Shutdown);
}
if let Some(t) = app.shutdown_confirm {
if t.elapsed() < SHUTDOWN_CONFIRM_TIMEOUT {
if t.elapsed() < CONFIRM_TIMEOUT {
app.shutdown_confirm = None;
return Some(Method::Shutdown);
}
@ -223,3 +227,24 @@ fn handle_shutdown(app: &mut App) -> Option<Method> {
));
None
}
/// Running → send `Method::Pause`.
/// Idle / Paused → 2-tap to quit the TUI (the Pod keeps running).
fn handle_pause_or_quit(app: &mut App) -> Option<Method> {
if app.running {
return Some(Method::Pause);
}
if let Some(t) = app.quit_confirm {
if t.elapsed() < CONFIRM_TIMEOUT {
app.quit_confirm = None;
app.quit = true;
return None;
}
}
app.quit_confirm = Some(std::time::Instant::now());
app.output_queue.push(app::OutputItem::Padded(
app::MessageKind::Error,
"Press Ctrl-C again within 3 s to exit the TUI (the Pod keeps running).".into(),
));
None
}

View File

@ -170,6 +170,18 @@ fn draw_status(frame: &mut Frame, app: &App, area: Rect) {
};
spans.push(Span::raw(" | "));
spans.push(Span::styled(status, Style::default().fg(Color::Yellow)));
} else if app.paused {
spans.push(Span::raw(" | "));
spans.push(Span::styled(
"paused",
Style::default()
.fg(Color::Cyan)
.add_modifier(Modifier::BOLD),
));
spans.push(Span::styled(
" — Enter to resume, type to start new turn",
Style::default().fg(Color::DarkGray),
));
} else {
spans.push(Span::styled(" idle", Style::default().fg(Color::DarkGray)));
}

184
tickets/user-pause.md Normal file
View File

@ -0,0 +1,184 @@
# ユーザ起点の Pause と Resume
## 背景
現状 `PodStatus::Paused` に到達する経路は interceptor フック(`PreToolAction::Pause` / `TurnEndAction::Pause`)のみで、**ユーザが TUI から Paused 状態に落とす手段がない**。結果として `Method::Resume` と TUI の `Ctrl-R` は実装されているのに発火機会がなく死に筋になっている。
`Method::Cancel`(現 TUI `Ctrl-X`)はハード中止で `PodStatus::Idle` に落ちる — turn を破棄したい時には使えるが、続きから再開する手段がない。
ユーザが「今の LLM リクエストを止めて、続きは後で再開したい」「止めて、別のことを先にやりたい」という操作が可能な primitive と UI を整える。
## 依存
- `Method::Resume` / `PodStatus::Paused`: 既に実装済み、変更不要
- `cancel_tx` による worker 割り込み: 既に実装済み
- `Method::PodEvent` / `Method::Notify` の既存プロトコル拡張パターンを踏襲
## 設計
### 新しい primitive
```rust
pub enum Method {
Run { input: String },
Notify { message: String },
PodEvent(PodEvent),
Resume,
Cancel,
Pause, // 新: 実行中の turn を止めて Paused 状態に落とす
Shutdown,
GetHistory,
}
```
**Cancel / Pause / Shutdown のセマンティクス分離**:
| Method | 実行中に受けた時 | 結果状態 | 用途 |
|---|---|---|---|
| `Cancel` | cancel_tx 発火、turn を破棄 | `Idle` | 中止して捨てる |
| `Pause` | cancel_tx 発火、turn を中断 | `Paused` | 止めるけど続きは後で |
| `Shutdown` | cancel_tx 発火、controller loop 終了 | プロセス終了 | Pod を落とす |
### Controller 側の変更
`run_with_cancel_support``pause_requested` フラグを導入:
```rust
async fn run_with_cancel_support<F>(...) -> (PodStatus, bool) {
let mut shutdown_requested = false;
let mut pause_requested = false;
loop {
tokio::select! {
result = &mut pod_future => {
return match result {
Ok(r) => { ... }
Err(PodError::Worker(WorkerError::Cancelled)) if pause_requested => {
// Pause 時は upward PodEvent::Errored を抑止
(PodStatus::Paused, shutdown_requested)
}
Err(e) => { ... PodEvent::Errored 発火 → (PodStatus::Idle, ...) }
};
}
method = method_rx.recv() => {
match method {
Some(Method::Cancel) => { cancel_tx.try_send(()); }
Some(Method::Pause) => {
pause_requested = true;
cancel_tx.try_send(());
}
Some(Method::Shutdown) => { ... }
...
}
}
}
}
}
```
worker の割り込みは `cancel_tx` 一本で共通、Cancel と Pause の区別は pause フラグの有無で controller 側が判定する。
**Pause 時の upward PodEvent 抑止**: `PodEvent::Errored` は worker エラー限定の報告なので、ユーザ起点 Pause では発火させない。親 Pod への「子が Paused になった」通知は本チケットでは実装しない(必要なら別チケットで `PodEvent::Paused` variant を追加)。
**自動 turn 中の Pause**: `Method::Notify` の IDLE 自動起動、`Method::PodEvent` の auto-kick も同じ `run_with_cancel_support` を通るので Pause は同じ挙動で効く。
### 割り込みタイミングと history の整合性
`cancel_tx` 発火時の history 状態:
- **LLM ストリーム中**: block collector の Stop が発火していないので partial text / tool_call は history に入らない → history は前ラウンド末で consistent
- **Tool 実行中**: tool 自体は cancellation を知らず完了を待つ。完了後の次チェックポイントで Cancelled → history には tool_result が書かれる、orphan なし
- **tool_call 確定後 tool 実行前**: orphan `tool_use` が history に残りうる
- **ラウンド間待機**: consistent、orphan なし
Resume と Pause→Run で扱いが分かれる。
### Resume の挙動
`worker.resume()` は既存実装のままで全ケース対応:
- Partial text で止まっていた場合: history は前ラウンド末なので、同じリクエストを再送 → LLM が新しく生成
- Orphan `tool_use` が残っている場合: `run_turn_loop``get_pending_tool_calls()` で拾って tool を実行
- 綺麗な境界で止まっていた場合: そのまま次ラウンドへ
Resume の仕様は変更なし。
### Paused → Run新しい turn として開始)
ユーザが Paused 中に新しい入力を投げたら、**「現 turn は終わった」として新しい turn を開始**する。orphan `tool_use` がある場合は history に下記を順に追加してから `worker.run(input)` を呼ぶ:
1. 各 orphan `tool_use` に対して synthetic `tool_result`summary: `"[Interrupted by user]"`、content なし)を inject — wire 互換性のため
2. `Item::system_message`(内容: `"[The previous turn was interrupted by the user. The user's next request follows.]"`)を inject — LLM に文脈を伝える
3. 新しい user message`Run` の inputを append
4. `run_turn_loop` に入る
実装する API の目安:`Pod::interrupt_and_run(input: String)` みたいな名前で上記 4 ステップを提供。Controller が Paused → Run 遷移時に呼ぶ。
注: 実機テストで Anthropic が orphan `tool_use`(次 user turn に `tool_result` が含まれない形)を許容するようなら step 1 は削除できる。まずは安全側に倒して両方入れる。
### Idle / Paused 時の Pause / Cancel
- `Pause` when `Idle`: `Event::Error { NotRunning }` を返す(`Cancel` と同じ)
- `Pause` when `Paused`: no-op既に Paused
- `Cancel` when `Idle` / `Paused`: `NotRunning` エラー(既存の Cancel 挙動と揃える)
### TUI のキー割り当て
現状:`Ctrl-X` = Cancel、`Ctrl-R` = Resume、`Ctrl-D` = Shutdownrunning なら 2 連打確認)、`Esc` = TUI 終了、`Ctrl-C` = TUI 終了。
変更後:
| キー | Running 時 | Idle / Paused 時 |
|---|---|---|
| `Ctrl-X` | `Method::Cancel`(破棄 → Idle | no-opエラー表示 |
| `Ctrl-C` | `Method::Pause`(→ Paused | 1 回目 warn / 3 秒以内の 2 回目で TUI 終了Pod は残る) |
| `Ctrl-D` | 1 回目 warn / 3 秒以内の 2 回目で `Method::Shutdown` | `Method::Shutdown`Pod を落とす) |
| `Enter` (入力あり) | —(入力は TUI 側でバッファ) | Idle なら `Method::Run`。Paused なら同じく `Method::Run`Controller が前 turn を打ち切って新 turn 開始) |
| `Enter` (空) | — | Paused なら `Method::Resume`。Idle なら no-op |
`Ctrl-R``Esc` は廃止。
Ctrl-C の 2 段階 UX は Ctrl-D と対称running 中の 1 回目は安全な中断Pause、非 running 時の 1 回目は確認 warn、どちらも連打で最終アクション。
### Paused 状態の TUI 表示
現状 `App``running: bool` のみで Paused を区別していない。`ui.rs:draw_status` も「running / idle」の 2 状態のみ。
追加:
- `App``paused: bool`(または `status: enum { Idle, Running, Paused }`
- `Event::RunEnd { result: RunResult::Paused }``paused = true`、`running = false`
- `Event::TurnStart` または新しい Run / Resume 送信時に clear
- `draw_status` に paused 分岐を追加、hint は `Enter to resume, type to start new turn`
## 影響範囲
- `crates/protocol/src/lib.rs`: `Method::Pause` variant 追加、serde round-trip テスト
- `crates/pod/src/controller.rs`: `run_with_cancel_support``pause_requested` 追加、Paused → Run の interrupt 処理
- `crates/pod/src/pod.rs`: `interrupt_and_run(input)` 相当の APIorphan 閉じ + system note inject + run
- `crates/tui/src/app.rs`: `paused` フラグ、`submit_input` の Paused 分岐(空なら Resume
- `crates/tui/src/main.rs`: `handle_key` のキー割り当て変更Ctrl-X / Ctrl-C / Ctrl-D、Ctrl-C の 2 連打 Quit、`Ctrl-R` / `Esc` 削除
- `crates/tui/src/ui.rs`: `draw_status` の paused 分岐
- `docs/tui-keybindings.md`: 既存ファイルを更新
- 既存の interceptor-driven Pause`PreToolAction::Pause` / `TurnEndAction::Pause`)は挙動不変
## 完了条件
- `Method::Pause` が protocol crate に追加され、serde round-trip テストが通る
- 実行中 Pod に `Method::Pause` を送ると `PodStatus::Paused` に落ち、`Method::Resume` で続きから再開できる
- TUI で `Ctrl-C` を押すと Pause されるLLM ストリーム中・tool 実行中どちらからでも、Notify / PodEvent 起点の自動 turn 中でも)
- Pause 時に親 Pod への `PodEvent::Errored` が飛ばないupward 通知抑止)
- Paused 中に空 Enter → Resume、入力あり Enter → 新 turn として Runorphan tool_use は synthetic tool_result + system note で閉じる)
- Paused → Run 後、LLM への送信が wire 上正しいorphan tool_use が解消されている)ことを統合テストで確認
- Pause → Resume の history consistency を統合テストで確認
- `Ctrl-X` = Cancel破棄 → Idle、`Ctrl-C` = Pause / 2 連打で TUI 終了、`Ctrl-D` = Shutdownrunning 中は 2 連打)
- `Ctrl-R` / `Esc` キーが無効化されている
- TUI status line に Paused 状態と Enter ヒントが表示される
- 既存の Cancel / Shutdown / interceptor-driven Pause の挙動が回帰しない
## 範囲外
- 複数クライアントが同時に Pod を触る時の race。最後の勝ち or 冪等挙動で十分
- Tool 実行の cancellation 対応(長時間 tool を interrupt する仕組み)
- 親 Pod への `PodEvent::Paused` 通知
- Anthropic が orphan `tool_use` を本当に 400 で弾くかの実機検証。まず synthetic tool_result を入れて安全側で実装、許容されるなら後で削る

View File

@ -0,0 +1,34 @@
# user-pause レビュー
## 結論
主要ロジックprotocol / controller / Pod / TUIは仕様通りで、ビルドと既存テストはすべて green。**ただしチケット完了条件で明示されている統合テスト 3 ケースが未実装のため、現状では完了条件未達**。
## 良い点
- `protocol``Method::Pause` variant 追加と serde round-trip テスト
- `controller.rs::run_with_cancel_support``pause_requested` フラグ設計と `PodEvent::Errored` の upward 抑止が仕様通り
- `Method::Pause` の Idle / Paused 区別NotRunning エラー / no-op
- `interrupt_and_run.rs` の責務分離: orphan 検出を純粋関数 `orphan_tool_result_closures` に切り出し、ユニットテスト 4 ケースで境界条件を網羅
- TUI の Ctrl-C 2 連打 UX と Ctrl-D との対称性。`paused` フラグを `Event::TurnStart` でクリアする経路は Resume / Run 両方でカバーされる
- `docs/tui-keybindings.md` が人間向け解説として丁寧。Cancel と Pause の使い分け、`Ctrl-R` / `Esc` 廃止経緯まで言及
## 指摘事項
### 1. 統合テスト不足completion blocker
チケット完了条件のうち、以下 3 項目に対応する統合テストが `crates/pod/tests/controller_test.rs` に追加されていない:
- 実行中 Pod に `Method::Pause` を送ると `PodStatus::Paused` に落ち、`Method::Resume` で続きから再開できる
- Paused → Run 後、LLM への送信が wire 上正しいorphan `tool_use` が解消されている)
- Pause → Resume の history consistency
`interrupt_and_run.rs` のユニットテストは関数単位の検証であり、controller を通したエンドツーエンドの確認とは別物。既存の `resume_without_pause_returns_error` と同形式で 3 ケース追加すること。
### 2. ライフサイクル運用
`tickets/user-pause.md`, `crates/pod/src/interrupt_and_run.rs` 等が untracked / unstaged。CLAUDE.md のチケット運用(作成 commit → 詳細化 commit → レビュー commitに沿うなら、実装と分けて commit すべき。
## 判定
統合テスト 3 ケースを追加した時点で完了条件達成。それまでは未完了。