diff --git a/crates/memory/src/audit.rs b/crates/memory/src/audit.rs index 5bd8c72c..c9e10135 100644 --- a/crates/memory/src/audit.rs +++ b/crates/memory/src/audit.rs @@ -21,6 +21,10 @@ use uuid::Uuid; use crate::workspace::WorkspaceLayout; +fn is_zero_usize(value: &usize) -> bool { + *value == 0 +} + #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] pub enum AuditWorker { @@ -141,6 +145,8 @@ pub struct ExtractAudit { pub struct ConsolidationAudit { #[serde(default)] pub staging_count: usize, + #[serde(default, skip_serializing_if = "is_zero_usize")] + pub invalid_staging_count: usize, #[serde(default)] pub staging_bytes: u64, #[serde(default, skip_serializing_if = "Vec::is_empty")] diff --git a/crates/memory/src/consolidate/mod.rs b/crates/memory/src/consolidate/mod.rs index 68e4c5ab..0aed4d9c 100644 --- a/crates/memory/src/consolidate/mod.rs +++ b/crates/memory/src/consolidate/mod.rs @@ -26,5 +26,7 @@ pub use input::{ render_tidy_hints, }; pub use lock::{LockError, LockRecord, StagingLock}; -pub use staging::{StagingEntry, list_staging_entries}; +pub use staging::{ + StagingEntriesSnapshot, StagingEntry, list_staging_entries, list_staging_entries_snapshot, +}; pub use tidy::{TidyHints, collect_tidy_hints}; diff --git a/crates/memory/src/consolidate/staging.rs b/crates/memory/src/consolidate/staging.rs index 08d55403..20b37a51 100644 --- a/crates/memory/src/consolidate/staging.rs +++ b/crates/memory/src/consolidate/staging.rs @@ -26,34 +26,58 @@ pub struct StagingEntry { pub bytes: u64, } +/// staging directory の検査結果。`entries` は current schema として読めた +/// staging のみで、`invalid_count` は `.json` だが staging として採用できなかった +/// ファイル数。 +#[derive(Debug, Clone, Default)] +pub struct StagingEntriesSnapshot { + pub entries: Vec, + pub invalid_count: usize, +} + /// `/*.json` を読んで UUIDv7 順に並べた [`StagingEntry`] /// 配列を返す。staging_dir が存在しなければ空配列。読めないファイルや /// JSON parse 失敗は `tracing::warn!` してスキップ(壊れた個別ファイルが /// consolidation 全体を止めないように)。 pub fn list_staging_entries(layout: &WorkspaceLayout) -> Vec { + list_staging_entries_snapshot(layout).entries +} + +/// `/*.json` を読んで valid staging と invalid staging 件数を返す。 +/// invalid は自動 migration / 削除 / archive せず、観測可能にするための件数だけを +/// 呼び出し側へ渡す。 +pub fn list_staging_entries_snapshot(layout: &WorkspaceLayout) -> StagingEntriesSnapshot { let dir = layout.staging_dir(); let entries = match std::fs::read_dir(&dir) { Ok(it) => it, - Err(_) => return Vec::new(), + Err(_) => return StagingEntriesSnapshot::default(), }; let mut out: Vec = Vec::new(); + let mut invalid_count = 0; for entry in entries.flatten() { let path = entry.path(); if !path.is_file() { continue; } - let stem = match path.file_stem().and_then(|s| s.to_str()) { - Some(s) => s, - None => continue, - }; let ext = path.extension().and_then(|s| s.to_str()).unwrap_or(""); if ext != "json" { continue; } + let stem = match path.file_stem().and_then(|s| s.to_str()) { + Some(s) => s, + None => { + invalid_count += 1; + continue; + } + }; let id = match Uuid::parse_str(stem) { Ok(u) => u, - Err(_) => continue, + Err(e) => { + invalid_count += 1; + tracing::warn!(path = %path.display(), error = %e, "failed to parse staging entry id"); + continue; + } }; let bytes = match std::fs::metadata(&path) { Ok(m) => m.len(), @@ -62,6 +86,7 @@ pub fn list_staging_entries(layout: &WorkspaceLayout) -> Vec { let raw = match std::fs::read_to_string(&path) { Ok(s) => s, Err(e) => { + invalid_count += 1; tracing::warn!(path = %path.display(), error = %e, "failed to read staging entry"); continue; } @@ -69,6 +94,7 @@ pub fn list_staging_entries(layout: &WorkspaceLayout) -> Vec { let record = match serde_json::from_str::(&raw) { Ok(r) => r, Err(e) => { + invalid_count += 1; tracing::warn!(path = %path.display(), error = %e, "failed to parse staging entry"); continue; } @@ -81,7 +107,10 @@ pub fn list_staging_entries(layout: &WorkspaceLayout) -> Vec { }); } out.sort_by_key(|e| e.id); - out + StagingEntriesSnapshot { + entries: out, + invalid_count, + } } #[cfg(test)] @@ -116,17 +145,26 @@ mod tests { } #[test] - fn skips_lock_file_and_garbage() { + fn skips_lock_file_and_counts_invalid_json() { let tmp = tempfile::TempDir::new().unwrap(); let layout = WorkspaceLayout::new(tmp.path().to_path_buf()); let (_id, _) = write_staging(&layout, source("s", [0, 1]), empty_payload()).unwrap(); - // Drop a non-UUID json file and a bare lock file alongside. + // Drop a non-UUID json file, an unparsable UUID-named json file, and + // a bare lock file alongside. Lock files are not `.json`; invalid + // `.json` files are surfaced separately instead of being mistaken for + // an empty staging directory. std::fs::write(layout.staging_dir().join("not-a-uuid.json"), "{}").unwrap(); + let bad_id = Uuid::now_v7(); + std::fs::write(layout.staging_dir().join(format!("{bad_id}.json")), "{").unwrap(); std::fs::write(layout.staging_dir().join(".consolidation.lock"), "{}").unwrap(); let entries = list_staging_entries(&layout); assert_eq!(entries.len(), 1); + + let snapshot = list_staging_entries_snapshot(&layout); + assert_eq!(snapshot.entries.len(), 1); + assert_eq!(snapshot.invalid_count, 2); } #[test] diff --git a/crates/pod/src/pod.rs b/crates/pod/src/pod.rs index 5f20252c..3358a80b 100644 --- a/crates/pod/src/pod.rs +++ b/crates/pod/src/pod.rs @@ -3098,16 +3098,26 @@ impl Pod { ); let event_tx = self.event_tx.as_ref(); - let entries = consolidate::list_staging_entries(&layout); + let staging_snapshot = consolidate::list_staging_entries_snapshot(&layout); + let invalid_staging_count = staging_snapshot.invalid_count; + let entries = staging_snapshot.entries; if entries.is_empty() { + let reason = if invalid_staging_count == 0 { + "no_staging_entries".to_string() + } else { + format!("no_valid_staging_entries invalid={invalid_staging_count}") + }; audit.emit( &layout, event_tx, memory::audit::WorkerLifecycleStatus::Skipped, - "no_staging_entries", + reason, None, None, - Some(memory::audit::ConsolidationAudit::default()), + Some(memory::audit::ConsolidationAudit { + invalid_staging_count, + ..Default::default() + }), ); return Ok(ConsolidateDecision::Skipped); } @@ -3117,6 +3127,7 @@ impl Pod { let consumed_ids: Vec = entries.iter().map(|e| e.id).collect(); let base_consolidation = memory::audit::ConsolidationAudit { staging_count: total_files, + invalid_staging_count, staging_bytes: total_bytes, consumed_staging_ids: consumed_ids.iter().map(ToString::to_string).collect(), operations: memory::audit::OperationCounts::default(), @@ -3404,17 +3415,38 @@ impl WorkerAuditBase { consolidation, }, ); - emit_memory_worker_event( - event_tx, - self.run_id, - self.worker, - status, - self.trigger, - &reason, - ); + if should_emit_memory_worker_event(self.worker, status, &reason) { + emit_memory_worker_event( + event_tx, + self.run_id, + self.worker, + status, + self.trigger, + &reason, + ); + } } } +fn should_emit_memory_worker_event( + worker: memory::audit::AuditWorker, + status: memory::audit::WorkerLifecycleStatus, + reason: &str, +) -> bool { + if worker == memory::audit::AuditWorker::MemoryConsolidation + && status == memory::audit::WorkerLifecycleStatus::Skipped + { + return !is_idle_consolidation_skip_reason(reason); + } + true +} + +fn is_idle_consolidation_skip_reason(reason: &str) -> bool { + reason == "no_staging_entries" + || reason == "consolidation_threshold_disabled" + || reason.starts_with("threshold_not_reached") +} + fn memory_language(cfg: &manifest::MemoryConfig) -> &str { cfg.language .as_deref() @@ -4247,6 +4279,45 @@ fn current_pwd() -> Result { .map_err(|source| PodError::InvalidPwd { pwd: cwd, source }) } +#[cfg(test)] +mod memory_worker_event_tests { + use super::*; + + #[test] + fn suppresses_idle_consolidation_skip_worker_events() { + assert!(!should_emit_memory_worker_event( + memory::audit::AuditWorker::MemoryConsolidation, + memory::audit::WorkerLifecycleStatus::Skipped, + "no_staging_entries", + )); + assert!(!should_emit_memory_worker_event( + memory::audit::AuditWorker::MemoryConsolidation, + memory::audit::WorkerLifecycleStatus::Skipped, + "threshold_not_reached files=1 bytes=64 min_files=2 min_bytes=1048576", + )); + assert!(!should_emit_memory_worker_event( + memory::audit::AuditWorker::MemoryConsolidation, + memory::audit::WorkerLifecycleStatus::Skipped, + "consolidation_threshold_disabled", + )); + assert!(should_emit_memory_worker_event( + memory::audit::AuditWorker::MemoryConsolidation, + memory::audit::WorkerLifecycleStatus::Skipped, + "no_valid_staging_entries invalid=1", + )); + assert!(should_emit_memory_worker_event( + memory::audit::AuditWorker::MemoryConsolidation, + memory::audit::WorkerLifecycleStatus::Completed, + "completed", + )); + assert!(should_emit_memory_worker_event( + memory::audit::AuditWorker::MemoryExtract, + memory::audit::WorkerLifecycleStatus::Skipped, + "threshold_not_reached files=1", + )); + } +} + #[cfg(test)] mod build_summary_prompt_tests { use super::*; diff --git a/crates/pod/tests/consolidation_test.rs b/crates/pod/tests/consolidation_test.rs index 2c45a8e5..97c9bcf7 100644 --- a/crates/pod/tests/consolidation_test.rs +++ b/crates/pod/tests/consolidation_test.rs @@ -27,8 +27,9 @@ use memory::WorkspaceLayout; use memory::extract::{ExtractedPayload, write_staging}; use memory::schema::SourceRef; use session_store::FsStore; +use tokio::sync::broadcast; -use pod::Pod; +use pod::{Event, Pod}; #[derive(Clone)] struct MockClient { @@ -183,6 +184,32 @@ fn write_n_staging(layout: &WorkspaceLayout, n: usize) -> Vec { ids } +fn attach_event_receiver(pod: &mut Pod) -> broadcast::Receiver { + let (tx, rx) = broadcast::channel(16); + pod.attach_event_tx(tx); + rx +} + +fn collect_memory_worker_reasons(rx: &mut broadcast::Receiver) -> Vec { + let mut reasons = Vec::new(); + loop { + match rx.try_recv() { + Ok(Event::MemoryWorker(event)) => reasons.push(event.reason), + Ok(_) => {} + Err(broadcast::error::TryRecvError::Empty) => break, + Err(err) => panic!("unexpected broadcast receive error: {err}"), + } + } + reasons +} + +fn read_audit_jsonl(layout: &WorkspaceLayout) -> Vec { + let text = std::fs::read_to_string(layout.audit_current_log_path()).unwrap(); + text.lines() + .map(|line| serde_json::from_str::(line).unwrap()) + .collect() +} + #[tokio::test] async fn no_memory_section_is_a_noop() { let pwd = tempfile::tempdir().unwrap(); @@ -242,6 +269,79 @@ async fn empty_staging_skips() { // No mock calls expected. } +#[tokio::test] +async fn empty_staging_skip_is_audit_only() { + let pwd = tempfile::tempdir().unwrap(); + let client = MockClient::new(vec![]); + let mut pod = make_pod_with(FILES_THRESHOLD_TOML, pwd.path().to_path_buf(), client).await; + let mut rx = attach_event_receiver(&mut pod); + + pod.try_post_run_consolidate().await.unwrap(); + + assert!(collect_memory_worker_reasons(&mut rx).is_empty()); +} + +#[tokio::test] +async fn invalid_only_staging_is_distinct_from_no_staging() { + let pwd = tempfile::tempdir().unwrap(); + let layout = WorkspaceLayout::new(pwd.path().to_path_buf()); + std::fs::create_dir_all(layout.staging_dir()).unwrap(); + let invalid_id = uuid::Uuid::now_v7(); + let invalid_path = layout.staging_dir().join(format!("{invalid_id}.json")); + std::fs::write(&invalid_path, "{").unwrap(); + + let client = MockClient::new(vec![]); + let mut pod = make_pod_with(FILES_THRESHOLD_TOML, pwd.path().to_path_buf(), client).await; + let mut rx = attach_event_receiver(&mut pod); + + pod.try_post_run_consolidate().await.unwrap(); + + assert!(invalid_path.exists(), "invalid staging is not auto-deleted"); + let reasons = collect_memory_worker_reasons(&mut rx); + assert_eq!(reasons, vec!["no_valid_staging_entries invalid=1"]); + + let audit = read_audit_jsonl(&layout); + let last = audit.last().unwrap(); + assert_eq!(last["reason"], "no_valid_staging_entries invalid=1"); + assert_eq!(last["consolidation"]["staging_count"], 0); + assert_eq!(last["consolidation"]["invalid_staging_count"], 1); +} + +#[tokio::test] +async fn below_threshold_skip_is_audit_only() { + let pwd = tempfile::tempdir().unwrap(); + let layout = WorkspaceLayout::new(pwd.path().to_path_buf()); + write_n_staging(&layout, 1); // threshold is 2 + + let client = MockClient::new(vec![]); + let mut pod = make_pod_with(FILES_THRESHOLD_TOML, pwd.path().to_path_buf(), client).await; + let mut rx = attach_event_receiver(&mut pod); + + pod.try_post_run_consolidate().await.unwrap(); + + assert!(collect_memory_worker_reasons(&mut rx).is_empty()); +} + +#[tokio::test] +async fn completed_event_survives_terminal_empty_drain_skip() { + let pwd = tempfile::tempdir().unwrap(); + let layout = WorkspaceLayout::new(pwd.path().to_path_buf()); + write_n_staging(&layout, 2); // threshold is 2 — fires. + + let client = MockClient::new(vec![done("ok")]); + let mut pod = make_pod_with(FILES_THRESHOLD_TOML, pwd.path().to_path_buf(), client).await; + let mut rx = attach_event_receiver(&mut pod); + + pod.try_post_run_consolidate().await.unwrap(); + + let reasons = collect_memory_worker_reasons(&mut rx); + assert_eq!(reasons.len(), 2); + assert!(reasons[0].starts_with("staging_threshold_reached files=2 bytes=")); + assert_eq!(reasons[1], "completed_no_record_changes"); + let audit = read_audit_jsonl(&layout); + assert_eq!(audit.last().unwrap()["reason"], "no_staging_entries"); +} + #[tokio::test] async fn below_threshold_skips_and_does_not_take_lock() { let pwd = tempfile::tempdir().unwrap();