yoi/crates/pod/tests/consolidation_test.rs

523 lines
16 KiB
Rust

//! consolidation (memory.consolidation) post-run trigger.
//!
//! Covers the gating, lock and cleanup behaviour without exercising the
//! full sub-worker tool loop:
//!
//! - no `[memory]` section → no-op
//! - `[memory]` present but no thresholds → no-op
//! - staging empty → skip
//! - staging below thresholds → skip + lock not acquired
//! - staging above threshold → sub-worker runs, consumed entries removed
//! - existing live lock → skip without error
//!
//! The sub-worker is fed a no-op LLM response (plain text) so it returns
//! immediately. The post-run path then exercises lock acquisition,
//! cleanup, and the empty-payload fast path.
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use async_trait::async_trait;
use futures::Stream;
use llm_worker::Worker;
use llm_worker::llm_client::event::{Event as LlmEvent, ResponseStatus, StatusEvent};
use llm_worker::llm_client::{ClientError, LlmClient, Request};
use memory::WorkspaceLayout;
use memory::extract::{ExtractedPayload, write_staging};
use memory::schema::SourceRef;
use pod_store::{CombinedStore, FsPodStore};
use session_store::FsStore;
type TestStore = CombinedStore<FsStore, FsPodStore>;
use tokio::sync::broadcast;
use pod::{Event, Pod};
#[derive(Clone)]
struct MockClient {
responses: Arc<Vec<Vec<LlmEvent>>>,
call_count: Arc<AtomicUsize>,
}
impl MockClient {
fn new(responses: Vec<Vec<LlmEvent>>) -> Self {
Self {
responses: Arc::new(responses),
call_count: Arc::new(AtomicUsize::new(0)),
}
}
}
#[async_trait]
impl LlmClient for MockClient {
fn clone_boxed(&self) -> Box<dyn LlmClient> {
Box::new(self.clone())
}
async fn stream(
&self,
_request: Request,
) -> Result<Pin<Box<dyn Stream<Item = Result<LlmEvent, ClientError>> + Send>>, ClientError>
{
let count = self.call_count.fetch_add(1, Ordering::SeqCst);
if count >= self.responses.len() {
return Err(ClientError::Config("mock client exhausted".into()));
}
let events = self.responses[count].clone();
let stream = futures::stream::iter(events.into_iter().map(Ok));
Ok(Box::pin(stream))
}
}
fn done(text: &str) -> Vec<LlmEvent> {
vec![
LlmEvent::text_block_start(0),
LlmEvent::text_delta(0, text),
LlmEvent::text_block_stop(0, None),
LlmEvent::Status(StatusEvent {
status: ResponseStatus::Completed,
}),
]
}
const NO_MEMORY_TOML: &str = r#"
[pod]
name = "test-pod"
[model]
scheme = "anthropic"
model_id = "test-model"
[worker]
max_tokens = 100
[[scope.allow]]
target = "./"
permission = "write"
"#;
const MEMORY_NO_THRESHOLDS_TOML: &str = r#"
[pod]
name = "test-pod"
[model]
scheme = "anthropic"
model_id = "test-model"
[worker]
max_tokens = 100
[memory]
[[scope.allow]]
target = "./"
permission = "write"
"#;
const FILES_THRESHOLD_TOML: &str = r#"
[pod]
name = "test-pod"
[model]
scheme = "anthropic"
model_id = "test-model"
[worker]
max_tokens = 100
[memory]
consolidation_threshold_files = 2
[[scope.allow]]
target = "./"
permission = "write"
"#;
const ZERO_THRESHOLDS_TOML: &str = r#"
[pod]
name = "test-pod"
[model]
scheme = "anthropic"
model_id = "test-model"
[worker]
max_tokens = 100
[memory]
consolidation_threshold_files = 0
consolidation_threshold_bytes = 0
[[scope.allow]]
target = "./"
permission = "write"
"#;
async fn make_pod_with(
manifest_toml: &str,
pwd: std::path::PathBuf,
client: MockClient,
) -> Pod<MockClient, TestStore> {
let manifest = pod::PodManifest::from_toml(manifest_toml).unwrap();
let store_tmp = tempfile::tempdir().unwrap();
let store = CombinedStore::new(
FsStore::new(store_tmp.path()).unwrap(),
FsPodStore::new(store_tmp.path().join("pods")).unwrap(),
);
std::mem::forget(store_tmp);
let scope = pod::Scope::writable(&pwd).unwrap();
let worker = Worker::new(client);
Pod::new(manifest, worker, store, pwd, scope).await.unwrap()
}
fn write_n_staging(layout: &WorkspaceLayout, n: usize) -> Vec<uuid::Uuid> {
let mut ids = Vec::new();
for i in 0..n {
let (id, _) = write_staging(
layout,
SourceRef {
segment_id: format!("s-{i}"),
range: [i as u64, i as u64],
},
ExtractedPayload::default(),
)
.unwrap();
ids.push(id);
}
ids
}
fn attach_event_receiver(pod: &mut Pod<MockClient, TestStore>) -> broadcast::Receiver<Event> {
let (tx, rx) = broadcast::channel(16);
pod.attach_event_tx(tx);
rx
}
fn collect_memory_worker_reasons(rx: &mut broadcast::Receiver<Event>) -> Vec<String> {
let mut reasons = Vec::new();
loop {
match rx.try_recv() {
Ok(Event::MemoryWorker(event)) => reasons.push(event.reason),
Ok(_) => {}
Err(broadcast::error::TryRecvError::Empty) => break,
Err(err) => panic!("unexpected broadcast receive error: {err}"),
}
}
reasons
}
fn read_audit_jsonl(layout: &WorkspaceLayout) -> Vec<serde_json::Value> {
let text = std::fs::read_to_string(layout.audit_current_log_path()).unwrap();
text.lines()
.map(|line| serde_json::from_str::<serde_json::Value>(line).unwrap())
.collect()
}
#[tokio::test]
async fn no_memory_section_is_a_noop() {
let pwd = tempfile::tempdir().unwrap();
let client = MockClient::new(vec![]);
let mut pod = make_pod_with(NO_MEMORY_TOML, pwd.path().to_path_buf(), client).await;
pod.try_post_run_consolidate()
.await
.expect("missing memory section must skip cleanly");
}
#[tokio::test]
async fn no_thresholds_is_a_noop() {
let pwd = tempfile::tempdir().unwrap();
let layout = WorkspaceLayout::new(pwd.path().to_path_buf());
write_n_staging(&layout, 5);
let client = MockClient::new(vec![]);
let mut pod = make_pod_with(MEMORY_NO_THRESHOLDS_TOML, pwd.path().to_path_buf(), client).await;
pod.try_post_run_consolidate()
.await
.expect("consolidation disabled when both thresholds are None");
// No staging entries removed.
assert_eq!(memory::consolidate::list_staging_entries(&layout).len(), 5);
}
#[tokio::test]
async fn zero_thresholds_treated_as_disabled() {
// Without the `Some(0) → None` collapse, `total_files >= 0` and
// `total_bytes >= 0` would always evaluate true and consolidation would
// fire on every post-run with any staging activity.
let pwd = tempfile::tempdir().unwrap();
let layout = WorkspaceLayout::new(pwd.path().to_path_buf());
write_n_staging(&layout, 5);
let client = MockClient::new(vec![]);
let mut pod = make_pod_with(ZERO_THRESHOLDS_TOML, pwd.path().to_path_buf(), client).await;
pod.try_post_run_consolidate()
.await
.expect("zero thresholds must collapse to disabled, not fire on every staging entry");
assert_eq!(
memory::consolidate::list_staging_entries(&layout).len(),
5,
"staging must be untouched when both thresholds are zero"
);
let lock_path = layout.staging_dir().join(".consolidation.lock");
assert!(!lock_path.exists(), "no lock should be acquired");
}
#[tokio::test]
async fn empty_staging_skips() {
let pwd = tempfile::tempdir().unwrap();
let client = MockClient::new(vec![]);
let mut pod = make_pod_with(FILES_THRESHOLD_TOML, pwd.path().to_path_buf(), client).await;
pod.try_post_run_consolidate().await.unwrap();
// No mock calls expected.
}
#[tokio::test]
async fn empty_staging_skip_is_audit_only() {
let pwd = tempfile::tempdir().unwrap();
let client = MockClient::new(vec![]);
let mut pod = make_pod_with(FILES_THRESHOLD_TOML, pwd.path().to_path_buf(), client).await;
let mut rx = attach_event_receiver(&mut pod);
pod.try_post_run_consolidate().await.unwrap();
assert!(collect_memory_worker_reasons(&mut rx).is_empty());
}
#[tokio::test]
async fn invalid_only_staging_is_distinct_from_no_staging() {
let pwd = tempfile::tempdir().unwrap();
let layout = WorkspaceLayout::new(pwd.path().to_path_buf());
std::fs::create_dir_all(layout.staging_dir()).unwrap();
let invalid_id = uuid::Uuid::now_v7();
let invalid_path = layout.staging_dir().join(format!("{invalid_id}.json"));
std::fs::write(
&invalid_path,
serde_json::json!({
"source": {
"session_id": "legacy-session",
"range": [0, 1]
},
"requests": []
})
.to_string(),
)
.unwrap();
let client = MockClient::new(vec![]);
let mut pod = make_pod_with(FILES_THRESHOLD_TOML, pwd.path().to_path_buf(), client).await;
let mut rx = attach_event_receiver(&mut pod);
pod.try_post_run_consolidate().await.unwrap();
assert!(invalid_path.exists(), "invalid staging is not auto-deleted");
let reasons = collect_memory_worker_reasons(&mut rx);
assert_eq!(reasons, vec!["no_valid_staging_entries invalid=1"]);
let audit = read_audit_jsonl(&layout);
let last = audit.last().unwrap();
assert_eq!(last["reason"], "no_valid_staging_entries invalid=1");
assert_eq!(last["consolidation"]["staging_count"], 0);
assert_eq!(last["consolidation"]["invalid_staging_count"], 1);
}
#[tokio::test]
async fn below_threshold_skip_is_audit_only() {
let pwd = tempfile::tempdir().unwrap();
let layout = WorkspaceLayout::new(pwd.path().to_path_buf());
write_n_staging(&layout, 1); // threshold is 2
let client = MockClient::new(vec![]);
let mut pod = make_pod_with(FILES_THRESHOLD_TOML, pwd.path().to_path_buf(), client).await;
let mut rx = attach_event_receiver(&mut pod);
pod.try_post_run_consolidate().await.unwrap();
assert!(collect_memory_worker_reasons(&mut rx).is_empty());
let audit = read_audit_jsonl(&layout);
let reason = audit.last().unwrap()["reason"]
.as_str()
.expect("audit reason must be a string");
assert!(reason.starts_with("threshold_not_reached "));
}
#[tokio::test]
async fn completed_event_survives_terminal_empty_drain_skip() {
let pwd = tempfile::tempdir().unwrap();
let layout = WorkspaceLayout::new(pwd.path().to_path_buf());
write_n_staging(&layout, 2); // threshold is 2 — fires.
let client = MockClient::new(vec![done("ok")]);
let mut pod = make_pod_with(FILES_THRESHOLD_TOML, pwd.path().to_path_buf(), client).await;
let mut rx = attach_event_receiver(&mut pod);
pod.try_post_run_consolidate().await.unwrap();
let reasons = collect_memory_worker_reasons(&mut rx);
assert_eq!(reasons.len(), 2);
assert!(reasons[0].starts_with("staging_threshold_reached files=2 bytes="));
assert_eq!(reasons[1], "completed_no_record_changes");
let audit = read_audit_jsonl(&layout);
assert_eq!(audit.last().unwrap()["reason"], "no_staging_entries");
}
#[tokio::test]
async fn below_threshold_skips_and_does_not_take_lock() {
let pwd = tempfile::tempdir().unwrap();
let layout = WorkspaceLayout::new(pwd.path().to_path_buf());
write_n_staging(&layout, 1); // threshold is 2
let client = MockClient::new(vec![]);
let mut pod = make_pod_with(FILES_THRESHOLD_TOML, pwd.path().to_path_buf(), client).await;
pod.try_post_run_consolidate().await.unwrap();
// Staging untouched.
assert_eq!(memory::consolidate::list_staging_entries(&layout).len(), 1);
// Lock file must not exist.
let lock_path = layout.staging_dir().join(".consolidation.lock");
assert!(!lock_path.exists(), "lock file should not be created");
}
#[tokio::test]
async fn fires_on_threshold_and_cleans_up_consumed_entries() {
let pwd = tempfile::tempdir().unwrap();
let layout = WorkspaceLayout::new(pwd.path().to_path_buf());
write_n_staging(&layout, 2); // threshold is 2 — fires.
// Sub-worker is given a single text-only response. The consolidation prompt
// tells it to call memory tools; the mock skips those, but `Worker::run`
// returns Ok regardless once the LLM closes with a final text.
let client = MockClient::new(vec![done("ok")]);
let mut pod = make_pod_with(FILES_THRESHOLD_TOML, pwd.path().to_path_buf(), client).await;
pod.try_post_run_consolidate().await.unwrap();
// Consumed entries removed.
assert!(
memory::consolidate::list_staging_entries(&layout).is_empty(),
"consumed staging entries must be cleaned up"
);
// Lock removed too.
let lock_path = layout.staging_dir().join(".consolidation.lock");
assert!(!lock_path.exists(), "lock file must be removed on success");
}
#[tokio::test]
async fn in_flight_guard_skips_reentry_without_clearing() {
use std::sync::atomic::Ordering;
let pwd = tempfile::tempdir().unwrap();
let layout = WorkspaceLayout::new(pwd.path().to_path_buf());
write_n_staging(&layout, 2);
let client = MockClient::new(vec![]);
let mut pod = make_pod_with(
FILES_THRESHOLD_TOML,
pwd.path().to_path_buf(),
client.clone(),
)
.await;
// Pre-set the in-flight flag as if another concurrent caller had
// entered run_consolidate_once. The CAS at the top of
// try_post_run_consolidate must take the early return without
// touching staging or the LLM, and must leave the flag intact for
// the holder to clear.
let in_flight = pod.consolidation_in_flight_handle();
in_flight.store(true, Ordering::Release);
pod.try_post_run_consolidate().await.unwrap();
assert!(
in_flight.load(Ordering::Acquire),
"reentry skip must not clear the in-flight flag — that's the holder's job"
);
assert_eq!(
memory::consolidate::list_staging_entries(&layout).len(),
2,
"staging must remain untouched on reentry skip"
);
assert_eq!(
client.call_count.load(Ordering::SeqCst),
0,
"no LLM calls should fire on reentry skip"
);
// Sanity: when the flag is cleared, the same pod fires normally and
// resets the flag itself (i.e. it isn't accidentally sticky).
in_flight.store(false, Ordering::Release);
let client2 = MockClient::new(vec![done("ok")]);
let mut pod2 = make_pod_with(FILES_THRESHOLD_TOML, pwd.path().to_path_buf(), client2).await;
pod2.try_post_run_consolidate().await.unwrap();
assert!(
!pod2
.consolidation_in_flight_handle()
.load(Ordering::Acquire),
"in-flight flag must be cleared after a normal run"
);
}
#[tokio::test]
async fn coalesce_loop_terminates_with_one_iteration_when_snapshot_drains_staging() {
use std::sync::atomic::Ordering;
// Coalesce semantics from `docs/plan/memory.md` §並走防止: a single
// run consumes the snapshot taken at acquire time; the loop
// re-evaluates against any post-snapshot extract additions. With no
// concurrent additions, the second iteration sees an empty staging
// and bails out — exercised here by counting LLM calls.
let pwd = tempfile::tempdir().unwrap();
let layout = WorkspaceLayout::new(pwd.path().to_path_buf());
write_n_staging(&layout, 4);
// Provide just one mock response. If the loop wrongly re-enters
// run_consolidate_once after Completed, the second sub-worker run
// would exhaust the mock and surface as an error.
let client = MockClient::new(vec![done("ok")]);
let mut pod = make_pod_with(
FILES_THRESHOLD_TOML,
pwd.path().to_path_buf(),
client.clone(),
)
.await;
pod.try_post_run_consolidate().await.unwrap();
assert_eq!(
client.call_count.load(Ordering::SeqCst),
1,
"Coalesce must terminate once the staging snapshot is drained — got an extra LLM call"
);
assert!(
memory::consolidate::list_staging_entries(&layout).is_empty(),
"staging must be empty after the single iteration"
);
}
#[tokio::test]
async fn live_lock_held_by_other_pod_skips() {
let pwd = tempfile::tempdir().unwrap();
let layout = WorkspaceLayout::new(pwd.path().to_path_buf());
write_n_staging(&layout, 3);
// Pre-acquire lock with this test's PID — definitely alive — and
// *don't* release it. The consolidation path must skip without error.
let _live_lock = memory::consolidate::StagingLock::acquire(
&layout,
std::process::id(),
"other-pod",
Vec::new(),
)
.unwrap();
let client = MockClient::new(vec![]);
let mut pod = make_pod_with(FILES_THRESHOLD_TOML, pwd.path().to_path_buf(), client).await;
pod.try_post_run_consolidate()
.await
.expect("InUse lock must surface as graceful skip");
// Staging untouched: lock holder owns the snapshot, not us.
assert_eq!(memory::consolidate::list_staging_entries(&layout).len(), 3);
}