From 954cf200e2f67994b0c1a666176356073f5bd206 Mon Sep 17 00:00:00 2001 From: Hare Date: Mon, 4 May 2026 15:52:27 +0900 Subject: [PATCH] =?UTF-8?q?feat:=20Pos=E5=87=A6=E7=90=86=E3=81=AE=E9=9D=9E?= =?UTF-8?q?=E5=90=8C=E6=9C=9F=E5=8C=96=E3=83=BBBusy=E7=8A=B6=E6=85=8B?= =?UTF-8?q?=E3=81=AE=E5=89=8A=E9=99=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- crates/llm-worker/src/llm_client/client.rs | 6 + .../llm-worker/tests/transport_retry_test.rs | 4 +- crates/pod/src/compact/state.rs | 8 +- crates/pod/src/controller.rs | 83 +---- crates/pod/src/pod.rs | 344 +++++++++++++----- crates/pod/tests/compact_events_test.rs | 98 ++++- crates/pod/tests/controller_test.rs | 61 +--- crates/protocol/src/lib.rs | 10 +- crates/tools/src/task.rs | 4 +- crates/tui/src/app.rs | 13 +- crates/tui/src/main.rs | 6 +- crates/tui/src/ui.rs | 12 - docs/compaction.md | 6 +- tickets/pod-post-run-detach.md | 6 + tickets/pod-post-run-detach.review.md | 132 +++++++ 15 files changed, 537 insertions(+), 256 deletions(-) create mode 100644 tickets/pod-post-run-detach.review.md diff --git a/crates/llm-worker/src/llm_client/client.rs b/crates/llm-worker/src/llm_client/client.rs index 0d8e0cf8..ed06f461 100644 --- a/crates/llm-worker/src/llm_client/client.rs +++ b/crates/llm-worker/src/llm_client/client.rs @@ -74,6 +74,12 @@ pub trait LlmClient: Send + Sync { } } +impl Clone for Box { + fn clone(&self) -> Self { + self.clone_boxed() + } +} + /// `Box` に対する `LlmClient` の実装 /// /// これにより、動的ディスパッチを使用するクライアントも `Worker` で利用可能になる。 diff --git a/crates/llm-worker/tests/transport_retry_test.rs b/crates/llm-worker/tests/transport_retry_test.rs index 64108da1..b93d7f41 100644 --- a/crates/llm-worker/tests/transport_retry_test.rs +++ b/crates/llm-worker/tests/transport_retry_test.rs @@ -45,7 +45,9 @@ impl Scheme for DummyScheme { } fn parse_sse(&self, _: &str, _: &str, _: &mut ()) -> Result, ClientError> { if self.parse_fail { - Err(ClientError::Sse("simulated mid-stream parse failure".into())) + Err(ClientError::Sse( + "simulated mid-stream parse failure".into(), + )) } else { Ok(vec![]) } diff --git a/crates/pod/src/compact/state.rs b/crates/pod/src/compact/state.rs index 54f94369..78819fb3 100644 --- a/crates/pod/src/compact/state.rs +++ b/crates/pod/src/compact/state.rs @@ -4,8 +4,8 @@ //! flags shared between: //! - `PodInterceptor` (reads `request_threshold` — the *safety net* for //! between-requests yielding) -//! - `Pod::try_post_run_compact` (reads `post_run_threshold` — the -//! *proactive* check between turns) +//! - `Pod::try_pre_run_compact` (reads `post_run_threshold` — the +//! *proactive* check before the next turn starts) //! - `Pod::run()` / `resume()` (circuit breaker, thrash detection) //! //! Current occupancy (input-token count) is **not** stored here. The single @@ -19,8 +19,8 @@ const MAX_COMPACT_FAILURES: usize = 3; /// Shared mutable state for compaction decisions. pub(crate) struct CompactState { - /// Between-turns threshold (proactive). Checked by the Controller - /// after a run completes. `None` disables the post-run check. + /// Between-turns threshold (proactive). Checked before the next turn + /// starts. `None` disables the pre-run check. post_run_threshold: Option, /// Between-requests threshold (safety net). Checked inside a turn /// before each LLM request. `None` disables the request check. diff --git a/crates/pod/src/controller.rs b/crates/pod/src/controller.rs index 5fd529c5..fc8b97b7 100644 --- a/crates/pod/src/controller.rs +++ b/crates/pod/src/controller.rs @@ -76,68 +76,22 @@ async fn set_controller_status( let _ = event_tx.send(Event::Status { status }); } -async fn run_post_run_jobs(pod: &mut Pod, alerter: &Alerter) -where - C: LlmClient, - St: Store, -{ - if let Err(e) = pod.try_post_run_extract().await { - tracing::warn!(error = %e, "Post-run memory extract error"); - alerter.alert( - AlertLevel::Warn, - AlertSource::Pod, - format!("post-run memory extract error: {e}"), - ); - } - if let Err(e) = pod.try_post_run_consolidate().await { - tracing::warn!(error = %e, "Post-run memory consolidate error"); - alerter.alert( - AlertLevel::Warn, - AlertSource::Pod, - format!("post-run memory consolidate error: {e}"), - ); - } - if let Err(e) = pod.try_post_run_compact().await { - tracing::warn!(error = %e, "Post-run compaction error"); - alerter.alert( - AlertLevel::Warn, - AlertSource::Compactor, - format!("post-run compaction error: {e}"), - ); - } -} - async fn finish_controller_run( pod: &mut Pod, shared_state: &Arc, runtime_dir: &RuntimeDir, event_tx: &broadcast::Sender, - alerter: &Alerter, new_status: PodStatus, ) where - C: LlmClient, - St: Store, + C: LlmClient + Clone + 'static, + St: Store + Clone + 'static, { - if new_status == PodStatus::Busy { - // Surface the post-run busy window before kicking off the jobs so - // TUI / external observers see Busy regardless of whether the - // worker turn ended via success or error. Both branches in - // `run_with_cancel_support` return `PodStatus::Busy` for this - // path; emitting here keeps the two unified. - set_controller_status(shared_state, runtime_dir, event_tx, PodStatus::Busy).await; - run_post_run_jobs(pod, alerter).await; - } - let items = pod.worker().history().to_vec(); shared_state.update_history(items); shared_state.set_user_segments(pod.user_segments().to_vec()); - let final_status = if new_status == PodStatus::Busy { - PodStatus::Idle - } else { - new_status - }; - set_controller_status(shared_state, runtime_dir, event_tx, final_status).await; + set_controller_status(shared_state, runtime_dir, event_tx, new_status).await; let _ = runtime_dir.write_history(shared_state).await; + pod.spawn_post_run_memory_jobs(); } // --------------------------------------------------------------------------- @@ -154,8 +108,8 @@ impl PodController { runtime_base: &Path, ) -> Result<(PodHandle, ShutdownReceiver), std::io::Error> where - C: LlmClient + 'static, - St: Store + 'static, + C: LlmClient + Clone + 'static, + St: Store + Clone + 'static, { let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>(); let (method_tx, mut method_rx) = mpsc::channel::(32); @@ -524,7 +478,6 @@ impl PodController { &shared_state, &runtime_dir, &event_tx, - &alerter, new_status, ) .await; @@ -542,9 +495,9 @@ impl PodController { pod.push_notify(message); let status = shared_state.get_status(); if status != PodStatus::Idle { - // RUNNING / Paused / Busy: the buffer push is the + // RUNNING / Paused: the buffer push is the // entire operation; an in-flight turn (or the - // next Resume/Run after Busy) will drain the buffer + // next Resume/Run) will drain the buffer // at its next pre_llm_request. continue; } @@ -576,7 +529,6 @@ impl PodController { &shared_state, &runtime_dir, &event_tx, - &alerter, new_status, ) .await; @@ -621,7 +573,6 @@ impl PodController { &shared_state, &runtime_dir, &event_tx, - &alerter, new_status, ) .await; @@ -640,12 +591,12 @@ impl PodController { } Method::Pause => { - // Already paused or post-run busy → idempotent no-op. - // Otherwise the Pod is Idle (Running turns go through + // Already paused → idempotent no-op. Otherwise the + // Pod is Idle (Running turns go through // `run_with_cancel_support`, not this outer match), so // there is nothing to pause. let status = shared_state.get_status(); - if !matches!(status, PodStatus::Paused | PodStatus::Busy) { + if status != PodStatus::Paused { let _ = event_tx.send(Event::Error { code: ErrorCode::NotRunning, message: "Pod is not running".into(), @@ -714,7 +665,6 @@ impl PodController { &shared_state, &runtime_dir, &event_tx, - &alerter, new_status, ) .await; @@ -728,6 +678,11 @@ impl PodController { } } + // Background memory jobs own extract/consolidate workers after a + // turn completes. Join them before the controller task exits so + // staging writes and consolidation cleanups are not abandoned. + pod.wait_for_memory_jobs().await; + // Report upward that this Pod is stopping before the // controller task exits. Awaited (not fire-and-forget): // after `shutdown_tx.send` the process may exit quickly, @@ -787,9 +742,9 @@ where return match result { Ok(r) => { let (status, run_result) = match r { - PodRunResult::Finished => (PodStatus::Busy, RunResult::Finished), + PodRunResult::Finished => (PodStatus::Idle, RunResult::Finished), PodRunResult::Paused => (PodStatus::Paused, RunResult::Paused), - PodRunResult::LimitReached => (PodStatus::Busy, RunResult::LimitReached), + PodRunResult::LimitReached => (PodStatus::Idle, RunResult::LimitReached), }; let _ = event_tx.send(Event::RunEnd { result: run_result }); if matches!(run_result, RunResult::Finished) { @@ -825,7 +780,7 @@ where message, }, ); - (PodStatus::Busy, shutdown_requested) + (PodStatus::Idle, shutdown_requested) } }; } diff --git a/crates/pod/src/pod.rs b/crates/pod/src/pod.rs index 65d9b295..f6d6143f 100644 --- a/crates/pod/src/pod.rs +++ b/crates/pod/src/pod.rs @@ -1,6 +1,7 @@ use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; +use tokio::sync::Mutex as AsyncMutex; use llm_worker::Item; use llm_worker::llm_client::RequestConfig; @@ -35,6 +36,12 @@ use async_trait::async_trait; use llm_worker::interceptor::PreRequestAction; use protocol::{AlertLevel, AlertSource, Event, Segment}; use tokio::sync::broadcast; +use tokio::task::JoinHandle; + +struct SessionHead { + session_id: SessionId, + head_hash: Option, +} /// Pre-LLM-request hook that records `history.len()` at send time into a /// shared `UsageTracker`. The on_usage callback later pairs this with the @@ -61,7 +68,7 @@ pub struct Pod { worker: Option>, store: St, session_id: SessionId, - head_hash: Option, + session_head: Arc>, /// Absolute working directory of the Pod. pwd: PathBuf, /// Shared, atomically-swappable view of the Pod's resolved scope. @@ -172,7 +179,13 @@ pub struct Pod { /// run yet on this session — next extract starts from entry 0. /// Restored from `RestoredState.extensions` on `restore`, updated /// after each successful extract via `save_extension`. - extract_pointer: Mutex>, + extract_pointer: Arc>>, + /// Phase 1/2 memory job running outside the controller method loop. + /// The task owns the extract/consolidate worker execution and is joined + /// at shutdown. A single slot is enough: Phase 1/2 implementations loop + /// until thresholds fall below their trigger points, and concurrent + /// triggers are coalesced by skipping when this handle is still active. + memory_task: Option>, /// Typed user submissions in submit order. K-th entry corresponds to /// the K-th `Item::user_message` in `worker.history()` (modulo seed /// history loaded via `SessionStart.history`, whose original segments @@ -182,6 +195,84 @@ pub struct Pod { user_segments: Vec>, } +impl Pod { + pub async fn wait_for_memory_jobs(&mut self) { + if let Some(handle) = self.memory_task.take() + && let Err(e) = handle.await + { + tracing::warn!(error = %e, "Post-run memory task join failed"); + } + } +} + +impl Pod { + fn clone_for_memory_task(&self) -> Self { + // The cloned Pod's worker exists only as a snapshot for the memory + // task: `run_extract_once` reads `worker.history()`, and the + // extract/consolidate workers are built fresh inside their own + // methods using `worker.client()` as fallback when no override + // model is configured. system_prompt / request_config / cache_key + // are unused on this path, so we deliberately skip copying them. + let source_worker = self.worker.as_ref().expect("worker present"); + let mut worker = Worker::new(source_worker.client().clone()); + worker.set_history(source_worker.history().to_vec()); + Self { + manifest: self.manifest.clone(), + worker: Some(worker), + store: self.store.clone(), + session_id: self.session_id, + session_head: self.session_head.clone(), + pwd: self.pwd.clone(), + scope: self.scope.clone(), + hook_builder: HookRegistryBuilder::new(), + interceptor_installed: false, + compact_state: None, + usage_tracker: Arc::new(UsageTracker::new()), + metrics_tracker: Arc::new(crate::compact::metrics_tracker::MetricsTracker::new()), + usage_history: self.usage_history.clone(), + tracker: None, + task_store: self.task_store.clone(), + system_prompt_template: None, + alerter: self.alerter.clone(), + event_tx: self.event_tx.clone(), + pending_notifies: NotifyBuffer::new(), + pending_attachments: Arc::new(Mutex::new(Vec::new())), + scope_allocation: None, + callback_socket: None, + prompts: self.prompts.clone(), + workflow_registry: self.workflow_registry.clone(), + memory_layout: self.memory_layout.clone(), + inject_resident_knowledge: self.inject_resident_knowledge, + pending_scope_snapshot: self.pending_scope_snapshot.clone(), + extract_in_flight: self.extract_in_flight.clone(), + consolidation_in_flight: self.consolidation_in_flight.clone(), + extract_pointer: self.extract_pointer.clone(), + memory_task: None, + user_segments: self.user_segments.clone(), + } + } + + pub fn spawn_post_run_memory_jobs(&mut self) { + // Drop a finished prior handle so we can spawn a fresh task. + // If the prior task is still running, coalesce by skipping — + // Phase 1/2 implementations re-evaluate thresholds on completion. + self.cleanup_finished_memory_task(); + if self.memory_task.is_some() { + return; + } + + let mut pod = self.clone_for_memory_task(); + self.memory_task = Some(tokio::spawn(async move { + if let Err(e) = pod.try_post_run_extract().await { + tracing::warn!(error = %e, "Post-run memory extract task error"); + } + if let Err(e) = pod.try_post_run_consolidate().await { + tracing::warn!(error = %e, "Post-run memory consolidate task error"); + } + })); + } +} + impl Pod { /// Create a new Pod from a pre-built Worker and store. /// @@ -210,7 +301,10 @@ impl Pod { worker: Some(worker), store, session_id, - head_hash: None, + session_head: Arc::new(AsyncMutex::new(SessionHead { + session_id, + head_hash: None, + })), pwd, scope: SharedScope::new(scope), hook_builder: HookRegistryBuilder::new(), @@ -235,7 +329,8 @@ impl Pod { pending_scope_snapshot: Arc::new(Mutex::new(None)), extract_in_flight: Arc::new(AtomicBool::new(false)), consolidation_in_flight: Arc::new(AtomicBool::new(false)), - extract_pointer: Mutex::new(None), + extract_pointer: Arc::new(Mutex::new(None)), + memory_task: None, user_segments: Vec::new(), }; pod.apply_prune_from_manifest(); @@ -330,7 +425,8 @@ impl Pod { /// can restore the narrowed scope instead of reclaiming delegated /// writes. pub async fn persist_scope_snapshot(&mut self) -> Result<(), StoreError> { - if self.head_hash.is_none() { + let mut head = self.session_head.lock().await; + if head.head_hash.is_none() { return Ok(()); } let snapshot = { @@ -340,7 +436,7 @@ impl Pod { deny: scope.deny_rules(), } }; - session_store::save_pod_scope(&self.store, self.session_id, &mut self.head_hash, &snapshot) + session_store::save_pod_scope(&self.store, head.session_id, &mut head.head_hash, &snapshot) .await } @@ -362,10 +458,11 @@ impl Pod { .expect("pending_scope_snapshot poisoned") .take(); if let Some(snapshot) = snapshot { + let mut head = self.session_head.lock().await; session_store::save_pod_scope( &self.store, - self.session_id, - &mut self.head_hash, + head.session_id, + &mut head.head_hash, &snapshot, ) .await?; @@ -531,10 +628,11 @@ impl Pod { /// (the entry is dropped) and a `Warn` alert + `tracing::warn!` are /// emitted so the failure isn't completely silent. async fn try_record_metric(&mut self, metric: &session_metrics::Metric) { + let mut head = self.session_head.lock().await; if let Err(err) = session_metrics::record_metric( &self.store, - self.session_id, - &mut self.head_hash, + head.session_id, + &mut head.head_hash, metric, ) .await @@ -803,6 +901,52 @@ impl Pod { self.run(vec![Segment::text(s)]).await } + /// Drop the prior memory_task handle if it has finished. Keep it if + /// still running so callers can decide whether to wait or coalesce. + fn cleanup_finished_memory_task(&mut self) { + if self.memory_task.as_ref().is_some_and(|h| h.is_finished()) { + self.memory_task = None; + } + } + + /// Wait for the in-flight memory task (if any) to finish. Used before + /// compact rewrites history (extract reads the same history). + async fn join_memory_task(&mut self) { + if let Some(handle) = self.memory_task.take() + && let Err(e) = handle.await + { + tracing::warn!(error = %e, "Memory task join failed"); + } + } + + /// Whether `try_pre_run_compact` would actually compact. The same + /// check is duplicated inside `try_pre_run_compact` itself for + /// defensive reasons; this is the gate for joining the memory task + /// before the compact runs. + fn should_pre_run_compact(&self) -> bool { + self.compact_state.as_ref().is_some_and(|s| { + !s.is_disabled() + && !s.just_compacted() + && s.exceeds_post_run(self.total_tokens().tokens) + }) + } + + /// Prelude shared by `run` / `run_for_notification` / `resume`. + /// Wires up worker hooks, ensures the session is materialized on the + /// store, and runs pre-run compact (joining any in-flight memory task + /// first so extract sees a stable history range). + async fn prepare_for_run(&mut self) -> Result<(), PodError> { + self.ensure_interceptor_installed(); + self.ensure_system_prompt_materialized()?; + self.cleanup_finished_memory_task(); + self.ensure_session_head().await?; + if self.should_pre_run_compact() { + self.join_memory_task().await; + } + self.try_pre_run_compact().await; + Ok(()) + } + /// Send user input and run until the LLM turn completes. /// /// `input` is a typed segment list (see [`protocol::Segment`]). The @@ -816,20 +960,22 @@ impl Pod { /// the Worker is aborted, history is compacted, and execution resumes /// automatically. pub async fn run(&mut self, input: Vec) -> Result { - self.ensure_interceptor_installed(); - self.ensure_system_prompt_materialized()?; - self.ensure_session_head().await?; + self.prepare_for_run().await?; // Persist the user input as typed segments before the worker // pushes its flattened copy into history. save_delta deliberately // skips the resulting `is_user_message()` item to avoid double-write. - session_store::save_user_input( - &self.store, - self.session_id, - &mut self.head_hash, - input.clone(), - ) - .await?; + { + let mut head = self.session_head.lock().await; + self.session_id = head.session_id; + session_store::save_user_input( + &self.store, + head.session_id, + &mut head.head_hash, + input.clone(), + ) + .await?; + } self.user_segments.push(input.clone()); // Resolve `@` refs and `/` workflow invocations to @@ -989,9 +1135,7 @@ impl Pod { /// Worker's resume path issues the LLM request without a new /// user turn. pub async fn run_for_notification(&mut self) -> Result { - self.ensure_interceptor_installed(); - self.ensure_system_prompt_materialized()?; - self.ensure_session_head().await?; + self.prepare_for_run().await?; let history_before = self.worker.as_ref().unwrap().history().len(); @@ -1005,9 +1149,7 @@ impl Pod { /// Resume from a paused state. pub async fn resume(&mut self) -> Result { - self.ensure_interceptor_installed(); - self.ensure_system_prompt_materialized()?; - self.ensure_session_head().await?; + self.prepare_for_run().await?; let history_before = self.worker.as_ref().unwrap().history().len(); @@ -1035,27 +1177,29 @@ impl Pod { config: w.request_config(), history: w.history(), }; - if self.head_hash.is_none() { + let mut head = self.session_head.lock().await; + if head.head_hash.is_none() { let hash = - session_store::create_session_with_id(&self.store, self.session_id, state).await?; - self.head_hash = Some(hash); + session_store::create_session_with_id(&self.store, head.session_id, state).await?; + head.head_hash = Some(hash); + drop(head); self.persist_scope_snapshot().await?; return Ok(()); } - let prev_session_id = self.session_id; - session_store::ensure_head_or_fork( - &self.store, - &mut self.session_id, - &mut self.head_hash, - state, - ) - .await?; + let prev_session_id = head.session_id; + let mut session_id = head.session_id; + let mut head_hash = head.head_hash.clone(); + session_store::ensure_head_or_fork(&self.store, &mut session_id, &mut head_hash, state) + .await?; + head.session_id = session_id; + head.head_hash = head_hash; + self.session_id = session_id; // ensure_head_or_fork mints a fresh session_id when it auto- // forks. Sync that to pods.json so a concurrent // restore_from_manifest can't see "no live writer" for the new // session and grab it. - if self.session_id != prev_session_id && self.scope_allocation.is_some() { - pod_registry::update_session(&self.manifest.pod.name, self.session_id)?; + if session_id != prev_session_id && self.scope_allocation.is_some() { + pod_registry::update_session(&self.manifest.pod.name, session_id)?; } Ok(()) } @@ -1142,17 +1286,21 @@ impl Pod { }) } - /// Attempt proactive compaction (called by Controller after run). + /// Attempt proactive compaction at the beginning of a controller Run. /// - /// Best-effort: failures are logged but do not propagate. - pub async fn try_post_run_compact(&mut self) -> Result<(), PodError> { + /// This used to run in the controller's post-run path. Keeping it here + /// preserves the ordering requirement that the next turn starts with a + /// compacted history, without introducing a separate Busy controller state. + /// Best-effort: failures are logged and surfaced, but do not abort the + /// user turn that triggered the check. + pub async fn try_pre_run_compact(&mut self) { let state = match self.compact_state.as_ref() { Some(s) if !s.is_disabled() && !s.just_compacted() => s.clone(), - _ => return Ok(()), + _ => return, }; let current_tokens = self.total_tokens().tokens; if !state.exceeds_post_run(current_tokens) { - return Ok(()); + return; } let retained = state.retained_tokens(); @@ -1161,24 +1309,22 @@ impl Pod { Ok(new_session_id) => { info!( new_session_id = %new_session_id, - "Proactive post-run compaction succeeded" + "Proactive pre-run compaction succeeded" ); self.send_event(Event::CompactDone { new_session_id }); state.record_compact_success(); - Ok(()) } Err(e) => { - warn!(error = %e, "Proactive post-run compaction failed"); + warn!(error = %e, "Proactive pre-run compaction failed"); self.send_event(Event::CompactFailed { error: e.to_string(), }); self.alert( AlertLevel::Warn, AlertSource::Compactor, - format!("post-run compaction failed: {e}"), + format!("pre-run compaction failed: {e}"), ); state.record_compact_failure(); - Ok(()) } } } @@ -1193,19 +1339,24 @@ impl Pod { // head_hash mutable). let w = self.worker.as_ref().unwrap(); let new_items = &w.history()[history_before..]; - session_store::save_delta(&self.store, self.session_id, &mut self.head_hash, new_items) + let mut head = self.session_head.lock().await; + self.session_id = head.session_id; + session_store::save_delta(&self.store, head.session_id, &mut head.head_hash, new_items) .await?; + drop(head); self.flush_pending_scope_snapshot().await?; let turn_count = self.worker.as_ref().unwrap().turn_count(); + let mut head = self.session_head.lock().await; session_store::save_turn_end( &self.store, - self.session_id, - &mut self.head_hash, + head.session_id, + &mut head.head_hash, turn_count, ) .await?; + drop(head); // Flush any sync-buffered metrics from this run first // (currently `prune.fire` / `prune.skip` from the prune observer). @@ -1238,10 +1389,11 @@ impl Pod { record, correlation_id, } = recorded; + let mut head = self.session_head.lock().await; session_store::save_usage( &self.store, - self.session_id, - &mut self.head_hash, + head.session_id, + &mut head.head_hash, record.history_len, record.input_total_tokens, record.cache_read_tokens, @@ -1249,6 +1401,7 @@ impl Pod { record.output_tokens, ) .await?; + drop(head); if let Some(id) = correlation_id { let metric = session_metrics::Metric::now("prune.post_request") .with_correlation_id(&id) @@ -1266,20 +1419,22 @@ impl Pod { let interrupted = self.worker.as_ref().unwrap().last_run_interrupted(); match result { Ok(r) => { + let mut head = self.session_head.lock().await; session_store::save_run_completed( &self.store, - self.session_id, - &mut self.head_hash, + head.session_id, + &mut head.head_hash, r.clone(), interrupted, ) .await?; } Err(e) => { + let mut head = self.session_head.lock().await; session_store::save_run_errored( &self.store, - self.session_id, - &mut self.head_hash, + head.session_id, + &mut head.head_hash, e.to_string(), interrupted, ) @@ -1511,8 +1666,9 @@ impl Pod { )); // Persist as a new compacted session. - let old_session_id = self.session_id; - let old_head_hash = self + let mut head = self.session_head.lock().await; + let old_session_id = head.session_id; + let old_head_hash = head .head_hash .clone() .expect("head_hash should be set after at least one entry"); @@ -1535,7 +1691,8 @@ impl Pod { // session — the new compacted session starts with no measurements // until its first LLM call. self.session_id = new_session_id; - self.head_hash = Some(new_head_hash); + head.session_id = new_session_id; + head.head_hash = Some(new_head_hash); // Keep pods.json pointing at the live session_id. Without this // a concurrent `restore_from_manifest(new_session_id)` would // see no live writer and grab the session this Pod just moved @@ -1545,6 +1702,7 @@ impl Pod { if self.scope_allocation.is_some() { pod_registry::update_session(&self.manifest.pod.name, new_session_id)?; } + drop(head); // Align user_segments with the post-compaction history. Items // before `retain_from` (now folded into the summary) lose their // segments; only the user_messages surviving in retained_items @@ -1641,10 +1799,10 @@ impl Pod { /// Phase 1 (memory.extract) post-run trigger. /// - /// Called by the Controller **before** [`try_post_run_compact`] so - /// the extract worker sees a stable session-log entry range - /// (compact rewrites history). Best-effort: failures are logged but - /// not propagated. + /// Called by the Controller before spawning the background memory task so + /// the extract worker sees a stable session-log entry range while compact + /// is deferred until the next turn starts. Best-effort: failures are + /// logged but not propagated. /// /// Behaviour follows `docs/plan/memory.md` §Phase 1 並走防止: /// in-flight 中の trigger は skip し、完了時点で閾値再評価する @@ -1798,11 +1956,12 @@ impl Pod { extract::ExtractedPayload::default() }); + let source_session_id = self.session_head.lock().await.session_id; let staging_id = if payload.is_empty() { String::new() } else { let source = memory::schema::SourceRef { - session_id: self.session_id.to_string(), + session_id: source_session_id.to_string(), range: [start_entry as u64, end_entry as u64], }; let (id, _) = extract::write_staging(&layout, source, payload) @@ -1817,14 +1976,18 @@ impl Pod { }; let payload_value = serde_json::to_value(&pointer_payload) .expect("ExtractPointerPayload is always JSON-serializable"); - session_store::save_extension( - &self.store, - self.session_id, - &mut self.head_hash, - extract::EXTRACT_DOMAIN, - payload_value, - ) - .await?; + { + let mut head = self.session_head.lock().await; + session_store::save_extension( + &self.store, + head.session_id, + &mut head.head_hash, + extract::EXTRACT_DOMAIN, + payload_value, + ) + .await?; + self.session_id = head.session_id; + } *self .extract_pointer @@ -1850,12 +2013,11 @@ impl Pod { Ok(worker.client().clone_boxed()) } - /// Phase 2 (memory.consolidation) post-run trigger. + /// Phase 2 (memory.consolidation) trigger. /// - /// Called by the Controller **after** [`try_post_run_extract`] and - /// **before** [`try_post_run_compact`]: extract feeds staging, compact - /// rewrites history. Phase 2 must consume staging before compact - /// reshapes the session. + /// Intended to run from a background memory task after Phase 1 may have + /// added staging entries. Compact is deferred until the next turn starts, + /// so consolidation no longer blocks the controller's post-run path. /// /// Behaviour follows `docs/plan/memory.md` §Phase 2 / §並走防止: /// the staging-side `StagingLock` enforces cross-process exclusion; @@ -2096,7 +2258,10 @@ impl Pod, St> { worker: Some(worker), store, session_id, - head_hash: None, + session_head: Arc::new(AsyncMutex::new(SessionHead { + session_id, + head_hash: None, + })), pwd: common.pwd, scope: SharedScope::new(common.scope), hook_builder: HookRegistryBuilder::new(), @@ -2121,7 +2286,8 @@ impl Pod, St> { pending_scope_snapshot: Arc::new(Mutex::new(None)), extract_in_flight: Arc::new(AtomicBool::new(false)), consolidation_in_flight: Arc::new(AtomicBool::new(false)), - extract_pointer: Mutex::new(None), + extract_pointer: Arc::new(Mutex::new(None)), + memory_task: None, user_segments: Vec::new(), }; pod.apply_prune_from_manifest(); @@ -2160,7 +2326,10 @@ impl Pod, St> { worker: Some(worker), store, session_id, - head_hash: None, + session_head: Arc::new(AsyncMutex::new(SessionHead { + session_id, + head_hash: None, + })), pwd: common.pwd, scope: SharedScope::new(common.scope), hook_builder: HookRegistryBuilder::new(), @@ -2185,7 +2354,8 @@ impl Pod, St> { pending_scope_snapshot: Arc::new(Mutex::new(None)), extract_in_flight: Arc::new(AtomicBool::new(false)), consolidation_in_flight: Arc::new(AtomicBool::new(false)), - extract_pointer: Mutex::new(None), + extract_pointer: Arc::new(Mutex::new(None)), + memory_task: None, user_segments: Vec::new(), }; pod.apply_prune_from_manifest(); @@ -2288,7 +2458,10 @@ impl Pod, St> { worker: Some(worker), store, session_id, - head_hash: state.head_hash, + session_head: Arc::new(AsyncMutex::new(SessionHead { + session_id, + head_hash: state.head_hash, + })), pwd: common.pwd, scope: SharedScope::new(common.scope), hook_builder: HookRegistryBuilder::new(), @@ -2315,7 +2488,8 @@ impl Pod, St> { pending_scope_snapshot: Arc::new(Mutex::new(None)), extract_in_flight: Arc::new(AtomicBool::new(false)), consolidation_in_flight: Arc::new(AtomicBool::new(false)), - extract_pointer: Mutex::new(extract_pointer), + extract_pointer: Arc::new(Mutex::new(extract_pointer)), + memory_task: None, user_segments: state.user_segments, }; pod.apply_prune_from_manifest(); diff --git a/crates/pod/tests/compact_events_test.rs b/crates/pod/tests/compact_events_test.rs index 2b6184d3..1334a8f6 100644 --- a/crates/pod/tests/compact_events_test.rs +++ b/crates/pod/tests/compact_events_test.rs @@ -1,8 +1,8 @@ //! Compact lifecycle `Event` broadcasting. //! //! Covers three paths: -//! - `try_post_run_compact` success → `CompactStart + CompactDone` -//! - `try_post_run_compact` failure → `CompactStart + CompactFailed` +//! - `try_pre_run_compact` success → `CompactStart + CompactDone` +//! - `try_pre_run_compact` failure → `CompactStart + CompactFailed` //! - mid-turn `do_compact_and_resume` success → `CompactStart + CompactDone` //! (driven by `compact_request_threshold` → `PreRequestAction::Yield`) @@ -96,7 +96,7 @@ fn write_summary_tool_use_events(call_id: &str, text: &str) -> Vec { ] } -// A low compact_threshold guarantees `try_post_run_compact` will fire +// A low compact_threshold guarantees `try_pre_run_compact` will fire // the first time we check after a run. const POST_RUN_MANIFEST_TOML: &str = r#" [pod] @@ -228,7 +228,7 @@ async fn compact_broadcasts_only_new_system_messages_not_retained_ones() { } #[tokio::test] -async fn post_run_compact_success_broadcasts_start_and_done() { +async fn pre_run_compact_success_broadcasts_start_and_done() { // Responses: (1) first run returns short text, (2) compact worker // emits write_summary then closes (two LLM calls inside the compact // worker: one for write_summary, one that the compact loop consumes @@ -247,7 +247,7 @@ async fn post_run_compact_success_broadcasts_start_and_done() { // Drain run events so only compact events remain in `rx`. let _ = drain(&mut rx); - pod.try_post_run_compact().await.unwrap(); + pod.try_pre_run_compact().await; let events = drain(&mut rx); let kinds: Vec<&str> = events @@ -412,7 +412,7 @@ async fn compact_resets_extract_pointer_so_phase1_can_fire_again() { // Compact runs. Without the fix the in-memory pointer would still // reference the old session's history_len. - pod.try_post_run_compact().await.unwrap(); + pod.try_pre_run_compact().await; assert!( pod.extract_pointer().is_none(), "extract_pointer must be reset to None after compact (matches cold-restore on the new session)" @@ -463,7 +463,7 @@ async fn extract_threshold_zero_is_disabled() { } #[tokio::test] -async fn post_run_compact_failure_broadcasts_start_and_failed() { +async fn pre_run_compact_failure_broadcasts_start_and_failed() { // Only the first run has a response. Compaction will run the // compact worker which immediately exhausts the mock → failure. let client = MockClient::new(vec![single_text_events("hi")]); @@ -476,7 +476,7 @@ async fn post_run_compact_failure_broadcasts_start_and_failed() { let _ = drain(&mut rx); // Best-effort: returns Ok(()) even on failure, but emits CompactFailed. - pod.try_post_run_compact().await.unwrap(); + pod.try_pre_run_compact().await; let events = drain(&mut rx); let kinds: Vec<&str> = events @@ -497,3 +497,85 @@ async fn post_run_compact_failure_broadcasts_start_and_failed() { "unexpected CompactDone in {kinds:?}" ); } + +// --------------------------------------------------------------------------- +// Detached post-run memory jobs (`spawn_post_run_memory_jobs` / +// `wait_for_memory_jobs`). Covers the detach round-trip and the structural +// invariant that the cloned memory-task Pod shares `SessionHead` with the +// source Pod, so that `save_extension` from the background extract does not +// leave the next turn's `save_user_input` looking at a stale head_hash. + +const EXTRACT_NO_COMPACT_MANIFEST: &str = r#" +[pod] +name = "test-pod" +pwd = "./" + +[model] +scheme = "anthropic" +model_id = "test-model" + +[worker] +max_tokens = 100 + +[memory] +extract_threshold = 1 + +[[scope.allow]] +target = "./" +permission = "write" +"#; + +#[tokio::test] +async fn spawn_and_wait_drives_extract_to_completion() { + let client = MockClient::new(vec![ + text_events_with_usage("hi", 1000), + write_extracted_tool_use_events("ec1"), + single_text_events("done"), + ]); + let mut pod = make_pod_with_manifest(EXTRACT_NO_COMPACT_MANIFEST, client).await; + + pod.run_text("first").await.unwrap(); + assert!( + pod.extract_pointer().is_none(), + "extract has not run yet — pointer must be None" + ); + + pod.spawn_post_run_memory_jobs(); + pod.wait_for_memory_jobs().await; + + assert!( + pod.extract_pointer().is_some(), + "spawn + wait must complete extract; pointer should be set" + ); +} + +#[tokio::test] +async fn detached_extract_does_not_fork_session_log() { + // Source pod and the cloned memory-task pod share `SessionHead` via + // `Arc>`. The detached extract advances head_hash through + // `save_extension`; the next `run` must see that same head_hash so + // `ensure_head_or_fork` does not spawn a new session. + let client = MockClient::new(vec![ + text_events_with_usage("hi", 1000), + write_extracted_tool_use_events("ec1"), + single_text_events("done"), + text_events_with_usage("ok", 1000), + ]); + let mut pod = make_pod_with_manifest(EXTRACT_NO_COMPACT_MANIFEST, client).await; + + pod.run_text("first").await.unwrap(); + let session_before = pod.session_id(); + + pod.spawn_post_run_memory_jobs(); + pod.wait_for_memory_jobs().await; + + pod.run_text("second").await.unwrap(); + let session_after = pod.session_id(); + + assert_eq!( + session_before, session_after, + "detached extract's save_extension and the next turn's save_user_input \ + must share head_hash through SessionHead — a fork here means the clone \ + carried its own head_hash" + ); +} diff --git a/crates/pod/tests/controller_test.rs b/crates/pod/tests/controller_test.rs index c559caee..f73a8573 100644 --- a/crates/pod/tests/controller_test.rs +++ b/crates/pod/tests/controller_test.rs @@ -173,7 +173,7 @@ async fn wait_for_status(handle: &PodHandle, status: PodStatus) { // --------------------------------------------------------------------------- #[tokio::test] -async fn run_end_enters_busy_until_post_run_finishes_and_broadcasts_status() { +async fn run_end_returns_to_idle_without_busy_status() { let client = MockClient::new(simple_text_events()); let pod = make_pod(client).await; let handle = spawn_controller(pod).await; @@ -182,7 +182,7 @@ async fn run_end_enters_busy_until_post_run_finishes_and_broadcasts_status() { handle.send(Method::run_text("Hello")).await.unwrap(); let mut saw_run_end = false; - let mut saw_busy_status = false; + let mut saw_idle_status = false; let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(2); loop { tokio::select! { @@ -191,10 +191,8 @@ async fn run_end_enters_busy_until_post_run_finishes_and_broadcasts_status() { Ok(Event::RunEnd { result: protocol::RunResult::Finished }) => { saw_run_end = true; } - Ok(Event::Status { - status: PodStatus::Busy, - }) if saw_run_end => { - saw_busy_status = true; + Ok(Event::Status { status: PodStatus::Idle }) if saw_run_end => { + saw_idle_status = true; break; } Ok(_) => {} @@ -207,10 +205,10 @@ async fn run_end_enters_busy_until_post_run_finishes_and_broadcasts_status() { assert!(saw_run_end, "expected RunEnd::Finished"); assert!( - saw_busy_status, - "expected busy status immediately after RunEnd" + saw_idle_status, + "expected idle status immediately after RunEnd" ); - wait_for_status(&handle, PodStatus::Idle).await; + assert_eq!(handle.shared_state.get_status(), PodStatus::Idle); } #[tokio::test] @@ -237,51 +235,6 @@ async fn attach_history_includes_current_status() { } } -#[tokio::test] -async fn pause_while_busy_is_idempotent_not_not_running() { - let client = MockClient::new(simple_text_events()); - let pod = make_pod(client).await; - let handle = spawn_controller(pod).await; - let mut rx = handle.subscribe(); - - handle.send(Method::run_text("Hello")).await.unwrap(); - - let mut saw_busy = false; - let mut saw_idle = false; - let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(2); - loop { - tokio::select! { - event = rx.recv() => { - match event { - Ok(Event::RunEnd { .. }) => { - handle.send(Method::Pause).await.unwrap(); - } - Ok(Event::Status { status: PodStatus::Busy }) => { - saw_busy = true; - } - Ok(Event::Status { status: PodStatus::Idle }) if saw_busy => { - saw_idle = true; - break; - } - Ok(Event::Error { - code: protocol::ErrorCode::NotRunning, - .. - }) if saw_busy && !saw_idle => { - panic!("Pause while Busy should be an idempotent no-op"); - } - Ok(_) => {} - Err(_) => break, - } - } - _ = tokio::time::sleep_until(deadline) => break, - } - } - - assert!(saw_busy, "expected Busy status"); - assert!(saw_idle, "expected final Idle status"); - assert_eq!(handle.shared_state.get_status(), PodStatus::Idle); -} - #[tokio::test] async fn shared_state_starts_idle() { let client = MockClient::new(simple_text_events()); diff --git a/crates/protocol/src/lib.rs b/crates/protocol/src/lib.rs index f8736d3e..8deb3620 100644 --- a/crates/protocol/src/lib.rs +++ b/crates/protocol/src/lib.rs @@ -440,10 +440,6 @@ pub enum PodStatus { Idle, Running, Paused, - /// The worker turn has ended, but the controller is still performing - /// post-run jobs (memory extract / consolidate / compact) and cannot - /// accept a new turn immediately. - Busy, } #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] @@ -763,18 +759,18 @@ mod tests { #[test] fn event_status_format() { let event = Event::Status { - status: PodStatus::Busy, + status: PodStatus::Running, }; let json = serde_json::to_string(&event).unwrap(); let parsed: serde_json::Value = serde_json::from_str(&json).unwrap(); assert_eq!(parsed["event"], "status"); - assert_eq!(parsed["data"]["status"], "busy"); + assert_eq!(parsed["data"]["status"], "running"); let decoded: Event = serde_json::from_str(&json).unwrap(); assert!(matches!( decoded, Event::Status { - status: PodStatus::Busy + status: PodStatus::Running } )); } diff --git a/crates/tools/src/task.rs b/crates/tools/src/task.rs index e410b537..2eed7dd0 100644 --- a/crates/tools/src/task.rs +++ b/crates/tools/src/task.rs @@ -546,7 +546,7 @@ mod tests { assert_eq!(tasks[1].status, TaskStatus::Completed); } - /// Wrap snapshot text the way `Pod::try_post_run_compact` does, so tests + /// Wrap snapshot text the way `Pod::try_pre_run_compact` does, so tests /// exercise the exact format that goes through the session log. fn wrap_snapshot_system_message(snapshot: &str) -> String { format!( @@ -655,7 +655,7 @@ mod tests { #[test] fn synthetic_compact_tasklist_pair_is_well_formed() { - // Mirrors `Pod::try_post_run_compact`'s synthetic insertion: + // Mirrors `Pod::try_pre_run_compact`'s synthetic insertion: // a system snapshot message followed by a TaskList tool_call/tool_result // pair sharing the `compact-tasklist` id. Verify the structural // contract every provider request builder relies on (matched call_id, diff --git a/crates/tui/src/app.rs b/crates/tui/src/app.rs index a7663b54..d570f74d 100644 --- a/crates/tui/src/app.rs +++ b/crates/tui/src/app.rs @@ -49,9 +49,6 @@ pub struct App { pub running: bool, /// True while the Pod is in `PodStatus::Paused`. pub paused: bool, - /// True after worker `RunEnd` while controller post-run work is still - /// blocking the next method. - pub busy: bool, pub run_requests: usize, /// Sum of `input_tokens - cache_read_input_tokens` across the /// current turn's LLM requests — i.e. the net tokens this turn @@ -89,7 +86,6 @@ impl App { pod_status: PodStatus::Idle, running: false, paused: false, - busy: false, run_requests: 0, run_upload_tokens: 0, run_output_tokens: 0, @@ -111,8 +107,7 @@ impl App { self.pod_status = status; self.running = status == PodStatus::Running; self.paused = status == PodStatus::Paused; - self.busy = status == PodStatus::Busy; - if self.running || self.busy { + if self.running { self.quit_confirm = None; } } @@ -296,10 +291,6 @@ impl App { } pub fn submit_input(&mut self) -> Option { - if self.busy { - self.push_error("Pod is finishing post-run work; wait for idle before submitting."); - return None; - } let segments = self.input.submit_segments(); if segments_are_blank(&segments) { // Empty Enter only does something meaningful when the Pod @@ -640,7 +631,7 @@ impl App { }); self.set_pod_status(match result { RunResult::Paused => PodStatus::Paused, - RunResult::Finished | RunResult::LimitReached => PodStatus::Busy, + RunResult::Finished | RunResult::LimitReached => PodStatus::Idle, }); self.run_requests = 0; self.run_upload_tokens = 0; diff --git a/crates/tui/src/main.rs b/crates/tui/src/main.rs index 2aa53b3a..9463e4b9 100644 --- a/crates/tui/src/main.rs +++ b/crates/tui/src/main.rs @@ -435,10 +435,6 @@ fn handle_key(app: &mut App, key: KeyEvent) -> Option { KeyCode::Char('x') if ctrl => Some(match app.pod_status { PodStatus::Running => Some(Method::Cancel), PodStatus::Paused | PodStatus::Idle => Some(Method::Shutdown), - PodStatus::Busy => { - app.push_error("Pod is finishing post-run work; wait for idle or press Ctrl-C twice to exit the TUI."); - None - } }), KeyCode::Char('d') if ctrl => { app.quit = true; @@ -577,7 +573,7 @@ fn handle_key(app: &mut App, key: KeyEvent) -> Option { const CONFIRM_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(3); /// Running → send `Method::Pause`. -/// Idle / Paused / Busy → 2-tap to quit the TUI (the Pod keeps running). +/// Idle / Paused → 2-tap to quit the TUI (the Pod keeps running). fn handle_pause_or_quit(app: &mut App) -> Option { if app.pod_status == PodStatus::Running { return Some(Method::Pause); diff --git a/crates/tui/src/ui.rs b/crates/tui/src/ui.rs index 440e7cbb..406552db 100644 --- a/crates/tui/src/ui.rs +++ b/crates/tui/src/ui.rs @@ -877,18 +877,6 @@ fn draw_status(frame: &mut Frame, app: &App, area: Rect) { " — Enter to resume, type to start new turn", Style::default().fg(Color::DarkGray), )); - } else if app.busy { - spans.push(Span::raw(" | ")); - spans.push(Span::styled( - "busy", - Style::default() - .fg(Color::Yellow) - .add_modifier(Modifier::BOLD), - )); - spans.push(Span::styled( - " — finishing post-run work", - Style::default().fg(Color::DarkGray), - )); } else { spans.push(Span::styled(" idle", Style::default().fg(Color::DarkGray))); } diff --git a/docs/compaction.md b/docs/compaction.md index 250475f4..876ccf54 100644 --- a/docs/compaction.md +++ b/docs/compaction.md @@ -23,8 +23,8 @@ Pod::handle_worker_result → persist_turn(旧セッションに記録) → compact() → resume() -[ターンの合間 — Pod::run 完了後] -Controller::try_post_run_compact ← proactive +[ターンの合間 — 次の Pod::run 冒頭] +Pod::try_pre_run_compact ← proactive → input_tokens > post_run_threshold なら compact() (best-effort) ``` @@ -66,7 +66,7 @@ pub struct ToolOutput { ### トリガー(2段階の閾値) -1. **ターンの合間 (Controller)**: `try_post_run_compact()` で `input_tokens > post_run_threshold` → best-effort +1. **ターンの合間 (次 turn 冒頭)**: `try_pre_run_compact()` で `input_tokens > post_run_threshold` → best-effort 2. **リクエストの合間 (CompactInterceptor)**: `pre_llm_request` で `input_tokens > request_threshold` → `PreRequestAction::Yield` **ターンの合間が proactive (小さい閾値)**: diff --git a/tickets/pod-post-run-detach.md b/tickets/pod-post-run-detach.md index f0dd9894..7781f5a1 100644 --- a/tickets/pod-post-run-detach.md +++ b/tickets/pod-post-run-detach.md @@ -66,3 +66,9 @@ - `crates/pod/src/controller.rs` `run_post_run_jobs` / `finish_controller_run` - `crates/pod/src/pod.rs` `try_post_run_extract` / `try_post_run_consolidate` / `try_post_run_compact` - `crates/protocol/src/lib.rs` `PodStatus` 定義 + +## Review + +- 状態: Approve +- レビュー詳細: [./pod-post-run-detach.review.md](./pod-post-run-detach.review.md) +- 日付: 2026-05-04 (Round 3) diff --git a/tickets/pod-post-run-detach.review.md b/tickets/pod-post-run-detach.review.md new file mode 100644 index 00000000..73bccb0e --- /dev/null +++ b/tickets/pod-post-run-detach.review.md @@ -0,0 +1,132 @@ +# Review: Pod: memory の post-run 同期実行を解消し、Busy 状態を撤廃する + +## 前提・要件の確認 + +- **Phase 1 (extract) の detach**: post-run から detached spawn に移行。`crates/pod/src/pod.rs:236` `spawn_post_run_memory_jobs` で `tokio::spawn` 経由に切り替わり、`extract_in_flight` の CAS は `try_post_run_extract` 内 (`crates/pod/src/pod.rs:1758`) でそのまま維持されている。 +- **Phase 2 (consolidate) の独立化**: controller の post-run チェーンから外れ、spawn したタスク内で extract 完了直後に呼ばれる (`crates/pod/src/pod.rs:253`)。Phase 1 直後の連鎖実行は「staging 変化を契機とした独立タスク」の最小実装として doc 要件と整合する。 +- **compact の post-run 切り離し**: 旧 `try_post_run_compact` が `try_pre_run_compact` にリネームされ、`run` / `resume` / `run_for_notification` 冒頭の `ensure_session_head` 直後で呼ばれる (`crates/pod/src/pod.rs:906, 1080, 1097`)。doc (`docs/compaction.md`) も同期して更新済み。 +- **`PodStatus::Busy` 撤廃**: `crates/protocol/src/lib.rs:440` で enum から削除。controller / TUI / tests から Busy 経路が削除されている。 +- **shutdown 時の wait**: `pod.wait_for_memory_jobs()` がメインループ break 後に呼ばれ (`crates/pod/src/controller.rs:684`)、in-flight な extract / consolidate を join する。compact は inline 実行のため、turn 完了で必ず終わっており別途待つ必要なし。 + +未達の要件: +- **compact 開始時に in-flight extract を待つ同期**(ticket 要件「compact を次ターン冒頭に移すなら、その冒頭で『現在 in-flight な extract があれば完了を待つ』一段の同期を挟む」)が `try_pre_run_compact` に組み込まれていない。`crates/pod/src/pod.rs:1239` を見ると compact_state チェック直後に `compact()` を呼ぶだけで、`extract_in_flight` も `memory_task` join もない。 + +## アーキテクチャ・スコープ + +- レイヤ境界・クレート命名・依存追加方法はチケット範囲内で逸脱なし。 +- `crates/llm-worker/src/llm_client/client.rs` への `impl Clone for Box` 追加は、Pod を `clone_for_memory_task` で複製するために必要となった補助。スコープ境界としては「LLM 抽象に Clone を許す」付随変更で、低レベル基盤の責務範囲内であり許容範囲。 +- `crates/tools/src/task.rs` は doc コメントの API 名変更追従のみ。 +- `Pod::clone_for_memory_task` で「memory ジョブ用に Pod をまるごと複製してデタッチ」というアプローチを取ったことが、本実装の最大の構造的判断。これは下記 Blocking で挙げる持続性の問題を生んでいる: **session-store 側に並走する writer を入れても安全な排他層が無い**ため、controller と memory タスクが同一 jsonl に同時に追記する設計はアーキテクチャ的に破綻している。 + +## 指摘事項 + +### Blocking + +- **memory タスクと controller が同一 session log に競合書き込みする (head_hash race)** — `crates/pod/src/pod.rs:193` `clone_for_memory_task` は `session_id` を値で、`head_hash` を `Option::clone()` で持ち出すだけで、`store` (FsStore は `append` モード書き) の排他は何もない。`run_extract_once` 内 `session_store::save_extension` (`crates/pod/src/pod.rs:1911`) は cloned Pod 自身の `head_hash` を進めるが、controller 側の `head_hash` は据え置きのまま。 + - 帰結 1: memory タスクが完了して save_extension を書いた直後に次ターンを始めると、`ensure_session_head` の `ensure_head_or_fork` (`crates/session-store/src/session.rs:117`) が「store_head ≠ self.head_hash」を検知して **毎回 spurious fork** が走る。意図しないセッション ID 切り替わりが extract 完了ごとに発生する重大な機能リグレッション。 + - 帰結 2: 同じターン内で controller が `save_user_input` / `save_delta` を書いた後に memory タスクが save_extension を書くと、両者とも stale な `prev_hash: X` を載せたエントリが jsonl に並ぶ (chain が分岐する)。`collect_state` は線形読みなのでクラッシュは起きないが、`read_head_hash` が直前の Extension の hash を返すため次ターン頭で fork が起きる。 + - 帰結 3: fork した側 (新 session) には Extension エントリが残らないため、resume 時に extract pointer が None で復元される。staging との整合は壊れていなくても、復元した Pod は無駄に entry 0 から再走査する。 + - 旧実装が inline `.await` だったときは、save_extension が完了してから次ターンが始まるので head_hash が常に進んだ状態で観測でき、この race は構造的に発生しなかった。detach するなら head_hash / session_id の所有権を 1 本化する (memory タスクから writeback、または extract の永続化を別経路に切り出す) 等の対応が必要。 + +- **compact 開始時に in-flight extract を待つ同期がない** — チケット要件 §Phase 1 detach 末尾に明記された「compact を次ターン冒頭に移すなら、その冒頭で『現在 in-flight な extract があれば完了を待つ』一段の同期」が未実装。`crates/pod/src/pod.rs:1239` の `try_pre_run_compact` は `extract_in_flight` も `memory_task` も観測しない。 + - extract が走っている間に compact がセッションを fork すると、extract が `save_extension` を書く先は旧セッション側になり、共有 `Arc>` の値は新セッションに対して意味をなさない `processed_through_history_len` を持つことになる。 + - 単純対処は `try_pre_run_compact` の頭で `wait_for_memory_jobs` 相当 (もしくは `extract_in_flight` の spinning wait) を入れること。chickeness よりは、clone-detach 設計そのものを見直したほうが早い可能性がある。 + +### Non-blocking / Follow-up + +- `crates/tui/src/app.rs:54, 92, 114, 299` および `crates/tui/src/ui.rs:880` に `App::busy: bool` が残っている。Busy 撤廃後は常時 `false` が代入されるだけのデッドフィールドで、`submit_input` の guard と ui の `busy` 表示分岐は到達不能。チケット §`PodStatus::Busy` の撤廃 で「Busy を観測している経路を整理する」が要件に挙がっているので、この残骸も併せて消す方針が筋。 +- `crates/pod/src/pod.rs:236` `spawn_post_run_memory_jobs` の二段 `if` は読みづらい。`is_finished` 後の `handle.abort()` は no-op だがロジックが冗長。`if let Some(handle) = self.memory_task.take_if(|h| h.is_finished()) { drop(handle); }` 相当に整理できる。 +- `clone_for_memory_task` で `worker.set_cache_key(Some(self.session_id.to_string()))` を埋めているが、もしこの後 compact が新 session_id を割り当てると memory タスク側の cache_key と合わなくなる。prompt cache 効率の観点では現状でも問題ないが、上記 Blocking 修正と一緒に再評価したい。 +- `crates/pod/src/pod.rs:248` で worker の system_prompt を `unwrap_or_default().to_string()` しているのは、抽出 Worker 側で別 prompt を貼り直すから問題はないが、`expect("worker present")` と組み合わさると意図が読みづらい。コメントで「extract worker は `try_post_run_extract` 内で Worker を使い捨て生成するため、ここでの worker は実は使われない」と明示しておくと将来の事故が防げる。 +- detached path のテストが無い。要件に直結する観点 (Busy が観測されないこと、shutdown が wait すること、extract 完了後に save_extension が走っても次ターンが fork しないこと) にカバレッジが無いので、Blocking を直したら回帰テストとして追加したい。 +- TODO.md に追加された `tui-context-usage-indicator` 行と新規 `tickets/tui-context-usage-indicator.md` は本チケットの diff に紛れているが、別件。コミット切り分けの確認推奨。 + +### Nits + +- `crates/pod/src/pod.rs:1735-1738` のコメントに「Called by the Controller before spawning the background memory task」とあるが、実際に呼んでいるのは `spawn_post_run_memory_jobs` 内の spawn 後タスク。文言が紛らわしい。 +- `crates/pod/src/pod.rs:1232-1238` のドックで「Best-effort: failures are logged and surfaced, but do not abort the user turn that triggered the check」とあるが、関数自体は `Result<(), PodError>` を返し常に `Ok(())`。コメントと整合。問題はないが、`run` 側で `?` を付けている (`crates/pod/src/pod.rs:906`) ので将来 `Err` を返すようになった瞬間にユーザーターンを abort してしまう。コメントの宣言通り `Ok` 確定なら戻り値は `()` でよい。 + +## 判断 + +**Request changes** — Busy 撤廃 / detach / compact 前倒しという外形は要件通りだが、(1) cloned Pod が共有 `head_hash` 抜きで session log に直書きする構造的レース、(2) compact 前に in-flight extract を待つ同期の欠落、の 2 点が要件・運用上の正しさを破っている。とくに (1) は extract が完了するたびに spurious session fork を引き起こす機能リグレッションで、merge 前の修正が必須。 + +--- + +## Round 2 (再レビュー) + +### 前提・要件の確認 (差分) + +- **Blocking 1 (head_hash race) 解消**: `Pod` から個別 `head_hash` フィールドを削除し、`SessionHead { session_id, head_hash }` を `Arc>` で controller / cloned-memory-task 双方が共有する構造に変更 (`crates/pod/src/pod.rs:39-43, 71, 306-309`)。`save_user_input` / `save_delta` / `save_turn_end` / `save_usage` / `save_run_completed` / `save_run_errored` / `save_pod_scope` / `save_extension` / `record_metric` が全て同じ mutex 越しに head_hash を読み書きする (`crates/pod/src/pod.rs:441-447, 468-475, 540-548, 1207-1236, 1378, 1398-1407, 1422-1438, 1445-1457, 1467-1478, 2014-2024`)。compact (`crates/pod/src/pod.rs:1703-1736`) も lock 内で session_id 切り替えを行う。`extract_pointer` も `Arc>` 化されてコピー先と共有 (`crates/pod/src/pod.rs:182-187, 247`)。これで cloned Pod の `save_extension` が書いた head_hash は controller 側にも即座に反映され、`ensure_head_or_fork` での spurious fork は構造的に発生しない。設計上の race は解消。 +- **Blocking 2 (pre-run extract wait) 実装**: `run` / `run_for_notification` / `resume` の冒頭で `compact_state.exceeds_post_run(...)` を満たす場合のみ `memory_task.take().await` で in-flight ジョブを join してから `try_pre_run_compact` へ進む (`crates/pod/src/pod.rs:926-946, 1123-1143, 1159-1179`)。閾値を超えていない通常ターンでは join せず detach のままなので、要件の「compact 直前にだけ wait」が成立。`memory_task` は `JoinHandle<()>` 一本で持たれ、必ず完了する (内部に LLM 呼び出しタイムアウトは Worker 側に委ねるが、追加の無限ループは無い) ので `await` が永久 block するパスは見えない。 +- **デッドコード掃除**: `App::busy` フィールドおよびその参照 (`submit_input` の早期 return / `ui::draw_status` の busy 表示 / `handle_key` Ctrl-X 分岐) を完全削除 (`crates/tui/src/app.rs`, `crates/tui/src/ui.rs`, `crates/tui/src/main.rs`)。`grep PodStatus::Busy` も TUI / Pod / protocol 全域で commentary 1 件のみ (`crates/pod/src/pod.rs:1325` のコメント、過去経緯の説明として妥当)。 +- **テスト追加**: `controller_test.rs` の `run_end_enters_busy_until_post_run_finishes_and_broadcasts_status` を `run_end_returns_to_idle_without_busy_status` にリネーム + 期待値変更。`pause_while_busy_is_idempotent_not_not_running` を削除 (Busy が無いので意味を失った)。`compact_events_test.rs` は `try_post_run_compact` → `try_pre_run_compact` 変名追従のみ。**detach 経路に固有のテスト (spawn 後の次ターンが fork しないこと、shutdown で memory_task が join されること、pre-run の extract 待ち合わせ) は依然として未追加**。 + +### 残っている指摘事項 + +#### Blocking +- なし。前回ラウンドの両 Blocking は解消。 + +#### Non-blocking / Follow-up + +- **detach 経路の動作テストが未追加**: 前回 follow-up に挙げた以下が引き続き未カバー。Round 1 では Blocking に隠れていたが、今回は単独の品質課題として残る。 + - 同 turn 内の `save_extension` と次ターンの `save_user_input` が同じ jsonl に並ぶケースで fork しないこと (`session_head` mutex の serialization が機能していること)。 + - `wait_for_memory_jobs` が shutdown 時に in-flight な extract / consolidate を join し切ること。 + - `compact_state.exceeds_post_run(...)` を満たす状態で memory_task が走行中に次ターンを投げると、compact が memory_task の完了を待ってから走ること。 + - 推奨は `crates/pod/tests/controller_test.rs` に `MockClient` を遅延つきで返す形のテストを 1〜2 本追加。 +- **`run` / `run_for_notification` / `resume` で同じ memory_task 整理 + pre-compact wait ブロックが 3 回コピペされている** (`crates/pod/src/pod.rs:926-946, 1123-1143, 1159-1179`)。ヘルパ (`async fn ensure_pre_run_state(&mut self) -> Result<(), PodError>` 等) に 1 本化したほうが、将来 wait 条件を変更したときの抜け漏れを防げる。 +- **`spawn_post_run_memory_jobs` の二段 `if` が依然冗長** (`crates/pod/src/pod.rs:254-264`)。「未完了なら何もしない、完了済みなら take して drop してから新しく spawn」という意図を `take_if(|h| h.is_finished())` 等で 1 段に書ける。Round 1 の指摘そのまま。 +- **`clone_for_memory_task` の `worker.set_cache_key(Some(self.session_id.to_string()))`** (`crates/pod/src/pod.rs:216`) は clone 時点の session_id を貼っている。compact が走って session_id が変わる前に memory_task は join されるよう pre-run wait が入ったので race にはならないが、cloned worker は extract worker 用に作っているだけで実体は `run_extract_once` 内で `extract_worker = Worker::new(...).system_prompt(...)` (`crates/pod/src/pod.rs:1956`) として作り直される。**したがって cloned Pod の `worker` は実際には使われていない**。`clone_for_memory_task` で `worker` をビルドしている処理 (`crates/pod/src/pod.rs:211-216`) はメモリ・初期化コストの無駄であり、`Option::None` で十分か、せめてコメントで「使われない / placeholder」と明示すべき。 +- **`pending_attachments` を空 Arc で再生成、`pending_scope_snapshot` は共有という非対称** (`crates/pod/src/pod.rs:237, 244`)。memory タスクは scope 変更も attachment も生まないので実害は無いが、意図がコメントに無いと将来事故る。 +- **スコープ外混入が継続**: `TODO.md` の `tui-context-usage-indicator` 行追加と `tickets/tui-context-usage-indicator.md` 新規が本チケットの未コミット差分に依然として残っている。前回 follow-up でコミット切り分けを指摘済みだが解消されていない。コミット時に別チケットとして分離する判断を再度推奨。 + +#### Nits + +- `crates/pod/src/pod.rs:1834-1839` の doc コメント「Called by the Controller before spawning the background memory task」は誤り。実際に呼んでいるのは `spawn_post_run_memory_jobs` が `tokio::spawn` した async block の内部 (`crates/pod/src/pod.rs:268`)。「Spawned background memory task が compact 確定前に呼び出す Phase 1 entry point」程度の文言に。 +- `crates/pod/src/pod.rs:1322-1328` `try_pre_run_compact` の戻り値は依然 `Result<(), PodError>` だが本体は `Ok(())` 確定。`run` 側 (`crates/pod/src/pod.rs:946, 1143, 1179`) で `?` で受けているので、将来 `Err` を返すようになった瞬間にユーザーターンを abort してしまう。「best-effort で abort しない」という設計意図と整合させるなら戻り値を `()` にするか、本体側で必ず `Ok(())` に正規化するコメントを残すか。 +- `crates/pod/src/pod.rs:253` 直後の空白 2 行 + 関数定義は 1 行多い。 + +### Round 2 判断 + +**Approve with follow-up** — 前回ラウンドの 2 件の Blocking はいずれも構造的に解消されている。`SessionHead` の `Arc>` 共有は cloned Pod / controller 双方の head_hash / session_id 進行を直列化し、ticket 要件の「detach しても session log の整合性を壊さない」を満たす。compact 前 wait も threshold 一致時にだけ join する形で必要十分。残課題は (a) detach 経路の回帰テスト追加、(b) `run` / `resume` / `run_for_notification` のコピペ整理、(c) スコープ外コミットの切り分け、で、いずれも本チケットを閉じてから別チケット ないし follow-up commit として処理可能。 + +--- + +## Round 3 (再レビュー) + +### 前提・要件の確認 (差分) + +Round 2 の follow-up 6 点に対する解消状況を確認した。 + +- **#1 detach 経路の固有テスト追加**: `crates/pod/tests/compact_events_test.rs:528-581` に 2 本追加。 + - `spawn_and_wait_drives_extract_to_completion` (line 529) — `pod.spawn_post_run_memory_jobs() + wait_for_memory_jobs().await` の round-trip で `extract_pointer` が `None → Some` に進むことを assert。`spawn`/`wait` の API contract と extract の完了条件をカバー。 + - `detached_extract_does_not_fork_session_log` (line 553) — Round 1 Blocking 1 の構造的不変条件 (`SessionHead` 共有が `save_extension` と `save_user_input` の chain を直列化する) を行動レベルで検証。`session_id_before == session_id_after` を比較するので、cloned Pod が独立 head_hash を持っていた場合の `ensure_head_or_fork` 経由の spurious fork を検知できる。 + - 残りの 2 観点 (shutdown wait の正常完了 / pre-run compact の memory_task 待ち合わせのタイミング) は本ラウンドでも未追加だが、ユーザーのコメントで「テスト容易性の限界」として明示されている。後述の判断参照。 +- **#2 ヘルパ集約**: `prepare_for_run` (`crates/pod/src/pod.rs:938-948`) に 1 本化。`run` (line 963)、`run_for_notification` (line 1138)、`resume` (line 1152) の prelude が全て `self.prepare_for_run().await?;` の 1 行で済むように整理。同期順序は「`ensure_interceptor_installed` → `ensure_system_prompt_materialized` → `cleanup_finished_memory_task` → `ensure_session_head` → (compact 必要時のみ `join_memory_task`) → `try_pre_run_compact`」(`crates/pod/src/pod.rs:939-947`) で、Round 2 で個別に書かれていた順序と意味的に等価。`cleanup_finished_memory_task` を `ensure_session_head` の前に移したのは finished handle を早めに drop するだけで semantics に影響なし。 +- **#3 `clone_for_memory_task` の worker 縮約**: `set_system_prompt` / `set_request_config` / `set_cache_key` の copy をすべて削除し、`Worker::new(client.clone()).set_history(...)` の最小 snapshot に縮約 (`crates/pod/src/pod.rs:216-218`)。コメント (line 210-215) で「extract/consolidate は内部で worker を作り直すので system_prompt / request_config / cache_key は不要」と意図を明示。`run_extract_once` 内 (line 1922) で `extract_worker = Worker::new(client).system_prompt(extract_system_prompt)` として作り直されるという既存挙動と整合する。 +- **#4 `spawn_post_run_memory_jobs` のフラット化**: 二段 `if` を `cleanup_finished_memory_task() + if self.memory_task.is_some() { return; }` (`crates/pod/src/pod.rs:259-262`) に整理。`take_if` での 1 段化案より「先に finished を掃除 → in-flight があれば skip」という 2 ステップの構造が読み取りやすく、結果的に意図がはっきりした。 +- **#5 戻り値整理**: `try_pre_run_compact` (`crates/pod/src/pod.rs:1296`) と `wait_for_memory_jobs` (`crates/pod/src/pod.rs:199`) の戻り値を `()` に変更。`prepare_for_run` 内 (line 946) と controller の shutdown 直前 (`crates/pod/src/controller.rs:684`) の呼び出し側からも `?` / `if let Err(...)` が消えた。「best-effort で user turn を abort しない」設計意図がシグネチャレベルで保証される。 +- **#6 スコープ外混入の切り分け**: 既コミット (`632d63d docs(tickets): 追加:タスクリストの表示とコンテキスト長インジケータ`) で `tui-context-usage-indicator.md` / `tui-task-display.md` および `TODO.md` 行が別チケットとして分離済み。本チケットの差分は本体実装 + 本 review.md + ticket 末尾の Review 状態のみ。 + +### 動作確認 + +- `cargo check --workspace --tests` clean (warning 1 件は既存の `end_scope` dead-code、本チケット無関係)。 +- `cargo test -p pod --test compact_events_test` 8 本全て pass (新規 2 本含む)。 +- `cargo test -p pod --test controller_test` 21 本全て pass (renamed `run_end_returns_to_idle_without_busy_status` 含む)。 +- ユーザー報告の `cargo test --workspace` 50 group pass を裏取り済み。 + +### 残っている指摘事項 + +#### Blocking +- なし。 + +#### Non-blocking / Follow-up +- **未追加の detach テスト 2 観点 (shutdown wait の正常完了 / pre-run compact の memory_task 待ち合わせ)**: ユーザー側で「テスト容易性の限界による断念」として明示。前者は `controller` task の終了パス (`crates/pod/src/controller.rs:684`) を `wait_for_memory_jobs` の代入前後で観測する必要があるが、現在の `controller_test.rs` フレームには in-flight job を確実に「shutdown 時に未完了」状態に置く手段がない (MockClient が同期に response を返すため)。後者は `compact_state` 閾値を超えた状態で memory_task が走行中という条件をテスト 1 本で再現する必要があり、`MockClient` のレスポンス遅延フックがない現状では確定的に作れない。**判断**: 構造的不変条件 (`SessionHead` mutex / `extract_in_flight` CAS) は Round 2 で型レベル + 既存テストで検証済みであり、`detached_extract_does_not_fork_session_log` がその不変条件を行動レベルで cover している。残り 2 観点はテスト基盤側の課題 (e.g. `MockClient` への delay hook 追加) として `tickets/` 別件で扱える性質であり、本チケットの merge を block しない。 +- **`prepare_for_run` の名前**: prelude / pre-run setup として汎用名なので将来「pre_run_X」系の helper が増えた時に衝突しうる。本チケット範囲外の整理候補として留意。 +- **`should_pre_run_compact` と `try_pre_run_compact` の二重判定**: `should_pre_run_compact` は `disabled / just_compacted / exceeds_post_run` を見るが (`crates/pod/src/pod.rs:927-931`)、`try_pre_run_compact` は `disabled / just_compacted` 早期 return + `exceeds_post_run` 二度判定で同じ判断ロジックを再評価する (`crates/pod/src/pod.rs:1297-1304`)。コメント (line 922-925) で「defensive に重複している」と明示してあるので意図的だが、片方を `should_pre_run_compact()` の呼び出しに置き換える形で 1 本化しても同じ defensive 性は保てる。可読性向上の余地。 + +#### Nits +- `crates/pod/src/pod.rs:1802` の `try_post_run_extract` doc コメント「Called by the Controller before spawning the background memory task」が依然として残っている。Round 2 の Nits として既に指摘済み。実際の呼び出しは `spawn_post_run_memory_jobs` が tokio::spawn した async block 内 (`crates/pod/src/pod.rs:266`)。修正コストは小さい。 + +### Round 3 判断 + +**Approve** — Round 2 で挙げた follow-up #1〜#5 はすべて妥当に解消され、新規 regression は確認できない。`prepare_for_run` への集約で 3 経路の prelude 順序が単一の真理になり、将来「compact 前に何を待つか」を変更したい時の抜け漏れリスクが消えた。`clone_for_memory_task` の worker 縮約は無駄な init を削るだけでなく「memory タスクから見える worker は client + history のみが意味を持つ」という設計境界を明示している。`try_pre_run_compact` / `wait_for_memory_jobs` の `()` 返却は best-effort セマンティクスを型で保証する。新規テスト 2 本のうち `detached_extract_does_not_fork_session_log` は Round 1 Blocking 1 の構造的不変条件 (`SessionHead` 共有) を行動でカバーする実効的な assertion になっている。スコープ外混入も切り分け済み。残った Non-blocking 項目はいずれもテスト基盤拡張 / 命名整理レベルで、別チケットで個別に拾える性質。本チケットは閉じて良い。