feat: Pos処理の非同期化・Busy状態の削除
This commit is contained in:
parent
7527b55de4
commit
3266ddb2d4
|
|
@ -74,6 +74,12 @@ pub trait LlmClient: Send + Sync {
|
|||
}
|
||||
}
|
||||
|
||||
impl Clone for Box<dyn LlmClient> {
|
||||
fn clone(&self) -> Self {
|
||||
self.clone_boxed()
|
||||
}
|
||||
}
|
||||
|
||||
/// `Box<dyn LlmClient>` に対する `LlmClient` の実装
|
||||
///
|
||||
/// これにより、動的ディスパッチを使用するクライアントも `Worker` で利用可能になる。
|
||||
|
|
|
|||
|
|
@ -45,7 +45,9 @@ impl Scheme for DummyScheme {
|
|||
}
|
||||
fn parse_sse(&self, _: &str, _: &str, _: &mut ()) -> Result<Vec<Event>, ClientError> {
|
||||
if self.parse_fail {
|
||||
Err(ClientError::Sse("simulated mid-stream parse failure".into()))
|
||||
Err(ClientError::Sse(
|
||||
"simulated mid-stream parse failure".into(),
|
||||
))
|
||||
} else {
|
||||
Ok(vec![])
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,8 +4,8 @@
|
|||
//! flags shared between:
|
||||
//! - `PodInterceptor` (reads `request_threshold` — the *safety net* for
|
||||
//! between-requests yielding)
|
||||
//! - `Pod::try_post_run_compact` (reads `post_run_threshold` — the
|
||||
//! *proactive* check between turns)
|
||||
//! - `Pod::try_pre_run_compact` (reads `post_run_threshold` — the
|
||||
//! *proactive* check before the next turn starts)
|
||||
//! - `Pod::run()` / `resume()` (circuit breaker, thrash detection)
|
||||
//!
|
||||
//! Current occupancy (input-token count) is **not** stored here. The single
|
||||
|
|
@ -19,8 +19,8 @@ const MAX_COMPACT_FAILURES: usize = 3;
|
|||
|
||||
/// Shared mutable state for compaction decisions.
|
||||
pub(crate) struct CompactState {
|
||||
/// Between-turns threshold (proactive). Checked by the Controller
|
||||
/// after a run completes. `None` disables the post-run check.
|
||||
/// Between-turns threshold (proactive). Checked before the next turn
|
||||
/// starts. `None` disables the pre-run check.
|
||||
post_run_threshold: Option<u64>,
|
||||
/// Between-requests threshold (safety net). Checked inside a turn
|
||||
/// before each LLM request. `None` disables the request check.
|
||||
|
|
|
|||
|
|
@ -76,68 +76,22 @@ async fn set_controller_status(
|
|||
let _ = event_tx.send(Event::Status { status });
|
||||
}
|
||||
|
||||
async fn run_post_run_jobs<C, St>(pod: &mut Pod<C, St>, alerter: &Alerter)
|
||||
where
|
||||
C: LlmClient,
|
||||
St: Store,
|
||||
{
|
||||
if let Err(e) = pod.try_post_run_extract().await {
|
||||
tracing::warn!(error = %e, "Post-run memory extract error");
|
||||
alerter.alert(
|
||||
AlertLevel::Warn,
|
||||
AlertSource::Pod,
|
||||
format!("post-run memory extract error: {e}"),
|
||||
);
|
||||
}
|
||||
if let Err(e) = pod.try_post_run_consolidate().await {
|
||||
tracing::warn!(error = %e, "Post-run memory consolidate error");
|
||||
alerter.alert(
|
||||
AlertLevel::Warn,
|
||||
AlertSource::Pod,
|
||||
format!("post-run memory consolidate error: {e}"),
|
||||
);
|
||||
}
|
||||
if let Err(e) = pod.try_post_run_compact().await {
|
||||
tracing::warn!(error = %e, "Post-run compaction error");
|
||||
alerter.alert(
|
||||
AlertLevel::Warn,
|
||||
AlertSource::Compactor,
|
||||
format!("post-run compaction error: {e}"),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
async fn finish_controller_run<C, St>(
|
||||
pod: &mut Pod<C, St>,
|
||||
shared_state: &Arc<PodSharedState>,
|
||||
runtime_dir: &RuntimeDir,
|
||||
event_tx: &broadcast::Sender<Event>,
|
||||
alerter: &Alerter,
|
||||
new_status: PodStatus,
|
||||
) where
|
||||
C: LlmClient,
|
||||
St: Store,
|
||||
C: LlmClient + Clone + 'static,
|
||||
St: Store + Clone + 'static,
|
||||
{
|
||||
if new_status == PodStatus::Busy {
|
||||
// Surface the post-run busy window before kicking off the jobs so
|
||||
// TUI / external observers see Busy regardless of whether the
|
||||
// worker turn ended via success or error. Both branches in
|
||||
// `run_with_cancel_support` return `PodStatus::Busy` for this
|
||||
// path; emitting here keeps the two unified.
|
||||
set_controller_status(shared_state, runtime_dir, event_tx, PodStatus::Busy).await;
|
||||
run_post_run_jobs(pod, alerter).await;
|
||||
}
|
||||
|
||||
let items = pod.worker().history().to_vec();
|
||||
shared_state.update_history(items);
|
||||
shared_state.set_user_segments(pod.user_segments().to_vec());
|
||||
let final_status = if new_status == PodStatus::Busy {
|
||||
PodStatus::Idle
|
||||
} else {
|
||||
new_status
|
||||
};
|
||||
set_controller_status(shared_state, runtime_dir, event_tx, final_status).await;
|
||||
set_controller_status(shared_state, runtime_dir, event_tx, new_status).await;
|
||||
let _ = runtime_dir.write_history(shared_state).await;
|
||||
pod.spawn_post_run_memory_jobs();
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
|
@ -154,8 +108,8 @@ impl PodController {
|
|||
runtime_base: &Path,
|
||||
) -> Result<(PodHandle, ShutdownReceiver), std::io::Error>
|
||||
where
|
||||
C: LlmClient + 'static,
|
||||
St: Store + 'static,
|
||||
C: LlmClient + Clone + 'static,
|
||||
St: Store + Clone + 'static,
|
||||
{
|
||||
let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
|
||||
let (method_tx, mut method_rx) = mpsc::channel::<Method>(32);
|
||||
|
|
@ -524,7 +478,6 @@ impl PodController {
|
|||
&shared_state,
|
||||
&runtime_dir,
|
||||
&event_tx,
|
||||
&alerter,
|
||||
new_status,
|
||||
)
|
||||
.await;
|
||||
|
|
@ -542,9 +495,9 @@ impl PodController {
|
|||
pod.push_notify(message);
|
||||
let status = shared_state.get_status();
|
||||
if status != PodStatus::Idle {
|
||||
// RUNNING / Paused / Busy: the buffer push is the
|
||||
// RUNNING / Paused: the buffer push is the
|
||||
// entire operation; an in-flight turn (or the
|
||||
// next Resume/Run after Busy) will drain the buffer
|
||||
// next Resume/Run) will drain the buffer
|
||||
// at its next pre_llm_request.
|
||||
continue;
|
||||
}
|
||||
|
|
@ -576,7 +529,6 @@ impl PodController {
|
|||
&shared_state,
|
||||
&runtime_dir,
|
||||
&event_tx,
|
||||
&alerter,
|
||||
new_status,
|
||||
)
|
||||
.await;
|
||||
|
|
@ -621,7 +573,6 @@ impl PodController {
|
|||
&shared_state,
|
||||
&runtime_dir,
|
||||
&event_tx,
|
||||
&alerter,
|
||||
new_status,
|
||||
)
|
||||
.await;
|
||||
|
|
@ -640,12 +591,12 @@ impl PodController {
|
|||
}
|
||||
|
||||
Method::Pause => {
|
||||
// Already paused or post-run busy → idempotent no-op.
|
||||
// Otherwise the Pod is Idle (Running turns go through
|
||||
// Already paused → idempotent no-op. Otherwise the
|
||||
// Pod is Idle (Running turns go through
|
||||
// `run_with_cancel_support`, not this outer match), so
|
||||
// there is nothing to pause.
|
||||
let status = shared_state.get_status();
|
||||
if !matches!(status, PodStatus::Paused | PodStatus::Busy) {
|
||||
if status != PodStatus::Paused {
|
||||
let _ = event_tx.send(Event::Error {
|
||||
code: ErrorCode::NotRunning,
|
||||
message: "Pod is not running".into(),
|
||||
|
|
@ -714,7 +665,6 @@ impl PodController {
|
|||
&shared_state,
|
||||
&runtime_dir,
|
||||
&event_tx,
|
||||
&alerter,
|
||||
new_status,
|
||||
)
|
||||
.await;
|
||||
|
|
@ -728,6 +678,11 @@ impl PodController {
|
|||
}
|
||||
}
|
||||
|
||||
// Background memory jobs own extract/consolidate workers after a
|
||||
// turn completes. Join them before the controller task exits so
|
||||
// staging writes and consolidation cleanups are not abandoned.
|
||||
pod.wait_for_memory_jobs().await;
|
||||
|
||||
// Report upward that this Pod is stopping before the
|
||||
// controller task exits. Awaited (not fire-and-forget):
|
||||
// after `shutdown_tx.send` the process may exit quickly,
|
||||
|
|
@ -787,9 +742,9 @@ where
|
|||
return match result {
|
||||
Ok(r) => {
|
||||
let (status, run_result) = match r {
|
||||
PodRunResult::Finished => (PodStatus::Busy, RunResult::Finished),
|
||||
PodRunResult::Finished => (PodStatus::Idle, RunResult::Finished),
|
||||
PodRunResult::Paused => (PodStatus::Paused, RunResult::Paused),
|
||||
PodRunResult::LimitReached => (PodStatus::Busy, RunResult::LimitReached),
|
||||
PodRunResult::LimitReached => (PodStatus::Idle, RunResult::LimitReached),
|
||||
};
|
||||
let _ = event_tx.send(Event::RunEnd { result: run_result });
|
||||
if matches!(run_result, RunResult::Finished) {
|
||||
|
|
@ -825,7 +780,7 @@ where
|
|||
message,
|
||||
},
|
||||
);
|
||||
(PodStatus::Busy, shutdown_requested)
|
||||
(PodStatus::Idle, shutdown_requested)
|
||||
}
|
||||
};
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
use std::path::{Path, PathBuf};
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use tokio::sync::Mutex as AsyncMutex;
|
||||
|
||||
use llm_worker::Item;
|
||||
use llm_worker::llm_client::RequestConfig;
|
||||
|
|
@ -35,6 +36,12 @@ use async_trait::async_trait;
|
|||
use llm_worker::interceptor::PreRequestAction;
|
||||
use protocol::{AlertLevel, AlertSource, Event, Segment};
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
struct SessionHead {
|
||||
session_id: SessionId,
|
||||
head_hash: Option<EntryHash>,
|
||||
}
|
||||
|
||||
/// Pre-LLM-request hook that records `history.len()` at send time into a
|
||||
/// shared `UsageTracker`. The on_usage callback later pairs this with the
|
||||
|
|
@ -61,7 +68,7 @@ pub struct Pod<C: LlmClient, St: Store> {
|
|||
worker: Option<Worker<C, Mutable>>,
|
||||
store: St,
|
||||
session_id: SessionId,
|
||||
head_hash: Option<EntryHash>,
|
||||
session_head: Arc<AsyncMutex<SessionHead>>,
|
||||
/// Absolute working directory of the Pod.
|
||||
pwd: PathBuf,
|
||||
/// Shared, atomically-swappable view of the Pod's resolved scope.
|
||||
|
|
@ -172,7 +179,13 @@ pub struct Pod<C: LlmClient, St: Store> {
|
|||
/// run yet on this session — next extract starts from entry 0.
|
||||
/// Restored from `RestoredState.extensions` on `restore`, updated
|
||||
/// after each successful extract via `save_extension`.
|
||||
extract_pointer: Mutex<Option<memory::ExtractPointerPayload>>,
|
||||
extract_pointer: Arc<Mutex<Option<memory::ExtractPointerPayload>>>,
|
||||
/// Phase 1/2 memory job running outside the controller method loop.
|
||||
/// The task owns the extract/consolidate worker execution and is joined
|
||||
/// at shutdown. A single slot is enough: Phase 1/2 implementations loop
|
||||
/// until thresholds fall below their trigger points, and concurrent
|
||||
/// triggers are coalesced by skipping when this handle is still active.
|
||||
memory_task: Option<JoinHandle<()>>,
|
||||
/// Typed user submissions in submit order. K-th entry corresponds to
|
||||
/// the K-th `Item::user_message` in `worker.history()` (modulo seed
|
||||
/// history loaded via `SessionStart.history`, whose original segments
|
||||
|
|
@ -182,6 +195,84 @@ pub struct Pod<C: LlmClient, St: Store> {
|
|||
user_segments: Vec<Vec<Segment>>,
|
||||
}
|
||||
|
||||
impl<C: LlmClient + 'static, St: Store + 'static> Pod<C, St> {
|
||||
pub async fn wait_for_memory_jobs(&mut self) {
|
||||
if let Some(handle) = self.memory_task.take()
|
||||
&& let Err(e) = handle.await
|
||||
{
|
||||
tracing::warn!(error = %e, "Post-run memory task join failed");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<C: LlmClient + Clone + 'static, St: Store + Clone + 'static> Pod<C, St> {
|
||||
fn clone_for_memory_task(&self) -> Self {
|
||||
// The cloned Pod's worker exists only as a snapshot for the memory
|
||||
// task: `run_extract_once` reads `worker.history()`, and the
|
||||
// extract/consolidate workers are built fresh inside their own
|
||||
// methods using `worker.client()` as fallback when no override
|
||||
// model is configured. system_prompt / request_config / cache_key
|
||||
// are unused on this path, so we deliberately skip copying them.
|
||||
let source_worker = self.worker.as_ref().expect("worker present");
|
||||
let mut worker = Worker::new(source_worker.client().clone());
|
||||
worker.set_history(source_worker.history().to_vec());
|
||||
Self {
|
||||
manifest: self.manifest.clone(),
|
||||
worker: Some(worker),
|
||||
store: self.store.clone(),
|
||||
session_id: self.session_id,
|
||||
session_head: self.session_head.clone(),
|
||||
pwd: self.pwd.clone(),
|
||||
scope: self.scope.clone(),
|
||||
hook_builder: HookRegistryBuilder::new(),
|
||||
interceptor_installed: false,
|
||||
compact_state: None,
|
||||
usage_tracker: Arc::new(UsageTracker::new()),
|
||||
metrics_tracker: Arc::new(crate::compact::metrics_tracker::MetricsTracker::new()),
|
||||
usage_history: self.usage_history.clone(),
|
||||
tracker: None,
|
||||
task_store: self.task_store.clone(),
|
||||
system_prompt_template: None,
|
||||
alerter: self.alerter.clone(),
|
||||
event_tx: self.event_tx.clone(),
|
||||
pending_notifies: NotifyBuffer::new(),
|
||||
pending_attachments: Arc::new(Mutex::new(Vec::new())),
|
||||
scope_allocation: None,
|
||||
callback_socket: None,
|
||||
prompts: self.prompts.clone(),
|
||||
workflow_registry: self.workflow_registry.clone(),
|
||||
memory_layout: self.memory_layout.clone(),
|
||||
inject_resident_knowledge: self.inject_resident_knowledge,
|
||||
pending_scope_snapshot: self.pending_scope_snapshot.clone(),
|
||||
extract_in_flight: self.extract_in_flight.clone(),
|
||||
consolidation_in_flight: self.consolidation_in_flight.clone(),
|
||||
extract_pointer: self.extract_pointer.clone(),
|
||||
memory_task: None,
|
||||
user_segments: self.user_segments.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn spawn_post_run_memory_jobs(&mut self) {
|
||||
// Drop a finished prior handle so we can spawn a fresh task.
|
||||
// If the prior task is still running, coalesce by skipping —
|
||||
// Phase 1/2 implementations re-evaluate thresholds on completion.
|
||||
self.cleanup_finished_memory_task();
|
||||
if self.memory_task.is_some() {
|
||||
return;
|
||||
}
|
||||
|
||||
let mut pod = self.clone_for_memory_task();
|
||||
self.memory_task = Some(tokio::spawn(async move {
|
||||
if let Err(e) = pod.try_post_run_extract().await {
|
||||
tracing::warn!(error = %e, "Post-run memory extract task error");
|
||||
}
|
||||
if let Err(e) = pod.try_post_run_consolidate().await {
|
||||
tracing::warn!(error = %e, "Post-run memory consolidate task error");
|
||||
}
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
impl<C: LlmClient, St: Store> Pod<C, St> {
|
||||
/// Create a new Pod from a pre-built Worker and store.
|
||||
///
|
||||
|
|
@ -210,7 +301,10 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
worker: Some(worker),
|
||||
store,
|
||||
session_id,
|
||||
head_hash: None,
|
||||
session_head: Arc::new(AsyncMutex::new(SessionHead {
|
||||
session_id,
|
||||
head_hash: None,
|
||||
})),
|
||||
pwd,
|
||||
scope: SharedScope::new(scope),
|
||||
hook_builder: HookRegistryBuilder::new(),
|
||||
|
|
@ -235,7 +329,8 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
pending_scope_snapshot: Arc::new(Mutex::new(None)),
|
||||
extract_in_flight: Arc::new(AtomicBool::new(false)),
|
||||
consolidation_in_flight: Arc::new(AtomicBool::new(false)),
|
||||
extract_pointer: Mutex::new(None),
|
||||
extract_pointer: Arc::new(Mutex::new(None)),
|
||||
memory_task: None,
|
||||
user_segments: Vec::new(),
|
||||
};
|
||||
pod.apply_prune_from_manifest();
|
||||
|
|
@ -330,7 +425,8 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
/// can restore the narrowed scope instead of reclaiming delegated
|
||||
/// writes.
|
||||
pub async fn persist_scope_snapshot(&mut self) -> Result<(), StoreError> {
|
||||
if self.head_hash.is_none() {
|
||||
let mut head = self.session_head.lock().await;
|
||||
if head.head_hash.is_none() {
|
||||
return Ok(());
|
||||
}
|
||||
let snapshot = {
|
||||
|
|
@ -340,7 +436,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
deny: scope.deny_rules(),
|
||||
}
|
||||
};
|
||||
session_store::save_pod_scope(&self.store, self.session_id, &mut self.head_hash, &snapshot)
|
||||
session_store::save_pod_scope(&self.store, head.session_id, &mut head.head_hash, &snapshot)
|
||||
.await
|
||||
}
|
||||
|
||||
|
|
@ -362,10 +458,11 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
.expect("pending_scope_snapshot poisoned")
|
||||
.take();
|
||||
if let Some(snapshot) = snapshot {
|
||||
let mut head = self.session_head.lock().await;
|
||||
session_store::save_pod_scope(
|
||||
&self.store,
|
||||
self.session_id,
|
||||
&mut self.head_hash,
|
||||
head.session_id,
|
||||
&mut head.head_hash,
|
||||
&snapshot,
|
||||
)
|
||||
.await?;
|
||||
|
|
@ -531,10 +628,11 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
/// (the entry is dropped) and a `Warn` alert + `tracing::warn!` are
|
||||
/// emitted so the failure isn't completely silent.
|
||||
async fn try_record_metric(&mut self, metric: &session_metrics::Metric) {
|
||||
let mut head = self.session_head.lock().await;
|
||||
if let Err(err) = session_metrics::record_metric(
|
||||
&self.store,
|
||||
self.session_id,
|
||||
&mut self.head_hash,
|
||||
head.session_id,
|
||||
&mut head.head_hash,
|
||||
metric,
|
||||
)
|
||||
.await
|
||||
|
|
@ -803,6 +901,52 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
self.run(vec![Segment::text(s)]).await
|
||||
}
|
||||
|
||||
/// Drop the prior memory_task handle if it has finished. Keep it if
|
||||
/// still running so callers can decide whether to wait or coalesce.
|
||||
fn cleanup_finished_memory_task(&mut self) {
|
||||
if self.memory_task.as_ref().is_some_and(|h| h.is_finished()) {
|
||||
self.memory_task = None;
|
||||
}
|
||||
}
|
||||
|
||||
/// Wait for the in-flight memory task (if any) to finish. Used before
|
||||
/// compact rewrites history (extract reads the same history).
|
||||
async fn join_memory_task(&mut self) {
|
||||
if let Some(handle) = self.memory_task.take()
|
||||
&& let Err(e) = handle.await
|
||||
{
|
||||
tracing::warn!(error = %e, "Memory task join failed");
|
||||
}
|
||||
}
|
||||
|
||||
/// Whether `try_pre_run_compact` would actually compact. The same
|
||||
/// check is duplicated inside `try_pre_run_compact` itself for
|
||||
/// defensive reasons; this is the gate for joining the memory task
|
||||
/// before the compact runs.
|
||||
fn should_pre_run_compact(&self) -> bool {
|
||||
self.compact_state.as_ref().is_some_and(|s| {
|
||||
!s.is_disabled()
|
||||
&& !s.just_compacted()
|
||||
&& s.exceeds_post_run(self.total_tokens().tokens)
|
||||
})
|
||||
}
|
||||
|
||||
/// Prelude shared by `run` / `run_for_notification` / `resume`.
|
||||
/// Wires up worker hooks, ensures the session is materialized on the
|
||||
/// store, and runs pre-run compact (joining any in-flight memory task
|
||||
/// first so extract sees a stable history range).
|
||||
async fn prepare_for_run(&mut self) -> Result<(), PodError> {
|
||||
self.ensure_interceptor_installed();
|
||||
self.ensure_system_prompt_materialized()?;
|
||||
self.cleanup_finished_memory_task();
|
||||
self.ensure_session_head().await?;
|
||||
if self.should_pre_run_compact() {
|
||||
self.join_memory_task().await;
|
||||
}
|
||||
self.try_pre_run_compact().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Send user input and run until the LLM turn completes.
|
||||
///
|
||||
/// `input` is a typed segment list (see [`protocol::Segment`]). The
|
||||
|
|
@ -816,20 +960,22 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
/// the Worker is aborted, history is compacted, and execution resumes
|
||||
/// automatically.
|
||||
pub async fn run(&mut self, input: Vec<Segment>) -> Result<PodRunResult, PodError> {
|
||||
self.ensure_interceptor_installed();
|
||||
self.ensure_system_prompt_materialized()?;
|
||||
self.ensure_session_head().await?;
|
||||
self.prepare_for_run().await?;
|
||||
|
||||
// Persist the user input as typed segments before the worker
|
||||
// pushes its flattened copy into history. save_delta deliberately
|
||||
// skips the resulting `is_user_message()` item to avoid double-write.
|
||||
session_store::save_user_input(
|
||||
&self.store,
|
||||
self.session_id,
|
||||
&mut self.head_hash,
|
||||
input.clone(),
|
||||
)
|
||||
.await?;
|
||||
{
|
||||
let mut head = self.session_head.lock().await;
|
||||
self.session_id = head.session_id;
|
||||
session_store::save_user_input(
|
||||
&self.store,
|
||||
head.session_id,
|
||||
&mut head.head_hash,
|
||||
input.clone(),
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
self.user_segments.push(input.clone());
|
||||
|
||||
// Resolve `@<path>` refs and `/<slug>` workflow invocations to
|
||||
|
|
@ -989,9 +1135,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
/// Worker's resume path issues the LLM request without a new
|
||||
/// user turn.
|
||||
pub async fn run_for_notification(&mut self) -> Result<PodRunResult, PodError> {
|
||||
self.ensure_interceptor_installed();
|
||||
self.ensure_system_prompt_materialized()?;
|
||||
self.ensure_session_head().await?;
|
||||
self.prepare_for_run().await?;
|
||||
|
||||
let history_before = self.worker.as_ref().unwrap().history().len();
|
||||
|
||||
|
|
@ -1005,9 +1149,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
|
||||
/// Resume from a paused state.
|
||||
pub async fn resume(&mut self) -> Result<PodRunResult, PodError> {
|
||||
self.ensure_interceptor_installed();
|
||||
self.ensure_system_prompt_materialized()?;
|
||||
self.ensure_session_head().await?;
|
||||
self.prepare_for_run().await?;
|
||||
|
||||
let history_before = self.worker.as_ref().unwrap().history().len();
|
||||
|
||||
|
|
@ -1035,27 +1177,29 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
config: w.request_config(),
|
||||
history: w.history(),
|
||||
};
|
||||
if self.head_hash.is_none() {
|
||||
let mut head = self.session_head.lock().await;
|
||||
if head.head_hash.is_none() {
|
||||
let hash =
|
||||
session_store::create_session_with_id(&self.store, self.session_id, state).await?;
|
||||
self.head_hash = Some(hash);
|
||||
session_store::create_session_with_id(&self.store, head.session_id, state).await?;
|
||||
head.head_hash = Some(hash);
|
||||
drop(head);
|
||||
self.persist_scope_snapshot().await?;
|
||||
return Ok(());
|
||||
}
|
||||
let prev_session_id = self.session_id;
|
||||
session_store::ensure_head_or_fork(
|
||||
&self.store,
|
||||
&mut self.session_id,
|
||||
&mut self.head_hash,
|
||||
state,
|
||||
)
|
||||
.await?;
|
||||
let prev_session_id = head.session_id;
|
||||
let mut session_id = head.session_id;
|
||||
let mut head_hash = head.head_hash.clone();
|
||||
session_store::ensure_head_or_fork(&self.store, &mut session_id, &mut head_hash, state)
|
||||
.await?;
|
||||
head.session_id = session_id;
|
||||
head.head_hash = head_hash;
|
||||
self.session_id = session_id;
|
||||
// ensure_head_or_fork mints a fresh session_id when it auto-
|
||||
// forks. Sync that to pods.json so a concurrent
|
||||
// restore_from_manifest can't see "no live writer" for the new
|
||||
// session and grab it.
|
||||
if self.session_id != prev_session_id && self.scope_allocation.is_some() {
|
||||
pod_registry::update_session(&self.manifest.pod.name, self.session_id)?;
|
||||
if session_id != prev_session_id && self.scope_allocation.is_some() {
|
||||
pod_registry::update_session(&self.manifest.pod.name, session_id)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
|
@ -1142,17 +1286,21 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
})
|
||||
}
|
||||
|
||||
/// Attempt proactive compaction (called by Controller after run).
|
||||
/// Attempt proactive compaction at the beginning of a controller Run.
|
||||
///
|
||||
/// Best-effort: failures are logged but do not propagate.
|
||||
pub async fn try_post_run_compact(&mut self) -> Result<(), PodError> {
|
||||
/// This used to run in the controller's post-run path. Keeping it here
|
||||
/// preserves the ordering requirement that the next turn starts with a
|
||||
/// compacted history, without introducing a separate Busy controller state.
|
||||
/// Best-effort: failures are logged and surfaced, but do not abort the
|
||||
/// user turn that triggered the check.
|
||||
pub async fn try_pre_run_compact(&mut self) {
|
||||
let state = match self.compact_state.as_ref() {
|
||||
Some(s) if !s.is_disabled() && !s.just_compacted() => s.clone(),
|
||||
_ => return Ok(()),
|
||||
_ => return,
|
||||
};
|
||||
let current_tokens = self.total_tokens().tokens;
|
||||
if !state.exceeds_post_run(current_tokens) {
|
||||
return Ok(());
|
||||
return;
|
||||
}
|
||||
|
||||
let retained = state.retained_tokens();
|
||||
|
|
@ -1161,24 +1309,22 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
Ok(new_session_id) => {
|
||||
info!(
|
||||
new_session_id = %new_session_id,
|
||||
"Proactive post-run compaction succeeded"
|
||||
"Proactive pre-run compaction succeeded"
|
||||
);
|
||||
self.send_event(Event::CompactDone { new_session_id });
|
||||
state.record_compact_success();
|
||||
Ok(())
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(error = %e, "Proactive post-run compaction failed");
|
||||
warn!(error = %e, "Proactive pre-run compaction failed");
|
||||
self.send_event(Event::CompactFailed {
|
||||
error: e.to_string(),
|
||||
});
|
||||
self.alert(
|
||||
AlertLevel::Warn,
|
||||
AlertSource::Compactor,
|
||||
format!("post-run compaction failed: {e}"),
|
||||
format!("pre-run compaction failed: {e}"),
|
||||
);
|
||||
state.record_compact_failure();
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1193,19 +1339,24 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
// head_hash mutable).
|
||||
let w = self.worker.as_ref().unwrap();
|
||||
let new_items = &w.history()[history_before..];
|
||||
session_store::save_delta(&self.store, self.session_id, &mut self.head_hash, new_items)
|
||||
let mut head = self.session_head.lock().await;
|
||||
self.session_id = head.session_id;
|
||||
session_store::save_delta(&self.store, head.session_id, &mut head.head_hash, new_items)
|
||||
.await?;
|
||||
|
||||
drop(head);
|
||||
self.flush_pending_scope_snapshot().await?;
|
||||
|
||||
let turn_count = self.worker.as_ref().unwrap().turn_count();
|
||||
let mut head = self.session_head.lock().await;
|
||||
session_store::save_turn_end(
|
||||
&self.store,
|
||||
self.session_id,
|
||||
&mut self.head_hash,
|
||||
head.session_id,
|
||||
&mut head.head_hash,
|
||||
turn_count,
|
||||
)
|
||||
.await?;
|
||||
drop(head);
|
||||
|
||||
// Flush any sync-buffered metrics from this run first
|
||||
// (currently `prune.fire` / `prune.skip` from the prune observer).
|
||||
|
|
@ -1238,10 +1389,11 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
record,
|
||||
correlation_id,
|
||||
} = recorded;
|
||||
let mut head = self.session_head.lock().await;
|
||||
session_store::save_usage(
|
||||
&self.store,
|
||||
self.session_id,
|
||||
&mut self.head_hash,
|
||||
head.session_id,
|
||||
&mut head.head_hash,
|
||||
record.history_len,
|
||||
record.input_total_tokens,
|
||||
record.cache_read_tokens,
|
||||
|
|
@ -1249,6 +1401,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
record.output_tokens,
|
||||
)
|
||||
.await?;
|
||||
drop(head);
|
||||
if let Some(id) = correlation_id {
|
||||
let metric = session_metrics::Metric::now("prune.post_request")
|
||||
.with_correlation_id(&id)
|
||||
|
|
@ -1266,20 +1419,22 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
let interrupted = self.worker.as_ref().unwrap().last_run_interrupted();
|
||||
match result {
|
||||
Ok(r) => {
|
||||
let mut head = self.session_head.lock().await;
|
||||
session_store::save_run_completed(
|
||||
&self.store,
|
||||
self.session_id,
|
||||
&mut self.head_hash,
|
||||
head.session_id,
|
||||
&mut head.head_hash,
|
||||
r.clone(),
|
||||
interrupted,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
Err(e) => {
|
||||
let mut head = self.session_head.lock().await;
|
||||
session_store::save_run_errored(
|
||||
&self.store,
|
||||
self.session_id,
|
||||
&mut self.head_hash,
|
||||
head.session_id,
|
||||
&mut head.head_hash,
|
||||
e.to_string(),
|
||||
interrupted,
|
||||
)
|
||||
|
|
@ -1511,8 +1666,9 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
));
|
||||
|
||||
// Persist as a new compacted session.
|
||||
let old_session_id = self.session_id;
|
||||
let old_head_hash = self
|
||||
let mut head = self.session_head.lock().await;
|
||||
let old_session_id = head.session_id;
|
||||
let old_head_hash = head
|
||||
.head_hash
|
||||
.clone()
|
||||
.expect("head_hash should be set after at least one entry");
|
||||
|
|
@ -1535,7 +1691,8 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
// session — the new compacted session starts with no measurements
|
||||
// until its first LLM call.
|
||||
self.session_id = new_session_id;
|
||||
self.head_hash = Some(new_head_hash);
|
||||
head.session_id = new_session_id;
|
||||
head.head_hash = Some(new_head_hash);
|
||||
// Keep pods.json pointing at the live session_id. Without this
|
||||
// a concurrent `restore_from_manifest(new_session_id)` would
|
||||
// see no live writer and grab the session this Pod just moved
|
||||
|
|
@ -1545,6 +1702,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
if self.scope_allocation.is_some() {
|
||||
pod_registry::update_session(&self.manifest.pod.name, new_session_id)?;
|
||||
}
|
||||
drop(head);
|
||||
// Align user_segments with the post-compaction history. Items
|
||||
// before `retain_from` (now folded into the summary) lose their
|
||||
// segments; only the user_messages surviving in retained_items
|
||||
|
|
@ -1641,10 +1799,10 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
|
||||
/// Phase 1 (memory.extract) post-run trigger.
|
||||
///
|
||||
/// Called by the Controller **before** [`try_post_run_compact`] so
|
||||
/// the extract worker sees a stable session-log entry range
|
||||
/// (compact rewrites history). Best-effort: failures are logged but
|
||||
/// not propagated.
|
||||
/// Called by the Controller before spawning the background memory task so
|
||||
/// the extract worker sees a stable session-log entry range while compact
|
||||
/// is deferred until the next turn starts. Best-effort: failures are
|
||||
/// logged but not propagated.
|
||||
///
|
||||
/// Behaviour follows `docs/plan/memory.md` §Phase 1 並走防止:
|
||||
/// in-flight 中の trigger は skip し、完了時点で閾値再評価する
|
||||
|
|
@ -1798,11 +1956,12 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
extract::ExtractedPayload::default()
|
||||
});
|
||||
|
||||
let source_session_id = self.session_head.lock().await.session_id;
|
||||
let staging_id = if payload.is_empty() {
|
||||
String::new()
|
||||
} else {
|
||||
let source = memory::schema::SourceRef {
|
||||
session_id: self.session_id.to_string(),
|
||||
session_id: source_session_id.to_string(),
|
||||
range: [start_entry as u64, end_entry as u64],
|
||||
};
|
||||
let (id, _) = extract::write_staging(&layout, source, payload)
|
||||
|
|
@ -1817,14 +1976,18 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
};
|
||||
let payload_value = serde_json::to_value(&pointer_payload)
|
||||
.expect("ExtractPointerPayload is always JSON-serializable");
|
||||
session_store::save_extension(
|
||||
&self.store,
|
||||
self.session_id,
|
||||
&mut self.head_hash,
|
||||
extract::EXTRACT_DOMAIN,
|
||||
payload_value,
|
||||
)
|
||||
.await?;
|
||||
{
|
||||
let mut head = self.session_head.lock().await;
|
||||
session_store::save_extension(
|
||||
&self.store,
|
||||
head.session_id,
|
||||
&mut head.head_hash,
|
||||
extract::EXTRACT_DOMAIN,
|
||||
payload_value,
|
||||
)
|
||||
.await?;
|
||||
self.session_id = head.session_id;
|
||||
}
|
||||
|
||||
*self
|
||||
.extract_pointer
|
||||
|
|
@ -1850,12 +2013,11 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
Ok(worker.client().clone_boxed())
|
||||
}
|
||||
|
||||
/// Phase 2 (memory.consolidation) post-run trigger.
|
||||
/// Phase 2 (memory.consolidation) trigger.
|
||||
///
|
||||
/// Called by the Controller **after** [`try_post_run_extract`] and
|
||||
/// **before** [`try_post_run_compact`]: extract feeds staging, compact
|
||||
/// rewrites history. Phase 2 must consume staging before compact
|
||||
/// reshapes the session.
|
||||
/// Intended to run from a background memory task after Phase 1 may have
|
||||
/// added staging entries. Compact is deferred until the next turn starts,
|
||||
/// so consolidation no longer blocks the controller's post-run path.
|
||||
///
|
||||
/// Behaviour follows `docs/plan/memory.md` §Phase 2 / §並走防止:
|
||||
/// the staging-side `StagingLock` enforces cross-process exclusion;
|
||||
|
|
@ -2096,7 +2258,10 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
|
|||
worker: Some(worker),
|
||||
store,
|
||||
session_id,
|
||||
head_hash: None,
|
||||
session_head: Arc::new(AsyncMutex::new(SessionHead {
|
||||
session_id,
|
||||
head_hash: None,
|
||||
})),
|
||||
pwd: common.pwd,
|
||||
scope: SharedScope::new(common.scope),
|
||||
hook_builder: HookRegistryBuilder::new(),
|
||||
|
|
@ -2121,7 +2286,8 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
|
|||
pending_scope_snapshot: Arc::new(Mutex::new(None)),
|
||||
extract_in_flight: Arc::new(AtomicBool::new(false)),
|
||||
consolidation_in_flight: Arc::new(AtomicBool::new(false)),
|
||||
extract_pointer: Mutex::new(None),
|
||||
extract_pointer: Arc::new(Mutex::new(None)),
|
||||
memory_task: None,
|
||||
user_segments: Vec::new(),
|
||||
};
|
||||
pod.apply_prune_from_manifest();
|
||||
|
|
@ -2160,7 +2326,10 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
|
|||
worker: Some(worker),
|
||||
store,
|
||||
session_id,
|
||||
head_hash: None,
|
||||
session_head: Arc::new(AsyncMutex::new(SessionHead {
|
||||
session_id,
|
||||
head_hash: None,
|
||||
})),
|
||||
pwd: common.pwd,
|
||||
scope: SharedScope::new(common.scope),
|
||||
hook_builder: HookRegistryBuilder::new(),
|
||||
|
|
@ -2185,7 +2354,8 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
|
|||
pending_scope_snapshot: Arc::new(Mutex::new(None)),
|
||||
extract_in_flight: Arc::new(AtomicBool::new(false)),
|
||||
consolidation_in_flight: Arc::new(AtomicBool::new(false)),
|
||||
extract_pointer: Mutex::new(None),
|
||||
extract_pointer: Arc::new(Mutex::new(None)),
|
||||
memory_task: None,
|
||||
user_segments: Vec::new(),
|
||||
};
|
||||
pod.apply_prune_from_manifest();
|
||||
|
|
@ -2288,7 +2458,10 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
|
|||
worker: Some(worker),
|
||||
store,
|
||||
session_id,
|
||||
head_hash: state.head_hash,
|
||||
session_head: Arc::new(AsyncMutex::new(SessionHead {
|
||||
session_id,
|
||||
head_hash: state.head_hash,
|
||||
})),
|
||||
pwd: common.pwd,
|
||||
scope: SharedScope::new(common.scope),
|
||||
hook_builder: HookRegistryBuilder::new(),
|
||||
|
|
@ -2315,7 +2488,8 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
|
|||
pending_scope_snapshot: Arc::new(Mutex::new(None)),
|
||||
extract_in_flight: Arc::new(AtomicBool::new(false)),
|
||||
consolidation_in_flight: Arc::new(AtomicBool::new(false)),
|
||||
extract_pointer: Mutex::new(extract_pointer),
|
||||
extract_pointer: Arc::new(Mutex::new(extract_pointer)),
|
||||
memory_task: None,
|
||||
user_segments: state.user_segments,
|
||||
};
|
||||
pod.apply_prune_from_manifest();
|
||||
|
|
|
|||
|
|
@ -1,8 +1,8 @@
|
|||
//! Compact lifecycle `Event` broadcasting.
|
||||
//!
|
||||
//! Covers three paths:
|
||||
//! - `try_post_run_compact` success → `CompactStart + CompactDone`
|
||||
//! - `try_post_run_compact` failure → `CompactStart + CompactFailed`
|
||||
//! - `try_pre_run_compact` success → `CompactStart + CompactDone`
|
||||
//! - `try_pre_run_compact` failure → `CompactStart + CompactFailed`
|
||||
//! - mid-turn `do_compact_and_resume` success → `CompactStart + CompactDone`
|
||||
//! (driven by `compact_request_threshold` → `PreRequestAction::Yield`)
|
||||
|
||||
|
|
@ -96,7 +96,7 @@ fn write_summary_tool_use_events(call_id: &str, text: &str) -> Vec<LlmEvent> {
|
|||
]
|
||||
}
|
||||
|
||||
// A low compact_threshold guarantees `try_post_run_compact` will fire
|
||||
// A low compact_threshold guarantees `try_pre_run_compact` will fire
|
||||
// the first time we check after a run.
|
||||
const POST_RUN_MANIFEST_TOML: &str = r#"
|
||||
[pod]
|
||||
|
|
@ -228,7 +228,7 @@ async fn compact_broadcasts_only_new_system_messages_not_retained_ones() {
|
|||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn post_run_compact_success_broadcasts_start_and_done() {
|
||||
async fn pre_run_compact_success_broadcasts_start_and_done() {
|
||||
// Responses: (1) first run returns short text, (2) compact worker
|
||||
// emits write_summary then closes (two LLM calls inside the compact
|
||||
// worker: one for write_summary, one that the compact loop consumes
|
||||
|
|
@ -247,7 +247,7 @@ async fn post_run_compact_success_broadcasts_start_and_done() {
|
|||
// Drain run events so only compact events remain in `rx`.
|
||||
let _ = drain(&mut rx);
|
||||
|
||||
pod.try_post_run_compact().await.unwrap();
|
||||
pod.try_pre_run_compact().await;
|
||||
|
||||
let events = drain(&mut rx);
|
||||
let kinds: Vec<&str> = events
|
||||
|
|
@ -412,7 +412,7 @@ async fn compact_resets_extract_pointer_so_phase1_can_fire_again() {
|
|||
|
||||
// Compact runs. Without the fix the in-memory pointer would still
|
||||
// reference the old session's history_len.
|
||||
pod.try_post_run_compact().await.unwrap();
|
||||
pod.try_pre_run_compact().await;
|
||||
assert!(
|
||||
pod.extract_pointer().is_none(),
|
||||
"extract_pointer must be reset to None after compact (matches cold-restore on the new session)"
|
||||
|
|
@ -463,7 +463,7 @@ async fn extract_threshold_zero_is_disabled() {
|
|||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn post_run_compact_failure_broadcasts_start_and_failed() {
|
||||
async fn pre_run_compact_failure_broadcasts_start_and_failed() {
|
||||
// Only the first run has a response. Compaction will run the
|
||||
// compact worker which immediately exhausts the mock → failure.
|
||||
let client = MockClient::new(vec![single_text_events("hi")]);
|
||||
|
|
@ -476,7 +476,7 @@ async fn post_run_compact_failure_broadcasts_start_and_failed() {
|
|||
let _ = drain(&mut rx);
|
||||
|
||||
// Best-effort: returns Ok(()) even on failure, but emits CompactFailed.
|
||||
pod.try_post_run_compact().await.unwrap();
|
||||
pod.try_pre_run_compact().await;
|
||||
|
||||
let events = drain(&mut rx);
|
||||
let kinds: Vec<&str> = events
|
||||
|
|
@ -497,3 +497,85 @@ async fn post_run_compact_failure_broadcasts_start_and_failed() {
|
|||
"unexpected CompactDone in {kinds:?}"
|
||||
);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Detached post-run memory jobs (`spawn_post_run_memory_jobs` /
|
||||
// `wait_for_memory_jobs`). Covers the detach round-trip and the structural
|
||||
// invariant that the cloned memory-task Pod shares `SessionHead` with the
|
||||
// source Pod, so that `save_extension` from the background extract does not
|
||||
// leave the next turn's `save_user_input` looking at a stale head_hash.
|
||||
|
||||
const EXTRACT_NO_COMPACT_MANIFEST: &str = r#"
|
||||
[pod]
|
||||
name = "test-pod"
|
||||
pwd = "./"
|
||||
|
||||
[model]
|
||||
scheme = "anthropic"
|
||||
model_id = "test-model"
|
||||
|
||||
[worker]
|
||||
max_tokens = 100
|
||||
|
||||
[memory]
|
||||
extract_threshold = 1
|
||||
|
||||
[[scope.allow]]
|
||||
target = "./"
|
||||
permission = "write"
|
||||
"#;
|
||||
|
||||
#[tokio::test]
|
||||
async fn spawn_and_wait_drives_extract_to_completion() {
|
||||
let client = MockClient::new(vec![
|
||||
text_events_with_usage("hi", 1000),
|
||||
write_extracted_tool_use_events("ec1"),
|
||||
single_text_events("done"),
|
||||
]);
|
||||
let mut pod = make_pod_with_manifest(EXTRACT_NO_COMPACT_MANIFEST, client).await;
|
||||
|
||||
pod.run_text("first").await.unwrap();
|
||||
assert!(
|
||||
pod.extract_pointer().is_none(),
|
||||
"extract has not run yet — pointer must be None"
|
||||
);
|
||||
|
||||
pod.spawn_post_run_memory_jobs();
|
||||
pod.wait_for_memory_jobs().await;
|
||||
|
||||
assert!(
|
||||
pod.extract_pointer().is_some(),
|
||||
"spawn + wait must complete extract; pointer should be set"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn detached_extract_does_not_fork_session_log() {
|
||||
// Source pod and the cloned memory-task pod share `SessionHead` via
|
||||
// `Arc<AsyncMutex<_>>`. The detached extract advances head_hash through
|
||||
// `save_extension`; the next `run` must see that same head_hash so
|
||||
// `ensure_head_or_fork` does not spawn a new session.
|
||||
let client = MockClient::new(vec![
|
||||
text_events_with_usage("hi", 1000),
|
||||
write_extracted_tool_use_events("ec1"),
|
||||
single_text_events("done"),
|
||||
text_events_with_usage("ok", 1000),
|
||||
]);
|
||||
let mut pod = make_pod_with_manifest(EXTRACT_NO_COMPACT_MANIFEST, client).await;
|
||||
|
||||
pod.run_text("first").await.unwrap();
|
||||
let session_before = pod.session_id();
|
||||
|
||||
pod.spawn_post_run_memory_jobs();
|
||||
pod.wait_for_memory_jobs().await;
|
||||
|
||||
pod.run_text("second").await.unwrap();
|
||||
let session_after = pod.session_id();
|
||||
|
||||
assert_eq!(
|
||||
session_before, session_after,
|
||||
"detached extract's save_extension and the next turn's save_user_input \
|
||||
must share head_hash through SessionHead — a fork here means the clone \
|
||||
carried its own head_hash"
|
||||
);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -173,7 +173,7 @@ async fn wait_for_status(handle: &PodHandle, status: PodStatus) {
|
|||
// ---------------------------------------------------------------------------
|
||||
|
||||
#[tokio::test]
|
||||
async fn run_end_enters_busy_until_post_run_finishes_and_broadcasts_status() {
|
||||
async fn run_end_returns_to_idle_without_busy_status() {
|
||||
let client = MockClient::new(simple_text_events());
|
||||
let pod = make_pod(client).await;
|
||||
let handle = spawn_controller(pod).await;
|
||||
|
|
@ -182,7 +182,7 @@ async fn run_end_enters_busy_until_post_run_finishes_and_broadcasts_status() {
|
|||
handle.send(Method::run_text("Hello")).await.unwrap();
|
||||
|
||||
let mut saw_run_end = false;
|
||||
let mut saw_busy_status = false;
|
||||
let mut saw_idle_status = false;
|
||||
let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(2);
|
||||
loop {
|
||||
tokio::select! {
|
||||
|
|
@ -191,10 +191,8 @@ async fn run_end_enters_busy_until_post_run_finishes_and_broadcasts_status() {
|
|||
Ok(Event::RunEnd { result: protocol::RunResult::Finished }) => {
|
||||
saw_run_end = true;
|
||||
}
|
||||
Ok(Event::Status {
|
||||
status: PodStatus::Busy,
|
||||
}) if saw_run_end => {
|
||||
saw_busy_status = true;
|
||||
Ok(Event::Status { status: PodStatus::Idle }) if saw_run_end => {
|
||||
saw_idle_status = true;
|
||||
break;
|
||||
}
|
||||
Ok(_) => {}
|
||||
|
|
@ -207,10 +205,10 @@ async fn run_end_enters_busy_until_post_run_finishes_and_broadcasts_status() {
|
|||
|
||||
assert!(saw_run_end, "expected RunEnd::Finished");
|
||||
assert!(
|
||||
saw_busy_status,
|
||||
"expected busy status immediately after RunEnd"
|
||||
saw_idle_status,
|
||||
"expected idle status immediately after RunEnd"
|
||||
);
|
||||
wait_for_status(&handle, PodStatus::Idle).await;
|
||||
assert_eq!(handle.shared_state.get_status(), PodStatus::Idle);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
|
@ -237,51 +235,6 @@ async fn attach_history_includes_current_status() {
|
|||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn pause_while_busy_is_idempotent_not_not_running() {
|
||||
let client = MockClient::new(simple_text_events());
|
||||
let pod = make_pod(client).await;
|
||||
let handle = spawn_controller(pod).await;
|
||||
let mut rx = handle.subscribe();
|
||||
|
||||
handle.send(Method::run_text("Hello")).await.unwrap();
|
||||
|
||||
let mut saw_busy = false;
|
||||
let mut saw_idle = false;
|
||||
let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(2);
|
||||
loop {
|
||||
tokio::select! {
|
||||
event = rx.recv() => {
|
||||
match event {
|
||||
Ok(Event::RunEnd { .. }) => {
|
||||
handle.send(Method::Pause).await.unwrap();
|
||||
}
|
||||
Ok(Event::Status { status: PodStatus::Busy }) => {
|
||||
saw_busy = true;
|
||||
}
|
||||
Ok(Event::Status { status: PodStatus::Idle }) if saw_busy => {
|
||||
saw_idle = true;
|
||||
break;
|
||||
}
|
||||
Ok(Event::Error {
|
||||
code: protocol::ErrorCode::NotRunning,
|
||||
..
|
||||
}) if saw_busy && !saw_idle => {
|
||||
panic!("Pause while Busy should be an idempotent no-op");
|
||||
}
|
||||
Ok(_) => {}
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
_ = tokio::time::sleep_until(deadline) => break,
|
||||
}
|
||||
}
|
||||
|
||||
assert!(saw_busy, "expected Busy status");
|
||||
assert!(saw_idle, "expected final Idle status");
|
||||
assert_eq!(handle.shared_state.get_status(), PodStatus::Idle);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn shared_state_starts_idle() {
|
||||
let client = MockClient::new(simple_text_events());
|
||||
|
|
|
|||
|
|
@ -440,10 +440,6 @@ pub enum PodStatus {
|
|||
Idle,
|
||||
Running,
|
||||
Paused,
|
||||
/// The worker turn has ended, but the controller is still performing
|
||||
/// post-run jobs (memory extract / consolidate / compact) and cannot
|
||||
/// accept a new turn immediately.
|
||||
Busy,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||
|
|
@ -763,18 +759,18 @@ mod tests {
|
|||
#[test]
|
||||
fn event_status_format() {
|
||||
let event = Event::Status {
|
||||
status: PodStatus::Busy,
|
||||
status: PodStatus::Running,
|
||||
};
|
||||
let json = serde_json::to_string(&event).unwrap();
|
||||
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
|
||||
assert_eq!(parsed["event"], "status");
|
||||
assert_eq!(parsed["data"]["status"], "busy");
|
||||
assert_eq!(parsed["data"]["status"], "running");
|
||||
|
||||
let decoded: Event = serde_json::from_str(&json).unwrap();
|
||||
assert!(matches!(
|
||||
decoded,
|
||||
Event::Status {
|
||||
status: PodStatus::Busy
|
||||
status: PodStatus::Running
|
||||
}
|
||||
));
|
||||
}
|
||||
|
|
|
|||
|
|
@ -546,7 +546,7 @@ mod tests {
|
|||
assert_eq!(tasks[1].status, TaskStatus::Completed);
|
||||
}
|
||||
|
||||
/// Wrap snapshot text the way `Pod::try_post_run_compact` does, so tests
|
||||
/// Wrap snapshot text the way `Pod::try_pre_run_compact` does, so tests
|
||||
/// exercise the exact format that goes through the session log.
|
||||
fn wrap_snapshot_system_message(snapshot: &str) -> String {
|
||||
format!(
|
||||
|
|
@ -655,7 +655,7 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn synthetic_compact_tasklist_pair_is_well_formed() {
|
||||
// Mirrors `Pod::try_post_run_compact`'s synthetic insertion:
|
||||
// Mirrors `Pod::try_pre_run_compact`'s synthetic insertion:
|
||||
// a system snapshot message followed by a TaskList tool_call/tool_result
|
||||
// pair sharing the `compact-tasklist` id. Verify the structural
|
||||
// contract every provider request builder relies on (matched call_id,
|
||||
|
|
|
|||
|
|
@ -49,9 +49,6 @@ pub struct App {
|
|||
pub running: bool,
|
||||
/// True while the Pod is in `PodStatus::Paused`.
|
||||
pub paused: bool,
|
||||
/// True after worker `RunEnd` while controller post-run work is still
|
||||
/// blocking the next method.
|
||||
pub busy: bool,
|
||||
pub run_requests: usize,
|
||||
/// Sum of `input_tokens - cache_read_input_tokens` across the
|
||||
/// current turn's LLM requests — i.e. the net tokens this turn
|
||||
|
|
@ -89,7 +86,6 @@ impl App {
|
|||
pod_status: PodStatus::Idle,
|
||||
running: false,
|
||||
paused: false,
|
||||
busy: false,
|
||||
run_requests: 0,
|
||||
run_upload_tokens: 0,
|
||||
run_output_tokens: 0,
|
||||
|
|
@ -111,8 +107,7 @@ impl App {
|
|||
self.pod_status = status;
|
||||
self.running = status == PodStatus::Running;
|
||||
self.paused = status == PodStatus::Paused;
|
||||
self.busy = status == PodStatus::Busy;
|
||||
if self.running || self.busy {
|
||||
if self.running {
|
||||
self.quit_confirm = None;
|
||||
}
|
||||
}
|
||||
|
|
@ -296,10 +291,6 @@ impl App {
|
|||
}
|
||||
|
||||
pub fn submit_input(&mut self) -> Option<Method> {
|
||||
if self.busy {
|
||||
self.push_error("Pod is finishing post-run work; wait for idle before submitting.");
|
||||
return None;
|
||||
}
|
||||
let segments = self.input.submit_segments();
|
||||
if segments_are_blank(&segments) {
|
||||
// Empty Enter only does something meaningful when the Pod
|
||||
|
|
@ -640,7 +631,7 @@ impl App {
|
|||
});
|
||||
self.set_pod_status(match result {
|
||||
RunResult::Paused => PodStatus::Paused,
|
||||
RunResult::Finished | RunResult::LimitReached => PodStatus::Busy,
|
||||
RunResult::Finished | RunResult::LimitReached => PodStatus::Idle,
|
||||
});
|
||||
self.run_requests = 0;
|
||||
self.run_upload_tokens = 0;
|
||||
|
|
|
|||
|
|
@ -435,10 +435,6 @@ fn handle_key(app: &mut App, key: KeyEvent) -> Option<Method> {
|
|||
KeyCode::Char('x') if ctrl => Some(match app.pod_status {
|
||||
PodStatus::Running => Some(Method::Cancel),
|
||||
PodStatus::Paused | PodStatus::Idle => Some(Method::Shutdown),
|
||||
PodStatus::Busy => {
|
||||
app.push_error("Pod is finishing post-run work; wait for idle or press Ctrl-C twice to exit the TUI.");
|
||||
None
|
||||
}
|
||||
}),
|
||||
KeyCode::Char('d') if ctrl => {
|
||||
app.quit = true;
|
||||
|
|
@ -577,7 +573,7 @@ fn handle_key(app: &mut App, key: KeyEvent) -> Option<Method> {
|
|||
const CONFIRM_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(3);
|
||||
|
||||
/// Running → send `Method::Pause`.
|
||||
/// Idle / Paused / Busy → 2-tap to quit the TUI (the Pod keeps running).
|
||||
/// Idle / Paused → 2-tap to quit the TUI (the Pod keeps running).
|
||||
fn handle_pause_or_quit(app: &mut App) -> Option<Method> {
|
||||
if app.pod_status == PodStatus::Running {
|
||||
return Some(Method::Pause);
|
||||
|
|
|
|||
|
|
@ -877,18 +877,6 @@ fn draw_status(frame: &mut Frame, app: &App, area: Rect) {
|
|||
" — Enter to resume, type to start new turn",
|
||||
Style::default().fg(Color::DarkGray),
|
||||
));
|
||||
} else if app.busy {
|
||||
spans.push(Span::raw(" | "));
|
||||
spans.push(Span::styled(
|
||||
"busy",
|
||||
Style::default()
|
||||
.fg(Color::Yellow)
|
||||
.add_modifier(Modifier::BOLD),
|
||||
));
|
||||
spans.push(Span::styled(
|
||||
" — finishing post-run work",
|
||||
Style::default().fg(Color::DarkGray),
|
||||
));
|
||||
} else {
|
||||
spans.push(Span::styled(" idle", Style::default().fg(Color::DarkGray)));
|
||||
}
|
||||
|
|
|
|||
|
|
@ -23,8 +23,8 @@ Pod::handle_worker_result
|
|||
→ persist_turn(旧セッションに記録)
|
||||
→ compact() → resume()
|
||||
|
||||
[ターンの合間 — Pod::run 完了後]
|
||||
Controller::try_post_run_compact ← proactive
|
||||
[ターンの合間 — 次の Pod::run 冒頭]
|
||||
Pod::try_pre_run_compact ← proactive
|
||||
→ input_tokens > post_run_threshold なら compact() (best-effort)
|
||||
```
|
||||
|
||||
|
|
@ -66,7 +66,7 @@ pub struct ToolOutput {
|
|||
|
||||
### トリガー(2段階の閾値)
|
||||
|
||||
1. **ターンの合間 (Controller)**: `try_post_run_compact()` で `input_tokens > post_run_threshold` → best-effort
|
||||
1. **ターンの合間 (次 turn 冒頭)**: `try_pre_run_compact()` で `input_tokens > post_run_threshold` → best-effort
|
||||
2. **リクエストの合間 (CompactInterceptor)**: `pre_llm_request` で `input_tokens > request_threshold` → `PreRequestAction::Yield`
|
||||
|
||||
**ターンの合間が proactive (小さい閾値)**:
|
||||
|
|
|
|||
|
|
@ -66,3 +66,9 @@
|
|||
- `crates/pod/src/controller.rs` `run_post_run_jobs` / `finish_controller_run`
|
||||
- `crates/pod/src/pod.rs` `try_post_run_extract` / `try_post_run_consolidate` / `try_post_run_compact`
|
||||
- `crates/protocol/src/lib.rs` `PodStatus` 定義
|
||||
|
||||
## Review
|
||||
|
||||
- 状態: Approve
|
||||
- レビュー詳細: [./pod-post-run-detach.review.md](./pod-post-run-detach.review.md)
|
||||
- 日付: 2026-05-04 (Round 3)
|
||||
|
|
|
|||
132
tickets/pod-post-run-detach.review.md
Normal file
132
tickets/pod-post-run-detach.review.md
Normal file
|
|
@ -0,0 +1,132 @@
|
|||
# Review: Pod: memory の post-run 同期実行を解消し、Busy 状態を撤廃する
|
||||
|
||||
## 前提・要件の確認
|
||||
|
||||
- **Phase 1 (extract) の detach**: post-run から detached spawn に移行。`crates/pod/src/pod.rs:236` `spawn_post_run_memory_jobs` で `tokio::spawn` 経由に切り替わり、`extract_in_flight` の CAS は `try_post_run_extract` 内 (`crates/pod/src/pod.rs:1758`) でそのまま維持されている。
|
||||
- **Phase 2 (consolidate) の独立化**: controller の post-run チェーンから外れ、spawn したタスク内で extract 完了直後に呼ばれる (`crates/pod/src/pod.rs:253`)。Phase 1 直後の連鎖実行は「staging 変化を契機とした独立タスク」の最小実装として doc 要件と整合する。
|
||||
- **compact の post-run 切り離し**: 旧 `try_post_run_compact` が `try_pre_run_compact` にリネームされ、`run` / `resume` / `run_for_notification` 冒頭の `ensure_session_head` 直後で呼ばれる (`crates/pod/src/pod.rs:906, 1080, 1097`)。doc (`docs/compaction.md`) も同期して更新済み。
|
||||
- **`PodStatus::Busy` 撤廃**: `crates/protocol/src/lib.rs:440` で enum から削除。controller / TUI / tests から Busy 経路が削除されている。
|
||||
- **shutdown 時の wait**: `pod.wait_for_memory_jobs()` がメインループ break 後に呼ばれ (`crates/pod/src/controller.rs:684`)、in-flight な extract / consolidate を join する。compact は inline 実行のため、turn 完了で必ず終わっており別途待つ必要なし。
|
||||
|
||||
未達の要件:
|
||||
- **compact 開始時に in-flight extract を待つ同期**(ticket 要件「compact を次ターン冒頭に移すなら、その冒頭で『現在 in-flight な extract があれば完了を待つ』一段の同期を挟む」)が `try_pre_run_compact` に組み込まれていない。`crates/pod/src/pod.rs:1239` を見ると compact_state チェック直後に `compact()` を呼ぶだけで、`extract_in_flight` も `memory_task` join もない。
|
||||
|
||||
## アーキテクチャ・スコープ
|
||||
|
||||
- レイヤ境界・クレート命名・依存追加方法はチケット範囲内で逸脱なし。
|
||||
- `crates/llm-worker/src/llm_client/client.rs` への `impl Clone for Box<dyn LlmClient>` 追加は、Pod を `clone_for_memory_task` で複製するために必要となった補助。スコープ境界としては「LLM 抽象に Clone を許す」付随変更で、低レベル基盤の責務範囲内であり許容範囲。
|
||||
- `crates/tools/src/task.rs` は doc コメントの API 名変更追従のみ。
|
||||
- `Pod::clone_for_memory_task` で「memory ジョブ用に Pod をまるごと複製してデタッチ」というアプローチを取ったことが、本実装の最大の構造的判断。これは下記 Blocking で挙げる持続性の問題を生んでいる: **session-store 側に並走する writer を入れても安全な排他層が無い**ため、controller と memory タスクが同一 jsonl に同時に追記する設計はアーキテクチャ的に破綻している。
|
||||
|
||||
## 指摘事項
|
||||
|
||||
### Blocking
|
||||
|
||||
- **memory タスクと controller が同一 session log に競合書き込みする (head_hash race)** — `crates/pod/src/pod.rs:193` `clone_for_memory_task` は `session_id` を値で、`head_hash` を `Option<EntryHash>::clone()` で持ち出すだけで、`store` (FsStore は `append` モード書き) の排他は何もない。`run_extract_once` 内 `session_store::save_extension` (`crates/pod/src/pod.rs:1911`) は cloned Pod 自身の `head_hash` を進めるが、controller 側の `head_hash` は据え置きのまま。
|
||||
- 帰結 1: memory タスクが完了して save_extension を書いた直後に次ターンを始めると、`ensure_session_head` の `ensure_head_or_fork` (`crates/session-store/src/session.rs:117`) が「store_head ≠ self.head_hash」を検知して **毎回 spurious fork** が走る。意図しないセッション ID 切り替わりが extract 完了ごとに発生する重大な機能リグレッション。
|
||||
- 帰結 2: 同じターン内で controller が `save_user_input` / `save_delta` を書いた後に memory タスクが save_extension を書くと、両者とも stale な `prev_hash: X` を載せたエントリが jsonl に並ぶ (chain が分岐する)。`collect_state` は線形読みなのでクラッシュは起きないが、`read_head_hash` が直前の Extension の hash を返すため次ターン頭で fork が起きる。
|
||||
- 帰結 3: fork した側 (新 session) には Extension エントリが残らないため、resume 時に extract pointer が None で復元される。staging との整合は壊れていなくても、復元した Pod は無駄に entry 0 から再走査する。
|
||||
- 旧実装が inline `.await` だったときは、save_extension が完了してから次ターンが始まるので head_hash が常に進んだ状態で観測でき、この race は構造的に発生しなかった。detach するなら head_hash / session_id の所有権を 1 本化する (memory タスクから writeback、または extract の永続化を別経路に切り出す) 等の対応が必要。
|
||||
|
||||
- **compact 開始時に in-flight extract を待つ同期がない** — チケット要件 §Phase 1 detach 末尾に明記された「compact を次ターン冒頭に移すなら、その冒頭で『現在 in-flight な extract があれば完了を待つ』一段の同期」が未実装。`crates/pod/src/pod.rs:1239` の `try_pre_run_compact` は `extract_in_flight` も `memory_task` も観測しない。
|
||||
- extract が走っている間に compact がセッションを fork すると、extract が `save_extension` を書く先は旧セッション側になり、共有 `Arc<Mutex<extract_pointer>>` の値は新セッションに対して意味をなさない `processed_through_history_len` を持つことになる。
|
||||
- 単純対処は `try_pre_run_compact` の頭で `wait_for_memory_jobs` 相当 (もしくは `extract_in_flight` の spinning wait) を入れること。chickeness よりは、clone-detach 設計そのものを見直したほうが早い可能性がある。
|
||||
|
||||
### Non-blocking / Follow-up
|
||||
|
||||
- `crates/tui/src/app.rs:54, 92, 114, 299` および `crates/tui/src/ui.rs:880` に `App::busy: bool` が残っている。Busy 撤廃後は常時 `false` が代入されるだけのデッドフィールドで、`submit_input` の guard と ui の `busy` 表示分岐は到達不能。チケット §`PodStatus::Busy` の撤廃 で「Busy を観測している経路を整理する」が要件に挙がっているので、この残骸も併せて消す方針が筋。
|
||||
- `crates/pod/src/pod.rs:236` `spawn_post_run_memory_jobs` の二段 `if` は読みづらい。`is_finished` 後の `handle.abort()` は no-op だがロジックが冗長。`if let Some(handle) = self.memory_task.take_if(|h| h.is_finished()) { drop(handle); }` 相当に整理できる。
|
||||
- `clone_for_memory_task` で `worker.set_cache_key(Some(self.session_id.to_string()))` を埋めているが、もしこの後 compact が新 session_id を割り当てると memory タスク側の cache_key と合わなくなる。prompt cache 効率の観点では現状でも問題ないが、上記 Blocking 修正と一緒に再評価したい。
|
||||
- `crates/pod/src/pod.rs:248` で worker の system_prompt を `unwrap_or_default().to_string()` しているのは、抽出 Worker 側で別 prompt を貼り直すから問題はないが、`expect("worker present")` と組み合わさると意図が読みづらい。コメントで「extract worker は `try_post_run_extract` 内で Worker を使い捨て生成するため、ここでの worker は実は使われない」と明示しておくと将来の事故が防げる。
|
||||
- detached path のテストが無い。要件に直結する観点 (Busy が観測されないこと、shutdown が wait すること、extract 完了後に save_extension が走っても次ターンが fork しないこと) にカバレッジが無いので、Blocking を直したら回帰テストとして追加したい。
|
||||
- TODO.md に追加された `tui-context-usage-indicator` 行と新規 `tickets/tui-context-usage-indicator.md` は本チケットの diff に紛れているが、別件。コミット切り分けの確認推奨。
|
||||
|
||||
### Nits
|
||||
|
||||
- `crates/pod/src/pod.rs:1735-1738` のコメントに「Called by the Controller before spawning the background memory task」とあるが、実際に呼んでいるのは `spawn_post_run_memory_jobs` 内の spawn 後タスク。文言が紛らわしい。
|
||||
- `crates/pod/src/pod.rs:1232-1238` のドックで「Best-effort: failures are logged and surfaced, but do not abort the user turn that triggered the check」とあるが、関数自体は `Result<(), PodError>` を返し常に `Ok(())`。コメントと整合。問題はないが、`run` 側で `?` を付けている (`crates/pod/src/pod.rs:906`) ので将来 `Err` を返すようになった瞬間にユーザーターンを abort してしまう。コメントの宣言通り `Ok` 確定なら戻り値は `()` でよい。
|
||||
|
||||
## 判断
|
||||
|
||||
**Request changes** — Busy 撤廃 / detach / compact 前倒しという外形は要件通りだが、(1) cloned Pod が共有 `head_hash` 抜きで session log に直書きする構造的レース、(2) compact 前に in-flight extract を待つ同期の欠落、の 2 点が要件・運用上の正しさを破っている。とくに (1) は extract が完了するたびに spurious session fork を引き起こす機能リグレッションで、merge 前の修正が必須。
|
||||
|
||||
---
|
||||
|
||||
## Round 2 (再レビュー)
|
||||
|
||||
### 前提・要件の確認 (差分)
|
||||
|
||||
- **Blocking 1 (head_hash race) 解消**: `Pod` から個別 `head_hash` フィールドを削除し、`SessionHead { session_id, head_hash }` を `Arc<AsyncMutex<_>>` で controller / cloned-memory-task 双方が共有する構造に変更 (`crates/pod/src/pod.rs:39-43, 71, 306-309`)。`save_user_input` / `save_delta` / `save_turn_end` / `save_usage` / `save_run_completed` / `save_run_errored` / `save_pod_scope` / `save_extension` / `record_metric` が全て同じ mutex 越しに head_hash を読み書きする (`crates/pod/src/pod.rs:441-447, 468-475, 540-548, 1207-1236, 1378, 1398-1407, 1422-1438, 1445-1457, 1467-1478, 2014-2024`)。compact (`crates/pod/src/pod.rs:1703-1736`) も lock 内で session_id 切り替えを行う。`extract_pointer` も `Arc<Mutex<_>>` 化されてコピー先と共有 (`crates/pod/src/pod.rs:182-187, 247`)。これで cloned Pod の `save_extension` が書いた head_hash は controller 側にも即座に反映され、`ensure_head_or_fork` での spurious fork は構造的に発生しない。設計上の race は解消。
|
||||
- **Blocking 2 (pre-run extract wait) 実装**: `run` / `run_for_notification` / `resume` の冒頭で `compact_state.exceeds_post_run(...)` を満たす場合のみ `memory_task.take().await` で in-flight ジョブを join してから `try_pre_run_compact` へ進む (`crates/pod/src/pod.rs:926-946, 1123-1143, 1159-1179`)。閾値を超えていない通常ターンでは join せず detach のままなので、要件の「compact 直前にだけ wait」が成立。`memory_task` は `JoinHandle<()>` 一本で持たれ、必ず完了する (内部に LLM 呼び出しタイムアウトは Worker 側に委ねるが、追加の無限ループは無い) ので `await` が永久 block するパスは見えない。
|
||||
- **デッドコード掃除**: `App::busy` フィールドおよびその参照 (`submit_input` の早期 return / `ui::draw_status` の busy 表示 / `handle_key` Ctrl-X 分岐) を完全削除 (`crates/tui/src/app.rs`, `crates/tui/src/ui.rs`, `crates/tui/src/main.rs`)。`grep PodStatus::Busy` も TUI / Pod / protocol 全域で commentary 1 件のみ (`crates/pod/src/pod.rs:1325` のコメント、過去経緯の説明として妥当)。
|
||||
- **テスト追加**: `controller_test.rs` の `run_end_enters_busy_until_post_run_finishes_and_broadcasts_status` を `run_end_returns_to_idle_without_busy_status` にリネーム + 期待値変更。`pause_while_busy_is_idempotent_not_not_running` を削除 (Busy が無いので意味を失った)。`compact_events_test.rs` は `try_post_run_compact` → `try_pre_run_compact` 変名追従のみ。**detach 経路に固有のテスト (spawn 後の次ターンが fork しないこと、shutdown で memory_task が join されること、pre-run の extract 待ち合わせ) は依然として未追加**。
|
||||
|
||||
### 残っている指摘事項
|
||||
|
||||
#### Blocking
|
||||
- なし。前回ラウンドの両 Blocking は解消。
|
||||
|
||||
#### Non-blocking / Follow-up
|
||||
|
||||
- **detach 経路の動作テストが未追加**: 前回 follow-up に挙げた以下が引き続き未カバー。Round 1 では Blocking に隠れていたが、今回は単独の品質課題として残る。
|
||||
- 同 turn 内の `save_extension` と次ターンの `save_user_input` が同じ jsonl に並ぶケースで fork しないこと (`session_head` mutex の serialization が機能していること)。
|
||||
- `wait_for_memory_jobs` が shutdown 時に in-flight な extract / consolidate を join し切ること。
|
||||
- `compact_state.exceeds_post_run(...)` を満たす状態で memory_task が走行中に次ターンを投げると、compact が memory_task の完了を待ってから走ること。
|
||||
- 推奨は `crates/pod/tests/controller_test.rs` に `MockClient` を遅延つきで返す形のテストを 1〜2 本追加。
|
||||
- **`run` / `run_for_notification` / `resume` で同じ memory_task 整理 + pre-compact wait ブロックが 3 回コピペされている** (`crates/pod/src/pod.rs:926-946, 1123-1143, 1159-1179`)。ヘルパ (`async fn ensure_pre_run_state(&mut self) -> Result<(), PodError>` 等) に 1 本化したほうが、将来 wait 条件を変更したときの抜け漏れを防げる。
|
||||
- **`spawn_post_run_memory_jobs` の二段 `if` が依然冗長** (`crates/pod/src/pod.rs:254-264`)。「未完了なら何もしない、完了済みなら take して drop してから新しく spawn」という意図を `take_if(|h| h.is_finished())` 等で 1 段に書ける。Round 1 の指摘そのまま。
|
||||
- **`clone_for_memory_task` の `worker.set_cache_key(Some(self.session_id.to_string()))`** (`crates/pod/src/pod.rs:216`) は clone 時点の session_id を貼っている。compact が走って session_id が変わる前に memory_task は join されるよう pre-run wait が入ったので race にはならないが、cloned worker は extract worker 用に作っているだけで実体は `run_extract_once` 内で `extract_worker = Worker::new(...).system_prompt(...)` (`crates/pod/src/pod.rs:1956`) として作り直される。**したがって cloned Pod の `worker` は実際には使われていない**。`clone_for_memory_task` で `worker` をビルドしている処理 (`crates/pod/src/pod.rs:211-216`) はメモリ・初期化コストの無駄であり、`Option::None` で十分か、せめてコメントで「使われない / placeholder」と明示すべき。
|
||||
- **`pending_attachments` を空 Arc で再生成、`pending_scope_snapshot` は共有という非対称** (`crates/pod/src/pod.rs:237, 244`)。memory タスクは scope 変更も attachment も生まないので実害は無いが、意図がコメントに無いと将来事故る。
|
||||
- **スコープ外混入が継続**: `TODO.md` の `tui-context-usage-indicator` 行追加と `tickets/tui-context-usage-indicator.md` 新規が本チケットの未コミット差分に依然として残っている。前回 follow-up でコミット切り分けを指摘済みだが解消されていない。コミット時に別チケットとして分離する判断を再度推奨。
|
||||
|
||||
#### Nits
|
||||
|
||||
- `crates/pod/src/pod.rs:1834-1839` の doc コメント「Called by the Controller before spawning the background memory task」は誤り。実際に呼んでいるのは `spawn_post_run_memory_jobs` が `tokio::spawn` した async block の内部 (`crates/pod/src/pod.rs:268`)。「Spawned background memory task が compact 確定前に呼び出す Phase 1 entry point」程度の文言に。
|
||||
- `crates/pod/src/pod.rs:1322-1328` `try_pre_run_compact` の戻り値は依然 `Result<(), PodError>` だが本体は `Ok(())` 確定。`run` 側 (`crates/pod/src/pod.rs:946, 1143, 1179`) で `?` で受けているので、将来 `Err` を返すようになった瞬間にユーザーターンを abort してしまう。「best-effort で abort しない」という設計意図と整合させるなら戻り値を `()` にするか、本体側で必ず `Ok(())` に正規化するコメントを残すか。
|
||||
- `crates/pod/src/pod.rs:253` 直後の空白 2 行 + 関数定義は 1 行多い。
|
||||
|
||||
### Round 2 判断
|
||||
|
||||
**Approve with follow-up** — 前回ラウンドの 2 件の Blocking はいずれも構造的に解消されている。`SessionHead` の `Arc<AsyncMutex<_>>` 共有は cloned Pod / controller 双方の head_hash / session_id 進行を直列化し、ticket 要件の「detach しても session log の整合性を壊さない」を満たす。compact 前 wait も threshold 一致時にだけ join する形で必要十分。残課題は (a) detach 経路の回帰テスト追加、(b) `run` / `resume` / `run_for_notification` のコピペ整理、(c) スコープ外コミットの切り分け、で、いずれも本チケットを閉じてから別チケット ないし follow-up commit として処理可能。
|
||||
|
||||
---
|
||||
|
||||
## Round 3 (再レビュー)
|
||||
|
||||
### 前提・要件の確認 (差分)
|
||||
|
||||
Round 2 の follow-up 6 点に対する解消状況を確認した。
|
||||
|
||||
- **#1 detach 経路の固有テスト追加**: `crates/pod/tests/compact_events_test.rs:528-581` に 2 本追加。
|
||||
- `spawn_and_wait_drives_extract_to_completion` (line 529) — `pod.spawn_post_run_memory_jobs() + wait_for_memory_jobs().await` の round-trip で `extract_pointer` が `None → Some` に進むことを assert。`spawn`/`wait` の API contract と extract の完了条件をカバー。
|
||||
- `detached_extract_does_not_fork_session_log` (line 553) — Round 1 Blocking 1 の構造的不変条件 (`SessionHead` 共有が `save_extension` と `save_user_input` の chain を直列化する) を行動レベルで検証。`session_id_before == session_id_after` を比較するので、cloned Pod が独立 head_hash を持っていた場合の `ensure_head_or_fork` 経由の spurious fork を検知できる。
|
||||
- 残りの 2 観点 (shutdown wait の正常完了 / pre-run compact の memory_task 待ち合わせのタイミング) は本ラウンドでも未追加だが、ユーザーのコメントで「テスト容易性の限界」として明示されている。後述の判断参照。
|
||||
- **#2 ヘルパ集約**: `prepare_for_run` (`crates/pod/src/pod.rs:938-948`) に 1 本化。`run` (line 963)、`run_for_notification` (line 1138)、`resume` (line 1152) の prelude が全て `self.prepare_for_run().await?;` の 1 行で済むように整理。同期順序は「`ensure_interceptor_installed` → `ensure_system_prompt_materialized` → `cleanup_finished_memory_task` → `ensure_session_head` → (compact 必要時のみ `join_memory_task`) → `try_pre_run_compact`」(`crates/pod/src/pod.rs:939-947`) で、Round 2 で個別に書かれていた順序と意味的に等価。`cleanup_finished_memory_task` を `ensure_session_head` の前に移したのは finished handle を早めに drop するだけで semantics に影響なし。
|
||||
- **#3 `clone_for_memory_task` の worker 縮約**: `set_system_prompt` / `set_request_config` / `set_cache_key` の copy をすべて削除し、`Worker::new(client.clone()).set_history(...)` の最小 snapshot に縮約 (`crates/pod/src/pod.rs:216-218`)。コメント (line 210-215) で「extract/consolidate は内部で worker を作り直すので system_prompt / request_config / cache_key は不要」と意図を明示。`run_extract_once` 内 (line 1922) で `extract_worker = Worker::new(client).system_prompt(extract_system_prompt)` として作り直されるという既存挙動と整合する。
|
||||
- **#4 `spawn_post_run_memory_jobs` のフラット化**: 二段 `if` を `cleanup_finished_memory_task() + if self.memory_task.is_some() { return; }` (`crates/pod/src/pod.rs:259-262`) に整理。`take_if` での 1 段化案より「先に finished を掃除 → in-flight があれば skip」という 2 ステップの構造が読み取りやすく、結果的に意図がはっきりした。
|
||||
- **#5 戻り値整理**: `try_pre_run_compact` (`crates/pod/src/pod.rs:1296`) と `wait_for_memory_jobs` (`crates/pod/src/pod.rs:199`) の戻り値を `()` に変更。`prepare_for_run` 内 (line 946) と controller の shutdown 直前 (`crates/pod/src/controller.rs:684`) の呼び出し側からも `?` / `if let Err(...)` が消えた。「best-effort で user turn を abort しない」設計意図がシグネチャレベルで保証される。
|
||||
- **#6 スコープ外混入の切り分け**: 既コミット (`632d63d docs(tickets): 追加:タスクリストの表示とコンテキスト長インジケータ`) で `tui-context-usage-indicator.md` / `tui-task-display.md` および `TODO.md` 行が別チケットとして分離済み。本チケットの差分は本体実装 + 本 review.md + ticket 末尾の Review 状態のみ。
|
||||
|
||||
### 動作確認
|
||||
|
||||
- `cargo check --workspace --tests` clean (warning 1 件は既存の `end_scope` dead-code、本チケット無関係)。
|
||||
- `cargo test -p pod --test compact_events_test` 8 本全て pass (新規 2 本含む)。
|
||||
- `cargo test -p pod --test controller_test` 21 本全て pass (renamed `run_end_returns_to_idle_without_busy_status` 含む)。
|
||||
- ユーザー報告の `cargo test --workspace` 50 group pass を裏取り済み。
|
||||
|
||||
### 残っている指摘事項
|
||||
|
||||
#### Blocking
|
||||
- なし。
|
||||
|
||||
#### Non-blocking / Follow-up
|
||||
- **未追加の detach テスト 2 観点 (shutdown wait の正常完了 / pre-run compact の memory_task 待ち合わせ)**: ユーザー側で「テスト容易性の限界による断念」として明示。前者は `controller` task の終了パス (`crates/pod/src/controller.rs:684`) を `wait_for_memory_jobs` の代入前後で観測する必要があるが、現在の `controller_test.rs` フレームには in-flight job を確実に「shutdown 時に未完了」状態に置く手段がない (MockClient が同期に response を返すため)。後者は `compact_state` 閾値を超えた状態で memory_task が走行中という条件をテスト 1 本で再現する必要があり、`MockClient` のレスポンス遅延フックがない現状では確定的に作れない。**判断**: 構造的不変条件 (`SessionHead` mutex / `extract_in_flight` CAS) は Round 2 で型レベル + 既存テストで検証済みであり、`detached_extract_does_not_fork_session_log` がその不変条件を行動レベルで cover している。残り 2 観点はテスト基盤側の課題 (e.g. `MockClient` への delay hook 追加) として `tickets/` 別件で扱える性質であり、本チケットの merge を block しない。
|
||||
- **`prepare_for_run` の名前**: prelude / pre-run setup として汎用名なので将来「pre_run_X」系の helper が増えた時に衝突しうる。本チケット範囲外の整理候補として留意。
|
||||
- **`should_pre_run_compact` と `try_pre_run_compact` の二重判定**: `should_pre_run_compact` は `disabled / just_compacted / exceeds_post_run` を見るが (`crates/pod/src/pod.rs:927-931`)、`try_pre_run_compact` は `disabled / just_compacted` 早期 return + `exceeds_post_run` 二度判定で同じ判断ロジックを再評価する (`crates/pod/src/pod.rs:1297-1304`)。コメント (line 922-925) で「defensive に重複している」と明示してあるので意図的だが、片方を `should_pre_run_compact()` の呼び出しに置き換える形で 1 本化しても同じ defensive 性は保てる。可読性向上の余地。
|
||||
|
||||
#### Nits
|
||||
- `crates/pod/src/pod.rs:1802` の `try_post_run_extract` doc コメント「Called by the Controller before spawning the background memory task」が依然として残っている。Round 2 の Nits として既に指摘済み。実際の呼び出しは `spawn_post_run_memory_jobs` が tokio::spawn した async block 内 (`crates/pod/src/pod.rs:266`)。修正コストは小さい。
|
||||
|
||||
### Round 3 判断
|
||||
|
||||
**Approve** — Round 2 で挙げた follow-up #1〜#5 はすべて妥当に解消され、新規 regression は確認できない。`prepare_for_run` への集約で 3 経路の prelude 順序が単一の真理になり、将来「compact 前に何を待つか」を変更したい時の抜け漏れリスクが消えた。`clone_for_memory_task` の worker 縮約は無駄な init を削るだけでなく「memory タスクから見える worker は client + history のみが意味を持つ」という設計境界を明示している。`try_pre_run_compact` / `wait_for_memory_jobs` の `()` 返却は best-effort セマンティクスを型で保証する。新規テスト 2 本のうち `detached_extract_does_not_fork_session_log` は Round 1 Blocking 1 の構造的不変条件 (`SessionHead` 共有) を行動でカバーする実効的な assertion になっている。スコープ外混入も切り分け済み。残った Non-blocking 項目はいずれもテスト基盤拡張 / 命名整理レベルで、別チケットで個別に拾える性質。本チケットは閉じて良い。
|
||||
Loading…
Reference in New Issue
Block a user