//! consolidation (memory.consolidation) post-run trigger. //! //! Covers the gating, lock and cleanup behaviour without exercising the //! full sub-worker tool loop: //! //! - no `[memory]` section → no-op //! - `[memory]` present but no thresholds → no-op //! - staging empty → skip //! - staging below thresholds → skip + lock not acquired //! - staging above threshold → sub-worker runs, consumed entries removed //! - existing live lock → skip without error //! //! The sub-worker is fed a no-op LLM response (plain text) so it returns //! immediately. The post-run path then exercises lock acquisition, //! cleanup, and the empty-payload fast path. use std::pin::Pin; use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; use async_trait::async_trait; use futures::Stream; use llm_worker::Worker; use llm_worker::llm_client::event::{Event as LlmEvent, ResponseStatus, StatusEvent}; use llm_worker::llm_client::{ClientError, LlmClient, Request}; use memory::WorkspaceLayout; use memory::extract::{ExtractedPayload, write_staging}; use memory::schema::SourceRef; use pod_store::{CombinedStore, FsPodStore}; use session_store::FsStore; type TestStore = CombinedStore; use tokio::sync::broadcast; use pod::{Event, Pod}; #[derive(Clone)] struct MockClient { responses: Arc>>, call_count: Arc, } impl MockClient { fn new(responses: Vec>) -> Self { Self { responses: Arc::new(responses), call_count: Arc::new(AtomicUsize::new(0)), } } } #[async_trait] impl LlmClient for MockClient { fn clone_boxed(&self) -> Box { Box::new(self.clone()) } async fn stream( &self, _request: Request, ) -> Result> + Send>>, ClientError> { let count = self.call_count.fetch_add(1, Ordering::SeqCst); if count >= self.responses.len() { return Err(ClientError::Config("mock client exhausted".into())); } let events = self.responses[count].clone(); let stream = futures::stream::iter(events.into_iter().map(Ok)); Ok(Box::pin(stream)) } } fn done(text: &str) -> Vec { vec![ LlmEvent::text_block_start(0), LlmEvent::text_delta(0, text), LlmEvent::text_block_stop(0, None), LlmEvent::Status(StatusEvent { status: ResponseStatus::Completed, }), ] } const NO_MEMORY_TOML: &str = r#" [pod] name = "test-pod" [model] scheme = "anthropic" model_id = "test-model" [worker] max_tokens = 100 [[scope.allow]] target = "./" permission = "write" "#; const MEMORY_NO_THRESHOLDS_TOML: &str = r#" [pod] name = "test-pod" [model] scheme = "anthropic" model_id = "test-model" [worker] max_tokens = 100 [memory] [[scope.allow]] target = "./" permission = "write" "#; const FILES_THRESHOLD_TOML: &str = r#" [pod] name = "test-pod" [model] scheme = "anthropic" model_id = "test-model" [worker] max_tokens = 100 [memory] consolidation_threshold_files = 2 [[scope.allow]] target = "./" permission = "write" "#; const ZERO_THRESHOLDS_TOML: &str = r#" [pod] name = "test-pod" [model] scheme = "anthropic" model_id = "test-model" [worker] max_tokens = 100 [memory] consolidation_threshold_files = 0 consolidation_threshold_bytes = 0 [[scope.allow]] target = "./" permission = "write" "#; async fn make_pod_with( manifest_toml: &str, pwd: std::path::PathBuf, client: MockClient, ) -> Pod { let manifest = pod::PodManifest::from_toml(manifest_toml).unwrap(); let store_tmp = tempfile::tempdir().unwrap(); let store = CombinedStore::new( FsStore::new(store_tmp.path()).unwrap(), FsPodStore::new(store_tmp.path().join("pods")).unwrap(), ); std::mem::forget(store_tmp); let scope = pod::Scope::writable(&pwd).unwrap(); let worker = Worker::new(client); Pod::new(manifest, worker, store, pwd, scope).await.unwrap() } fn write_n_staging(layout: &WorkspaceLayout, n: usize) -> Vec { let mut ids = Vec::new(); for i in 0..n { let (id, _) = write_staging( layout, SourceRef { segment_id: format!("s-{i}"), range: [i as u64, i as u64], }, ExtractedPayload::default(), ) .unwrap(); ids.push(id); } ids } fn attach_event_receiver(pod: &mut Pod) -> broadcast::Receiver { let (tx, rx) = broadcast::channel(16); pod.attach_event_tx(tx); rx } fn collect_memory_worker_reasons(rx: &mut broadcast::Receiver) -> Vec { let mut reasons = Vec::new(); loop { match rx.try_recv() { Ok(Event::MemoryWorker(event)) => reasons.push(event.reason), Ok(_) => {} Err(broadcast::error::TryRecvError::Empty) => break, Err(err) => panic!("unexpected broadcast receive error: {err}"), } } reasons } fn read_audit_jsonl(layout: &WorkspaceLayout) -> Vec { let text = std::fs::read_to_string(layout.audit_current_log_path()).unwrap(); text.lines() .map(|line| serde_json::from_str::(line).unwrap()) .collect() } #[tokio::test] async fn no_memory_section_is_a_noop() { let pwd = tempfile::tempdir().unwrap(); let client = MockClient::new(vec![]); let mut pod = make_pod_with(NO_MEMORY_TOML, pwd.path().to_path_buf(), client).await; pod.try_post_run_consolidate() .await .expect("missing memory section must skip cleanly"); } #[tokio::test] async fn no_thresholds_is_a_noop() { let pwd = tempfile::tempdir().unwrap(); let layout = WorkspaceLayout::new(pwd.path().to_path_buf()); write_n_staging(&layout, 5); let client = MockClient::new(vec![]); let mut pod = make_pod_with(MEMORY_NO_THRESHOLDS_TOML, pwd.path().to_path_buf(), client).await; pod.try_post_run_consolidate() .await .expect("consolidation disabled when both thresholds are None"); // No staging entries removed. assert_eq!(memory::consolidate::list_staging_entries(&layout).len(), 5); } #[tokio::test] async fn zero_thresholds_treated_as_disabled() { // Without the `Some(0) → None` collapse, `total_files >= 0` and // `total_bytes >= 0` would always evaluate true and consolidation would // fire on every post-run with any staging activity. let pwd = tempfile::tempdir().unwrap(); let layout = WorkspaceLayout::new(pwd.path().to_path_buf()); write_n_staging(&layout, 5); let client = MockClient::new(vec![]); let mut pod = make_pod_with(ZERO_THRESHOLDS_TOML, pwd.path().to_path_buf(), client).await; pod.try_post_run_consolidate() .await .expect("zero thresholds must collapse to disabled, not fire on every staging entry"); assert_eq!( memory::consolidate::list_staging_entries(&layout).len(), 5, "staging must be untouched when both thresholds are zero" ); let lock_path = layout.staging_dir().join(".consolidation.lock"); assert!(!lock_path.exists(), "no lock should be acquired"); } #[tokio::test] async fn empty_staging_skips() { let pwd = tempfile::tempdir().unwrap(); let client = MockClient::new(vec![]); let mut pod = make_pod_with(FILES_THRESHOLD_TOML, pwd.path().to_path_buf(), client).await; pod.try_post_run_consolidate().await.unwrap(); // No mock calls expected. } #[tokio::test] async fn empty_staging_skip_is_audit_only() { let pwd = tempfile::tempdir().unwrap(); let client = MockClient::new(vec![]); let mut pod = make_pod_with(FILES_THRESHOLD_TOML, pwd.path().to_path_buf(), client).await; let mut rx = attach_event_receiver(&mut pod); pod.try_post_run_consolidate().await.unwrap(); assert!(collect_memory_worker_reasons(&mut rx).is_empty()); } #[tokio::test] async fn invalid_only_staging_is_distinct_from_no_staging() { let pwd = tempfile::tempdir().unwrap(); let layout = WorkspaceLayout::new(pwd.path().to_path_buf()); std::fs::create_dir_all(layout.staging_dir()).unwrap(); let invalid_id = uuid::Uuid::now_v7(); let invalid_path = layout.staging_dir().join(format!("{invalid_id}.json")); std::fs::write( &invalid_path, serde_json::json!({ "source": { "session_id": "legacy-session", "range": [0, 1] }, "requests": [] }) .to_string(), ) .unwrap(); let client = MockClient::new(vec![]); let mut pod = make_pod_with(FILES_THRESHOLD_TOML, pwd.path().to_path_buf(), client).await; let mut rx = attach_event_receiver(&mut pod); pod.try_post_run_consolidate().await.unwrap(); assert!(invalid_path.exists(), "invalid staging is not auto-deleted"); let reasons = collect_memory_worker_reasons(&mut rx); assert_eq!(reasons, vec!["no_valid_staging_entries invalid=1"]); let audit = read_audit_jsonl(&layout); let last = audit.last().unwrap(); assert_eq!(last["reason"], "no_valid_staging_entries invalid=1"); assert_eq!(last["consolidation"]["staging_count"], 0); assert_eq!(last["consolidation"]["invalid_staging_count"], 1); } #[tokio::test] async fn below_threshold_skip_is_audit_only() { let pwd = tempfile::tempdir().unwrap(); let layout = WorkspaceLayout::new(pwd.path().to_path_buf()); write_n_staging(&layout, 1); // threshold is 2 let client = MockClient::new(vec![]); let mut pod = make_pod_with(FILES_THRESHOLD_TOML, pwd.path().to_path_buf(), client).await; let mut rx = attach_event_receiver(&mut pod); pod.try_post_run_consolidate().await.unwrap(); assert!(collect_memory_worker_reasons(&mut rx).is_empty()); let audit = read_audit_jsonl(&layout); let reason = audit.last().unwrap()["reason"] .as_str() .expect("audit reason must be a string"); assert!(reason.starts_with("threshold_not_reached ")); } #[tokio::test] async fn completed_event_survives_terminal_empty_drain_skip() { let pwd = tempfile::tempdir().unwrap(); let layout = WorkspaceLayout::new(pwd.path().to_path_buf()); write_n_staging(&layout, 2); // threshold is 2 — fires. let client = MockClient::new(vec![done("ok")]); let mut pod = make_pod_with(FILES_THRESHOLD_TOML, pwd.path().to_path_buf(), client).await; let mut rx = attach_event_receiver(&mut pod); pod.try_post_run_consolidate().await.unwrap(); let reasons = collect_memory_worker_reasons(&mut rx); assert_eq!(reasons.len(), 2); assert!(reasons[0].starts_with("staging_threshold_reached files=2 bytes=")); assert_eq!(reasons[1], "completed_no_record_changes"); let audit = read_audit_jsonl(&layout); assert_eq!(audit.last().unwrap()["reason"], "no_staging_entries"); } #[tokio::test] async fn below_threshold_skips_and_does_not_take_lock() { let pwd = tempfile::tempdir().unwrap(); let layout = WorkspaceLayout::new(pwd.path().to_path_buf()); write_n_staging(&layout, 1); // threshold is 2 let client = MockClient::new(vec![]); let mut pod = make_pod_with(FILES_THRESHOLD_TOML, pwd.path().to_path_buf(), client).await; pod.try_post_run_consolidate().await.unwrap(); // Staging untouched. assert_eq!(memory::consolidate::list_staging_entries(&layout).len(), 1); // Lock file must not exist. let lock_path = layout.staging_dir().join(".consolidation.lock"); assert!(!lock_path.exists(), "lock file should not be created"); } #[tokio::test] async fn fires_on_threshold_and_cleans_up_consumed_entries() { let pwd = tempfile::tempdir().unwrap(); let layout = WorkspaceLayout::new(pwd.path().to_path_buf()); write_n_staging(&layout, 2); // threshold is 2 — fires. // Sub-worker is given a single text-only response. The consolidation prompt // tells it to call memory tools; the mock skips those, but `Worker::run` // returns Ok regardless once the LLM closes with a final text. let client = MockClient::new(vec![done("ok")]); let mut pod = make_pod_with(FILES_THRESHOLD_TOML, pwd.path().to_path_buf(), client).await; pod.try_post_run_consolidate().await.unwrap(); // Consumed entries removed. assert!( memory::consolidate::list_staging_entries(&layout).is_empty(), "consumed staging entries must be cleaned up" ); // Lock removed too. let lock_path = layout.staging_dir().join(".consolidation.lock"); assert!(!lock_path.exists(), "lock file must be removed on success"); } #[tokio::test] async fn in_flight_guard_skips_reentry_without_clearing() { use std::sync::atomic::Ordering; let pwd = tempfile::tempdir().unwrap(); let layout = WorkspaceLayout::new(pwd.path().to_path_buf()); write_n_staging(&layout, 2); let client = MockClient::new(vec![]); let mut pod = make_pod_with( FILES_THRESHOLD_TOML, pwd.path().to_path_buf(), client.clone(), ) .await; // Pre-set the in-flight flag as if another concurrent caller had // entered run_consolidate_once. The CAS at the top of // try_post_run_consolidate must take the early return without // touching staging or the LLM, and must leave the flag intact for // the holder to clear. let in_flight = pod.consolidation_in_flight_handle(); in_flight.store(true, Ordering::Release); pod.try_post_run_consolidate().await.unwrap(); assert!( in_flight.load(Ordering::Acquire), "reentry skip must not clear the in-flight flag — that's the holder's job" ); assert_eq!( memory::consolidate::list_staging_entries(&layout).len(), 2, "staging must remain untouched on reentry skip" ); assert_eq!( client.call_count.load(Ordering::SeqCst), 0, "no LLM calls should fire on reentry skip" ); // Sanity: when the flag is cleared, the same pod fires normally and // resets the flag itself (i.e. it isn't accidentally sticky). in_flight.store(false, Ordering::Release); let client2 = MockClient::new(vec![done("ok")]); let mut pod2 = make_pod_with(FILES_THRESHOLD_TOML, pwd.path().to_path_buf(), client2).await; pod2.try_post_run_consolidate().await.unwrap(); assert!( !pod2 .consolidation_in_flight_handle() .load(Ordering::Acquire), "in-flight flag must be cleared after a normal run" ); } #[tokio::test] async fn coalesce_loop_terminates_with_one_iteration_when_snapshot_drains_staging() { use std::sync::atomic::Ordering; // Coalesce semantics from `docs/plan/memory.md` §並走防止: a single // run consumes the snapshot taken at acquire time; the loop // re-evaluates against any post-snapshot extract additions. With no // concurrent additions, the second iteration sees an empty staging // and bails out — exercised here by counting LLM calls. let pwd = tempfile::tempdir().unwrap(); let layout = WorkspaceLayout::new(pwd.path().to_path_buf()); write_n_staging(&layout, 4); // Provide just one mock response. If the loop wrongly re-enters // run_consolidate_once after Completed, the second sub-worker run // would exhaust the mock and surface as an error. let client = MockClient::new(vec![done("ok")]); let mut pod = make_pod_with( FILES_THRESHOLD_TOML, pwd.path().to_path_buf(), client.clone(), ) .await; pod.try_post_run_consolidate().await.unwrap(); assert_eq!( client.call_count.load(Ordering::SeqCst), 1, "Coalesce must terminate once the staging snapshot is drained — got an extra LLM call" ); assert!( memory::consolidate::list_staging_entries(&layout).is_empty(), "staging must be empty after the single iteration" ); } #[tokio::test] async fn live_lock_held_by_other_pod_skips() { let pwd = tempfile::tempdir().unwrap(); let layout = WorkspaceLayout::new(pwd.path().to_path_buf()); write_n_staging(&layout, 3); // Pre-acquire lock with this test's PID — definitely alive — and // *don't* release it. The consolidation path must skip without error. let _live_lock = memory::consolidate::StagingLock::acquire( &layout, std::process::id(), "other-pod", Vec::new(), ) .unwrap(); let client = MockClient::new(vec![]); let mut pod = make_pod_with(FILES_THRESHOLD_TOML, pwd.path().to_path_buf(), client).await; pod.try_post_run_consolidate() .await .expect("InUse lock must surface as graceful skip"); // Staging untouched: lock holder owns the snapshot, not us. assert_eq!(memory::consolidate::list_staging_entries(&layout).len(), 3); }