401 lines
12 KiB
Rust
401 lines
12 KiB
Rust
//! consolidation (memory.consolidation) post-run trigger.
|
|
//!
|
|
//! Covers the gating, lock and cleanup behaviour without exercising the
|
|
//! full sub-worker tool loop:
|
|
//!
|
|
//! - no `[memory]` section → no-op
|
|
//! - `[memory]` present but no thresholds → no-op
|
|
//! - staging empty → skip
|
|
//! - staging below thresholds → skip + lock not acquired
|
|
//! - staging above threshold → sub-worker runs, consumed entries removed
|
|
//! - existing live lock → skip without error
|
|
//!
|
|
//! The sub-worker is fed a no-op LLM response (plain text) so it returns
|
|
//! immediately. The post-run path then exercises lock acquisition,
|
|
//! cleanup, and the empty-payload fast path.
|
|
|
|
use std::pin::Pin;
|
|
use std::sync::Arc;
|
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
|
|
|
use async_trait::async_trait;
|
|
use futures::Stream;
|
|
use llm_worker::Worker;
|
|
use llm_worker::llm_client::event::{Event as LlmEvent, ResponseStatus, StatusEvent};
|
|
use llm_worker::llm_client::{ClientError, LlmClient, Request};
|
|
use memory::WorkspaceLayout;
|
|
use memory::extract::{ExtractedPayload, write_staging};
|
|
use memory::schema::SourceRef;
|
|
use session_store::FsStore;
|
|
|
|
use pod::Pod;
|
|
|
|
#[derive(Clone)]
|
|
struct MockClient {
|
|
responses: Arc<Vec<Vec<LlmEvent>>>,
|
|
call_count: Arc<AtomicUsize>,
|
|
}
|
|
|
|
impl MockClient {
|
|
fn new(responses: Vec<Vec<LlmEvent>>) -> Self {
|
|
Self {
|
|
responses: Arc::new(responses),
|
|
call_count: Arc::new(AtomicUsize::new(0)),
|
|
}
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl LlmClient for MockClient {
|
|
fn clone_boxed(&self) -> Box<dyn LlmClient> {
|
|
Box::new(self.clone())
|
|
}
|
|
|
|
async fn stream(
|
|
&self,
|
|
_request: Request,
|
|
) -> Result<Pin<Box<dyn Stream<Item = Result<LlmEvent, ClientError>> + Send>>, ClientError>
|
|
{
|
|
let count = self.call_count.fetch_add(1, Ordering::SeqCst);
|
|
if count >= self.responses.len() {
|
|
return Err(ClientError::Config("mock client exhausted".into()));
|
|
}
|
|
let events = self.responses[count].clone();
|
|
let stream = futures::stream::iter(events.into_iter().map(Ok));
|
|
Ok(Box::pin(stream))
|
|
}
|
|
}
|
|
|
|
fn done(text: &str) -> Vec<LlmEvent> {
|
|
vec![
|
|
LlmEvent::text_block_start(0),
|
|
LlmEvent::text_delta(0, text),
|
|
LlmEvent::text_block_stop(0, None),
|
|
LlmEvent::Status(StatusEvent {
|
|
status: ResponseStatus::Completed,
|
|
}),
|
|
]
|
|
}
|
|
|
|
const NO_MEMORY_TOML: &str = r#"
|
|
[pod]
|
|
name = "test-pod"
|
|
|
|
[model]
|
|
scheme = "anthropic"
|
|
model_id = "test-model"
|
|
|
|
[worker]
|
|
max_tokens = 100
|
|
|
|
[[scope.allow]]
|
|
target = "./"
|
|
permission = "write"
|
|
"#;
|
|
|
|
const MEMORY_NO_THRESHOLDS_TOML: &str = r#"
|
|
[pod]
|
|
name = "test-pod"
|
|
|
|
[model]
|
|
scheme = "anthropic"
|
|
model_id = "test-model"
|
|
|
|
[worker]
|
|
max_tokens = 100
|
|
|
|
[memory]
|
|
|
|
[[scope.allow]]
|
|
target = "./"
|
|
permission = "write"
|
|
"#;
|
|
|
|
const FILES_THRESHOLD_TOML: &str = r#"
|
|
[pod]
|
|
name = "test-pod"
|
|
|
|
[model]
|
|
scheme = "anthropic"
|
|
model_id = "test-model"
|
|
|
|
[worker]
|
|
max_tokens = 100
|
|
|
|
[memory]
|
|
consolidation_threshold_files = 2
|
|
|
|
[[scope.allow]]
|
|
target = "./"
|
|
permission = "write"
|
|
"#;
|
|
|
|
const ZERO_THRESHOLDS_TOML: &str = r#"
|
|
[pod]
|
|
name = "test-pod"
|
|
|
|
[model]
|
|
scheme = "anthropic"
|
|
model_id = "test-model"
|
|
|
|
[worker]
|
|
max_tokens = 100
|
|
|
|
[memory]
|
|
consolidation_threshold_files = 0
|
|
consolidation_threshold_bytes = 0
|
|
|
|
[[scope.allow]]
|
|
target = "./"
|
|
permission = "write"
|
|
"#;
|
|
|
|
async fn make_pod_with(
|
|
manifest_toml: &str,
|
|
pwd: std::path::PathBuf,
|
|
client: MockClient,
|
|
) -> Pod<MockClient, FsStore> {
|
|
let manifest = pod::PodManifest::from_toml(manifest_toml).unwrap();
|
|
|
|
let store_tmp = tempfile::tempdir().unwrap();
|
|
let store = FsStore::new(store_tmp.path()).await.unwrap();
|
|
std::mem::forget(store_tmp);
|
|
|
|
let scope = pod::Scope::writable(&pwd).unwrap();
|
|
let worker = Worker::new(client);
|
|
Pod::new(manifest, worker, store, pwd, scope).await.unwrap()
|
|
}
|
|
|
|
fn write_n_staging(layout: &WorkspaceLayout, n: usize) -> Vec<uuid::Uuid> {
|
|
let mut ids = Vec::new();
|
|
for i in 0..n {
|
|
let (id, _) = write_staging(
|
|
layout,
|
|
SourceRef {
|
|
session_id: format!("s-{i}"),
|
|
range: [i as u64, i as u64],
|
|
},
|
|
ExtractedPayload::default(),
|
|
)
|
|
.unwrap();
|
|
ids.push(id);
|
|
}
|
|
ids
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn no_memory_section_is_a_noop() {
|
|
let pwd = tempfile::tempdir().unwrap();
|
|
let client = MockClient::new(vec![]);
|
|
let mut pod = make_pod_with(NO_MEMORY_TOML, pwd.path().to_path_buf(), client).await;
|
|
pod.try_post_run_consolidate()
|
|
.await
|
|
.expect("missing memory section must skip cleanly");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn no_thresholds_is_a_noop() {
|
|
let pwd = tempfile::tempdir().unwrap();
|
|
let layout = WorkspaceLayout::new(pwd.path().to_path_buf());
|
|
write_n_staging(&layout, 5);
|
|
|
|
let client = MockClient::new(vec![]);
|
|
let mut pod = make_pod_with(MEMORY_NO_THRESHOLDS_TOML, pwd.path().to_path_buf(), client).await;
|
|
pod.try_post_run_consolidate()
|
|
.await
|
|
.expect("consolidation disabled when both thresholds are None");
|
|
|
|
// No staging entries removed.
|
|
assert_eq!(memory::consolidate::list_staging_entries(&layout).len(), 5);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn zero_thresholds_treated_as_disabled() {
|
|
// Without the `Some(0) → None` collapse, `total_files >= 0` and
|
|
// `total_bytes >= 0` would always evaluate true and consolidation would
|
|
// fire on every post-run with any staging activity.
|
|
let pwd = tempfile::tempdir().unwrap();
|
|
let layout = WorkspaceLayout::new(pwd.path().to_path_buf());
|
|
write_n_staging(&layout, 5);
|
|
|
|
let client = MockClient::new(vec![]);
|
|
let mut pod = make_pod_with(ZERO_THRESHOLDS_TOML, pwd.path().to_path_buf(), client).await;
|
|
pod.try_post_run_consolidate()
|
|
.await
|
|
.expect("zero thresholds must collapse to disabled, not fire on every staging entry");
|
|
|
|
assert_eq!(
|
|
memory::consolidate::list_staging_entries(&layout).len(),
|
|
5,
|
|
"staging must be untouched when both thresholds are zero"
|
|
);
|
|
let lock_path = layout.staging_dir().join(".consolidation.lock");
|
|
assert!(!lock_path.exists(), "no lock should be acquired");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn empty_staging_skips() {
|
|
let pwd = tempfile::tempdir().unwrap();
|
|
let client = MockClient::new(vec![]);
|
|
let mut pod = make_pod_with(FILES_THRESHOLD_TOML, pwd.path().to_path_buf(), client).await;
|
|
pod.try_post_run_consolidate().await.unwrap();
|
|
// No mock calls expected.
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn below_threshold_skips_and_does_not_take_lock() {
|
|
let pwd = tempfile::tempdir().unwrap();
|
|
let layout = WorkspaceLayout::new(pwd.path().to_path_buf());
|
|
write_n_staging(&layout, 1); // threshold is 2
|
|
|
|
let client = MockClient::new(vec![]);
|
|
let mut pod = make_pod_with(FILES_THRESHOLD_TOML, pwd.path().to_path_buf(), client).await;
|
|
pod.try_post_run_consolidate().await.unwrap();
|
|
|
|
// Staging untouched.
|
|
assert_eq!(memory::consolidate::list_staging_entries(&layout).len(), 1);
|
|
// Lock file must not exist.
|
|
let lock_path = layout.staging_dir().join(".consolidation.lock");
|
|
assert!(!lock_path.exists(), "lock file should not be created");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn fires_on_threshold_and_cleans_up_consumed_entries() {
|
|
let pwd = tempfile::tempdir().unwrap();
|
|
let layout = WorkspaceLayout::new(pwd.path().to_path_buf());
|
|
write_n_staging(&layout, 2); // threshold is 2 — fires.
|
|
|
|
// Sub-worker is given a single text-only response. The consolidation prompt
|
|
// tells it to call memory tools; the mock skips those, but `Worker::run`
|
|
// returns Ok regardless once the LLM closes with a final text.
|
|
let client = MockClient::new(vec![done("ok")]);
|
|
let mut pod = make_pod_with(FILES_THRESHOLD_TOML, pwd.path().to_path_buf(), client).await;
|
|
pod.try_post_run_consolidate().await.unwrap();
|
|
|
|
// Consumed entries removed.
|
|
assert!(
|
|
memory::consolidate::list_staging_entries(&layout).is_empty(),
|
|
"consumed staging entries must be cleaned up"
|
|
);
|
|
// Lock removed too.
|
|
let lock_path = layout.staging_dir().join(".consolidation.lock");
|
|
assert!(!lock_path.exists(), "lock file must be removed on success");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn in_flight_guard_skips_reentry_without_clearing() {
|
|
use std::sync::atomic::Ordering;
|
|
|
|
let pwd = tempfile::tempdir().unwrap();
|
|
let layout = WorkspaceLayout::new(pwd.path().to_path_buf());
|
|
write_n_staging(&layout, 2);
|
|
|
|
let client = MockClient::new(vec![]);
|
|
let mut pod = make_pod_with(
|
|
FILES_THRESHOLD_TOML,
|
|
pwd.path().to_path_buf(),
|
|
client.clone(),
|
|
)
|
|
.await;
|
|
|
|
// Pre-set the in-flight flag as if another concurrent caller had
|
|
// entered run_consolidate_once. The CAS at the top of
|
|
// try_post_run_consolidate must take the early return without
|
|
// touching staging or the LLM, and must leave the flag intact for
|
|
// the holder to clear.
|
|
let in_flight = pod.consolidation_in_flight_handle();
|
|
in_flight.store(true, Ordering::Release);
|
|
|
|
pod.try_post_run_consolidate().await.unwrap();
|
|
|
|
assert!(
|
|
in_flight.load(Ordering::Acquire),
|
|
"reentry skip must not clear the in-flight flag — that's the holder's job"
|
|
);
|
|
assert_eq!(
|
|
memory::consolidate::list_staging_entries(&layout).len(),
|
|
2,
|
|
"staging must remain untouched on reentry skip"
|
|
);
|
|
assert_eq!(
|
|
client.call_count.load(Ordering::SeqCst),
|
|
0,
|
|
"no LLM calls should fire on reentry skip"
|
|
);
|
|
|
|
// Sanity: when the flag is cleared, the same pod fires normally and
|
|
// resets the flag itself (i.e. it isn't accidentally sticky).
|
|
in_flight.store(false, Ordering::Release);
|
|
let client2 = MockClient::new(vec![done("ok")]);
|
|
let mut pod2 = make_pod_with(FILES_THRESHOLD_TOML, pwd.path().to_path_buf(), client2).await;
|
|
pod2.try_post_run_consolidate().await.unwrap();
|
|
assert!(
|
|
!pod2
|
|
.consolidation_in_flight_handle()
|
|
.load(Ordering::Acquire),
|
|
"in-flight flag must be cleared after a normal run"
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn coalesce_loop_terminates_with_one_iteration_when_snapshot_drains_staging() {
|
|
use std::sync::atomic::Ordering;
|
|
|
|
// Coalesce semantics from `docs/plan/memory.md` §並走防止: a single
|
|
// run consumes the snapshot taken at acquire time; the loop
|
|
// re-evaluates against any post-snapshot extract additions. With no
|
|
// concurrent additions, the second iteration sees an empty staging
|
|
// and bails out — exercised here by counting LLM calls.
|
|
let pwd = tempfile::tempdir().unwrap();
|
|
let layout = WorkspaceLayout::new(pwd.path().to_path_buf());
|
|
write_n_staging(&layout, 4);
|
|
|
|
// Provide just one mock response. If the loop wrongly re-enters
|
|
// run_consolidate_once after Completed, the second sub-worker run
|
|
// would exhaust the mock and surface as an error.
|
|
let client = MockClient::new(vec![done("ok")]);
|
|
let mut pod = make_pod_with(
|
|
FILES_THRESHOLD_TOML,
|
|
pwd.path().to_path_buf(),
|
|
client.clone(),
|
|
)
|
|
.await;
|
|
pod.try_post_run_consolidate().await.unwrap();
|
|
|
|
assert_eq!(
|
|
client.call_count.load(Ordering::SeqCst),
|
|
1,
|
|
"Coalesce must terminate once the staging snapshot is drained — got an extra LLM call"
|
|
);
|
|
assert!(
|
|
memory::consolidate::list_staging_entries(&layout).is_empty(),
|
|
"staging must be empty after the single iteration"
|
|
);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn live_lock_held_by_other_pod_skips() {
|
|
let pwd = tempfile::tempdir().unwrap();
|
|
let layout = WorkspaceLayout::new(pwd.path().to_path_buf());
|
|
write_n_staging(&layout, 3);
|
|
|
|
// Pre-acquire lock with this test's PID — definitely alive — and
|
|
// *don't* release it. The consolidation path must skip without error.
|
|
let _live_lock = memory::consolidate::StagingLock::acquire(
|
|
&layout,
|
|
std::process::id(),
|
|
"other-pod",
|
|
Vec::new(),
|
|
)
|
|
.unwrap();
|
|
|
|
let client = MockClient::new(vec![]);
|
|
let mut pod = make_pod_with(FILES_THRESHOLD_TOML, pwd.path().to_path_buf(), client).await;
|
|
pod.try_post_run_consolidate()
|
|
.await
|
|
.expect("InUse lock must surface as graceful skip");
|
|
|
|
// Staging untouched: lock holder owns the snapshot, not us.
|
|
assert_eq!(memory::consolidate::list_staging_entries(&layout).len(), 3);
|
|
}
|