feat: Podのステータスを厳密にし、同期漏れを防ぐ

This commit is contained in:
Keisuke Hirata 2026-05-04 12:55:11 +09:00
parent 20097e8296
commit 4eb73fa552
24 changed files with 641 additions and 311 deletions

View File

@ -11,6 +11,7 @@
- ストリーム途中失敗時の継続 → [tickets/llm-worker-stream-continuation.md](tickets/llm-worker-stream-continuation.md)
- ネイティブ GUI クライアント MVP → [tickets/native-gui-mvp.md](tickets/native-gui-mvp.md)
- TUI 拡充
- Run/notice 前後の Pod 状態同期と Ctrl 系操作の安定化 → [tickets/tui-pod-status-sync.md](tickets/tui-pod-status-sync.md)
- Run 中の入力キューイング → [tickets/tui-input-queue.md](tickets/tui-input-queue.md)
- ユーザーマニフェストのモデル設定 wizard → [tickets/tui-user-model-setup.md](tickets/tui-user-model-setup.md)
- spawn 失敗時に Pod の stderr が TUI に表示されない → [tickets/tui-spawn-error-surface.md](tickets/tui-spawn-error-surface.md)

View File

@ -698,7 +698,10 @@ mod tests {
std::fs::create_dir(&sub).unwrap();
let shared = SharedScope::new(Scope::writable(dir.path()).unwrap());
let target = sub.join("a.txt");
assert_eq!(shared.load().permission_at(&target), Some(Permission::Write));
assert_eq!(
shared.load().permission_at(&target),
Some(Permission::Write)
);
shared
.update(|cur| {
cur.with_added_deny_rules([ScopeRule {

View File

@ -203,7 +203,13 @@ pub fn render_tidy_hints(tidy: &TidyHints) -> String {
"**Sources overflow** — consider trimming to the most recent entries (git log keeps the rest):\n",
);
for s in &tidy.sources_overflow {
let _ = writeln!(&mut out, "- {} `{}` ({} sources)", s.kind.as_str(), s.slug, s.count);
let _ = writeln!(
&mut out,
"- {} `{}` ({} sources)",
s.kind.as_str(),
s.slug,
s.count
);
}
out.push('\n');
}
@ -276,12 +282,9 @@ mod tests {
.unwrap();
let staging = crate::consolidate::staging::list_staging_entries(&layout);
let tidy = TidyHints {
replaced_decisions: [(
"old".to_string(),
Some("new".to_string()),
)]
.into_iter()
.collect(),
replaced_decisions: [("old".to_string(), Some("new".to_string()))]
.into_iter()
.collect(),
sources_overflow: vec![SourcesOverflow {
kind: RecordKind::Decision,
slug: "dec".into(),

View File

@ -295,8 +295,7 @@ mod tests {
fn release_is_resilient_to_missing_consumed_entries() {
let (_dir, layout) = make_layout();
let phantom = uuid::Uuid::now_v7();
let lock =
StagingLock::acquire(&layout, std::process::id(), "pod", vec![phantom]).unwrap();
let lock = StagingLock::acquire(&layout, std::process::id(), "pod", vec![phantom]).unwrap();
let lock_path = lock.path().to_path_buf();
// No file at <staging>/<phantom>.json — release must not panic.
lock.release_with_cleanup(&layout);

View File

@ -74,10 +74,7 @@ pub fn collect_tidy_hints(layout: &WorkspaceLayout) -> TidyHints {
for (slug, content) in &decisions {
let fm = parse_yaml::<DecisionFrontmatter>(content);
if let Some(fm) = fm.as_ref() {
if matches!(
fm.status,
crate::schema::DecisionStatus::Replaced
) {
if matches!(fm.status, crate::schema::DecisionStatus::Replaced) {
hints
.replaced_decisions
.insert(slug.clone(), fm.replaced_by.as_ref().map(|s| s.to_string()));
@ -113,9 +110,9 @@ pub fn collect_tidy_hints(layout: &WorkspaceLayout) -> TidyHints {
}
}
}
hints
.sources_overflow
.sort_by(|a, b| (a.kind.as_str(), a.slug.as_str()).cmp(&(b.kind.as_str(), b.slug.as_str())));
hints.sources_overflow.sort_by(|a, b| {
(a.kind.as_str(), a.slug.as_str()).cmp(&(b.kind.as_str(), b.slug.as_str()))
});
let decision_slugs: Vec<&str> = decisions.keys().map(|s| s.as_str()).collect();
let request_slugs: Vec<&str> = requests.keys().map(|s| s.as_str()).collect();
@ -139,10 +136,7 @@ pub fn collect_tidy_hints(layout: &WorkspaceLayout) -> TidyHints {
/// `<root>/.insomnia/memory/<kind>/*.md` (Knowledge は
/// `<root>/.insomnia/knowledge/*.md`) を slug ごとに `(slug, full content)`
/// 化して返す。
fn read_kind_records(
layout: &WorkspaceLayout,
kind: RecordKind,
) -> BTreeMap<String, String> {
fn read_kind_records(layout: &WorkspaceLayout, kind: RecordKind) -> BTreeMap<String, String> {
let dir = match kind {
RecordKind::Decision => layout.decisions_dir(),
RecordKind::Request => layout.requests_dir(),

View File

@ -49,33 +49,29 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
let metrics = self.metrics_tracker_handle();
let usage_tracker = self.usage_tracker_handle();
let observer: PruneObserver = Box::new(move |eval| {
match &eval.decision {
PruneDecision::Fired { .. } => {
let correlation_id = uuid::Uuid::now_v7().to_string();
let mut metric = Metric::now("prune.fire")
.with_value(eval.estimated_savings as f64)
.with_correlation_id(&correlation_id)
.with_dimension("candidate_count", eval.candidate_count.to_string());
if let Some(border) = eval.border_turn {
metric = metric.with_dimension("border_turn", border.to_string());
}
metrics.push(metric);
usage_tracker.note_correlation_id(correlation_id);
}
PruneDecision::SkippedNoCandidates => {
metrics.push(
Metric::now("prune.skip").with_dimension("reason", "no_candidates"),
);
}
PruneDecision::SkippedBelowMinSavings => {
metrics.push(
Metric::now("prune.skip")
.with_dimension("reason", "below_min_savings")
.with_dimension("candidate_count", eval.candidate_count.to_string())
.with_value(eval.estimated_savings as f64),
);
let observer: PruneObserver = Box::new(move |eval| match &eval.decision {
PruneDecision::Fired { .. } => {
let correlation_id = uuid::Uuid::now_v7().to_string();
let mut metric = Metric::now("prune.fire")
.with_value(eval.estimated_savings as f64)
.with_correlation_id(&correlation_id)
.with_dimension("candidate_count", eval.candidate_count.to_string());
if let Some(border) = eval.border_turn {
metric = metric.with_dimension("border_turn", border.to_string());
}
metrics.push(metric);
usage_tracker.note_correlation_id(correlation_id);
}
PruneDecision::SkippedNoCandidates => {
metrics.push(Metric::now("prune.skip").with_dimension("reason", "no_candidates"));
}
PruneDecision::SkippedBelowMinSavings => {
metrics.push(
Metric::now("prune.skip")
.with_dimension("reason", "below_min_savings")
.with_dimension("candidate_count", eval.candidate_count.to_string())
.with_value(eval.estimated_savings as f64),
);
}
});

View File

@ -12,13 +12,15 @@ use crate::ipc::notify_buffer::NotifyBuffer;
use crate::ipc::server::SocketServer;
use crate::pod::{Pod, PodError, PodRunResult};
use crate::runtime::dir::RuntimeDir;
use crate::shared_state::{PodSharedState, PodStatus};
use crate::shared_state::PodSharedState;
use crate::spawn::comm_tools::{
list_pods_tool, read_pod_output_tool, send_to_pod_tool, stop_pod_tool,
};
use crate::spawn::registry::SpawnedPodRegistry;
use crate::spawn::tool::spawn_pod_tool;
use protocol::{AlertLevel, AlertSource, ErrorCode, Event, Method, RunResult, TurnResult};
use protocol::{
AlertLevel, AlertSource, ErrorCode, Event, Method, PodStatus, RunResult, TurnResult,
};
fn is_system_message_item(item: &Item) -> bool {
matches!(
@ -63,6 +65,75 @@ impl PodHandle {
}
}
async fn set_controller_status(
shared_state: &Arc<PodSharedState>,
runtime_dir: &RuntimeDir,
event_tx: &broadcast::Sender<Event>,
status: PodStatus,
) {
shared_state.set_status(status);
let _ = runtime_dir.write_status(shared_state).await;
let _ = event_tx.send(Event::Status { status });
}
async fn run_post_run_jobs<C, St>(pod: &mut Pod<C, St>, alerter: &Alerter)
where
C: LlmClient,
St: Store,
{
if let Err(e) = pod.try_post_run_extract().await {
tracing::warn!(error = %e, "Post-run memory extract error");
alerter.alert(
AlertLevel::Warn,
AlertSource::Pod,
format!("post-run memory extract error: {e}"),
);
}
if let Err(e) = pod.try_post_run_consolidate().await {
tracing::warn!(error = %e, "Post-run memory consolidate error");
alerter.alert(
AlertLevel::Warn,
AlertSource::Pod,
format!("post-run memory consolidate error: {e}"),
);
}
if let Err(e) = pod.try_post_run_compact().await {
tracing::warn!(error = %e, "Post-run compaction error");
alerter.alert(
AlertLevel::Warn,
AlertSource::Compactor,
format!("post-run compaction error: {e}"),
);
}
}
async fn finish_controller_run<C, St>(
pod: &mut Pod<C, St>,
shared_state: &Arc<PodSharedState>,
runtime_dir: &RuntimeDir,
event_tx: &broadcast::Sender<Event>,
alerter: &Alerter,
new_status: PodStatus,
) where
C: LlmClient,
St: Store,
{
if new_status == PodStatus::Busy {
run_post_run_jobs(pod, alerter).await;
}
let items = pod.worker().history().to_vec();
shared_state.update_history(items);
shared_state.set_user_segments(pod.user_segments().to_vec());
let final_status = if new_status == PodStatus::Busy {
PodStatus::Idle
} else {
new_status
};
set_controller_status(shared_state, runtime_dir, event_tx, final_status).await;
let _ = runtime_dir.write_history(shared_state).await;
}
// ---------------------------------------------------------------------------
// PodController — actor that owns a Pod
// ---------------------------------------------------------------------------
@ -414,8 +485,13 @@ impl PodController {
let _ = event_tx.send(Event::UserMessage {
segments: input.clone(),
});
shared_state.set_status(PodStatus::Running);
let _ = runtime_dir.write_status(&shared_state).await;
set_controller_status(
&shared_state,
&runtime_dir,
&event_tx,
PodStatus::Running,
)
.await;
let run_future = async {
if was_paused {
@ -430,6 +506,7 @@ impl PodController {
&event_tx,
&cancel_tx,
&shared_state,
&runtime_dir,
&notify_buffer,
self_parent_socket.as_ref(),
&spawner_name,
@ -437,39 +514,15 @@ impl PodController {
)
.await;
if new_status == PodStatus::Idle {
if let Err(e) = pod.try_post_run_extract().await {
tracing::warn!(error = %e, "Post-run memory extract error");
alerter.alert(
AlertLevel::Warn,
AlertSource::Pod,
format!("post-run memory extract error: {e}"),
);
}
if let Err(e) = pod.try_post_run_consolidate().await {
tracing::warn!(error = %e, "Post-run memory consolidate error");
alerter.alert(
AlertLevel::Warn,
AlertSource::Pod,
format!("post-run memory consolidate error: {e}"),
);
}
if let Err(e) = pod.try_post_run_compact().await {
tracing::warn!(error = %e, "Post-run compaction error");
alerter.alert(
AlertLevel::Warn,
AlertSource::Compactor,
format!("post-run compaction error: {e}"),
);
}
}
let items = pod.worker().history().to_vec();
shared_state.update_history(items);
shared_state.set_user_segments(pod.user_segments().to_vec());
shared_state.set_status(new_status);
let _ = runtime_dir.write_status(&shared_state).await;
let _ = runtime_dir.write_history(&shared_state).await;
finish_controller_run(
&mut pod,
&shared_state,
&runtime_dir,
&event_tx,
&alerter,
new_status,
)
.await;
if shutdown {
let _ = event_tx.send(Event::Shutdown);
@ -482,17 +535,23 @@ impl PodController {
message: message.clone(),
});
pod.push_notify(message);
if shared_state.get_status() != PodStatus::Idle {
// RUNNING / Paused: the buffer push is the
// entire operation; the in-flight turn (or
// next Resume) will drain the buffer at its
// next pre_llm_request.
let status = shared_state.get_status();
if status != PodStatus::Idle {
// RUNNING / Paused / Busy: the buffer push is the
// entire operation; an in-flight turn (or the
// next Resume/Run after Busy) will drain the buffer
// at its next pre_llm_request.
continue;
}
// IDLE: auto-start a turn so the LLM sees the
// buffered notification(s) without a human Run.
shared_state.set_status(PodStatus::Running);
let _ = runtime_dir.write_status(&shared_state).await;
set_controller_status(
&shared_state,
&runtime_dir,
&event_tx,
PodStatus::Running,
)
.await;
let (new_status, shutdown) = run_with_cancel_support(
pod.run_for_notification(),
@ -500,6 +559,7 @@ impl PodController {
&event_tx,
&cancel_tx,
&shared_state,
&runtime_dir,
&notify_buffer,
self_parent_socket.as_ref(),
&spawner_name,
@ -507,39 +567,15 @@ impl PodController {
)
.await;
if new_status == PodStatus::Idle {
if let Err(e) = pod.try_post_run_extract().await {
tracing::warn!(error = %e, "Post-run memory extract error");
alerter.alert(
AlertLevel::Warn,
AlertSource::Pod,
format!("post-run memory extract error: {e}"),
);
}
if let Err(e) = pod.try_post_run_consolidate().await {
tracing::warn!(error = %e, "Post-run memory consolidate error");
alerter.alert(
AlertLevel::Warn,
AlertSource::Pod,
format!("post-run memory consolidate error: {e}"),
);
}
if let Err(e) = pod.try_post_run_compact().await {
tracing::warn!(error = %e, "Post-run compaction error");
alerter.alert(
AlertLevel::Warn,
AlertSource::Compactor,
format!("post-run compaction error: {e}"),
);
}
}
let items = pod.worker().history().to_vec();
shared_state.update_history(items);
shared_state.set_user_segments(pod.user_segments().to_vec());
shared_state.set_status(new_status);
let _ = runtime_dir.write_status(&shared_state).await;
let _ = runtime_dir.write_history(&shared_state).await;
finish_controller_run(
&mut pod,
&shared_state,
&runtime_dir,
&event_tx,
&alerter,
new_status,
)
.await;
if shutdown {
let _ = event_tx.send(Event::Shutdown);
@ -555,8 +591,13 @@ impl PodController {
});
continue;
}
shared_state.set_status(PodStatus::Running);
let _ = runtime_dir.write_status(&shared_state).await;
set_controller_status(
&shared_state,
&runtime_dir,
&event_tx,
PodStatus::Running,
)
.await;
let (new_status, shutdown) = run_with_cancel_support(
pod.resume(),
@ -564,6 +605,7 @@ impl PodController {
&event_tx,
&cancel_tx,
&shared_state,
&runtime_dir,
&notify_buffer,
self_parent_socket.as_ref(),
&spawner_name,
@ -571,39 +613,15 @@ impl PodController {
)
.await;
if new_status == PodStatus::Idle {
if let Err(e) = pod.try_post_run_extract().await {
tracing::warn!(error = %e, "Post-run memory extract error");
alerter.alert(
AlertLevel::Warn,
AlertSource::Pod,
format!("post-run memory extract error: {e}"),
);
}
if let Err(e) = pod.try_post_run_consolidate().await {
tracing::warn!(error = %e, "Post-run memory consolidate error");
alerter.alert(
AlertLevel::Warn,
AlertSource::Pod,
format!("post-run memory consolidate error: {e}"),
);
}
if let Err(e) = pod.try_post_run_compact().await {
tracing::warn!(error = %e, "Post-run compaction error");
alerter.alert(
AlertLevel::Warn,
AlertSource::Compactor,
format!("post-run compaction error: {e}"),
);
}
}
let items = pod.worker().history().to_vec();
shared_state.update_history(items);
shared_state.set_user_segments(pod.user_segments().to_vec());
shared_state.set_status(new_status);
let _ = runtime_dir.write_status(&shared_state).await;
let _ = runtime_dir.write_history(&shared_state).await;
finish_controller_run(
&mut pod,
&shared_state,
&runtime_dir,
&event_tx,
&alerter,
new_status,
)
.await;
if shutdown {
let _ = event_tx.send(Event::Shutdown);
@ -619,11 +637,12 @@ impl PodController {
}
Method::Pause => {
// Already paused → idempotent no-op. Otherwise
// the Pod is Idle (Running turns go through
// `run_with_cancel_support`, not this outer
// match), so there is nothing to pause.
if shared_state.get_status() != PodStatus::Paused {
// Already paused or post-run busy → idempotent no-op.
// Otherwise the Pod is Idle (Running turns go through
// `run_with_cancel_support`, not this outer match), so
// there is nothing to pause.
let status = shared_state.get_status();
if !matches!(status, PodStatus::Paused | PodStatus::Busy) {
let _ = event_tx.send(Event::Error {
code: ErrorCode::NotRunning,
message: "Pod is not running".into(),
@ -666,8 +685,13 @@ impl PodController {
// notification is not stranded. Matches the
// `Method::Notify` idle path.
if shared_state.get_status() == PodStatus::Idle {
shared_state.set_status(PodStatus::Running);
let _ = runtime_dir.write_status(&shared_state).await;
set_controller_status(
&shared_state,
&runtime_dir,
&event_tx,
PodStatus::Running,
)
.await;
let (new_status, shutdown) = run_with_cancel_support(
pod.run_for_notification(),
@ -675,6 +699,7 @@ impl PodController {
&event_tx,
&cancel_tx,
&shared_state,
&runtime_dir,
&notify_buffer,
self_parent_socket.as_ref(),
&spawner_name,
@ -682,39 +707,15 @@ impl PodController {
)
.await;
if new_status == PodStatus::Idle {
if let Err(e) = pod.try_post_run_extract().await {
tracing::warn!(error = %e, "Post-run memory extract error");
alerter.alert(
AlertLevel::Warn,
AlertSource::Pod,
format!("post-run memory extract error: {e}"),
);
}
if let Err(e) = pod.try_post_run_consolidate().await {
tracing::warn!(error = %e, "Post-run memory consolidate error");
alerter.alert(
AlertLevel::Warn,
AlertSource::Pod,
format!("post-run memory consolidate error: {e}"),
);
}
if let Err(e) = pod.try_post_run_compact().await {
tracing::warn!(error = %e, "Post-run compaction error");
alerter.alert(
AlertLevel::Warn,
AlertSource::Compactor,
format!("post-run compaction error: {e}"),
);
}
}
let items = pod.worker().history().to_vec();
shared_state.update_history(items);
shared_state.set_user_segments(pod.user_segments().to_vec());
shared_state.set_status(new_status);
let _ = runtime_dir.write_status(&shared_state).await;
let _ = runtime_dir.write_history(&shared_state).await;
finish_controller_run(
&mut pod,
&shared_state,
&runtime_dir,
&event_tx,
&alerter,
new_status,
)
.await;
if shutdown {
let _ = event_tx.send(Event::Shutdown);
@ -766,6 +767,7 @@ async fn run_with_cancel_support<F>(
event_tx: &broadcast::Sender<Event>,
cancel_tx: &mpsc::Sender<()>,
shared_state: &Arc<PodSharedState>,
runtime_dir: &RuntimeDir,
notify_buffer: &NotifyBuffer,
parent_socket: Option<&std::path::PathBuf>,
self_name: &str,
@ -784,11 +786,18 @@ where
return match result {
Ok(r) => {
let (status, run_result) = match r {
PodRunResult::Finished => (PodStatus::Idle, RunResult::Finished),
PodRunResult::Finished => (PodStatus::Busy, RunResult::Finished),
PodRunResult::Paused => (PodStatus::Paused, RunResult::Paused),
PodRunResult::LimitReached => (PodStatus::Idle, RunResult::LimitReached),
PodRunResult::LimitReached => (PodStatus::Busy, RunResult::LimitReached),
};
let _ = event_tx.send(Event::RunEnd { result: run_result });
if status == PodStatus::Busy {
shared_state.set_status(PodStatus::Busy);
let _ = runtime_dir.write_status(shared_state).await;
let _ = event_tx.send(Event::Status {
status: PodStatus::Busy,
});
}
if matches!(run_result, RunResult::Finished) {
crate::ipc::event::fire_and_forget(
parent_socket.cloned(),
@ -822,7 +831,7 @@ where
message,
},
);
(PodStatus::Idle, shutdown_requested)
(PodStatus::Busy, shutdown_requested)
}
};
}

View File

@ -17,9 +17,9 @@ use llm_worker::interceptor::{
Interceptor, PostToolAction, PreRequestAction, PreToolAction, PromptAction, ToolCallInfo,
ToolResultInfo, TurnEndAction,
};
use tracing::warn;
use llm_worker::tool::ToolOutput;
use tracing::info;
use tracing::warn;
use crate::compact::state::CompactState;
use crate::hook::{

View File

@ -159,10 +159,12 @@ async fn handle_connection(stream: tokio::net::UnixStream, handle: PodHandle) {
})
.collect();
let greeting = handle.shared_state.greeting.clone();
let status = handle.shared_state.get_status();
if writer
.write(&Event::History {
items: values,
greeting,
status,
})
.await
.is_err()

View File

@ -26,7 +26,7 @@ pub use pod::{Pod, PodError, PodRunResult, apply_worker_manifest};
pub use prompt::catalog::{CatalogError, PodPrompt, PromptCatalog};
pub use prompt::loader::PromptLoader;
pub use prompt::system::{SystemPromptContext, SystemPromptError, SystemPromptTemplate};
pub use protocol::{ErrorCode, Event, Method, TurnResult};
pub use protocol::{ErrorCode, Event, Method, PodStatus, TurnResult};
pub use provider::{ProviderError, build_client};
pub use runtime::dir::RuntimeDir;
pub use shared_state::{PodSharedState, PodStatus};
pub use shared_state::PodSharedState;

View File

@ -131,7 +131,8 @@ pub fn default_base() -> Result<PathBuf, io::Error> {
#[cfg(test)]
mod tests {
use super::*;
use crate::shared_state::{PodSharedState, PodStatus};
use crate::shared_state::PodSharedState;
use protocol::PodStatus;
fn test_state() -> PodSharedState {
PodSharedState::new(

View File

@ -1,8 +1,8 @@
use std::sync::{OnceLock, RwLock};
use llm_worker::llm_client::types::Item;
use protocol::Segment;
use serde::{Deserialize, Serialize};
use protocol::{PodStatus, Segment};
use serde_json::json;
use session_store::SessionId;
use crate::fs_view::PodFsView;
@ -39,14 +39,6 @@ pub struct PodSharedState {
workflows: OnceLock<Vec<WorkflowCandidate>>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum PodStatus {
Idle,
Running,
Paused,
}
impl PodSharedState {
pub fn new(
pod_name: String,
@ -138,7 +130,7 @@ impl PodSharedState {
/// Serialize status as JSON.
pub fn status_json(&self) -> String {
let status = self.get_status();
serde_json::json!({
json!({
"state": status,
"session_id": self.session_id.to_string(),
"pod_name": self.pod_name,

View File

@ -206,10 +206,7 @@ async fn no_thresholds_is_a_noop() {
.expect("phase 2 disabled when both thresholds are None");
// No staging entries removed.
assert_eq!(
memory::consolidate::list_staging_entries(&layout).len(),
5
);
assert_eq!(memory::consolidate::list_staging_entries(&layout).len(), 5);
}
#[tokio::test]
@ -256,10 +253,7 @@ async fn below_threshold_skips_and_does_not_take_lock() {
pod.try_post_run_consolidate().await.unwrap();
// Staging untouched.
assert_eq!(
memory::consolidate::list_staging_entries(&layout).len(),
1
);
assert_eq!(memory::consolidate::list_staging_entries(&layout).len(), 1);
// Lock file must not exist.
let lock_path = layout.staging_dir().join(".consolidation.lock");
assert!(!lock_path.exists(), "lock file should not be created");
@ -285,10 +279,7 @@ async fn fires_on_threshold_and_cleans_up_consumed_entries() {
);
// Lock removed too.
let lock_path = layout.staging_dir().join(".consolidation.lock");
assert!(
!lock_path.exists(),
"lock file must be removed on success"
);
assert!(!lock_path.exists(), "lock file must be removed on success");
}
#[tokio::test]
@ -300,7 +291,12 @@ async fn in_flight_guard_skips_reentry_without_clearing() {
write_n_staging(&layout, 2);
let client = MockClient::new(vec![]);
let mut pod = make_pod_with(FILES_THRESHOLD_TOML, pwd.path().to_path_buf(), client.clone()).await;
let mut pod = make_pod_with(
FILES_THRESHOLD_TOML,
pwd.path().to_path_buf(),
client.clone(),
)
.await;
// Pre-set the in-flight flag as if another concurrent caller had
// entered run_consolidate_once. The CAS at the top of
@ -334,7 +330,9 @@ async fn in_flight_guard_skips_reentry_without_clearing() {
let mut pod2 = make_pod_with(FILES_THRESHOLD_TOML, pwd.path().to_path_buf(), client2).await;
pod2.try_post_run_consolidate().await.unwrap();
assert!(
!pod2.consolidation_in_flight_handle().load(Ordering::Acquire),
!pod2
.consolidation_in_flight_handle()
.load(Ordering::Acquire),
"in-flight flag must be cleared after a normal run"
);
}
@ -356,7 +354,12 @@ async fn coalesce_loop_terminates_with_one_iteration_when_snapshot_drains_stagin
// run_consolidate_once after Completed, the second sub-worker run
// would exhaust the mock and surface as an error.
let client = MockClient::new(vec![done("ok")]);
let mut pod = make_pod_with(FILES_THRESHOLD_TOML, pwd.path().to_path_buf(), client.clone()).await;
let mut pod = make_pod_with(
FILES_THRESHOLD_TOML,
pwd.path().to_path_buf(),
client.clone(),
)
.await;
pod.try_post_run_consolidate().await.unwrap();
assert_eq!(
@ -393,8 +396,5 @@ async fn live_lock_held_by_other_pod_skips() {
.expect("InUse lock must surface as graceful skip");
// Staging untouched: lock holder owns the snapshot, not us.
assert_eq!(
memory::consolidate::list_staging_entries(&layout).len(),
3
);
assert_eq!(memory::consolidate::list_staging_entries(&layout).len(), 3);
}

View File

@ -10,7 +10,7 @@ use llm_worker::llm_client::{ClientError, LlmClient, Request};
use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput};
use session_store::FsStore;
use pod::{Event, Method, Pod, PodController, PodManifest, PodStatus};
use pod::{Event, Method, Pod, PodController, PodHandle, PodManifest, PodStatus};
// ---------------------------------------------------------------------------
// Mock LLM Client
@ -147,8 +147,6 @@ async fn make_pod_with_pwd(client: MockClient) -> (Pod<MockClient, FsStore>, std
(pod, pwd)
}
use pod::PodHandle;
async fn spawn_controller(pod: Pod<MockClient, FsStore>) -> PodHandle {
let tmp = tempfile::tempdir().unwrap();
let runtime_base = tmp.path().to_owned();
@ -157,9 +155,132 @@ async fn spawn_controller(pod: Pod<MockClient, FsStore>) -> PodHandle {
handle
}
async fn wait_for_status(handle: &PodHandle, status: PodStatus) {
let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(2);
loop {
if handle.shared_state.get_status() == status {
return;
}
assert!(
tokio::time::Instant::now() < deadline,
"timed out waiting for status {status:?}; current={:?}",
handle.shared_state.get_status()
);
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
#[tokio::test]
async fn run_end_enters_busy_until_post_run_finishes_and_broadcasts_status() {
let client = MockClient::new(simple_text_events());
let pod = make_pod(client).await;
let handle = spawn_controller(pod).await;
let mut rx = handle.subscribe();
handle.send(Method::run_text("Hello")).await.unwrap();
let mut saw_run_end = false;
let mut saw_busy_status = false;
let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(2);
loop {
tokio::select! {
event = rx.recv() => {
match event {
Ok(Event::RunEnd { result: protocol::RunResult::Finished }) => {
saw_run_end = true;
}
Ok(Event::Status {
status: PodStatus::Busy,
}) if saw_run_end => {
saw_busy_status = true;
break;
}
Ok(_) => {}
Err(_) => break,
}
}
_ = tokio::time::sleep_until(deadline) => break,
}
}
assert!(saw_run_end, "expected RunEnd::Finished");
assert!(
saw_busy_status,
"expected busy status immediately after RunEnd"
);
wait_for_status(&handle, PodStatus::Idle).await;
}
#[tokio::test]
async fn attach_history_includes_current_status() {
let client = MockClient::sequential(vec![MockResponse::Hang(simple_text_events())]);
let pod = make_pod(client).await;
let handle = spawn_controller(pod).await;
handle.send(Method::run_text("Hello")).await.unwrap();
wait_for_status(&handle, PodStatus::Running).await;
let stream = tokio::net::UnixStream::connect(handle.runtime_dir.socket_path())
.await
.unwrap();
let (reader, writer) = stream.into_split();
let mut reader = protocol::stream::JsonLineReader::new(reader);
let mut writer = protocol::stream::JsonLineWriter::new(writer);
writer.write(&Method::GetHistory).await.unwrap();
let event = reader.next::<Event>().await.unwrap().unwrap();
match event {
Event::History { status, .. } => assert_eq!(status, PodStatus::Running),
other => panic!("expected History, got {other:?}"),
}
}
#[tokio::test]
async fn pause_while_busy_is_idempotent_not_not_running() {
let client = MockClient::new(simple_text_events());
let pod = make_pod(client).await;
let handle = spawn_controller(pod).await;
let mut rx = handle.subscribe();
handle.send(Method::run_text("Hello")).await.unwrap();
let mut saw_busy = false;
let mut saw_idle = false;
let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(2);
loop {
tokio::select! {
event = rx.recv() => {
match event {
Ok(Event::RunEnd { .. }) => {
handle.send(Method::Pause).await.unwrap();
}
Ok(Event::Status { status: PodStatus::Busy }) => {
saw_busy = true;
}
Ok(Event::Status { status: PodStatus::Idle }) if saw_busy => {
saw_idle = true;
break;
}
Ok(Event::Error {
code: protocol::ErrorCode::NotRunning,
..
}) if saw_busy && !saw_idle => {
panic!("Pause while Busy should be an idempotent no-op");
}
Ok(_) => {}
Err(_) => break,
}
}
_ = tokio::time::sleep_until(deadline) => break,
}
}
assert!(saw_busy, "expected Busy status");
assert!(saw_idle, "expected final Idle status");
assert_eq!(handle.shared_state.get_status(), PodStatus::Idle);
}
#[tokio::test]
async fn shared_state_starts_idle() {
@ -565,10 +686,10 @@ async fn notify_while_idle_auto_starts_turn_and_injects_system_message() {
// request context for that turn).
let requests = client_for_assert.captured_requests();
assert_eq!(requests.len(), 1, "one LLM call expected");
let notify_in_request = requests[0]
.items
.iter()
.any(|i| i.as_text().is_some_and(|t| t.contains("[Notification]") && t.contains("turn finished")));
let notify_in_request = requests[0].items.iter().any(|i| {
i.as_text()
.is_some_and(|t| t.contains("[Notification]") && t.contains("turn finished"))
});
assert!(
notify_in_request,
"injected system message missing from request, got items: {:?}",
@ -583,13 +704,17 @@ async fn notify_while_idle_auto_starts_turn_and_injects_system_message() {
// (and therefore eventually into history.json), per
// tickets/notify-history-persist.md.
let history = handle.shared_state.history();
let notify_in_history = history
.iter()
.any(|i| i.as_text().is_some_and(|t| t.contains("[Notification]") && t.contains("turn finished")));
let notify_in_history = history.iter().any(|i| {
i.as_text()
.is_some_and(|t| t.contains("[Notification]") && t.contains("turn finished"))
});
assert!(
notify_in_history,
"notify must be committed to worker.history, got items: {:?}",
history.iter().filter_map(|i| i.as_text()).collect::<Vec<_>>()
history
.iter()
.filter_map(|i| i.as_text())
.collect::<Vec<_>>()
);
}
@ -671,7 +796,10 @@ async fn pod_event_turn_ended_while_idle_auto_starts_turn_and_injects_system_mes
assert!(
event_in_history,
"PodEvent must be committed to worker.history, got items: {:?}",
history.iter().filter_map(|i| i.as_text()).collect::<Vec<_>>()
history
.iter()
.filter_map(|i| i.as_text())
.collect::<Vec<_>>()
);
}

View File

@ -174,6 +174,7 @@ fn serve_history(listener: UnixListener, items: Vec<Item>) -> JoinHandle<()> {
scope_summary: String::new(),
tools: Vec::new(),
},
status: protocol::PodStatus::Idle,
};
let _ = writer.write(&event).await;
}

View File

@ -22,9 +22,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use async_trait::async_trait;
use futures::Stream;
use llm_worker::Worker;
use llm_worker::llm_client::event::{
Event as LlmEvent, ResponseStatus, StatusEvent, UsageEvent,
};
use llm_worker::llm_client::event::{Event as LlmEvent, ResponseStatus, StatusEvent, UsageEvent};
use llm_worker::llm_client::{ClientError, LlmClient, Request};
use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput};
use session_metrics::{DOMAIN, Metric, metrics_from_extensions};
@ -169,7 +167,11 @@ async fn make_pod(
manifest_toml: String,
client: MockClient,
tool_name: &'static str,
) -> (Pod<MockClient, FsStore>, tempfile::TempDir, tempfile::TempDir) {
) -> (
Pod<MockClient, FsStore>,
tempfile::TempDir,
tempfile::TempDir,
) {
let manifest = PodManifest::from_toml(&manifest_toml).unwrap();
let store_tmp = tempfile::tempdir().unwrap();
let store = FsStore::new(store_tmp.path()).await.unwrap();
@ -199,8 +201,7 @@ async fn prune_metrics_emit_skip_then_fire_with_post_request_join() {
text_response_with_cache("ok", 0, 200),
text_response_with_cache("done", 1234, 50),
]);
let (mut pod, _store_tmp, _pwd_tmp) =
make_pod(manifest_toml(1, 1), client, "big_tool").await;
let (mut pod, _store_tmp, _pwd_tmp) = make_pod(manifest_toml(1, 1), client, "big_tool").await;
let session_id = pod.session_id();
// Cloning the store handle to read the session log back after the
// runs complete — the Pod retains its own copy.
@ -253,10 +254,7 @@ async fn prune_metrics_emit_skip_then_fire_with_post_request_join() {
fire.dimensions.contains_key("border_turn"),
"fire missing border_turn: {fire:?}"
);
assert!(
fire.value.is_some(),
"fire missing estimated_savings value"
);
assert!(fire.value.is_some(), "fire missing estimated_savings value");
let fire_id = fire
.correlation_id
.as_ref()
@ -272,7 +270,9 @@ async fn prune_metrics_emit_skip_then_fire_with_post_request_join() {
assert_eq!(post.correlation_id.as_ref(), Some(fire_id));
assert_eq!(post.value, Some(1234.0));
assert_eq!(
post.dimensions.get("cache_write_tokens").map(String::as_str),
post.dimensions
.get("cache_write_tokens")
.map(String::as_str),
Some("50")
);
assert!(post.dimensions.contains_key("history_len"));
@ -457,7 +457,10 @@ permission = "write"
let state = session_store::restore(&store, session_id).await.unwrap();
let metrics = metrics_from_extensions(&state.extensions);
assert!(metrics.is_empty(), "no metrics should be recorded: {metrics:?}");
assert!(
metrics.is_empty(),
"no metrics should be recorded: {metrics:?}"
);
// And no extension entries at all in the metrics domain.
assert!(state.extensions.iter().all(|(d, _)| d != DOMAIN));

View File

@ -314,6 +314,17 @@ pub enum Event {
History {
items: Vec<serde_json::Value>,
greeting: Greeting,
/// Current Pod controller status at the moment the history snapshot
/// was taken. This lets late-attaching clients render and route
/// controls from the real controller state instead of inferring from
/// replayed history.
#[serde(default)]
status: PodStatus,
},
/// Current Pod controller status. Broadcast on every controller-level
/// transition and included in `History` snapshots for late attach.
Status {
status: PodStatus,
},
/// Reply to `Method::ListCompletions`. Delivered only to the
/// requesting socket (not broadcast). `entries` is empty when no
@ -422,6 +433,19 @@ pub struct Greeting {
// Supporting types
// ---------------------------------------------------------------------------
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "snake_case")]
pub enum PodStatus {
#[default]
Idle,
Running,
Paused,
/// The worker turn has ended, but the controller is still performing
/// post-run jobs (memory extract / consolidate / compact) and cannot
/// accept a new turn immediately.
Busy,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum TurnResult {
@ -724,6 +748,7 @@ mod tests {
scope_summary: "Writable:\n - /tmp".into(),
tools: vec!["Read".into()],
},
status: PodStatus::Paused,
};
let json = serde_json::to_string(&event).unwrap();
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
@ -732,6 +757,36 @@ mod tests {
assert_eq!(parsed["data"]["items"][0]["role"], "user");
assert_eq!(parsed["data"]["greeting"]["pod_name"], "test");
assert_eq!(parsed["data"]["greeting"]["tools"][0], "Read");
assert_eq!(parsed["data"]["status"], "paused");
}
#[test]
fn event_status_format() {
let event = Event::Status {
status: PodStatus::Busy,
};
let json = serde_json::to_string(&event).unwrap();
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
assert_eq!(parsed["event"], "status");
assert_eq!(parsed["data"]["status"], "busy");
let decoded: Event = serde_json::from_str(&json).unwrap();
assert!(matches!(
decoded,
Event::Status {
status: PodStatus::Busy
}
));
}
#[test]
fn event_history_legacy_without_status_defaults_to_idle() {
let json = r#"{"event":"history","data":{"items":[],"greeting":{"pod_name":"test","cwd":"/tmp","provider":"anthropic","model":"claude","scope_summary":"","tools":[]}}}"#;
let decoded: Event = serde_json::from_str(json).unwrap();
match decoded {
Event::History { status, .. } => assert_eq!(status, PodStatus::Idle),
other => panic!("expected History, got {other:?}"),
}
}
#[test]

View File

@ -89,9 +89,7 @@ pub async fn record_metric(
/// `Metric` 列に fold する。
///
/// schema 変更で deserialize できない payload は無視する(後方互換)。
pub fn metrics_from_extensions(
extensions: &[(String, serde_json::Value)],
) -> Vec<Metric> {
pub fn metrics_from_extensions(extensions: &[(String, serde_json::Value)]) -> Vec<Metric> {
extensions
.iter()
.filter(|(domain, _)| domain == DOMAIN)

View File

@ -385,8 +385,8 @@ pub fn render_snapshot(tasks: &[TaskEntry]) -> String {
let snapshot = TaskSnapshot {
tasks: tasks.to_vec(),
};
let json = serde_json::to_string_pretty(&snapshot)
.unwrap_or_else(|_| String::from("{\"tasks\":[]}"));
let json =
serde_json::to_string_pretty(&snapshot).unwrap_or_else(|_| String::from("{\"tasks\":[]}"));
format!("{}\n\n```json\n{}\n```\n", snapshot_overview(tasks), json)
}
@ -560,7 +560,8 @@ mod tests {
fn replay_history_uses_compact_snapshot_and_continues_updates() {
let pre = TaskStore::new();
pre.create("kept".into(), "from compact".into());
pre.update(1, Some(TaskStatus::Inprogress), None, None).unwrap();
pre.update(1, Some(TaskStatus::Inprogress), None, None)
.unwrap();
let history = vec![
Item::system_message(wrap_snapshot_system_message(&pre.snapshot_text())),
Item::tool_call("u1", "TaskUpdate", r#"{"taskid":1,"status":"completed"}"#),
@ -587,13 +588,23 @@ mod tests {
// pre-compact `TaskCreate`s do not surface as duplicates.
let pre = TaskStore::new();
pre.create("A".into(), "A-desc".into());
pre.update(1, Some(TaskStatus::Completed), None, None).unwrap();
pre.update(1, Some(TaskStatus::Completed), None, None)
.unwrap();
pre.create("B".into(), "B-desc".into());
pre.update(2, Some(TaskStatus::Inprogress), None, None).unwrap();
pre.update(2, Some(TaskStatus::Inprogress), None, None)
.unwrap();
let history = vec![
Item::tool_call("c1", "TaskCreate", r#"{"subject":"A","description":"A-desc"}"#),
Item::tool_call(
"c1",
"TaskCreate",
r#"{"subject":"A","description":"A-desc"}"#,
),
Item::tool_call("u1", "TaskUpdate", r#"{"taskid":1,"status":"completed"}"#),
Item::tool_call("c2", "TaskCreate", r#"{"subject":"B","description":"B-desc"}"#),
Item::tool_call(
"c2",
"TaskCreate",
r#"{"subject":"B","description":"B-desc"}"#,
),
Item::tool_call("u2", "TaskUpdate", r#"{"taskid":2,"status":"inprogress"}"#),
Item::system_message(wrap_snapshot_system_message(&pre.snapshot_text())),
Item::tool_call("compact-tasklist", "TaskList", "{}"),
@ -625,7 +636,8 @@ mod tests {
"subject with\nembedded newline\n- bullet".into(),
"desc:\n status: not-actually-a-field\n ```code fence```".into(),
);
pre.update(1, Some(TaskStatus::Inprogress), None, None).unwrap();
pre.update(1, Some(TaskStatus::Inprogress), None, None)
.unwrap();
let history = vec![Item::system_message(wrap_snapshot_system_message(
&pre.snapshot_text(),

View File

@ -1,7 +1,8 @@
use std::time::Instant;
use protocol::{
AlertLevel, AlertSource, CompletionEntry, CompletionKind, Event, Method, RunResult, Segment,
AlertLevel, AlertSource, CompletionEntry, CompletionKind, Event, Method, PodStatus, RunResult,
Segment,
};
use crate::block::{
@ -41,11 +42,16 @@ impl CompletionState {
pub struct App {
pub pod_name: String,
pub connected: bool,
/// Last controller status reported by the Pod. Drives the status line
/// and Ctrl-key routing; do not infer this solely from replayed history.
pub pod_status: PodStatus,
/// True while the Pod is in `PodStatus::Running`.
pub running: bool,
/// True while the Pod is in `PodStatus::Paused`. Set on
/// `RunEnd::Paused` and cleared when a new turn starts (either via
/// `Resume` or a fresh `Run`).
/// True while the Pod is in `PodStatus::Paused`.
pub paused: bool,
/// True after worker `RunEnd` while controller post-run work is still
/// blocking the next method.
pub busy: bool,
pub run_requests: usize,
/// Sum of `input_tokens - cache_read_input_tokens` across the
/// current turn's LLM requests — i.e. the net tokens this turn
@ -80,8 +86,10 @@ impl App {
Self {
pod_name,
connected: false,
pod_status: PodStatus::Idle,
running: false,
paused: false,
busy: false,
run_requests: 0,
run_upload_tokens: 0,
run_output_tokens: 0,
@ -99,6 +107,16 @@ impl App {
}
}
pub fn set_pod_status(&mut self, status: PodStatus) {
self.pod_status = status;
self.running = status == PodStatus::Running;
self.paused = status == PodStatus::Paused;
self.busy = status == PodStatus::Busy;
if self.running || self.busy {
self.quit_confirm = None;
}
}
/// Re-evaluate the completion popup against the current input.
/// Returns a `Method::ListCompletions` to send when the
/// `(kind, prefix_start, prefix)` triple changed; otherwise `None`.
@ -278,6 +296,10 @@ impl App {
}
pub fn submit_input(&mut self) -> Option<Method> {
if self.busy {
self.push_error("Pod is finishing post-run work; wait for idle before submitting.");
return None;
}
let segments = self.input.submit_segments();
if segments_are_blank(&segments) {
// Empty Enter only does something meaningful when the Pod
@ -450,8 +472,7 @@ impl App {
self.assistant_streaming = false;
}
Event::TurnStart { .. } => {
self.running = true;
self.paused = false;
self.set_pod_status(PodStatus::Running);
self.run_requests += 1;
self.current_tool = None;
self.assistant_streaming = false;
@ -617,8 +638,10 @@ impl App {
upload_tokens: self.run_upload_tokens,
output_tokens: self.run_output_tokens,
});
self.running = false;
self.paused = matches!(result, RunResult::Paused);
self.set_pod_status(match result {
RunResult::Paused => PodStatus::Paused,
RunResult::Finished | RunResult::LimitReached => PodStatus::Busy,
});
self.run_requests = 0;
self.run_upload_tokens = 0;
self.run_output_tokens = 0;
@ -643,8 +666,16 @@ impl App {
message: alert.message,
});
}
Event::History { items, greeting } => {
Event::History {
items,
greeting,
status,
} => {
self.restore_history(&items, greeting);
self.set_pod_status(status);
}
Event::Status { status } => {
self.set_pod_status(status);
}
Event::Completions { kind, entries } => {
// Apply only if the popup is still on the same
@ -1216,8 +1247,11 @@ mod completion_flow_tests {
"text": "[File: src/main.rs]\nfn main() {}",
}],
})],
status: PodStatus::Running,
});
assert!(matches!(app.pod_status, PodStatus::Running));
assert!(app.running);
assert!(matches!(
app.blocks.get(1),
Some(Block::SystemMessage { text }) if text == "[File: src/main.rs]\nfn main() {}"

View File

@ -35,6 +35,10 @@ impl PodClient {
self.writer.write(method).await
}
pub fn try_next_event(&mut self) -> Option<Event> {
self.event_rx.try_recv().ok()
}
pub async fn next_event(&mut self) -> Option<Event> {
self.event_rx.recv().await
}

View File

@ -21,7 +21,7 @@ use crossterm::execute;
use crossterm::terminal::{
EnterAlternateScreen, LeaveAlternateScreen, disable_raw_mode, enable_raw_mode,
};
use protocol::Method;
use protocol::{Method, PodStatus};
use ratatui::Terminal;
use ratatui::backend::CrosstermBackend;
use session_store::SessionId;
@ -283,29 +283,44 @@ async fn run_loop(
break;
}
// Drain any already-buffered Pod events in a bounded batch before
// polling the terminal. This keeps status fresh without letting a
// busy event stream starve Ctrl-C / Ctrl-X input.
for _ in 0..32 {
match client.try_next_event() {
Some(ev) => app.handle_pod_event(ev),
None => break,
}
}
// Always give the terminal queue a non-blocking pass each frame.
// The awaited select below only waits after this pass found nothing.
let mut handled_term_event = false;
while event::poll(std::time::Duration::ZERO)? {
handled_term_event = true;
handle_terminal_event(app, &mut client, event::read()?).await?;
if app.quit {
break;
}
}
if app.quit {
break;
}
if handled_term_event {
terminal.draw(|f| ui::draw(f, app))?;
continue;
}
tokio::select! {
_ = tokio::task::spawn_blocking(|| event::poll(std::time::Duration::from_millis(50))) => {
while event::poll(std::time::Duration::ZERO)? {
match event::read()? {
TermEvent::Key(key) => {
if let Some(method) = handle_key(app, key) {
client.send(&method).await?;
}
}
TermEvent::Mouse(mouse) => {
handle_mouse(app, mouse);
}
TermEvent::Paste(s) => {
app.insert_paste(s);
}
TermEvent::Resize(_, _) => {
// No-op: next draw repaints in full.
}
_ => {}
}
if app.quit {
break;
}
term_event = tokio::task::spawn_blocking(|| {
if event::poll(std::time::Duration::from_millis(50))? {
event::read().map(Some)
} else {
Ok(None)
}
}) => {
if let Some(term_event) = term_event?? {
handle_terminal_event(app, &mut client, term_event).await?;
}
}
event = client.next_event(), if app.connected => {
@ -325,6 +340,31 @@ async fn run_loop(
Ok(())
}
async fn handle_terminal_event(
app: &mut App,
client: &mut PodClient,
event: TermEvent,
) -> Result<(), Box<dyn std::error::Error>> {
match event {
TermEvent::Key(key) => {
if let Some(method) = handle_key(app, key) {
client.send(&method).await?;
}
}
TermEvent::Mouse(mouse) => {
handle_mouse(app, mouse);
}
TermEvent::Paste(s) => {
app.insert_paste(s);
}
TermEvent::Resize(_, _) => {
// No-op: next draw repaints in full.
}
_ => {}
}
Ok(())
}
fn run_disconnected(_app: &mut App) -> Result<(), Box<dyn std::error::Error>> {
loop {
if event::poll(std::time::Duration::from_millis(100))?
@ -392,10 +432,13 @@ fn handle_key(app: &mut App, key: KeyEvent) -> Option<Method> {
Some(app.refresh_completion())
}
KeyCode::Char('c') if ctrl => Some(handle_pause_or_quit(app)),
KeyCode::Char('x') if ctrl => Some(if app.running {
Some(Method::Cancel)
} else {
Some(Method::Shutdown)
KeyCode::Char('x') if ctrl => Some(match app.pod_status {
PodStatus::Running => Some(Method::Cancel),
PodStatus::Paused | PodStatus::Idle => Some(Method::Shutdown),
PodStatus::Busy => {
app.push_error("Pod is finishing post-run work; wait for idle or press Ctrl-C twice to exit the TUI.");
None
}
}),
KeyCode::Char('d') if ctrl => {
app.quit = true;
@ -534,9 +577,9 @@ fn handle_key(app: &mut App, key: KeyEvent) -> Option<Method> {
const CONFIRM_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(3);
/// Running → send `Method::Pause`.
/// Idle / Paused → 2-tap to quit the TUI (the Pod keeps running).
/// Idle / Paused / Busy → 2-tap to quit the TUI (the Pod keeps running).
fn handle_pause_or_quit(app: &mut App) -> Option<Method> {
if app.running {
if app.pod_status == PodStatus::Running {
return Some(Method::Pause);
}
if let Some(t) = app.quit_confirm

View File

@ -877,6 +877,18 @@ fn draw_status(frame: &mut Frame, app: &App, area: Rect) {
" — Enter to resume, type to start new turn",
Style::default().fg(Color::DarkGray),
));
} else if app.busy {
spans.push(Span::raw(" | "));
spans.push(Span::styled(
"busy",
Style::default()
.fg(Color::Yellow)
.add_modifier(Modifier::BOLD),
));
spans.push(Span::styled(
" — finishing post-run work",
Style::default().fg(Color::DarkGray),
));
} else {
spans.push(Span::styled(" idle", Style::default().fg(Color::DarkGray)));
}

View File

@ -0,0 +1,40 @@
# TUI: Pod 状態同期と Ctrl 系操作の安定化
## 背景
TUI 利用中、notice / alert / compact 表示が出た前後で `Ctrl-C` / `Ctrl-X` が効かない、または意図と違う動作をしているように見えるケースがある。
調査上、notice 表示そのものがキー入力を壊している実装は見当たらない。一方で、TUI が持つ `running` / `paused` などのローカル状態と、Pod controller の実状態がズレる経路が複数ある。
代表例:
- `Event::RunEnd` 後、TUI は idle 扱いになるが、Pod controller 側では memory extract / consolidate / post-run compact などの post-run 処理がまだ続いている場合がある。
- この区間で notice / alert / compact event が表示されると、ユーザーからは「notice が出て idle に見えるのに Ctrl 操作が効かない」ように見える。
- TUI attach 時は `GetHistory` で履歴を復元するが、Pod の現在 statusRunning / Paused / Idleは復元されない。
- 実際には Pod が Running / Paused でも、TUI 側は初期値の idle として扱い、`Ctrl-C` / `Ctrl-X` の分岐が実状態と合わない可能性がある。
- TUI の terminal event polling は Pod event が頻繁な時に key input が遅延・取りこぼしに見えやすい構造になっている疑いがある。
## 要件
- TUI が表示・操作に使う Pod 状態は、接続直後および実行中を通じて Pod controller の実状態と一致する。
- attach / resume 直後でも Running / Paused / Idle が正しく表示される。
- `Ctrl-C` / `Ctrl-X` の分岐が TUI の古い推測状態に依存して誤動作しない。
- `RunEnd` と post-run 処理中の扱いを整理する。
- Worker turn が終わった状態と、Pod controller が次の method を即時処理できる状態を混同しない。
- post-run compact / memory 系処理中に notice が出ても、TUI 上の status と操作可能性が矛盾しない。
- TUI の input polling を見直し、Pod event / notice が多い状況でも key input が starvation しない。
- `Ctrl-C` / `Ctrl-X` の UX を明確化する。
- Running / Paused / Idle / post-run busy それぞれで、Pause / Cancel / Shutdown / TUI exit のどれを送るかを明示する。
## 完了条件
- Running / Paused な Pod に後から TUI attach しても、status 表示と `Ctrl-C` / `Ctrl-X` の挙動が実状態に合っている。
- `RunEnd` 後の post-run 処理中に alert / compact notice が発生しても、TUI が idle と誤表示して操作不能に見える状態にならない。
- Pod event / notice が連続しても、`Ctrl-C` / `Ctrl-X` が遅延・取りこぼしに見えない。
- 上記の状態遷移について、少なくともユニットテストまたは再現手順で確認できる。
## 範囲外
- TUI の入力キューイング(`tickets/tui-input-queue.md`)。本チケットは状態同期と Ctrl 系操作の安定化に限定する。
- native GUI 側の状態管理。
- notice / alert の文言や見た目の全面 redesign。