From 0be30052c1984978e913f3e26afe1121b8aa96db Mon Sep 17 00:00:00 2001 From: Hare Date: Mon, 4 May 2026 12:55:11 +0900 Subject: [PATCH] =?UTF-8?q?feat:=20Pod=E3=81=AE=E3=82=B9=E3=83=86=E3=83=BC?= =?UTF-8?q?=E3=82=BF=E3=82=B9=E3=82=92=E5=8E=B3=E5=AF=86=E3=81=AB=E3=81=97?= =?UTF-8?q?=E3=80=81=E5=90=8C=E6=9C=9F=E6=BC=8F=E3=82=8C=E3=82=92=E9=98=B2?= =?UTF-8?q?=E3=81=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- TODO.md | 1 + crates/manifest/src/scope.rs | 5 +- crates/memory/src/consolidate/input.rs | 17 +- crates/memory/src/consolidate/lock.rs | 3 +- crates/memory/src/consolidate/tidy.rs | 16 +- crates/pod/src/compact/prune.rs | 48 ++-- crates/pod/src/controller.rs | 319 ++++++++++++----------- crates/pod/src/ipc/interceptor.rs | 2 +- crates/pod/src/ipc/server.rs | 2 + crates/pod/src/lib.rs | 4 +- crates/pod/src/runtime/dir.rs | 3 +- crates/pod/src/shared_state.rs | 14 +- crates/pod/tests/consolidation_test.rs | 38 +-- crates/pod/tests/controller_test.rs | 156 ++++++++++- crates/pod/tests/pod_comm_tools_test.rs | 1 + crates/pod/tests/session_metrics_test.rs | 27 +- crates/protocol/src/lib.rs | 55 ++++ crates/session-metrics/src/lib.rs | 4 +- crates/tools/src/task.rs | 28 +- crates/tui/src/app.rs | 52 +++- crates/tui/src/client.rs | 4 + crates/tui/src/main.rs | 101 ++++--- crates/tui/src/ui.rs | 12 + tickets/tui-pod-status-sync.md | 40 +++ 24 files changed, 641 insertions(+), 311 deletions(-) create mode 100644 tickets/tui-pod-status-sync.md diff --git a/TODO.md b/TODO.md index d75ddc1a..767ce8ee 100644 --- a/TODO.md +++ b/TODO.md @@ -11,6 +11,7 @@ - ストリーム途中失敗時の継続 → [tickets/llm-worker-stream-continuation.md](tickets/llm-worker-stream-continuation.md) - ネイティブ GUI クライアント MVP → [tickets/native-gui-mvp.md](tickets/native-gui-mvp.md) - TUI 拡充 + - Run/notice 前後の Pod 状態同期と Ctrl 系操作の安定化 → [tickets/tui-pod-status-sync.md](tickets/tui-pod-status-sync.md) - Run 中の入力キューイング → [tickets/tui-input-queue.md](tickets/tui-input-queue.md) - ユーザーマニフェストのモデル設定 wizard → [tickets/tui-user-model-setup.md](tickets/tui-user-model-setup.md) - spawn 失敗時に Pod の stderr が TUI に表示されない → [tickets/tui-spawn-error-surface.md](tickets/tui-spawn-error-surface.md) diff --git a/crates/manifest/src/scope.rs b/crates/manifest/src/scope.rs index 5f1b829d..8cc64187 100644 --- a/crates/manifest/src/scope.rs +++ b/crates/manifest/src/scope.rs @@ -698,7 +698,10 @@ mod tests { std::fs::create_dir(&sub).unwrap(); let shared = SharedScope::new(Scope::writable(dir.path()).unwrap()); let target = sub.join("a.txt"); - assert_eq!(shared.load().permission_at(&target), Some(Permission::Write)); + assert_eq!( + shared.load().permission_at(&target), + Some(Permission::Write) + ); shared .update(|cur| { cur.with_added_deny_rules([ScopeRule { diff --git a/crates/memory/src/consolidate/input.rs b/crates/memory/src/consolidate/input.rs index 6b4df1dc..a2b4343e 100644 --- a/crates/memory/src/consolidate/input.rs +++ b/crates/memory/src/consolidate/input.rs @@ -203,7 +203,13 @@ pub fn render_tidy_hints(tidy: &TidyHints) -> String { "**Sources overflow** — consider trimming to the most recent entries (git log keeps the rest):\n", ); for s in &tidy.sources_overflow { - let _ = writeln!(&mut out, "- {} `{}` ({} sources)", s.kind.as_str(), s.slug, s.count); + let _ = writeln!( + &mut out, + "- {} `{}` ({} sources)", + s.kind.as_str(), + s.slug, + s.count + ); } out.push('\n'); } @@ -276,12 +282,9 @@ mod tests { .unwrap(); let staging = crate::consolidate::staging::list_staging_entries(&layout); let tidy = TidyHints { - replaced_decisions: [( - "old".to_string(), - Some("new".to_string()), - )] - .into_iter() - .collect(), + replaced_decisions: [("old".to_string(), Some("new".to_string()))] + .into_iter() + .collect(), sources_overflow: vec![SourcesOverflow { kind: RecordKind::Decision, slug: "dec".into(), diff --git a/crates/memory/src/consolidate/lock.rs b/crates/memory/src/consolidate/lock.rs index 772387e1..292d6331 100644 --- a/crates/memory/src/consolidate/lock.rs +++ b/crates/memory/src/consolidate/lock.rs @@ -295,8 +295,7 @@ mod tests { fn release_is_resilient_to_missing_consumed_entries() { let (_dir, layout) = make_layout(); let phantom = uuid::Uuid::now_v7(); - let lock = - StagingLock::acquire(&layout, std::process::id(), "pod", vec![phantom]).unwrap(); + let lock = StagingLock::acquire(&layout, std::process::id(), "pod", vec![phantom]).unwrap(); let lock_path = lock.path().to_path_buf(); // No file at /.json — release must not panic. lock.release_with_cleanup(&layout); diff --git a/crates/memory/src/consolidate/tidy.rs b/crates/memory/src/consolidate/tidy.rs index 34b52bcd..0794397a 100644 --- a/crates/memory/src/consolidate/tidy.rs +++ b/crates/memory/src/consolidate/tidy.rs @@ -74,10 +74,7 @@ pub fn collect_tidy_hints(layout: &WorkspaceLayout) -> TidyHints { for (slug, content) in &decisions { let fm = parse_yaml::(content); if let Some(fm) = fm.as_ref() { - if matches!( - fm.status, - crate::schema::DecisionStatus::Replaced - ) { + if matches!(fm.status, crate::schema::DecisionStatus::Replaced) { hints .replaced_decisions .insert(slug.clone(), fm.replaced_by.as_ref().map(|s| s.to_string())); @@ -113,9 +110,9 @@ pub fn collect_tidy_hints(layout: &WorkspaceLayout) -> TidyHints { } } } - hints - .sources_overflow - .sort_by(|a, b| (a.kind.as_str(), a.slug.as_str()).cmp(&(b.kind.as_str(), b.slug.as_str()))); + hints.sources_overflow.sort_by(|a, b| { + (a.kind.as_str(), a.slug.as_str()).cmp(&(b.kind.as_str(), b.slug.as_str())) + }); let decision_slugs: Vec<&str> = decisions.keys().map(|s| s.as_str()).collect(); let request_slugs: Vec<&str> = requests.keys().map(|s| s.as_str()).collect(); @@ -139,10 +136,7 @@ pub fn collect_tidy_hints(layout: &WorkspaceLayout) -> TidyHints { /// `/.insomnia/memory//*.md` (Knowledge は /// `/.insomnia/knowledge/*.md`) を slug ごとに `(slug, full content)` /// 化して返す。 -fn read_kind_records( - layout: &WorkspaceLayout, - kind: RecordKind, -) -> BTreeMap { +fn read_kind_records(layout: &WorkspaceLayout, kind: RecordKind) -> BTreeMap { let dir = match kind { RecordKind::Decision => layout.decisions_dir(), RecordKind::Request => layout.requests_dir(), diff --git a/crates/pod/src/compact/prune.rs b/crates/pod/src/compact/prune.rs index 3b238340..2c62b50b 100644 --- a/crates/pod/src/compact/prune.rs +++ b/crates/pod/src/compact/prune.rs @@ -49,33 +49,29 @@ impl Pod { let metrics = self.metrics_tracker_handle(); let usage_tracker = self.usage_tracker_handle(); - let observer: PruneObserver = Box::new(move |eval| { - match &eval.decision { - PruneDecision::Fired { .. } => { - let correlation_id = uuid::Uuid::now_v7().to_string(); - let mut metric = Metric::now("prune.fire") - .with_value(eval.estimated_savings as f64) - .with_correlation_id(&correlation_id) - .with_dimension("candidate_count", eval.candidate_count.to_string()); - if let Some(border) = eval.border_turn { - metric = metric.with_dimension("border_turn", border.to_string()); - } - metrics.push(metric); - usage_tracker.note_correlation_id(correlation_id); - } - PruneDecision::SkippedNoCandidates => { - metrics.push( - Metric::now("prune.skip").with_dimension("reason", "no_candidates"), - ); - } - PruneDecision::SkippedBelowMinSavings => { - metrics.push( - Metric::now("prune.skip") - .with_dimension("reason", "below_min_savings") - .with_dimension("candidate_count", eval.candidate_count.to_string()) - .with_value(eval.estimated_savings as f64), - ); + let observer: PruneObserver = Box::new(move |eval| match &eval.decision { + PruneDecision::Fired { .. } => { + let correlation_id = uuid::Uuid::now_v7().to_string(); + let mut metric = Metric::now("prune.fire") + .with_value(eval.estimated_savings as f64) + .with_correlation_id(&correlation_id) + .with_dimension("candidate_count", eval.candidate_count.to_string()); + if let Some(border) = eval.border_turn { + metric = metric.with_dimension("border_turn", border.to_string()); } + metrics.push(metric); + usage_tracker.note_correlation_id(correlation_id); + } + PruneDecision::SkippedNoCandidates => { + metrics.push(Metric::now("prune.skip").with_dimension("reason", "no_candidates")); + } + PruneDecision::SkippedBelowMinSavings => { + metrics.push( + Metric::now("prune.skip") + .with_dimension("reason", "below_min_savings") + .with_dimension("candidate_count", eval.candidate_count.to_string()) + .with_value(eval.estimated_savings as f64), + ); } }); diff --git a/crates/pod/src/controller.rs b/crates/pod/src/controller.rs index 2bd3cdc5..82874823 100644 --- a/crates/pod/src/controller.rs +++ b/crates/pod/src/controller.rs @@ -12,13 +12,15 @@ use crate::ipc::notify_buffer::NotifyBuffer; use crate::ipc::server::SocketServer; use crate::pod::{Pod, PodError, PodRunResult}; use crate::runtime::dir::RuntimeDir; -use crate::shared_state::{PodSharedState, PodStatus}; +use crate::shared_state::PodSharedState; use crate::spawn::comm_tools::{ list_pods_tool, read_pod_output_tool, send_to_pod_tool, stop_pod_tool, }; use crate::spawn::registry::SpawnedPodRegistry; use crate::spawn::tool::spawn_pod_tool; -use protocol::{AlertLevel, AlertSource, ErrorCode, Event, Method, RunResult, TurnResult}; +use protocol::{ + AlertLevel, AlertSource, ErrorCode, Event, Method, PodStatus, RunResult, TurnResult, +}; fn is_system_message_item(item: &Item) -> bool { matches!( @@ -63,6 +65,75 @@ impl PodHandle { } } +async fn set_controller_status( + shared_state: &Arc, + runtime_dir: &RuntimeDir, + event_tx: &broadcast::Sender, + status: PodStatus, +) { + shared_state.set_status(status); + let _ = runtime_dir.write_status(shared_state).await; + let _ = event_tx.send(Event::Status { status }); +} + +async fn run_post_run_jobs(pod: &mut Pod, alerter: &Alerter) +where + C: LlmClient, + St: Store, +{ + if let Err(e) = pod.try_post_run_extract().await { + tracing::warn!(error = %e, "Post-run memory extract error"); + alerter.alert( + AlertLevel::Warn, + AlertSource::Pod, + format!("post-run memory extract error: {e}"), + ); + } + if let Err(e) = pod.try_post_run_consolidate().await { + tracing::warn!(error = %e, "Post-run memory consolidate error"); + alerter.alert( + AlertLevel::Warn, + AlertSource::Pod, + format!("post-run memory consolidate error: {e}"), + ); + } + if let Err(e) = pod.try_post_run_compact().await { + tracing::warn!(error = %e, "Post-run compaction error"); + alerter.alert( + AlertLevel::Warn, + AlertSource::Compactor, + format!("post-run compaction error: {e}"), + ); + } +} + +async fn finish_controller_run( + pod: &mut Pod, + shared_state: &Arc, + runtime_dir: &RuntimeDir, + event_tx: &broadcast::Sender, + alerter: &Alerter, + new_status: PodStatus, +) where + C: LlmClient, + St: Store, +{ + if new_status == PodStatus::Busy { + run_post_run_jobs(pod, alerter).await; + } + + let items = pod.worker().history().to_vec(); + shared_state.update_history(items); + shared_state.set_user_segments(pod.user_segments().to_vec()); + let final_status = if new_status == PodStatus::Busy { + PodStatus::Idle + } else { + new_status + }; + set_controller_status(shared_state, runtime_dir, event_tx, final_status).await; + let _ = runtime_dir.write_history(shared_state).await; +} + // --------------------------------------------------------------------------- // PodController — actor that owns a Pod // --------------------------------------------------------------------------- @@ -414,8 +485,13 @@ impl PodController { let _ = event_tx.send(Event::UserMessage { segments: input.clone(), }); - shared_state.set_status(PodStatus::Running); - let _ = runtime_dir.write_status(&shared_state).await; + set_controller_status( + &shared_state, + &runtime_dir, + &event_tx, + PodStatus::Running, + ) + .await; let run_future = async { if was_paused { @@ -430,6 +506,7 @@ impl PodController { &event_tx, &cancel_tx, &shared_state, + &runtime_dir, ¬ify_buffer, self_parent_socket.as_ref(), &spawner_name, @@ -437,39 +514,15 @@ impl PodController { ) .await; - if new_status == PodStatus::Idle { - if let Err(e) = pod.try_post_run_extract().await { - tracing::warn!(error = %e, "Post-run memory extract error"); - alerter.alert( - AlertLevel::Warn, - AlertSource::Pod, - format!("post-run memory extract error: {e}"), - ); - } - if let Err(e) = pod.try_post_run_consolidate().await { - tracing::warn!(error = %e, "Post-run memory consolidate error"); - alerter.alert( - AlertLevel::Warn, - AlertSource::Pod, - format!("post-run memory consolidate error: {e}"), - ); - } - if let Err(e) = pod.try_post_run_compact().await { - tracing::warn!(error = %e, "Post-run compaction error"); - alerter.alert( - AlertLevel::Warn, - AlertSource::Compactor, - format!("post-run compaction error: {e}"), - ); - } - } - - let items = pod.worker().history().to_vec(); - shared_state.update_history(items); - shared_state.set_user_segments(pod.user_segments().to_vec()); - shared_state.set_status(new_status); - let _ = runtime_dir.write_status(&shared_state).await; - let _ = runtime_dir.write_history(&shared_state).await; + finish_controller_run( + &mut pod, + &shared_state, + &runtime_dir, + &event_tx, + &alerter, + new_status, + ) + .await; if shutdown { let _ = event_tx.send(Event::Shutdown); @@ -482,17 +535,23 @@ impl PodController { message: message.clone(), }); pod.push_notify(message); - if shared_state.get_status() != PodStatus::Idle { - // RUNNING / Paused: the buffer push is the - // entire operation; the in-flight turn (or - // next Resume) will drain the buffer at its - // next pre_llm_request. + let status = shared_state.get_status(); + if status != PodStatus::Idle { + // RUNNING / Paused / Busy: the buffer push is the + // entire operation; an in-flight turn (or the + // next Resume/Run after Busy) will drain the buffer + // at its next pre_llm_request. continue; } // IDLE: auto-start a turn so the LLM sees the // buffered notification(s) without a human Run. - shared_state.set_status(PodStatus::Running); - let _ = runtime_dir.write_status(&shared_state).await; + set_controller_status( + &shared_state, + &runtime_dir, + &event_tx, + PodStatus::Running, + ) + .await; let (new_status, shutdown) = run_with_cancel_support( pod.run_for_notification(), @@ -500,6 +559,7 @@ impl PodController { &event_tx, &cancel_tx, &shared_state, + &runtime_dir, ¬ify_buffer, self_parent_socket.as_ref(), &spawner_name, @@ -507,39 +567,15 @@ impl PodController { ) .await; - if new_status == PodStatus::Idle { - if let Err(e) = pod.try_post_run_extract().await { - tracing::warn!(error = %e, "Post-run memory extract error"); - alerter.alert( - AlertLevel::Warn, - AlertSource::Pod, - format!("post-run memory extract error: {e}"), - ); - } - if let Err(e) = pod.try_post_run_consolidate().await { - tracing::warn!(error = %e, "Post-run memory consolidate error"); - alerter.alert( - AlertLevel::Warn, - AlertSource::Pod, - format!("post-run memory consolidate error: {e}"), - ); - } - if let Err(e) = pod.try_post_run_compact().await { - tracing::warn!(error = %e, "Post-run compaction error"); - alerter.alert( - AlertLevel::Warn, - AlertSource::Compactor, - format!("post-run compaction error: {e}"), - ); - } - } - - let items = pod.worker().history().to_vec(); - shared_state.update_history(items); - shared_state.set_user_segments(pod.user_segments().to_vec()); - shared_state.set_status(new_status); - let _ = runtime_dir.write_status(&shared_state).await; - let _ = runtime_dir.write_history(&shared_state).await; + finish_controller_run( + &mut pod, + &shared_state, + &runtime_dir, + &event_tx, + &alerter, + new_status, + ) + .await; if shutdown { let _ = event_tx.send(Event::Shutdown); @@ -555,8 +591,13 @@ impl PodController { }); continue; } - shared_state.set_status(PodStatus::Running); - let _ = runtime_dir.write_status(&shared_state).await; + set_controller_status( + &shared_state, + &runtime_dir, + &event_tx, + PodStatus::Running, + ) + .await; let (new_status, shutdown) = run_with_cancel_support( pod.resume(), @@ -564,6 +605,7 @@ impl PodController { &event_tx, &cancel_tx, &shared_state, + &runtime_dir, ¬ify_buffer, self_parent_socket.as_ref(), &spawner_name, @@ -571,39 +613,15 @@ impl PodController { ) .await; - if new_status == PodStatus::Idle { - if let Err(e) = pod.try_post_run_extract().await { - tracing::warn!(error = %e, "Post-run memory extract error"); - alerter.alert( - AlertLevel::Warn, - AlertSource::Pod, - format!("post-run memory extract error: {e}"), - ); - } - if let Err(e) = pod.try_post_run_consolidate().await { - tracing::warn!(error = %e, "Post-run memory consolidate error"); - alerter.alert( - AlertLevel::Warn, - AlertSource::Pod, - format!("post-run memory consolidate error: {e}"), - ); - } - if let Err(e) = pod.try_post_run_compact().await { - tracing::warn!(error = %e, "Post-run compaction error"); - alerter.alert( - AlertLevel::Warn, - AlertSource::Compactor, - format!("post-run compaction error: {e}"), - ); - } - } - - let items = pod.worker().history().to_vec(); - shared_state.update_history(items); - shared_state.set_user_segments(pod.user_segments().to_vec()); - shared_state.set_status(new_status); - let _ = runtime_dir.write_status(&shared_state).await; - let _ = runtime_dir.write_history(&shared_state).await; + finish_controller_run( + &mut pod, + &shared_state, + &runtime_dir, + &event_tx, + &alerter, + new_status, + ) + .await; if shutdown { let _ = event_tx.send(Event::Shutdown); @@ -619,11 +637,12 @@ impl PodController { } Method::Pause => { - // Already paused → idempotent no-op. Otherwise - // the Pod is Idle (Running turns go through - // `run_with_cancel_support`, not this outer - // match), so there is nothing to pause. - if shared_state.get_status() != PodStatus::Paused { + // Already paused or post-run busy → idempotent no-op. + // Otherwise the Pod is Idle (Running turns go through + // `run_with_cancel_support`, not this outer match), so + // there is nothing to pause. + let status = shared_state.get_status(); + if !matches!(status, PodStatus::Paused | PodStatus::Busy) { let _ = event_tx.send(Event::Error { code: ErrorCode::NotRunning, message: "Pod is not running".into(), @@ -666,8 +685,13 @@ impl PodController { // notification is not stranded. Matches the // `Method::Notify` idle path. if shared_state.get_status() == PodStatus::Idle { - shared_state.set_status(PodStatus::Running); - let _ = runtime_dir.write_status(&shared_state).await; + set_controller_status( + &shared_state, + &runtime_dir, + &event_tx, + PodStatus::Running, + ) + .await; let (new_status, shutdown) = run_with_cancel_support( pod.run_for_notification(), @@ -675,6 +699,7 @@ impl PodController { &event_tx, &cancel_tx, &shared_state, + &runtime_dir, ¬ify_buffer, self_parent_socket.as_ref(), &spawner_name, @@ -682,39 +707,15 @@ impl PodController { ) .await; - if new_status == PodStatus::Idle { - if let Err(e) = pod.try_post_run_extract().await { - tracing::warn!(error = %e, "Post-run memory extract error"); - alerter.alert( - AlertLevel::Warn, - AlertSource::Pod, - format!("post-run memory extract error: {e}"), - ); - } - if let Err(e) = pod.try_post_run_consolidate().await { - tracing::warn!(error = %e, "Post-run memory consolidate error"); - alerter.alert( - AlertLevel::Warn, - AlertSource::Pod, - format!("post-run memory consolidate error: {e}"), - ); - } - if let Err(e) = pod.try_post_run_compact().await { - tracing::warn!(error = %e, "Post-run compaction error"); - alerter.alert( - AlertLevel::Warn, - AlertSource::Compactor, - format!("post-run compaction error: {e}"), - ); - } - } - - let items = pod.worker().history().to_vec(); - shared_state.update_history(items); - shared_state.set_user_segments(pod.user_segments().to_vec()); - shared_state.set_status(new_status); - let _ = runtime_dir.write_status(&shared_state).await; - let _ = runtime_dir.write_history(&shared_state).await; + finish_controller_run( + &mut pod, + &shared_state, + &runtime_dir, + &event_tx, + &alerter, + new_status, + ) + .await; if shutdown { let _ = event_tx.send(Event::Shutdown); @@ -766,6 +767,7 @@ async fn run_with_cancel_support( event_tx: &broadcast::Sender, cancel_tx: &mpsc::Sender<()>, shared_state: &Arc, + runtime_dir: &RuntimeDir, notify_buffer: &NotifyBuffer, parent_socket: Option<&std::path::PathBuf>, self_name: &str, @@ -784,11 +786,18 @@ where return match result { Ok(r) => { let (status, run_result) = match r { - PodRunResult::Finished => (PodStatus::Idle, RunResult::Finished), + PodRunResult::Finished => (PodStatus::Busy, RunResult::Finished), PodRunResult::Paused => (PodStatus::Paused, RunResult::Paused), - PodRunResult::LimitReached => (PodStatus::Idle, RunResult::LimitReached), + PodRunResult::LimitReached => (PodStatus::Busy, RunResult::LimitReached), }; let _ = event_tx.send(Event::RunEnd { result: run_result }); + if status == PodStatus::Busy { + shared_state.set_status(PodStatus::Busy); + let _ = runtime_dir.write_status(shared_state).await; + let _ = event_tx.send(Event::Status { + status: PodStatus::Busy, + }); + } if matches!(run_result, RunResult::Finished) { crate::ipc::event::fire_and_forget( parent_socket.cloned(), @@ -822,7 +831,7 @@ where message, }, ); - (PodStatus::Idle, shutdown_requested) + (PodStatus::Busy, shutdown_requested) } }; } diff --git a/crates/pod/src/ipc/interceptor.rs b/crates/pod/src/ipc/interceptor.rs index dfdeb4dc..39a70fb4 100644 --- a/crates/pod/src/ipc/interceptor.rs +++ b/crates/pod/src/ipc/interceptor.rs @@ -17,9 +17,9 @@ use llm_worker::interceptor::{ Interceptor, PostToolAction, PreRequestAction, PreToolAction, PromptAction, ToolCallInfo, ToolResultInfo, TurnEndAction, }; -use tracing::warn; use llm_worker::tool::ToolOutput; use tracing::info; +use tracing::warn; use crate::compact::state::CompactState; use crate::hook::{ diff --git a/crates/pod/src/ipc/server.rs b/crates/pod/src/ipc/server.rs index a414a653..ffea034d 100644 --- a/crates/pod/src/ipc/server.rs +++ b/crates/pod/src/ipc/server.rs @@ -159,10 +159,12 @@ async fn handle_connection(stream: tokio::net::UnixStream, handle: PodHandle) { }) .collect(); let greeting = handle.shared_state.greeting.clone(); + let status = handle.shared_state.get_status(); if writer .write(&Event::History { items: values, greeting, + status, }) .await .is_err() diff --git a/crates/pod/src/lib.rs b/crates/pod/src/lib.rs index 8b8bcdfd..a6786a34 100644 --- a/crates/pod/src/lib.rs +++ b/crates/pod/src/lib.rs @@ -26,7 +26,7 @@ pub use pod::{Pod, PodError, PodRunResult, apply_worker_manifest}; pub use prompt::catalog::{CatalogError, PodPrompt, PromptCatalog}; pub use prompt::loader::PromptLoader; pub use prompt::system::{SystemPromptContext, SystemPromptError, SystemPromptTemplate}; -pub use protocol::{ErrorCode, Event, Method, TurnResult}; +pub use protocol::{ErrorCode, Event, Method, PodStatus, TurnResult}; pub use provider::{ProviderError, build_client}; pub use runtime::dir::RuntimeDir; -pub use shared_state::{PodSharedState, PodStatus}; +pub use shared_state::PodSharedState; diff --git a/crates/pod/src/runtime/dir.rs b/crates/pod/src/runtime/dir.rs index 8ecbd9f1..8a6c6ece 100644 --- a/crates/pod/src/runtime/dir.rs +++ b/crates/pod/src/runtime/dir.rs @@ -131,7 +131,8 @@ pub fn default_base() -> Result { #[cfg(test)] mod tests { use super::*; - use crate::shared_state::{PodSharedState, PodStatus}; + use crate::shared_state::PodSharedState; + use protocol::PodStatus; fn test_state() -> PodSharedState { PodSharedState::new( diff --git a/crates/pod/src/shared_state.rs b/crates/pod/src/shared_state.rs index 2ae0e5ab..04dd8200 100644 --- a/crates/pod/src/shared_state.rs +++ b/crates/pod/src/shared_state.rs @@ -1,8 +1,8 @@ use std::sync::{OnceLock, RwLock}; use llm_worker::llm_client::types::Item; -use protocol::Segment; -use serde::{Deserialize, Serialize}; +use protocol::{PodStatus, Segment}; +use serde_json::json; use session_store::SessionId; use crate::fs_view::PodFsView; @@ -39,14 +39,6 @@ pub struct PodSharedState { workflows: OnceLock>, } -#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] -#[serde(rename_all = "snake_case")] -pub enum PodStatus { - Idle, - Running, - Paused, -} - impl PodSharedState { pub fn new( pod_name: String, @@ -138,7 +130,7 @@ impl PodSharedState { /// Serialize status as JSON. pub fn status_json(&self) -> String { let status = self.get_status(); - serde_json::json!({ + json!({ "state": status, "session_id": self.session_id.to_string(), "pod_name": self.pod_name, diff --git a/crates/pod/tests/consolidation_test.rs b/crates/pod/tests/consolidation_test.rs index 7badcfda..d9e02545 100644 --- a/crates/pod/tests/consolidation_test.rs +++ b/crates/pod/tests/consolidation_test.rs @@ -206,10 +206,7 @@ async fn no_thresholds_is_a_noop() { .expect("phase 2 disabled when both thresholds are None"); // No staging entries removed. - assert_eq!( - memory::consolidate::list_staging_entries(&layout).len(), - 5 - ); + assert_eq!(memory::consolidate::list_staging_entries(&layout).len(), 5); } #[tokio::test] @@ -256,10 +253,7 @@ async fn below_threshold_skips_and_does_not_take_lock() { pod.try_post_run_consolidate().await.unwrap(); // Staging untouched. - assert_eq!( - memory::consolidate::list_staging_entries(&layout).len(), - 1 - ); + assert_eq!(memory::consolidate::list_staging_entries(&layout).len(), 1); // Lock file must not exist. let lock_path = layout.staging_dir().join(".consolidation.lock"); assert!(!lock_path.exists(), "lock file should not be created"); @@ -285,10 +279,7 @@ async fn fires_on_threshold_and_cleans_up_consumed_entries() { ); // Lock removed too. let lock_path = layout.staging_dir().join(".consolidation.lock"); - assert!( - !lock_path.exists(), - "lock file must be removed on success" - ); + assert!(!lock_path.exists(), "lock file must be removed on success"); } #[tokio::test] @@ -300,7 +291,12 @@ async fn in_flight_guard_skips_reentry_without_clearing() { write_n_staging(&layout, 2); let client = MockClient::new(vec![]); - let mut pod = make_pod_with(FILES_THRESHOLD_TOML, pwd.path().to_path_buf(), client.clone()).await; + let mut pod = make_pod_with( + FILES_THRESHOLD_TOML, + pwd.path().to_path_buf(), + client.clone(), + ) + .await; // Pre-set the in-flight flag as if another concurrent caller had // entered run_consolidate_once. The CAS at the top of @@ -334,7 +330,9 @@ async fn in_flight_guard_skips_reentry_without_clearing() { let mut pod2 = make_pod_with(FILES_THRESHOLD_TOML, pwd.path().to_path_buf(), client2).await; pod2.try_post_run_consolidate().await.unwrap(); assert!( - !pod2.consolidation_in_flight_handle().load(Ordering::Acquire), + !pod2 + .consolidation_in_flight_handle() + .load(Ordering::Acquire), "in-flight flag must be cleared after a normal run" ); } @@ -356,7 +354,12 @@ async fn coalesce_loop_terminates_with_one_iteration_when_snapshot_drains_stagin // run_consolidate_once after Completed, the second sub-worker run // would exhaust the mock and surface as an error. let client = MockClient::new(vec![done("ok")]); - let mut pod = make_pod_with(FILES_THRESHOLD_TOML, pwd.path().to_path_buf(), client.clone()).await; + let mut pod = make_pod_with( + FILES_THRESHOLD_TOML, + pwd.path().to_path_buf(), + client.clone(), + ) + .await; pod.try_post_run_consolidate().await.unwrap(); assert_eq!( @@ -393,8 +396,5 @@ async fn live_lock_held_by_other_pod_skips() { .expect("InUse lock must surface as graceful skip"); // Staging untouched: lock holder owns the snapshot, not us. - assert_eq!( - memory::consolidate::list_staging_entries(&layout).len(), - 3 - ); + assert_eq!(memory::consolidate::list_staging_entries(&layout).len(), 3); } diff --git a/crates/pod/tests/controller_test.rs b/crates/pod/tests/controller_test.rs index d8b391e6..c559caee 100644 --- a/crates/pod/tests/controller_test.rs +++ b/crates/pod/tests/controller_test.rs @@ -10,7 +10,7 @@ use llm_worker::llm_client::{ClientError, LlmClient, Request}; use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput}; use session_store::FsStore; -use pod::{Event, Method, Pod, PodController, PodManifest, PodStatus}; +use pod::{Event, Method, Pod, PodController, PodHandle, PodManifest, PodStatus}; // --------------------------------------------------------------------------- // Mock LLM Client @@ -147,8 +147,6 @@ async fn make_pod_with_pwd(client: MockClient) -> (Pod, std (pod, pwd) } -use pod::PodHandle; - async fn spawn_controller(pod: Pod) -> PodHandle { let tmp = tempfile::tempdir().unwrap(); let runtime_base = tmp.path().to_owned(); @@ -157,9 +155,132 @@ async fn spawn_controller(pod: Pod) -> PodHandle { handle } +async fn wait_for_status(handle: &PodHandle, status: PodStatus) { + let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(2); + loop { + if handle.shared_state.get_status() == status { + return; + } + assert!( + tokio::time::Instant::now() < deadline, + "timed out waiting for status {status:?}; current={:?}", + handle.shared_state.get_status() + ); + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + } +} + // --------------------------------------------------------------------------- -// Tests -// --------------------------------------------------------------------------- + +#[tokio::test] +async fn run_end_enters_busy_until_post_run_finishes_and_broadcasts_status() { + let client = MockClient::new(simple_text_events()); + let pod = make_pod(client).await; + let handle = spawn_controller(pod).await; + let mut rx = handle.subscribe(); + + handle.send(Method::run_text("Hello")).await.unwrap(); + + let mut saw_run_end = false; + let mut saw_busy_status = false; + let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(2); + loop { + tokio::select! { + event = rx.recv() => { + match event { + Ok(Event::RunEnd { result: protocol::RunResult::Finished }) => { + saw_run_end = true; + } + Ok(Event::Status { + status: PodStatus::Busy, + }) if saw_run_end => { + saw_busy_status = true; + break; + } + Ok(_) => {} + Err(_) => break, + } + } + _ = tokio::time::sleep_until(deadline) => break, + } + } + + assert!(saw_run_end, "expected RunEnd::Finished"); + assert!( + saw_busy_status, + "expected busy status immediately after RunEnd" + ); + wait_for_status(&handle, PodStatus::Idle).await; +} + +#[tokio::test] +async fn attach_history_includes_current_status() { + let client = MockClient::sequential(vec![MockResponse::Hang(simple_text_events())]); + let pod = make_pod(client).await; + let handle = spawn_controller(pod).await; + + handle.send(Method::run_text("Hello")).await.unwrap(); + wait_for_status(&handle, PodStatus::Running).await; + + let stream = tokio::net::UnixStream::connect(handle.runtime_dir.socket_path()) + .await + .unwrap(); + let (reader, writer) = stream.into_split(); + let mut reader = protocol::stream::JsonLineReader::new(reader); + let mut writer = protocol::stream::JsonLineWriter::new(writer); + writer.write(&Method::GetHistory).await.unwrap(); + + let event = reader.next::().await.unwrap().unwrap(); + match event { + Event::History { status, .. } => assert_eq!(status, PodStatus::Running), + other => panic!("expected History, got {other:?}"), + } +} + +#[tokio::test] +async fn pause_while_busy_is_idempotent_not_not_running() { + let client = MockClient::new(simple_text_events()); + let pod = make_pod(client).await; + let handle = spawn_controller(pod).await; + let mut rx = handle.subscribe(); + + handle.send(Method::run_text("Hello")).await.unwrap(); + + let mut saw_busy = false; + let mut saw_idle = false; + let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(2); + loop { + tokio::select! { + event = rx.recv() => { + match event { + Ok(Event::RunEnd { .. }) => { + handle.send(Method::Pause).await.unwrap(); + } + Ok(Event::Status { status: PodStatus::Busy }) => { + saw_busy = true; + } + Ok(Event::Status { status: PodStatus::Idle }) if saw_busy => { + saw_idle = true; + break; + } + Ok(Event::Error { + code: protocol::ErrorCode::NotRunning, + .. + }) if saw_busy && !saw_idle => { + panic!("Pause while Busy should be an idempotent no-op"); + } + Ok(_) => {} + Err(_) => break, + } + } + _ = tokio::time::sleep_until(deadline) => break, + } + } + + assert!(saw_busy, "expected Busy status"); + assert!(saw_idle, "expected final Idle status"); + assert_eq!(handle.shared_state.get_status(), PodStatus::Idle); +} #[tokio::test] async fn shared_state_starts_idle() { @@ -565,10 +686,10 @@ async fn notify_while_idle_auto_starts_turn_and_injects_system_message() { // request context for that turn). let requests = client_for_assert.captured_requests(); assert_eq!(requests.len(), 1, "one LLM call expected"); - let notify_in_request = requests[0] - .items - .iter() - .any(|i| i.as_text().is_some_and(|t| t.contains("[Notification]") && t.contains("turn finished"))); + let notify_in_request = requests[0].items.iter().any(|i| { + i.as_text() + .is_some_and(|t| t.contains("[Notification]") && t.contains("turn finished")) + }); assert!( notify_in_request, "injected system message missing from request, got items: {:?}", @@ -583,13 +704,17 @@ async fn notify_while_idle_auto_starts_turn_and_injects_system_message() { // (and therefore eventually into history.json), per // tickets/notify-history-persist.md. let history = handle.shared_state.history(); - let notify_in_history = history - .iter() - .any(|i| i.as_text().is_some_and(|t| t.contains("[Notification]") && t.contains("turn finished"))); + let notify_in_history = history.iter().any(|i| { + i.as_text() + .is_some_and(|t| t.contains("[Notification]") && t.contains("turn finished")) + }); assert!( notify_in_history, "notify must be committed to worker.history, got items: {:?}", - history.iter().filter_map(|i| i.as_text()).collect::>() + history + .iter() + .filter_map(|i| i.as_text()) + .collect::>() ); } @@ -671,7 +796,10 @@ async fn pod_event_turn_ended_while_idle_auto_starts_turn_and_injects_system_mes assert!( event_in_history, "PodEvent must be committed to worker.history, got items: {:?}", - history.iter().filter_map(|i| i.as_text()).collect::>() + history + .iter() + .filter_map(|i| i.as_text()) + .collect::>() ); } diff --git a/crates/pod/tests/pod_comm_tools_test.rs b/crates/pod/tests/pod_comm_tools_test.rs index 7453dc5d..a72d276d 100644 --- a/crates/pod/tests/pod_comm_tools_test.rs +++ b/crates/pod/tests/pod_comm_tools_test.rs @@ -174,6 +174,7 @@ fn serve_history(listener: UnixListener, items: Vec) -> JoinHandle<()> { scope_summary: String::new(), tools: Vec::new(), }, + status: protocol::PodStatus::Idle, }; let _ = writer.write(&event).await; } diff --git a/crates/pod/tests/session_metrics_test.rs b/crates/pod/tests/session_metrics_test.rs index 7f0c18d9..c396385a 100644 --- a/crates/pod/tests/session_metrics_test.rs +++ b/crates/pod/tests/session_metrics_test.rs @@ -22,9 +22,7 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use async_trait::async_trait; use futures::Stream; use llm_worker::Worker; -use llm_worker::llm_client::event::{ - Event as LlmEvent, ResponseStatus, StatusEvent, UsageEvent, -}; +use llm_worker::llm_client::event::{Event as LlmEvent, ResponseStatus, StatusEvent, UsageEvent}; use llm_worker::llm_client::{ClientError, LlmClient, Request}; use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput}; use session_metrics::{DOMAIN, Metric, metrics_from_extensions}; @@ -169,7 +167,11 @@ async fn make_pod( manifest_toml: String, client: MockClient, tool_name: &'static str, -) -> (Pod, tempfile::TempDir, tempfile::TempDir) { +) -> ( + Pod, + tempfile::TempDir, + tempfile::TempDir, +) { let manifest = PodManifest::from_toml(&manifest_toml).unwrap(); let store_tmp = tempfile::tempdir().unwrap(); let store = FsStore::new(store_tmp.path()).await.unwrap(); @@ -199,8 +201,7 @@ async fn prune_metrics_emit_skip_then_fire_with_post_request_join() { text_response_with_cache("ok", 0, 200), text_response_with_cache("done", 1234, 50), ]); - let (mut pod, _store_tmp, _pwd_tmp) = - make_pod(manifest_toml(1, 1), client, "big_tool").await; + let (mut pod, _store_tmp, _pwd_tmp) = make_pod(manifest_toml(1, 1), client, "big_tool").await; let session_id = pod.session_id(); // Cloning the store handle to read the session log back after the // runs complete — the Pod retains its own copy. @@ -253,10 +254,7 @@ async fn prune_metrics_emit_skip_then_fire_with_post_request_join() { fire.dimensions.contains_key("border_turn"), "fire missing border_turn: {fire:?}" ); - assert!( - fire.value.is_some(), - "fire missing estimated_savings value" - ); + assert!(fire.value.is_some(), "fire missing estimated_savings value"); let fire_id = fire .correlation_id .as_ref() @@ -272,7 +270,9 @@ async fn prune_metrics_emit_skip_then_fire_with_post_request_join() { assert_eq!(post.correlation_id.as_ref(), Some(fire_id)); assert_eq!(post.value, Some(1234.0)); assert_eq!( - post.dimensions.get("cache_write_tokens").map(String::as_str), + post.dimensions + .get("cache_write_tokens") + .map(String::as_str), Some("50") ); assert!(post.dimensions.contains_key("history_len")); @@ -457,7 +457,10 @@ permission = "write" let state = session_store::restore(&store, session_id).await.unwrap(); let metrics = metrics_from_extensions(&state.extensions); - assert!(metrics.is_empty(), "no metrics should be recorded: {metrics:?}"); + assert!( + metrics.is_empty(), + "no metrics should be recorded: {metrics:?}" + ); // And no extension entries at all in the metrics domain. assert!(state.extensions.iter().all(|(d, _)| d != DOMAIN)); diff --git a/crates/protocol/src/lib.rs b/crates/protocol/src/lib.rs index 0b96142b..f8736d3e 100644 --- a/crates/protocol/src/lib.rs +++ b/crates/protocol/src/lib.rs @@ -314,6 +314,17 @@ pub enum Event { History { items: Vec, greeting: Greeting, + /// Current Pod controller status at the moment the history snapshot + /// was taken. This lets late-attaching clients render and route + /// controls from the real controller state instead of inferring from + /// replayed history. + #[serde(default)] + status: PodStatus, + }, + /// Current Pod controller status. Broadcast on every controller-level + /// transition and included in `History` snapshots for late attach. + Status { + status: PodStatus, }, /// Reply to `Method::ListCompletions`. Delivered only to the /// requesting socket (not broadcast). `entries` is empty when no @@ -422,6 +433,19 @@ pub struct Greeting { // Supporting types // --------------------------------------------------------------------------- +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)] +#[serde(rename_all = "snake_case")] +pub enum PodStatus { + #[default] + Idle, + Running, + Paused, + /// The worker turn has ended, but the controller is still performing + /// post-run jobs (memory extract / consolidate / compact) and cannot + /// accept a new turn immediately. + Busy, +} + #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] pub enum TurnResult { @@ -724,6 +748,7 @@ mod tests { scope_summary: "Writable:\n - /tmp".into(), tools: vec!["Read".into()], }, + status: PodStatus::Paused, }; let json = serde_json::to_string(&event).unwrap(); let parsed: serde_json::Value = serde_json::from_str(&json).unwrap(); @@ -732,6 +757,36 @@ mod tests { assert_eq!(parsed["data"]["items"][0]["role"], "user"); assert_eq!(parsed["data"]["greeting"]["pod_name"], "test"); assert_eq!(parsed["data"]["greeting"]["tools"][0], "Read"); + assert_eq!(parsed["data"]["status"], "paused"); + } + + #[test] + fn event_status_format() { + let event = Event::Status { + status: PodStatus::Busy, + }; + let json = serde_json::to_string(&event).unwrap(); + let parsed: serde_json::Value = serde_json::from_str(&json).unwrap(); + assert_eq!(parsed["event"], "status"); + assert_eq!(parsed["data"]["status"], "busy"); + + let decoded: Event = serde_json::from_str(&json).unwrap(); + assert!(matches!( + decoded, + Event::Status { + status: PodStatus::Busy + } + )); + } + + #[test] + fn event_history_legacy_without_status_defaults_to_idle() { + let json = r#"{"event":"history","data":{"items":[],"greeting":{"pod_name":"test","cwd":"/tmp","provider":"anthropic","model":"claude","scope_summary":"","tools":[]}}}"#; + let decoded: Event = serde_json::from_str(json).unwrap(); + match decoded { + Event::History { status, .. } => assert_eq!(status, PodStatus::Idle), + other => panic!("expected History, got {other:?}"), + } } #[test] diff --git a/crates/session-metrics/src/lib.rs b/crates/session-metrics/src/lib.rs index 73037a73..3b209125 100644 --- a/crates/session-metrics/src/lib.rs +++ b/crates/session-metrics/src/lib.rs @@ -89,9 +89,7 @@ pub async fn record_metric( /// `Metric` 列に fold する。 /// /// schema 変更で deserialize できない payload は無視する(後方互換)。 -pub fn metrics_from_extensions( - extensions: &[(String, serde_json::Value)], -) -> Vec { +pub fn metrics_from_extensions(extensions: &[(String, serde_json::Value)]) -> Vec { extensions .iter() .filter(|(domain, _)| domain == DOMAIN) diff --git a/crates/tools/src/task.rs b/crates/tools/src/task.rs index 7e16c716..e410b537 100644 --- a/crates/tools/src/task.rs +++ b/crates/tools/src/task.rs @@ -385,8 +385,8 @@ pub fn render_snapshot(tasks: &[TaskEntry]) -> String { let snapshot = TaskSnapshot { tasks: tasks.to_vec(), }; - let json = serde_json::to_string_pretty(&snapshot) - .unwrap_or_else(|_| String::from("{\"tasks\":[]}")); + let json = + serde_json::to_string_pretty(&snapshot).unwrap_or_else(|_| String::from("{\"tasks\":[]}")); format!("{}\n\n```json\n{}\n```\n", snapshot_overview(tasks), json) } @@ -560,7 +560,8 @@ mod tests { fn replay_history_uses_compact_snapshot_and_continues_updates() { let pre = TaskStore::new(); pre.create("kept".into(), "from compact".into()); - pre.update(1, Some(TaskStatus::Inprogress), None, None).unwrap(); + pre.update(1, Some(TaskStatus::Inprogress), None, None) + .unwrap(); let history = vec![ Item::system_message(wrap_snapshot_system_message(&pre.snapshot_text())), Item::tool_call("u1", "TaskUpdate", r#"{"taskid":1,"status":"completed"}"#), @@ -587,13 +588,23 @@ mod tests { // pre-compact `TaskCreate`s do not surface as duplicates. let pre = TaskStore::new(); pre.create("A".into(), "A-desc".into()); - pre.update(1, Some(TaskStatus::Completed), None, None).unwrap(); + pre.update(1, Some(TaskStatus::Completed), None, None) + .unwrap(); pre.create("B".into(), "B-desc".into()); - pre.update(2, Some(TaskStatus::Inprogress), None, None).unwrap(); + pre.update(2, Some(TaskStatus::Inprogress), None, None) + .unwrap(); let history = vec![ - Item::tool_call("c1", "TaskCreate", r#"{"subject":"A","description":"A-desc"}"#), + Item::tool_call( + "c1", + "TaskCreate", + r#"{"subject":"A","description":"A-desc"}"#, + ), Item::tool_call("u1", "TaskUpdate", r#"{"taskid":1,"status":"completed"}"#), - Item::tool_call("c2", "TaskCreate", r#"{"subject":"B","description":"B-desc"}"#), + Item::tool_call( + "c2", + "TaskCreate", + r#"{"subject":"B","description":"B-desc"}"#, + ), Item::tool_call("u2", "TaskUpdate", r#"{"taskid":2,"status":"inprogress"}"#), Item::system_message(wrap_snapshot_system_message(&pre.snapshot_text())), Item::tool_call("compact-tasklist", "TaskList", "{}"), @@ -625,7 +636,8 @@ mod tests { "subject with\nembedded newline\n- bullet".into(), "desc:\n status: not-actually-a-field\n ```code fence```".into(), ); - pre.update(1, Some(TaskStatus::Inprogress), None, None).unwrap(); + pre.update(1, Some(TaskStatus::Inprogress), None, None) + .unwrap(); let history = vec![Item::system_message(wrap_snapshot_system_message( &pre.snapshot_text(), diff --git a/crates/tui/src/app.rs b/crates/tui/src/app.rs index 384d1f77..a7663b54 100644 --- a/crates/tui/src/app.rs +++ b/crates/tui/src/app.rs @@ -1,7 +1,8 @@ use std::time::Instant; use protocol::{ - AlertLevel, AlertSource, CompletionEntry, CompletionKind, Event, Method, RunResult, Segment, + AlertLevel, AlertSource, CompletionEntry, CompletionKind, Event, Method, PodStatus, RunResult, + Segment, }; use crate::block::{ @@ -41,11 +42,16 @@ impl CompletionState { pub struct App { pub pod_name: String, pub connected: bool, + /// Last controller status reported by the Pod. Drives the status line + /// and Ctrl-key routing; do not infer this solely from replayed history. + pub pod_status: PodStatus, + /// True while the Pod is in `PodStatus::Running`. pub running: bool, - /// True while the Pod is in `PodStatus::Paused`. Set on - /// `RunEnd::Paused` and cleared when a new turn starts (either via - /// `Resume` or a fresh `Run`). + /// True while the Pod is in `PodStatus::Paused`. pub paused: bool, + /// True after worker `RunEnd` while controller post-run work is still + /// blocking the next method. + pub busy: bool, pub run_requests: usize, /// Sum of `input_tokens - cache_read_input_tokens` across the /// current turn's LLM requests — i.e. the net tokens this turn @@ -80,8 +86,10 @@ impl App { Self { pod_name, connected: false, + pod_status: PodStatus::Idle, running: false, paused: false, + busy: false, run_requests: 0, run_upload_tokens: 0, run_output_tokens: 0, @@ -99,6 +107,16 @@ impl App { } } + pub fn set_pod_status(&mut self, status: PodStatus) { + self.pod_status = status; + self.running = status == PodStatus::Running; + self.paused = status == PodStatus::Paused; + self.busy = status == PodStatus::Busy; + if self.running || self.busy { + self.quit_confirm = None; + } + } + /// Re-evaluate the completion popup against the current input. /// Returns a `Method::ListCompletions` to send when the /// `(kind, prefix_start, prefix)` triple changed; otherwise `None`. @@ -278,6 +296,10 @@ impl App { } pub fn submit_input(&mut self) -> Option { + if self.busy { + self.push_error("Pod is finishing post-run work; wait for idle before submitting."); + return None; + } let segments = self.input.submit_segments(); if segments_are_blank(&segments) { // Empty Enter only does something meaningful when the Pod @@ -450,8 +472,7 @@ impl App { self.assistant_streaming = false; } Event::TurnStart { .. } => { - self.running = true; - self.paused = false; + self.set_pod_status(PodStatus::Running); self.run_requests += 1; self.current_tool = None; self.assistant_streaming = false; @@ -617,8 +638,10 @@ impl App { upload_tokens: self.run_upload_tokens, output_tokens: self.run_output_tokens, }); - self.running = false; - self.paused = matches!(result, RunResult::Paused); + self.set_pod_status(match result { + RunResult::Paused => PodStatus::Paused, + RunResult::Finished | RunResult::LimitReached => PodStatus::Busy, + }); self.run_requests = 0; self.run_upload_tokens = 0; self.run_output_tokens = 0; @@ -643,8 +666,16 @@ impl App { message: alert.message, }); } - Event::History { items, greeting } => { + Event::History { + items, + greeting, + status, + } => { self.restore_history(&items, greeting); + self.set_pod_status(status); + } + Event::Status { status } => { + self.set_pod_status(status); } Event::Completions { kind, entries } => { // Apply only if the popup is still on the same @@ -1216,8 +1247,11 @@ mod completion_flow_tests { "text": "[File: src/main.rs]\nfn main() {}", }], })], + status: PodStatus::Running, }); + assert!(matches!(app.pod_status, PodStatus::Running)); + assert!(app.running); assert!(matches!( app.blocks.get(1), Some(Block::SystemMessage { text }) if text == "[File: src/main.rs]\nfn main() {}" diff --git a/crates/tui/src/client.rs b/crates/tui/src/client.rs index 04d44093..0988727f 100644 --- a/crates/tui/src/client.rs +++ b/crates/tui/src/client.rs @@ -35,6 +35,10 @@ impl PodClient { self.writer.write(method).await } + pub fn try_next_event(&mut self) -> Option { + self.event_rx.try_recv().ok() + } + pub async fn next_event(&mut self) -> Option { self.event_rx.recv().await } diff --git a/crates/tui/src/main.rs b/crates/tui/src/main.rs index b754b083..2aa53b3a 100644 --- a/crates/tui/src/main.rs +++ b/crates/tui/src/main.rs @@ -21,7 +21,7 @@ use crossterm::execute; use crossterm::terminal::{ EnterAlternateScreen, LeaveAlternateScreen, disable_raw_mode, enable_raw_mode, }; -use protocol::Method; +use protocol::{Method, PodStatus}; use ratatui::Terminal; use ratatui::backend::CrosstermBackend; use session_store::SessionId; @@ -283,29 +283,44 @@ async fn run_loop( break; } + // Drain any already-buffered Pod events in a bounded batch before + // polling the terminal. This keeps status fresh without letting a + // busy event stream starve Ctrl-C / Ctrl-X input. + for _ in 0..32 { + match client.try_next_event() { + Some(ev) => app.handle_pod_event(ev), + None => break, + } + } + + // Always give the terminal queue a non-blocking pass each frame. + // The awaited select below only waits after this pass found nothing. + let mut handled_term_event = false; + while event::poll(std::time::Duration::ZERO)? { + handled_term_event = true; + handle_terminal_event(app, &mut client, event::read()?).await?; + if app.quit { + break; + } + } + if app.quit { + break; + } + if handled_term_event { + terminal.draw(|f| ui::draw(f, app))?; + continue; + } + tokio::select! { - _ = tokio::task::spawn_blocking(|| event::poll(std::time::Duration::from_millis(50))) => { - while event::poll(std::time::Duration::ZERO)? { - match event::read()? { - TermEvent::Key(key) => { - if let Some(method) = handle_key(app, key) { - client.send(&method).await?; - } - } - TermEvent::Mouse(mouse) => { - handle_mouse(app, mouse); - } - TermEvent::Paste(s) => { - app.insert_paste(s); - } - TermEvent::Resize(_, _) => { - // No-op: next draw repaints in full. - } - _ => {} - } - if app.quit { - break; - } + term_event = tokio::task::spawn_blocking(|| { + if event::poll(std::time::Duration::from_millis(50))? { + event::read().map(Some) + } else { + Ok(None) + } + }) => { + if let Some(term_event) = term_event?? { + handle_terminal_event(app, &mut client, term_event).await?; } } event = client.next_event(), if app.connected => { @@ -325,6 +340,31 @@ async fn run_loop( Ok(()) } +async fn handle_terminal_event( + app: &mut App, + client: &mut PodClient, + event: TermEvent, +) -> Result<(), Box> { + match event { + TermEvent::Key(key) => { + if let Some(method) = handle_key(app, key) { + client.send(&method).await?; + } + } + TermEvent::Mouse(mouse) => { + handle_mouse(app, mouse); + } + TermEvent::Paste(s) => { + app.insert_paste(s); + } + TermEvent::Resize(_, _) => { + // No-op: next draw repaints in full. + } + _ => {} + } + Ok(()) +} + fn run_disconnected(_app: &mut App) -> Result<(), Box> { loop { if event::poll(std::time::Duration::from_millis(100))? @@ -392,10 +432,13 @@ fn handle_key(app: &mut App, key: KeyEvent) -> Option { Some(app.refresh_completion()) } KeyCode::Char('c') if ctrl => Some(handle_pause_or_quit(app)), - KeyCode::Char('x') if ctrl => Some(if app.running { - Some(Method::Cancel) - } else { - Some(Method::Shutdown) + KeyCode::Char('x') if ctrl => Some(match app.pod_status { + PodStatus::Running => Some(Method::Cancel), + PodStatus::Paused | PodStatus::Idle => Some(Method::Shutdown), + PodStatus::Busy => { + app.push_error("Pod is finishing post-run work; wait for idle or press Ctrl-C twice to exit the TUI."); + None + } }), KeyCode::Char('d') if ctrl => { app.quit = true; @@ -534,9 +577,9 @@ fn handle_key(app: &mut App, key: KeyEvent) -> Option { const CONFIRM_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(3); /// Running → send `Method::Pause`. -/// Idle / Paused → 2-tap to quit the TUI (the Pod keeps running). +/// Idle / Paused / Busy → 2-tap to quit the TUI (the Pod keeps running). fn handle_pause_or_quit(app: &mut App) -> Option { - if app.running { + if app.pod_status == PodStatus::Running { return Some(Method::Pause); } if let Some(t) = app.quit_confirm diff --git a/crates/tui/src/ui.rs b/crates/tui/src/ui.rs index 406552db..440e7cbb 100644 --- a/crates/tui/src/ui.rs +++ b/crates/tui/src/ui.rs @@ -877,6 +877,18 @@ fn draw_status(frame: &mut Frame, app: &App, area: Rect) { " — Enter to resume, type to start new turn", Style::default().fg(Color::DarkGray), )); + } else if app.busy { + spans.push(Span::raw(" | ")); + spans.push(Span::styled( + "busy", + Style::default() + .fg(Color::Yellow) + .add_modifier(Modifier::BOLD), + )); + spans.push(Span::styled( + " — finishing post-run work", + Style::default().fg(Color::DarkGray), + )); } else { spans.push(Span::styled(" idle", Style::default().fg(Color::DarkGray))); } diff --git a/tickets/tui-pod-status-sync.md b/tickets/tui-pod-status-sync.md new file mode 100644 index 00000000..9d57c08b --- /dev/null +++ b/tickets/tui-pod-status-sync.md @@ -0,0 +1,40 @@ +# TUI: Pod 状態同期と Ctrl 系操作の安定化 + +## 背景 + +TUI 利用中、notice / alert / compact 表示が出た前後で `Ctrl-C` / `Ctrl-X` が効かない、または意図と違う動作をしているように見えるケースがある。 + +調査上、notice 表示そのものがキー入力を壊している実装は見当たらない。一方で、TUI が持つ `running` / `paused` などのローカル状態と、Pod controller の実状態がズレる経路が複数ある。 + +代表例: + +- `Event::RunEnd` 後、TUI は idle 扱いになるが、Pod controller 側では memory extract / consolidate / post-run compact などの post-run 処理がまだ続いている場合がある。 + - この区間で notice / alert / compact event が表示されると、ユーザーからは「notice が出て idle に見えるのに Ctrl 操作が効かない」ように見える。 +- TUI attach 時は `GetHistory` で履歴を復元するが、Pod の現在 status(Running / Paused / Idle)は復元されない。 + - 実際には Pod が Running / Paused でも、TUI 側は初期値の idle として扱い、`Ctrl-C` / `Ctrl-X` の分岐が実状態と合わない可能性がある。 +- TUI の terminal event polling は Pod event が頻繁な時に key input が遅延・取りこぼしに見えやすい構造になっている疑いがある。 + +## 要件 + +- TUI が表示・操作に使う Pod 状態は、接続直後および実行中を通じて Pod controller の実状態と一致する。 + - attach / resume 直後でも Running / Paused / Idle が正しく表示される。 + - `Ctrl-C` / `Ctrl-X` の分岐が TUI の古い推測状態に依存して誤動作しない。 +- `RunEnd` と post-run 処理中の扱いを整理する。 + - Worker turn が終わった状態と、Pod controller が次の method を即時処理できる状態を混同しない。 + - post-run compact / memory 系処理中に notice が出ても、TUI 上の status と操作可能性が矛盾しない。 +- TUI の input polling を見直し、Pod event / notice が多い状況でも key input が starvation しない。 +- `Ctrl-C` / `Ctrl-X` の UX を明確化する。 + - Running / Paused / Idle / post-run busy それぞれで、Pause / Cancel / Shutdown / TUI exit のどれを送るかを明示する。 + +## 完了条件 + +- Running / Paused な Pod に後から TUI attach しても、status 表示と `Ctrl-C` / `Ctrl-X` の挙動が実状態に合っている。 +- `RunEnd` 後の post-run 処理中に alert / compact notice が発生しても、TUI が idle と誤表示して操作不能に見える状態にならない。 +- Pod event / notice が連続しても、`Ctrl-C` / `Ctrl-X` が遅延・取りこぼしに見えない。 +- 上記の状態遷移について、少なくともユニットテストまたは再現手順で確認できる。 + +## 範囲外 + +- TUI の入力キューイング(`tickets/tui-input-queue.md`)。本チケットは状態同期と Ctrl 系操作の安定化に限定する。 +- native GUI 側の状態管理。 +- notice / alert の文言や見た目の全面 redesign。