プロトコル経由のshutdow経路

This commit is contained in:
Keisuke Hirata 2026-04-16 13:49:53 +09:00
parent 710220c920
commit aa138e6583
11 changed files with 183 additions and 27 deletions

View File

@ -5,6 +5,7 @@
- [ ] Compact の改善(要約品質 + 挙動詳細) → [tickets/compact-improvements.md](tickets/compact-improvements.md)
- [ ] Protocol の設計 → [tickets/protocol-design.md](tickets/protocol-design.md)
- [ ] パーミッション: パターンベースのツール実行制御 → [tickets/permission-extension-point.md](tickets/permission-extension-point.md)
- [ ] Pod オーケストレーション: LLM によるマルチエージェント分業 → [tickets/pod-orchestration.md](tickets/pod-orchestration.md)
- [ ] ネイティブ GUI クライアント MVP → [tickets/native-gui-mvp.md](tickets/native-gui-mvp.md)
- [ ] TUI 拡充
- [ ] Pod の明示的 shutdown → [tickets/tui-pod-shutdown.md](tickets/tui-pod-shutdown.md)

View File

@ -43,7 +43,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let pod = pod::Pod::from_manifest_toml(&toml, store).await?;
let runtime_tmp = tempfile::tempdir()?;
let handle = PodController::spawn(pod, runtime_tmp.path()).await?;
let (handle, _shutdown_rx) = PodController::spawn(pod, runtime_tmp.path()).await?;
// Check initial status via shared state
println!("[shared_state] {}", handle.shared_state.status_json());

View File

@ -4,7 +4,7 @@ use std::sync::Arc;
use llm_worker::WorkerError;
use llm_worker::llm_client::client::LlmClient;
use session_store::Store;
use tokio::sync::{broadcast, mpsc};
use tokio::sync::{broadcast, mpsc, oneshot};
use crate::notifier::Notifier;
use crate::pod::{Pod, PodError, PodRunResult};
@ -50,17 +50,20 @@ impl PodHandle {
// PodController — actor that owns a Pod
// ---------------------------------------------------------------------------
pub type ShutdownReceiver = oneshot::Receiver<()>;
pub struct PodController;
impl PodController {
pub async fn spawn<C, St>(
mut pod: Pod<C, St>,
runtime_base: &Path,
) -> Result<PodHandle, std::io::Error>
) -> Result<(PodHandle, ShutdownReceiver), std::io::Error>
where
C: LlmClient + 'static,
St: Store + 'static,
{
let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
let (method_tx, mut method_rx) = mpsc::channel::<Method>(32);
let (event_tx, _) = broadcast::channel::<Event>(256);
let notifier = Notifier::new(event_tx.clone());
@ -225,7 +228,7 @@ impl PodController {
shared_state.set_status(PodStatus::Running);
let _ = runtime_dir.write_status(&shared_state).await;
let new_status = run_with_cancel_support(
let (new_status, shutdown) = run_with_cancel_support(
pod.run(&input),
&mut method_rx,
&event_tx,
@ -234,7 +237,6 @@ impl PodController {
)
.await;
// Proactive post-run compaction (best-effort).
if new_status == PodStatus::Idle {
if let Err(e) = pod.try_post_run_compact().await {
tracing::warn!(error = %e, "Post-run compaction error");
@ -251,6 +253,11 @@ impl PodController {
shared_state.set_status(new_status);
let _ = runtime_dir.write_status(&shared_state).await;
let _ = runtime_dir.write_history(&shared_state).await;
if shutdown {
let _ = event_tx.send(Event::Shutdown);
break;
}
}
Method::Resume => {
@ -264,7 +271,7 @@ impl PodController {
shared_state.set_status(PodStatus::Running);
let _ = runtime_dir.write_status(&shared_state).await;
let new_status = run_with_cancel_support(
let (new_status, shutdown) = run_with_cancel_support(
pod.resume(),
&mut method_rx,
&event_tx,
@ -273,7 +280,6 @@ impl PodController {
)
.await;
// Proactive post-run compaction (best-effort).
if new_status == PodStatus::Idle {
if let Err(e) = pod.try_post_run_compact().await {
tracing::warn!(error = %e, "Post-run compaction error");
@ -290,6 +296,11 @@ impl PodController {
shared_state.set_status(new_status);
let _ = runtime_dir.write_status(&shared_state).await;
let _ = runtime_dir.write_history(&shared_state).await;
if shutdown {
let _ = event_tx.send(Event::Shutdown);
break;
}
}
Method::Cancel => {
@ -299,30 +310,39 @@ impl PodController {
});
}
Method::Shutdown => {
let _ = event_tx.send(Event::Shutdown);
break;
}
// GetHistory is handled at the socket layer (direct response).
// If it somehow reaches the controller, ignore it.
Method::GetHistory => {}
}
}
let _ = shutdown_tx.send(());
});
Ok(handle)
Ok((handle, shutdown_rx))
}
}
/// Runs a Pod future while concurrently processing incoming methods.
/// Only `Cancel` is handled during execution; `Run` and `Resume` get errors.
///
/// Returns `(final_status, shutdown_requested)`.
async fn run_with_cancel_support<F>(
pod_future: F,
method_rx: &mut mpsc::Receiver<Method>,
event_tx: &broadcast::Sender<Event>,
cancel_tx: &mpsc::Sender<()>,
shared_state: &Arc<PodSharedState>,
) -> PodStatus
) -> (PodStatus, bool)
where
F: std::future::Future<Output = Result<PodRunResult, PodError>>,
{
tokio::pin!(pod_future);
let mut shutdown_requested = false;
loop {
tokio::select! {
@ -335,7 +355,7 @@ where
PodRunResult::LimitReached => (PodStatus::Idle, RunResult::LimitReached),
};
let _ = event_tx.send(Event::RunEnd { result: run_result });
status
(status, shutdown_requested)
}
Err(e) => {
let code = worker_error_code(&e);
@ -343,7 +363,7 @@ where
code,
message: e.to_string(),
});
PodStatus::Idle
(PodStatus::Idle, shutdown_requested)
}
};
}
@ -352,19 +372,21 @@ where
Some(Method::Cancel) => {
let _ = cancel_tx.try_send(());
}
Some(Method::Shutdown) => {
shutdown_requested = true;
let _ = cancel_tx.try_send(());
}
Some(Method::Run { .. } | Method::Resume) => {
let _ = event_tx.send(Event::Error {
code: ErrorCode::AlreadyRunning,
message: "Pod is already executing a turn".into(),
});
}
Some(Method::GetHistory) => {
// Handled at socket layer; ignore here.
}
Some(Method::GetHistory) => {}
None => {
let _ = cancel_tx.try_send(());
shared_state.set_status(PodStatus::Idle);
return PodStatus::Idle;
return (PodStatus::Idle, false);
}
}
}

View File

@ -19,7 +19,7 @@ mod usage_tracker;
pub use token_counter::{EstimateSource, SplitPoint, TokenEstimate};
pub use controller::{PodController, PodHandle};
pub use controller::{PodController, PodHandle, ShutdownReceiver};
pub use factory::{FactoryError, PodFactory};
pub use notifier::Notifier;
pub use hook::{Hook, HookEventKind, HookRegistryBuilder};

View File

@ -157,8 +157,8 @@ async fn main() -> ExitCode {
return ExitCode::FAILURE;
}
};
let handle = match PodController::spawn(pod, &runtime_base).await {
Ok(h) => h,
let (handle, shutdown_rx) = match PodController::spawn(pod, &runtime_base).await {
Ok(pair) => pair,
Err(e) => {
eprintln!("error: failed to start pod controller: {e}");
return ExitCode::FAILURE;
@ -170,13 +170,12 @@ async fn main() -> ExitCode {
handle.runtime_dir.socket_path()
);
// Wait for shutdown signal
match tokio::signal::ctrl_c().await {
Ok(()) => {
eprintln!("pod: {pod_name} shutting down");
tokio::select! {
_ = tokio::signal::ctrl_c() => {
eprintln!("pod: {pod_name} shutting down (signal)");
}
Err(e) => {
eprintln!("error: failed to listen for signal: {e}");
_ = shutdown_rx => {
eprintln!("pod: {pod_name} shutting down (client request)");
}
}

View File

@ -111,9 +111,9 @@ use pod::PodHandle;
async fn spawn_controller(pod: Pod<MockClient, FsStore>) -> PodHandle {
let tmp = tempfile::tempdir().unwrap();
let runtime_base = tmp.path().to_owned();
// Leak tempdir so it survives the test
std::mem::forget(tmp);
PodController::spawn(pod, &runtime_base).await.unwrap()
let (handle, _shutdown_rx) = PodController::spawn(pod, &runtime_base).await.unwrap();
handle
}
// ---------------------------------------------------------------------------

View File

@ -12,6 +12,7 @@ pub enum Method {
Run { input: String },
Resume,
Cancel,
Shutdown,
GetHistory,
}
@ -69,6 +70,7 @@ pub enum Event {
greeting: Greeting,
},
Notification(Notification),
Shutdown,
}
/// User-facing notification emitted from the Pod layer.

View File

@ -12,6 +12,7 @@ pub struct App {
pub input: String,
pub cursor: usize,
pub quit: bool,
pub shutdown_confirm: Option<std::time::Instant>,
/// Lines waiting to be flushed to terminal via insert_before.
pub output_queue: Vec<OutputItem>,
/// Partial streaming text not yet terminated by newline.
@ -55,6 +56,7 @@ impl App {
input: String::new(),
cursor: 0,
quit: false,
shutdown_confirm: None,
output_queue: Vec::new(),
pending_text: String::new(),
}
@ -193,6 +195,9 @@ impl App {
self.output_queue.insert(1, OutputItem::Blank);
}
}
Event::Shutdown => {
self.quit = true;
}
}
}

View File

@ -168,6 +168,9 @@ fn handle_key(app: &mut App, key: KeyEvent) -> Option<Method> {
}
KeyCode::Char('r') if key.modifiers.contains(KeyModifiers::CONTROL) => Some(Method::Resume),
KeyCode::Char('x') if key.modifiers.contains(KeyModifiers::CONTROL) => Some(Method::Cancel),
KeyCode::Char('d') if key.modifiers.contains(KeyModifiers::CONTROL) => {
return handle_shutdown(app);
}
KeyCode::Enter => app.submit_input(),
KeyCode::Backspace => {
app.delete_char_before();
@ -200,3 +203,23 @@ fn handle_key(app: &mut App, key: KeyEvent) -> Option<Method> {
_ => None,
}
}
const SHUTDOWN_CONFIRM_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(3);
fn handle_shutdown(app: &mut App) -> Option<Method> {
if !app.running {
return Some(Method::Shutdown);
}
if let Some(t) = app.shutdown_confirm {
if t.elapsed() < SHUTDOWN_CONFIRM_TIMEOUT {
app.shutdown_confirm = None;
return Some(Method::Shutdown);
}
}
app.shutdown_confirm = Some(std::time::Instant::now());
app.output_queue.push(app::OutputItem::Padded(
app::MessageKind::Error,
"Turn is running. Press Ctrl-D again to cancel and shut down.".into(),
));
None
}

View File

@ -1,5 +1,10 @@
# TUI: Pod を明示的に終了させる操作
## レビュー状態
初回レビュー実施済み。[tui-pod-shutdown.review.md](tui-pod-shutdown.review.md) を参照。
要件達成、アーキテクチャは「通常後処理を経由してからの終了」を保証。指摘1件shutdown 中の進行表示が無い — 実害なしで不問)。**受け入れ可**。
## 背景
現状、TUI から Pod を終了させる手段は Ctrl-C / プロセス終了に頼っている。これだと:

View File

@ -0,0 +1,99 @@
# レビュー: TUI: Pod を明示的に終了させる操作
対象差分: `crates/protocol/src/lib.rs`, `crates/pod/src/{controller,lib,main}.rs`, `crates/pod/tests/controller_test.rs`, `crates/pod/examples/pod_protocol.rs`, `crates/tui/src/{app,main}.rs`staged、未コミット
## 要件達成状況
| 要件 | 状態 |
|---|---|
| TUI 内のキーバインドで Pod の終了を開始できる | ✅ `Ctrl-D``Method::Shutdown` を送信 |
| 実行中のターンがあれば確認を挟む | ✅ `handle_shutdown`: running 中は「Press Ctrl-D again」警告を表示、3秒以内の再押下で確定 |
| 実行中のターンがなければ即座に終了 | ✅ `!app.running` なら即 `Method::Shutdown` |
| 既存のキャンセル機構で中断する | ✅ `run_with_cancel_support` 内で `Method::Shutdown` 受信時に `cancel_tx.try_send(())` + `shutdown_requested = true` |
| キャンセル完了後 session-store にフラッシュ | ✅ `run_with_cancel_support``(status, shutdown)` を返した後、controller loop 内で `write_status` / `write_history` が走ってから `break` |
| shutdown 完了後 TUI が終了する | ✅ `Event::Shutdown``app.quit = true`、main loop が break |
| Pod プロセス自体も正常終了する | ✅ controller が `shutdown_tx.send(())``main.rs``shutdown_rx` が発火 → `drop(handle)``ExitCode::SUCCESS` |
| shutdown 中の進行状況が画面で分かる | 🟡 「Press Ctrl-D again to cancel and shut down.」メッセージは出るが、shutdown 後の「shutting down...」表示は無い(`Event::Shutdown` 受信時に `app.quit = true` だけで即終了)|
## アーキテクチャ
### Protocol 拡張
- `Method::Shutdown``Event::Shutdown` が protocol に追加。最小限の拡張で意味が明確
- `Method::Shutdown` は Run/Resume/Cancel と同格の method バリアント。socket 層での特殊扱い不要
### Controller 内の shutdown フロー
```
Idle 状態で Shutdown 受信:
→ Event::Shutdown を broadcast → controller loop break → shutdown_tx.send(())
Running 状態で Shutdown 受信 (run_with_cancel_support 内):
→ cancel_tx.try_send(()) で実行中ターンをキャンセル
→ shutdown_requested = true をフラグ
→ pod_future が Cancelled/Error で完了
→ (PodStatus::Idle, shutdown=true) を返す
→ controller loop が compaction → write_status → write_history の通常後処理
→ shutdown フラグを見て Event::Shutdown → break → shutdown_tx.send(())
```
**重要**: shutdown が来ても**通常の後処理compaction, persistを必ず経由**してから抜ける。session の整合性が壊れない。この設計は正しい。
### main.rs の二択 select
```rust
tokio::select! {
_ = tokio::signal::ctrl_c() => { ... }
_ = shutdown_rx => { ... }
}
```
Ctrl-Csignalと client 要求の Shutdown を同格に扱う。どちらが先に来ても `drop(handle)` → 正常終了。
### TUI の確認 UI
`shutdown_confirm: Option<Instant>` フィールドで「最後に Ctrl-D を押した時刻」を保持。3秒以内の再押下で確定。Timeout 後は再度リセット。シンプルで十分。
## 指摘事項
### 1. 🟡 shutdown 中の進行表示が無い
チケット要件:
> shutdown 中は進行状況が画面上で分かる「shutting down...」等の表示)。
実装:
- `Event::Shutdown` を受けた瞬間に `app.quit = true` → main loop が即 break → terminal restore
- ユーザーが「shutting down」を読む暇がない
実際には shutdown はほぼ瞬時(キャンセル完了 + persist + Event::Shutdown が数十 msなので視認できるタイミングがそもそも無い。要件の「shutting down...」は「長い shutdown を想定した表示」だが、実装上 shutdown は速いので**実用上問題にならない**。
**判断**: 実害なし。将来 shutdown が重くなった場合(大量ターンの persist 等)に表示を足すのは容易(`Event::Shutdown` 受信時に output_queue に push してから quit を遅延させればよい)。**不問**。
### 2. 🟢 `shutdown_confirm``Instant` ベース
wall clock ではなく `std::time::Instant` を使っている。NTP ジャンプに影響されない。正しい選択。
### 3. 🟢 Resume パスでも shutdown 対応
`Method::Resume` のハンドラも `run_with_cancel_support` を通っており、Resume 中に `Shutdown` が来ても同じフローで処理される。漏れなし。
### 4. 🟢 examples / tests の追随
- `pod_protocol.rs`: `PodController::spawn` の戻り値を `(handle, _shutdown_rx)` に分解
- `controller_test.rs`: 同上
- 既存テストが壊れていない
### 5. 🟢 `Ctrl-D` のキーバインド選択
`Ctrl-D` は shell の EOF 慣例に合っており、「この Pod との対話を終える」という意味が自然。`q` や `:quit` は通常入力との衝突リスクがあるので `Ctrl-D` は妥当。
## テスト
- `controller_test.rs`: 戻り値の型変更に追随。shutdown 固有のテストShutdown method を送って controller が break するか、persist 後に shutdown_rx が発火するか等)は**未追加**
- `handle_shutdown` のユニットテストrunning 時の confirm フロー、timeout 後のリセット等)は**未追加**
テストが薄い点はあるが、shutdown フローは controller loop 内の分岐であり、integration test で網羅すると MockClient の応答タイミング制御が必要になって重くなる。最小実装としては許容。
## 結論
**受け入れ可**。要件はほぼ達成、アーキテクチャは「通常の後処理を必ず経由してから終了」という最重要の不変条件を守っている。指摘1shutdown 中の進行表示)は実害なしで不問。テストの薄さは将来必要になったら足す程度。