fix: suppress memory idle skip notices
This commit is contained in:
parent
5b3b16c4b2
commit
5872a53ec1
|
|
@ -21,6 +21,10 @@ use uuid::Uuid;
|
||||||
|
|
||||||
use crate::workspace::WorkspaceLayout;
|
use crate::workspace::WorkspaceLayout;
|
||||||
|
|
||||||
|
fn is_zero_usize(value: &usize) -> bool {
|
||||||
|
*value == 0
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
#[serde(rename_all = "snake_case")]
|
#[serde(rename_all = "snake_case")]
|
||||||
pub enum AuditWorker {
|
pub enum AuditWorker {
|
||||||
|
|
@ -141,6 +145,8 @@ pub struct ExtractAudit {
|
||||||
pub struct ConsolidationAudit {
|
pub struct ConsolidationAudit {
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub staging_count: usize,
|
pub staging_count: usize,
|
||||||
|
#[serde(default, skip_serializing_if = "is_zero_usize")]
|
||||||
|
pub invalid_staging_count: usize,
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub staging_bytes: u64,
|
pub staging_bytes: u64,
|
||||||
#[serde(default, skip_serializing_if = "Vec::is_empty")]
|
#[serde(default, skip_serializing_if = "Vec::is_empty")]
|
||||||
|
|
|
||||||
|
|
@ -26,5 +26,7 @@ pub use input::{
|
||||||
render_tidy_hints,
|
render_tidy_hints,
|
||||||
};
|
};
|
||||||
pub use lock::{LockError, LockRecord, StagingLock};
|
pub use lock::{LockError, LockRecord, StagingLock};
|
||||||
pub use staging::{StagingEntry, list_staging_entries};
|
pub use staging::{
|
||||||
|
StagingEntriesSnapshot, StagingEntry, list_staging_entries, list_staging_entries_snapshot,
|
||||||
|
};
|
||||||
pub use tidy::{TidyHints, collect_tidy_hints};
|
pub use tidy::{TidyHints, collect_tidy_hints};
|
||||||
|
|
|
||||||
|
|
@ -26,34 +26,58 @@ pub struct StagingEntry {
|
||||||
pub bytes: u64,
|
pub bytes: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// staging directory の検査結果。`entries` は current schema として読めた
|
||||||
|
/// staging のみで、`invalid_count` は `.json` だが staging として採用できなかった
|
||||||
|
/// ファイル数。
|
||||||
|
#[derive(Debug, Clone, Default)]
|
||||||
|
pub struct StagingEntriesSnapshot {
|
||||||
|
pub entries: Vec<StagingEntry>,
|
||||||
|
pub invalid_count: usize,
|
||||||
|
}
|
||||||
|
|
||||||
/// `<staging_dir>/*.json` を読んで UUIDv7 順に並べた [`StagingEntry`]
|
/// `<staging_dir>/*.json` を読んで UUIDv7 順に並べた [`StagingEntry`]
|
||||||
/// 配列を返す。staging_dir が存在しなければ空配列。読めないファイルや
|
/// 配列を返す。staging_dir が存在しなければ空配列。読めないファイルや
|
||||||
/// JSON parse 失敗は `tracing::warn!` してスキップ(壊れた個別ファイルが
|
/// JSON parse 失敗は `tracing::warn!` してスキップ(壊れた個別ファイルが
|
||||||
/// consolidation 全体を止めないように)。
|
/// consolidation 全体を止めないように)。
|
||||||
pub fn list_staging_entries(layout: &WorkspaceLayout) -> Vec<StagingEntry> {
|
pub fn list_staging_entries(layout: &WorkspaceLayout) -> Vec<StagingEntry> {
|
||||||
|
list_staging_entries_snapshot(layout).entries
|
||||||
|
}
|
||||||
|
|
||||||
|
/// `<staging_dir>/*.json` を読んで valid staging と invalid staging 件数を返す。
|
||||||
|
/// invalid は自動 migration / 削除 / archive せず、観測可能にするための件数だけを
|
||||||
|
/// 呼び出し側へ渡す。
|
||||||
|
pub fn list_staging_entries_snapshot(layout: &WorkspaceLayout) -> StagingEntriesSnapshot {
|
||||||
let dir = layout.staging_dir();
|
let dir = layout.staging_dir();
|
||||||
let entries = match std::fs::read_dir(&dir) {
|
let entries = match std::fs::read_dir(&dir) {
|
||||||
Ok(it) => it,
|
Ok(it) => it,
|
||||||
Err(_) => return Vec::new(),
|
Err(_) => return StagingEntriesSnapshot::default(),
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut out: Vec<StagingEntry> = Vec::new();
|
let mut out: Vec<StagingEntry> = Vec::new();
|
||||||
|
let mut invalid_count = 0;
|
||||||
for entry in entries.flatten() {
|
for entry in entries.flatten() {
|
||||||
let path = entry.path();
|
let path = entry.path();
|
||||||
if !path.is_file() {
|
if !path.is_file() {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
let stem = match path.file_stem().and_then(|s| s.to_str()) {
|
|
||||||
Some(s) => s,
|
|
||||||
None => continue,
|
|
||||||
};
|
|
||||||
let ext = path.extension().and_then(|s| s.to_str()).unwrap_or("");
|
let ext = path.extension().and_then(|s| s.to_str()).unwrap_or("");
|
||||||
if ext != "json" {
|
if ext != "json" {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
let stem = match path.file_stem().and_then(|s| s.to_str()) {
|
||||||
|
Some(s) => s,
|
||||||
|
None => {
|
||||||
|
invalid_count += 1;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
let id = match Uuid::parse_str(stem) {
|
let id = match Uuid::parse_str(stem) {
|
||||||
Ok(u) => u,
|
Ok(u) => u,
|
||||||
Err(_) => continue,
|
Err(e) => {
|
||||||
|
invalid_count += 1;
|
||||||
|
tracing::warn!(path = %path.display(), error = %e, "failed to parse staging entry id");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
};
|
};
|
||||||
let bytes = match std::fs::metadata(&path) {
|
let bytes = match std::fs::metadata(&path) {
|
||||||
Ok(m) => m.len(),
|
Ok(m) => m.len(),
|
||||||
|
|
@ -62,6 +86,7 @@ pub fn list_staging_entries(layout: &WorkspaceLayout) -> Vec<StagingEntry> {
|
||||||
let raw = match std::fs::read_to_string(&path) {
|
let raw = match std::fs::read_to_string(&path) {
|
||||||
Ok(s) => s,
|
Ok(s) => s,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
invalid_count += 1;
|
||||||
tracing::warn!(path = %path.display(), error = %e, "failed to read staging entry");
|
tracing::warn!(path = %path.display(), error = %e, "failed to read staging entry");
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
@ -69,6 +94,7 @@ pub fn list_staging_entries(layout: &WorkspaceLayout) -> Vec<StagingEntry> {
|
||||||
let record = match serde_json::from_str::<StagingRecord>(&raw) {
|
let record = match serde_json::from_str::<StagingRecord>(&raw) {
|
||||||
Ok(r) => r,
|
Ok(r) => r,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
invalid_count += 1;
|
||||||
tracing::warn!(path = %path.display(), error = %e, "failed to parse staging entry");
|
tracing::warn!(path = %path.display(), error = %e, "failed to parse staging entry");
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
@ -81,7 +107,10 @@ pub fn list_staging_entries(layout: &WorkspaceLayout) -> Vec<StagingEntry> {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
out.sort_by_key(|e| e.id);
|
out.sort_by_key(|e| e.id);
|
||||||
out
|
StagingEntriesSnapshot {
|
||||||
|
entries: out,
|
||||||
|
invalid_count,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|
@ -116,17 +145,26 @@ mod tests {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn skips_lock_file_and_garbage() {
|
fn skips_lock_file_and_counts_invalid_json() {
|
||||||
let tmp = tempfile::TempDir::new().unwrap();
|
let tmp = tempfile::TempDir::new().unwrap();
|
||||||
let layout = WorkspaceLayout::new(tmp.path().to_path_buf());
|
let layout = WorkspaceLayout::new(tmp.path().to_path_buf());
|
||||||
let (_id, _) = write_staging(&layout, source("s", [0, 1]), empty_payload()).unwrap();
|
let (_id, _) = write_staging(&layout, source("s", [0, 1]), empty_payload()).unwrap();
|
||||||
|
|
||||||
// Drop a non-UUID json file and a bare lock file alongside.
|
// Drop a non-UUID json file, an unparsable UUID-named json file, and
|
||||||
|
// a bare lock file alongside. Lock files are not `.json`; invalid
|
||||||
|
// `.json` files are surfaced separately instead of being mistaken for
|
||||||
|
// an empty staging directory.
|
||||||
std::fs::write(layout.staging_dir().join("not-a-uuid.json"), "{}").unwrap();
|
std::fs::write(layout.staging_dir().join("not-a-uuid.json"), "{}").unwrap();
|
||||||
|
let bad_id = Uuid::now_v7();
|
||||||
|
std::fs::write(layout.staging_dir().join(format!("{bad_id}.json")), "{").unwrap();
|
||||||
std::fs::write(layout.staging_dir().join(".consolidation.lock"), "{}").unwrap();
|
std::fs::write(layout.staging_dir().join(".consolidation.lock"), "{}").unwrap();
|
||||||
|
|
||||||
let entries = list_staging_entries(&layout);
|
let entries = list_staging_entries(&layout);
|
||||||
assert_eq!(entries.len(), 1);
|
assert_eq!(entries.len(), 1);
|
||||||
|
|
||||||
|
let snapshot = list_staging_entries_snapshot(&layout);
|
||||||
|
assert_eq!(snapshot.entries.len(), 1);
|
||||||
|
assert_eq!(snapshot.invalid_count, 2);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
|
|
||||||
|
|
@ -3098,16 +3098,26 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
||||||
);
|
);
|
||||||
let event_tx = self.event_tx.as_ref();
|
let event_tx = self.event_tx.as_ref();
|
||||||
|
|
||||||
let entries = consolidate::list_staging_entries(&layout);
|
let staging_snapshot = consolidate::list_staging_entries_snapshot(&layout);
|
||||||
|
let invalid_staging_count = staging_snapshot.invalid_count;
|
||||||
|
let entries = staging_snapshot.entries;
|
||||||
if entries.is_empty() {
|
if entries.is_empty() {
|
||||||
|
let reason = if invalid_staging_count == 0 {
|
||||||
|
"no_staging_entries".to_string()
|
||||||
|
} else {
|
||||||
|
format!("no_valid_staging_entries invalid={invalid_staging_count}")
|
||||||
|
};
|
||||||
audit.emit(
|
audit.emit(
|
||||||
&layout,
|
&layout,
|
||||||
event_tx,
|
event_tx,
|
||||||
memory::audit::WorkerLifecycleStatus::Skipped,
|
memory::audit::WorkerLifecycleStatus::Skipped,
|
||||||
"no_staging_entries",
|
reason,
|
||||||
None,
|
None,
|
||||||
None,
|
None,
|
||||||
Some(memory::audit::ConsolidationAudit::default()),
|
Some(memory::audit::ConsolidationAudit {
|
||||||
|
invalid_staging_count,
|
||||||
|
..Default::default()
|
||||||
|
}),
|
||||||
);
|
);
|
||||||
return Ok(ConsolidateDecision::Skipped);
|
return Ok(ConsolidateDecision::Skipped);
|
||||||
}
|
}
|
||||||
|
|
@ -3117,6 +3127,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
||||||
let consumed_ids: Vec<uuid::Uuid> = entries.iter().map(|e| e.id).collect();
|
let consumed_ids: Vec<uuid::Uuid> = entries.iter().map(|e| e.id).collect();
|
||||||
let base_consolidation = memory::audit::ConsolidationAudit {
|
let base_consolidation = memory::audit::ConsolidationAudit {
|
||||||
staging_count: total_files,
|
staging_count: total_files,
|
||||||
|
invalid_staging_count,
|
||||||
staging_bytes: total_bytes,
|
staging_bytes: total_bytes,
|
||||||
consumed_staging_ids: consumed_ids.iter().map(ToString::to_string).collect(),
|
consumed_staging_ids: consumed_ids.iter().map(ToString::to_string).collect(),
|
||||||
operations: memory::audit::OperationCounts::default(),
|
operations: memory::audit::OperationCounts::default(),
|
||||||
|
|
@ -3404,17 +3415,38 @@ impl WorkerAuditBase {
|
||||||
consolidation,
|
consolidation,
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
emit_memory_worker_event(
|
if should_emit_memory_worker_event(self.worker, status, &reason) {
|
||||||
event_tx,
|
emit_memory_worker_event(
|
||||||
self.run_id,
|
event_tx,
|
||||||
self.worker,
|
self.run_id,
|
||||||
status,
|
self.worker,
|
||||||
self.trigger,
|
status,
|
||||||
&reason,
|
self.trigger,
|
||||||
);
|
&reason,
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn should_emit_memory_worker_event(
|
||||||
|
worker: memory::audit::AuditWorker,
|
||||||
|
status: memory::audit::WorkerLifecycleStatus,
|
||||||
|
reason: &str,
|
||||||
|
) -> bool {
|
||||||
|
if worker == memory::audit::AuditWorker::MemoryConsolidation
|
||||||
|
&& status == memory::audit::WorkerLifecycleStatus::Skipped
|
||||||
|
{
|
||||||
|
return !is_idle_consolidation_skip_reason(reason);
|
||||||
|
}
|
||||||
|
true
|
||||||
|
}
|
||||||
|
|
||||||
|
fn is_idle_consolidation_skip_reason(reason: &str) -> bool {
|
||||||
|
reason == "no_staging_entries"
|
||||||
|
|| reason == "consolidation_threshold_disabled"
|
||||||
|
|| reason.starts_with("threshold_not_reached")
|
||||||
|
}
|
||||||
|
|
||||||
fn memory_language(cfg: &manifest::MemoryConfig) -> &str {
|
fn memory_language(cfg: &manifest::MemoryConfig) -> &str {
|
||||||
cfg.language
|
cfg.language
|
||||||
.as_deref()
|
.as_deref()
|
||||||
|
|
@ -4247,6 +4279,45 @@ fn current_pwd() -> Result<PathBuf, PodError> {
|
||||||
.map_err(|source| PodError::InvalidPwd { pwd: cwd, source })
|
.map_err(|source| PodError::InvalidPwd { pwd: cwd, source })
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod memory_worker_event_tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn suppresses_idle_consolidation_skip_worker_events() {
|
||||||
|
assert!(!should_emit_memory_worker_event(
|
||||||
|
memory::audit::AuditWorker::MemoryConsolidation,
|
||||||
|
memory::audit::WorkerLifecycleStatus::Skipped,
|
||||||
|
"no_staging_entries",
|
||||||
|
));
|
||||||
|
assert!(!should_emit_memory_worker_event(
|
||||||
|
memory::audit::AuditWorker::MemoryConsolidation,
|
||||||
|
memory::audit::WorkerLifecycleStatus::Skipped,
|
||||||
|
"threshold_not_reached files=1 bytes=64 min_files=2 min_bytes=1048576",
|
||||||
|
));
|
||||||
|
assert!(!should_emit_memory_worker_event(
|
||||||
|
memory::audit::AuditWorker::MemoryConsolidation,
|
||||||
|
memory::audit::WorkerLifecycleStatus::Skipped,
|
||||||
|
"consolidation_threshold_disabled",
|
||||||
|
));
|
||||||
|
assert!(should_emit_memory_worker_event(
|
||||||
|
memory::audit::AuditWorker::MemoryConsolidation,
|
||||||
|
memory::audit::WorkerLifecycleStatus::Skipped,
|
||||||
|
"no_valid_staging_entries invalid=1",
|
||||||
|
));
|
||||||
|
assert!(should_emit_memory_worker_event(
|
||||||
|
memory::audit::AuditWorker::MemoryConsolidation,
|
||||||
|
memory::audit::WorkerLifecycleStatus::Completed,
|
||||||
|
"completed",
|
||||||
|
));
|
||||||
|
assert!(should_emit_memory_worker_event(
|
||||||
|
memory::audit::AuditWorker::MemoryExtract,
|
||||||
|
memory::audit::WorkerLifecycleStatus::Skipped,
|
||||||
|
"threshold_not_reached files=1",
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod build_summary_prompt_tests {
|
mod build_summary_prompt_tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
|
||||||
|
|
@ -27,8 +27,9 @@ use memory::WorkspaceLayout;
|
||||||
use memory::extract::{ExtractedPayload, write_staging};
|
use memory::extract::{ExtractedPayload, write_staging};
|
||||||
use memory::schema::SourceRef;
|
use memory::schema::SourceRef;
|
||||||
use session_store::FsStore;
|
use session_store::FsStore;
|
||||||
|
use tokio::sync::broadcast;
|
||||||
|
|
||||||
use pod::Pod;
|
use pod::{Event, Pod};
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
struct MockClient {
|
struct MockClient {
|
||||||
|
|
@ -183,6 +184,32 @@ fn write_n_staging(layout: &WorkspaceLayout, n: usize) -> Vec<uuid::Uuid> {
|
||||||
ids
|
ids
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn attach_event_receiver(pod: &mut Pod<MockClient, FsStore>) -> broadcast::Receiver<Event> {
|
||||||
|
let (tx, rx) = broadcast::channel(16);
|
||||||
|
pod.attach_event_tx(tx);
|
||||||
|
rx
|
||||||
|
}
|
||||||
|
|
||||||
|
fn collect_memory_worker_reasons(rx: &mut broadcast::Receiver<Event>) -> Vec<String> {
|
||||||
|
let mut reasons = Vec::new();
|
||||||
|
loop {
|
||||||
|
match rx.try_recv() {
|
||||||
|
Ok(Event::MemoryWorker(event)) => reasons.push(event.reason),
|
||||||
|
Ok(_) => {}
|
||||||
|
Err(broadcast::error::TryRecvError::Empty) => break,
|
||||||
|
Err(err) => panic!("unexpected broadcast receive error: {err}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
reasons
|
||||||
|
}
|
||||||
|
|
||||||
|
fn read_audit_jsonl(layout: &WorkspaceLayout) -> Vec<serde_json::Value> {
|
||||||
|
let text = std::fs::read_to_string(layout.audit_current_log_path()).unwrap();
|
||||||
|
text.lines()
|
||||||
|
.map(|line| serde_json::from_str::<serde_json::Value>(line).unwrap())
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn no_memory_section_is_a_noop() {
|
async fn no_memory_section_is_a_noop() {
|
||||||
let pwd = tempfile::tempdir().unwrap();
|
let pwd = tempfile::tempdir().unwrap();
|
||||||
|
|
@ -242,6 +269,79 @@ async fn empty_staging_skips() {
|
||||||
// No mock calls expected.
|
// No mock calls expected.
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn empty_staging_skip_is_audit_only() {
|
||||||
|
let pwd = tempfile::tempdir().unwrap();
|
||||||
|
let client = MockClient::new(vec![]);
|
||||||
|
let mut pod = make_pod_with(FILES_THRESHOLD_TOML, pwd.path().to_path_buf(), client).await;
|
||||||
|
let mut rx = attach_event_receiver(&mut pod);
|
||||||
|
|
||||||
|
pod.try_post_run_consolidate().await.unwrap();
|
||||||
|
|
||||||
|
assert!(collect_memory_worker_reasons(&mut rx).is_empty());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn invalid_only_staging_is_distinct_from_no_staging() {
|
||||||
|
let pwd = tempfile::tempdir().unwrap();
|
||||||
|
let layout = WorkspaceLayout::new(pwd.path().to_path_buf());
|
||||||
|
std::fs::create_dir_all(layout.staging_dir()).unwrap();
|
||||||
|
let invalid_id = uuid::Uuid::now_v7();
|
||||||
|
let invalid_path = layout.staging_dir().join(format!("{invalid_id}.json"));
|
||||||
|
std::fs::write(&invalid_path, "{").unwrap();
|
||||||
|
|
||||||
|
let client = MockClient::new(vec![]);
|
||||||
|
let mut pod = make_pod_with(FILES_THRESHOLD_TOML, pwd.path().to_path_buf(), client).await;
|
||||||
|
let mut rx = attach_event_receiver(&mut pod);
|
||||||
|
|
||||||
|
pod.try_post_run_consolidate().await.unwrap();
|
||||||
|
|
||||||
|
assert!(invalid_path.exists(), "invalid staging is not auto-deleted");
|
||||||
|
let reasons = collect_memory_worker_reasons(&mut rx);
|
||||||
|
assert_eq!(reasons, vec!["no_valid_staging_entries invalid=1"]);
|
||||||
|
|
||||||
|
let audit = read_audit_jsonl(&layout);
|
||||||
|
let last = audit.last().unwrap();
|
||||||
|
assert_eq!(last["reason"], "no_valid_staging_entries invalid=1");
|
||||||
|
assert_eq!(last["consolidation"]["staging_count"], 0);
|
||||||
|
assert_eq!(last["consolidation"]["invalid_staging_count"], 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn below_threshold_skip_is_audit_only() {
|
||||||
|
let pwd = tempfile::tempdir().unwrap();
|
||||||
|
let layout = WorkspaceLayout::new(pwd.path().to_path_buf());
|
||||||
|
write_n_staging(&layout, 1); // threshold is 2
|
||||||
|
|
||||||
|
let client = MockClient::new(vec![]);
|
||||||
|
let mut pod = make_pod_with(FILES_THRESHOLD_TOML, pwd.path().to_path_buf(), client).await;
|
||||||
|
let mut rx = attach_event_receiver(&mut pod);
|
||||||
|
|
||||||
|
pod.try_post_run_consolidate().await.unwrap();
|
||||||
|
|
||||||
|
assert!(collect_memory_worker_reasons(&mut rx).is_empty());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn completed_event_survives_terminal_empty_drain_skip() {
|
||||||
|
let pwd = tempfile::tempdir().unwrap();
|
||||||
|
let layout = WorkspaceLayout::new(pwd.path().to_path_buf());
|
||||||
|
write_n_staging(&layout, 2); // threshold is 2 — fires.
|
||||||
|
|
||||||
|
let client = MockClient::new(vec![done("ok")]);
|
||||||
|
let mut pod = make_pod_with(FILES_THRESHOLD_TOML, pwd.path().to_path_buf(), client).await;
|
||||||
|
let mut rx = attach_event_receiver(&mut pod);
|
||||||
|
|
||||||
|
pod.try_post_run_consolidate().await.unwrap();
|
||||||
|
|
||||||
|
let reasons = collect_memory_worker_reasons(&mut rx);
|
||||||
|
assert_eq!(reasons.len(), 2);
|
||||||
|
assert!(reasons[0].starts_with("staging_threshold_reached files=2 bytes="));
|
||||||
|
assert_eq!(reasons[1], "completed_no_record_changes");
|
||||||
|
let audit = read_audit_jsonl(&layout);
|
||||||
|
assert_eq!(audit.last().unwrap()["reason"], "no_staging_entries");
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn below_threshold_skips_and_does_not_take_lock() {
|
async fn below_threshold_skips_and_does_not_take_lock() {
|
||||||
let pwd = tempfile::tempdir().unwrap();
|
let pwd = tempfile::tempdir().unwrap();
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user