From 7c14b51bacea633c20e096b53674d02a99987907 Mon Sep 17 00:00:00 2001 From: Hare Date: Mon, 25 May 2026 03:24:04 +0900 Subject: [PATCH] memory: add audit log events --- Cargo.lock | 1 + crates/memory/Cargo.toml | 1 + crates/memory/src/audit.rs | 444 +++++++++++++++ crates/memory/src/lib.rs | 1 + crates/memory/src/tool/delete.rs | 152 ++++++ crates/memory/src/tool/edit.rs | 160 +++++- crates/memory/src/tool/mod.rs | 14 + crates/memory/src/tool/query.rs | 80 ++- crates/memory/src/tool/read.rs | 55 +- crates/memory/src/tool/write.rs | 92 +++- crates/memory/src/workspace.rs | 28 +- crates/pod/src/controller.rs | 1 + crates/pod/src/pod.rs | 514 +++++++++++++++++- crates/protocol/src/lib.rs | 18 + crates/tui/src/app.rs | 26 + crates/tui/src/ui.rs | 5 + .../internal/memory_consolidation_system.md | 2 +- 17 files changed, 1534 insertions(+), 60 deletions(-) create mode 100644 crates/memory/src/audit.rs create mode 100644 crates/memory/src/tool/delete.rs diff --git a/Cargo.lock b/Cargo.lock index 08a37e13..4a1cbde9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1796,6 +1796,7 @@ dependencies = [ "serde", "serde_json", "serde_yaml", + "sha2 0.11.0", "tempfile", "thiserror 2.0.18", "tokio", diff --git a/crates/memory/Cargo.toml b/crates/memory/Cargo.toml index 316e4280..7a09c0e8 100644 --- a/crates/memory/Cargo.toml +++ b/crates/memory/Cargo.toml @@ -14,6 +14,7 @@ manifest = { workspace = true } schemars = { workspace = true } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } +sha2 = { workspace = true } serde_yaml = "0.9.34" thiserror = { workspace = true } tracing = { workspace = true } diff --git a/crates/memory/src/audit.rs b/crates/memory/src/audit.rs new file mode 100644 index 00000000..5bd8c72c --- /dev/null +++ b/crates/memory/src/audit.rs @@ -0,0 +1,444 @@ +//! Append-only JSONL audit log for memory workers and tools. +//! +//! The log is evidence-only observability data under +//! `.insomnia/memory/_logs/current.log`. It is intentionally separate from +//! `_staging` and `_usage`, and consolidation never consumes it. Operators can +//! follow the latest stream with: +//! +//! ```text +//! tail -f .insomnia/memory/_logs/current.log +//! ``` + +use std::collections::BTreeMap; +use std::fs::{self, OpenOptions}; +use std::io::{self, Write}; +use std::path::{Path, PathBuf}; + +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use sha2::{Digest, Sha256}; +use uuid::Uuid; + +use crate::workspace::WorkspaceLayout; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum AuditWorker { + MemoryExtract, + MemoryConsolidation, +} + +impl AuditWorker { + pub fn label(self) -> &'static str { + match self { + Self::MemoryExtract => "extract", + Self::MemoryConsolidation => "consolidation", + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum WorkerLifecycleStatus { + Started, + Completed, + Skipped, + Failed, + Cancelled, +} + +impl WorkerLifecycleStatus { + pub fn label(self) -> &'static str { + match self { + Self::Started => "running", + Self::Completed => "done", + Self::Skipped => "skipped", + Self::Failed => "failed", + Self::Cancelled => "cancelled", + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum AuditTrigger { + SessionEnd, + TurnThreshold, + TokenThreshold, + StagingBacklog, + Idle, + Manual, + StartupRecovery, + Unknown, +} + +impl AuditTrigger { + pub fn label(self) -> &'static str { + match self { + Self::SessionEnd => "session_end", + Self::TurnThreshold => "turn_threshold", + Self::TokenThreshold => "token_threshold", + Self::StagingBacklog => "staging_backlog", + Self::Idle => "idle", + Self::Manual => "manual", + Self::StartupRecovery => "startup_recovery", + Self::Unknown => "unknown", + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum AuditStatus { + Success, + Failed, + Skipped, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct ModelAudit { + #[serde(default, skip_serializing_if = "Option::is_none")] + pub ref_: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub scheme: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub model_id: Option, +} + +#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)] +pub struct UsageAudit { + #[serde(default, skip_serializing_if = "Option::is_none")] + pub input_tokens: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub output_tokens: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub total_tokens: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub cache_read_input_tokens: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub cache_creation_input_tokens: Option, +} + +#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)] +pub struct ExtractAudit { + #[serde(default, skip_serializing_if = "Option::is_none")] + pub session_id: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub segment_id: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub entry_range: Option<[u64; 2]>, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub history_range: Option<[u64; 2]>, + #[serde(default)] + pub staging_count: usize, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub staging_ids: Vec, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub staging_paths: Vec, +} + +#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)] +pub struct ConsolidationAudit { + #[serde(default)] + pub staging_count: usize, + #[serde(default)] + pub staging_bytes: u64, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub consumed_staging_ids: Vec, + #[serde(default)] + pub operations: OperationCounts, +} + +#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)] +pub struct OperationCounts { + #[serde(default)] + pub write: usize, + #[serde(default)] + pub edit: usize, + #[serde(default)] + pub delete: usize, + #[serde(default)] + pub drop: usize, + #[serde(default)] + pub merge: usize, + #[serde(default)] + pub trim: usize, +} + +impl OperationCounts { + pub fn total_record_changes(&self) -> usize { + self.write + self.edit + self.delete + self.drop + self.merge + self.trim + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct WorkerLifecycleAudit { + pub run_id: Uuid, + pub worker: AuditWorker, + pub status: WorkerLifecycleStatus, + pub trigger: AuditTrigger, + pub reason: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub model: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub usage: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub extract: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub consolidation: Option, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct RecordOperationAudit { + pub op: String, + pub status: AuditStatus, + pub kind: String, + pub slug: String, + pub path: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub before_hash: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub after_hash: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub reason: Option, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct RecordUsageAudit { + pub op: String, + pub status: AuditStatus, + pub kind: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub slug: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub path: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub query: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub result_count: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub reason: Option, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(tag = "event", rename_all = "snake_case")] +pub enum AuditPayload { + WorkerLifecycle(WorkerLifecycleAudit), + RecordOperation(RecordOperationAudit), + RecordUsage(RecordUsageAudit), +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct AuditEvent { + pub id: Uuid, + pub occurred_at: DateTime, + #[serde(flatten)] + pub payload: AuditPayload, +} + +impl AuditEvent { + pub fn new(payload: AuditPayload) -> Self { + Self { + id: Uuid::now_v7(), + occurred_at: Utc::now(), + payload, + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct RecordSnapshot { + pub kind: String, + pub slug: String, + pub path: PathBuf, + pub hash: String, +} + +/// Append one audit event to `.insomnia/memory/_logs/current.log`. +pub fn append_audit_event(layout: &WorkspaceLayout, event: &AuditEvent) -> io::Result<()> { + let path = layout.audit_current_log_path(); + if let Some(parent) = path.parent() { + fs::create_dir_all(parent)?; + } + let line = serde_json::to_string(event) + .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err))?; + let mut file = OpenOptions::new().create(true).append(true).open(path)?; + writeln!(file, "{line}")?; + Ok(()) +} + +pub fn append_worker_lifecycle( + layout: &WorkspaceLayout, + audit: WorkerLifecycleAudit, +) -> io::Result<()> { + append_audit_event( + layout, + &AuditEvent::new(AuditPayload::WorkerLifecycle(audit)), + ) +} + +pub fn append_record_operation( + layout: &WorkspaceLayout, + audit: RecordOperationAudit, +) -> io::Result<()> { + append_audit_event( + layout, + &AuditEvent::new(AuditPayload::RecordOperation(audit)), + ) +} + +pub fn append_record_usage(layout: &WorkspaceLayout, audit: RecordUsageAudit) -> io::Result<()> { + append_audit_event(layout, &AuditEvent::new(AuditPayload::RecordUsage(audit))) +} + +pub fn file_hash(path: &Path) -> io::Result> { + match fs::read(path) { + Ok(bytes) => Ok(Some(hash_bytes(&bytes))), + Err(err) if err.kind() == io::ErrorKind::NotFound => Ok(None), + Err(err) => Err(err), + } +} + +pub fn hash_bytes(bytes: &[u8]) -> String { + let digest = Sha256::digest(bytes); + let mut out = String::with_capacity("sha256:".len() + digest.len() * 2); + out.push_str("sha256:"); + for byte in digest { + use std::fmt::Write as _; + let _ = write!(&mut out, "{byte:02x}"); + } + out +} + +pub fn snapshot_records(layout: &WorkspaceLayout) -> BTreeMap { + let mut out = BTreeMap::new(); + snapshot_one(&mut out, "summary", "summary", layout.summary_path()); + snapshot_dir(&mut out, "decision", layout.decisions_dir()); + snapshot_dir(&mut out, "request", layout.requests_dir()); + snapshot_dir(&mut out, "knowledge", layout.knowledge_dir()); + out +} + +pub fn operation_counts_from_snapshots( + before: &BTreeMap, + after: &BTreeMap, +) -> OperationCounts { + let mut counts = OperationCounts::default(); + for (key, after_record) in after { + match before.get(key) { + None => counts.write += 1, + Some(before_record) if before_record.hash != after_record.hash => counts.edit += 1, + Some(_) => {} + } + } + for key in before.keys() { + if !after.contains_key(key) { + counts.delete += 1; + } + } + counts +} + +fn snapshot_dir(out: &mut BTreeMap, kind: &str, dir: PathBuf) { + let entries = match fs::read_dir(dir) { + Ok(entries) => entries, + Err(_) => return, + }; + for entry in entries.flatten() { + let path = entry.path(); + if !path.is_file() { + continue; + } + let Some(name) = path.file_name().and_then(|n| n.to_str()) else { + continue; + }; + let Some(slug) = name.strip_suffix(".md").map(str::to_string) else { + continue; + }; + snapshot_one(out, kind, &slug, path); + } +} + +fn snapshot_one(out: &mut BTreeMap, kind: &str, slug: &str, path: PathBuf) { + if !path.is_file() { + return; + } + let Ok(Some(hash)) = file_hash(&path) else { + return; + }; + out.insert( + format!("{kind}/{slug}"), + RecordSnapshot { + kind: kind.to_string(), + slug: slug.to_string(), + path, + hash, + }, + ); +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::TempDir; + + fn setup() -> (TempDir, WorkspaceLayout) { + let dir = TempDir::new().unwrap(); + let layout = WorkspaceLayout::new(dir.path().to_path_buf()); + (dir, layout) + } + + #[test] + fn appends_jsonl_to_current_log() { + let (_dir, layout) = setup(); + let run_id = Uuid::now_v7(); + append_worker_lifecycle( + &layout, + WorkerLifecycleAudit { + run_id, + worker: AuditWorker::MemoryExtract, + status: WorkerLifecycleStatus::Started, + trigger: AuditTrigger::TokenThreshold, + reason: "tokens_threshold_reached".to_string(), + model: None, + usage: None, + extract: None, + consolidation: None, + }, + ) + .unwrap(); + + let text = fs::read_to_string(layout.audit_current_log_path()).unwrap(); + let value: serde_json::Value = serde_json::from_str(text.trim()).unwrap(); + assert_eq!(value["event"], "worker_lifecycle"); + assert_eq!(value["worker"], "memory_extract"); + assert_eq!(value["status"], "started"); + assert_eq!(value["run_id"], run_id.to_string()); + } + + #[test] + fn counts_created_edited_deleted_records() { + let (dir, layout) = setup(); + let decision_dir = dir.path().join(".insomnia/memory/decisions"); + fs::create_dir_all(&decision_dir).unwrap(); + fs::write(decision_dir.join("a.md"), "old").unwrap(); + fs::write(decision_dir.join("gone.md"), "old").unwrap(); + let before = snapshot_records(&layout); + + fs::write(decision_dir.join("a.md"), "new").unwrap(); + fs::remove_file(decision_dir.join("gone.md")).unwrap(); + fs::write(decision_dir.join("created.md"), "new").unwrap(); + let after = snapshot_records(&layout); + + let counts = operation_counts_from_snapshots(&before, &after); + assert_eq!(counts.write, 1); + assert_eq!(counts.edit, 1); + assert_eq!(counts.delete, 1); + } + + #[test] + fn hash_has_sha256_prefix() { + assert_eq!(hash_bytes(b"abc").len(), "sha256:".len() + 64); + assert!(hash_bytes(b"abc").starts_with("sha256:")); + } +} diff --git a/crates/memory/src/lib.rs b/crates/memory/src/lib.rs index 25fb35b4..fa7719b2 100644 --- a/crates/memory/src/lib.rs +++ b/crates/memory/src/lib.rs @@ -6,6 +6,7 @@ //! crate) must not touch these directories — Pod is responsible for //! denying them at the Scope level when memory is enabled. +pub mod audit; pub mod consolidate; pub mod error; pub mod extract; diff --git a/crates/memory/src/tool/delete.rs b/crates/memory/src/tool/delete.rs new file mode 100644 index 00000000..16ffc939 --- /dev/null +++ b/crates/memory/src/tool/delete.rs @@ -0,0 +1,152 @@ +//! `MemoryDelete` tool for removing memory / knowledge records with audit logging. + +use std::sync::Arc; + +use async_trait::async_trait; +use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput}; +use serde::Deserialize; + +use crate::audit::{AuditStatus, RecordOperationAudit, append_record_operation, file_hash}; +use crate::tool::MemoryToolKind; +use crate::workspace::WorkspaceLayout; + +const DESCRIPTION: &str = "Delete an existing memory or knowledge record selected by `kind` + `slug`. \ +For `summary` omit `slug`; for the others `slug` is required. The delete is audited and cannot target \ +workflow or staging/log files."; + +#[derive(Debug, Deserialize, schemars::JsonSchema)] +struct DeleteParams { + /// Kind of record to delete. + kind: MemoryToolKind, + /// Slug. Required for everything except `summary`; forbidden for `summary`. + #[serde(default)] + slug: Option, +} + +struct MemoryDeleteTool { + layout: WorkspaceLayout, +} + +#[async_trait] +impl Tool for MemoryDeleteTool { + async fn execute(&self, input_json: &str) -> Result { + let params: DeleteParams = serde_json::from_str(input_json) + .map_err(|e| ToolError::InvalidArgument(format!("invalid MemoryDelete input: {e}")))?; + let path = params + .kind + .resolve_path(&self.layout, params.slug.as_deref())?; + let kind = params.kind.to_string(); + let slug = audit_slug(¶ms.kind, params.slug.as_deref()); + let before_hash = file_hash(&path).ok().flatten(); + if before_hash.is_none() { + let reason = format!("record not found: {}", path.display()); + let _ = append_record_operation( + &self.layout, + RecordOperationAudit { + op: "delete".to_string(), + status: AuditStatus::Failed, + kind, + slug, + path: path.display().to_string(), + before_hash, + after_hash: None, + reason: Some(reason.clone()), + }, + ); + return Err(ToolError::ExecutionFailed(reason)); + } + + if let Err(err) = std::fs::remove_file(&path) { + let reason = format!("failed to delete {}: {err}", path.display()); + let _ = append_record_operation( + &self.layout, + RecordOperationAudit { + op: "delete".to_string(), + status: AuditStatus::Failed, + kind, + slug, + path: path.display().to_string(), + before_hash, + after_hash: None, + reason: Some(reason.clone()), + }, + ); + return Err(ToolError::ExecutionFailed(reason)); + } + + let _ = append_record_operation( + &self.layout, + RecordOperationAudit { + op: "delete".to_string(), + status: AuditStatus::Success, + kind, + slug, + path: path.display().to_string(), + before_hash, + after_hash: None, + reason: None, + }, + ); + + Ok(ToolOutput { + summary: format!("Deleted {}", path.display()), + content: None, + }) + } +} + +pub fn delete_tool(layout: WorkspaceLayout) -> ToolDefinition { + Arc::new(move || { + let schema = schemars::schema_for!(DeleteParams); + let schema_value = serde_json::to_value(schema).unwrap_or(serde_json::json!({})); + let meta = ToolMeta::new("MemoryDelete") + .description(DESCRIPTION) + .input_schema(schema_value); + let tool: Arc = Arc::new(MemoryDeleteTool { + layout: layout.clone(), + }); + (meta, tool) + }) +} + +fn audit_slug(kind: &MemoryToolKind, slug: Option<&str>) -> String { + match kind { + MemoryToolKind::Summary => "summary".to_string(), + _ => slug.unwrap_or("").to_string(), + } +} + +#[cfg(test)] +mod tests { + use super::*; + use chrono::Utc; + use tempfile::TempDir; + + #[tokio::test] + async fn delete_removes_file_and_audits() { + let dir = TempDir::new().unwrap(); + let layout = WorkspaceLayout::new(dir.path().to_path_buf()); + std::fs::create_dir_all(layout.decisions_dir()).unwrap(); + let path = layout.decisions_dir().join("obsolete.md"); + let now = Utc::now().to_rfc3339(); + std::fs::write( + &path, + format!( + "---\ncreated_at: {now}\nupdated_at: {now}\nsources: []\nstatus: open\n---\nold" + ), + ) + .unwrap(); + + let (_, tool) = delete_tool(layout.clone())(); + let out = tool + .execute(r#"{"kind":"decision","slug":"obsolete"}"#) + .await + .unwrap(); + assert!(out.summary.contains("Deleted")); + assert!(!path.exists()); + let log = std::fs::read_to_string(layout.audit_current_log_path()).unwrap(); + assert!(log.contains(r#""event":"record_operation""#)); + assert!(log.contains(r#""op":"delete""#)); + assert!(log.contains(r#""status":"success""#)); + } +} diff --git a/crates/memory/src/tool/edit.rs b/crates/memory/src/tool/edit.rs index a657d517..79b8a81b 100644 --- a/crates/memory/src/tool/edit.rs +++ b/crates/memory/src/tool/edit.rs @@ -12,6 +12,9 @@ use async_trait::async_trait; use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput}; use serde::Deserialize; +use crate::audit::{ + AuditStatus, RecordOperationAudit, append_record_operation, file_hash, hash_bytes, +}; use crate::linter::{LintReport, Linter, WriteMode}; use crate::tool::MemoryToolKind; use crate::workspace::WorkspaceLayout; @@ -62,30 +65,94 @@ impl Tool for EditTool { let path = params .kind .resolve_path(&self.layout, params.slug.as_deref())?; + let kind = params.kind.to_string(); + let slug = audit_slug(¶ms.kind, params.slug.as_deref()); - let current_bytes = std::fs::read(&path).map_err(|e| match e.kind() { - std::io::ErrorKind::NotFound => ToolError::ExecutionFailed(format!( - "record not found (use MemoryWrite to create): {}", - path.display() - )), - _ => ToolError::ExecutionFailed(format!("read failed at {}: {e}", path.display())), - })?; - let current_text = std::str::from_utf8(¤t_bytes).map_err(|_| { - ToolError::InvalidArgument(format!("file is not valid UTF-8: {}", path.display())) - })?; + let current_bytes = match std::fs::read(&path) { + Ok(bytes) => bytes, + Err(e) => { + let reason = match e.kind() { + std::io::ErrorKind::NotFound => format!( + "record not found (use MemoryWrite to create): {}", + path.display() + ), + _ => format!("read failed at {}: {e}", path.display()), + }; + let _ = append_record_operation( + &self.layout, + RecordOperationAudit { + op: "edit".to_string(), + status: AuditStatus::Failed, + kind, + slug, + path: path.display().to_string(), + before_hash: None, + after_hash: None, + reason: Some(reason.clone()), + }, + ); + return Err(ToolError::ExecutionFailed(reason)); + } + }; + let before_hash = Some(hash_bytes(¤t_bytes)); + let current_text = match std::str::from_utf8(¤t_bytes) { + Ok(text) => text, + Err(_) => { + let reason = format!("file is not valid UTF-8: {}", path.display()); + let _ = append_record_operation( + &self.layout, + RecordOperationAudit { + op: "edit".to_string(), + status: AuditStatus::Failed, + kind, + slug, + path: path.display().to_string(), + before_hash, + after_hash: None, + reason: Some(reason.clone()), + }, + ); + return Err(ToolError::InvalidArgument(reason)); + } + }; let count = current_text.matches(¶ms.old_string).count(); if count == 0 { - return Err(ToolError::InvalidArgument(format!( - "old_string not found in {}", - path.display() - ))); + let reason = format!("old_string not found in {}", path.display()); + let _ = append_record_operation( + &self.layout, + RecordOperationAudit { + op: "edit".to_string(), + status: AuditStatus::Failed, + kind, + slug, + path: path.display().to_string(), + before_hash, + after_hash: None, + reason: Some(reason.clone()), + }, + ); + return Err(ToolError::InvalidArgument(reason)); } if !params.replace_all && count > 1 { - return Err(ToolError::InvalidArgument(format!( + let reason = format!( "old_string occurs {count} times in {}; pass replace_all: true or narrow the snippet", path.display() - ))); + ); + let _ = append_record_operation( + &self.layout, + RecordOperationAudit { + op: "edit".to_string(), + status: AuditStatus::Failed, + kind, + slug, + path: path.display().to_string(), + before_hash, + after_hash: None, + reason: Some(reason.clone()), + }, + ); + return Err(ToolError::InvalidArgument(reason)); } let new_text = if params.replace_all { @@ -97,12 +164,58 @@ impl Tool for EditTool { let report = self.linter.lint(&path, &new_text, WriteMode::Update); if report.has_errors() { - return Err(ToolError::InvalidArgument(format_report(&report))); + let reason = format_report(&report); + let _ = append_record_operation( + &self.layout, + RecordOperationAudit { + op: "edit".to_string(), + status: AuditStatus::Failed, + kind, + slug, + path: path.display().to_string(), + before_hash, + after_hash: None, + reason: Some(reason.clone()), + }, + ); + return Err(ToolError::InvalidArgument(reason)); } - std::fs::write(&path, new_text.as_bytes()).map_err(|e| { - ToolError::ExecutionFailed(format!("failed to write {}: {e}", path.display())) - })?; + if let Err(e) = std::fs::write(&path, new_text.as_bytes()) { + let reason = format!("failed to write {}: {e}", path.display()); + let _ = append_record_operation( + &self.layout, + RecordOperationAudit { + op: "edit".to_string(), + status: AuditStatus::Failed, + kind, + slug, + path: path.display().to_string(), + before_hash, + after_hash: None, + reason: Some(reason.clone()), + }, + ); + return Err(ToolError::ExecutionFailed(reason)); + } + let after_hash = file_hash(&path).ok().flatten(); + let _ = append_record_operation( + &self.layout, + RecordOperationAudit { + op: "edit".to_string(), + status: AuditStatus::Success, + kind, + slug, + path: path.display().to_string(), + before_hash, + after_hash, + reason: if report.warnings.is_empty() { + None + } else { + Some(format!("{} warning(s)", report.warnings.len())) + }, + }, + ); let summary = format!( "Edited {} ({} replacement{}){}", @@ -118,6 +231,13 @@ impl Tool for EditTool { } } +fn audit_slug(kind: &MemoryToolKind, slug: Option<&str>) -> String { + match kind { + MemoryToolKind::Summary => "summary".to_string(), + _ => slug.unwrap_or("").to_string(), + } +} + fn format_report(report: &LintReport) -> String { use std::fmt::Write as _; let mut buf = String::from("memory linter rejected the edit:"); diff --git a/crates/memory/src/tool/mod.rs b/crates/memory/src/tool/mod.rs index a6802c28..bcd8192b 100644 --- a/crates/memory/src/tool/mod.rs +++ b/crates/memory/src/tool/mod.rs @@ -5,11 +5,14 @@ //! to know the on-disk layout — Search returns `{slug, kind, ...}` and //! that pair feeds straight into Read / Edit. +mod delete; mod edit; mod query; mod read; mod write; +pub use delete::delete_tool; + use std::path::PathBuf; use llm_worker::tool::ToolError; @@ -34,6 +37,17 @@ pub enum MemoryToolKind { Knowledge, } +impl std::fmt::Display for MemoryToolKind { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(match self { + Self::Summary => "summary", + Self::Decision => "decision", + Self::Request => "request", + Self::Knowledge => "knowledge", + }) + } +} + impl MemoryToolKind { pub fn as_str(self) -> &'static str { match self { diff --git a/crates/memory/src/tool/query.rs b/crates/memory/src/tool/query.rs index 0ab3c946..deb80940 100644 --- a/crates/memory/src/tool/query.rs +++ b/crates/memory/src/tool/query.rs @@ -7,8 +7,9 @@ //! enumerate what records exist without knowing what's inside them. //! //! - `MemoryQuery` walks `.insomnia/memory/{summary.md,decisions/, -//! requests/}`. `.insomnia/workflow/` and `.insomnia/memory/_staging/` -//! are excluded by construction. +//! requests/}`. `.insomnia/workflow/`, `.insomnia/memory/_staging/`, +//! `.insomnia/memory/_usage/`, and `.insomnia/memory/_logs/` are excluded +//! by construction. //! - `KnowledgeQuery` walks `.insomnia/knowledge/*.md` and supports a //! `kind` filter against the Knowledge frontmatter's `kind` field. //! @@ -23,6 +24,7 @@ use async_trait::async_trait; use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput}; use serde::{Deserialize, Serialize}; +use crate::audit::{AuditStatus, RecordUsageAudit, append_record_usage}; use crate::schema::{KnowledgeFrontmatter, split_frontmatter}; use crate::workspace::WorkspaceLayout; @@ -128,7 +130,25 @@ impl Tool for MemoryQueryTool { let params: MemoryQueryParams = serde_json::from_str(input_json) .map_err(|e| ToolError::InvalidArgument(format!("invalid MemoryQuery input: {e}")))?; let needle = match params.query.as_deref() { - Some(q) => Some(validate_query(q)?), + Some(q) => match validate_query(q) { + Ok(q) => Some(q), + Err(err) => { + let _ = append_record_usage( + &self.layout, + RecordUsageAudit { + op: "query".to_string(), + status: AuditStatus::Failed, + kind: "memory".to_string(), + slug: None, + path: None, + query: params.query.clone(), + result_count: None, + reason: Some(err.to_string()), + }, + ); + return Err(err); + } + }, None => None, }; @@ -194,6 +214,23 @@ impl Tool for MemoryQueryTool { Some(q) => format!("{} hit(s) for {q:?}", records.len()), None => format!("{} record(s)", records.len()), }; + let _ = append_record_usage( + &self.layout, + RecordUsageAudit { + op: "query".to_string(), + status: AuditStatus::Success, + kind: "memory".to_string(), + slug: None, + path: None, + query: params.query.clone(), + result_count: Some(records.len()), + reason: if records.len() >= limit { + Some("result_limit_reached".to_string()) + } else { + None + }, + }, + ); Ok(ToolOutput { summary, content: Some(body), @@ -208,7 +245,25 @@ impl Tool for KnowledgeQueryTool { ToolError::InvalidArgument(format!("invalid KnowledgeQuery input: {e}")) })?; let needle = match params.query.as_deref() { - Some(q) => Some(validate_query(q)?), + Some(q) => match validate_query(q) { + Ok(q) => Some(q), + Err(err) => { + let _ = append_record_usage( + &self.layout, + RecordUsageAudit { + op: "query".to_string(), + status: AuditStatus::Failed, + kind: "knowledge".to_string(), + slug: None, + path: None, + query: params.query.clone(), + result_count: None, + reason: Some(err.to_string()), + }, + ); + return Err(err); + } + }, None => None, }; let kind_filter = params.kind.as_deref(); @@ -272,6 +327,23 @@ impl Tool for KnowledgeQueryTool { Some(q) => format!("{} hit(s) for {q:?}", records.len()), None => format!("{} record(s)", records.len()), }; + let _ = append_record_usage( + &self.layout, + RecordUsageAudit { + op: "query".to_string(), + status: AuditStatus::Success, + kind: "knowledge".to_string(), + slug: None, + path: None, + query: params.query.clone(), + result_count: Some(records.len()), + reason: if records.len() >= limit { + Some("result_limit_reached".to_string()) + } else { + None + }, + }, + ); Ok(ToolOutput { summary, content: Some(body), diff --git a/crates/memory/src/tool/read.rs b/crates/memory/src/tool/read.rs index 0ad8ba28..9d839b88 100644 --- a/crates/memory/src/tool/read.rs +++ b/crates/memory/src/tool/read.rs @@ -11,6 +11,7 @@ use async_trait::async_trait; use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput}; use serde::Deserialize; +use crate::audit::{AuditStatus, RecordUsageAudit, append_record_usage}; use crate::tool::MemoryToolKind; use crate::usage::{self, UsageSource}; use crate::workspace::WorkspaceLayout; @@ -51,13 +52,32 @@ impl Tool for ReadTool { let path = params .kind .resolve_path(&self.layout, params.slug.as_deref())?; + let kind = params.kind.to_string(); + let slug = audit_slug(¶ms.kind, params.slug.as_deref()); - let bytes = std::fs::read(&path).map_err(|e| match e.kind() { - std::io::ErrorKind::NotFound => { - ToolError::ExecutionFailed(format!("record not found: {}", path.display())) + let bytes = match std::fs::read(&path) { + Ok(bytes) => bytes, + Err(e) => { + let reason = match e.kind() { + std::io::ErrorKind::NotFound => format!("record not found: {}", path.display()), + _ => format!("read failed at {}: {e}", path.display()), + }; + let _ = append_record_usage( + &self.layout, + RecordUsageAudit { + op: "read".to_string(), + status: AuditStatus::Failed, + kind, + slug: Some(slug), + path: Some(path.display().to_string()), + query: None, + result_count: None, + reason: Some(reason.clone()), + }, + ); + return Err(ToolError::ExecutionFailed(reason)); } - _ => ToolError::ExecutionFailed(format!("read failed at {}: {e}", path.display())), - })?; + }; let text = String::from_utf8_lossy(&bytes).into_owned(); if let Some(segment_id) = self.usage_session_id.as_deref() { @@ -97,6 +117,24 @@ impl Tool for ReadTool { ) }; + let _ = append_record_usage( + &self.layout, + RecordUsageAudit { + op: "read".to_string(), + status: AuditStatus::Success, + kind, + slug: Some(slug), + path: Some(path.display().to_string()), + query: None, + result_count: Some(rendered.line_count), + reason: if rendered.truncated { + Some("truncated".to_string()) + } else { + None + }, + }, + ); + Ok(ToolOutput { summary, content: Some(rendered.body), @@ -104,6 +142,13 @@ impl Tool for ReadTool { } } +fn audit_slug(kind: &MemoryToolKind, slug: Option<&str>) -> String { + match kind { + MemoryToolKind::Summary => "summary".to_string(), + _ => slug.unwrap_or("").to_string(), + } +} + struct Rendered { body: String, line_count: usize, diff --git a/crates/memory/src/tool/write.rs b/crates/memory/src/tool/write.rs index 1ad3e79f..f3837c5d 100644 --- a/crates/memory/src/tool/write.rs +++ b/crates/memory/src/tool/write.rs @@ -12,6 +12,9 @@ use async_trait::async_trait; use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput}; use serde::Deserialize; +use crate::audit::{ + AuditStatus, RecordOperationAudit, append_record_operation, file_hash, hash_bytes, +}; use crate::linter::{LintReport, Linter, WriteMode}; use crate::tool::MemoryToolKind; use crate::workspace::WorkspaceLayout; @@ -46,8 +49,11 @@ impl Tool for WriteTool { let path = params .kind .resolve_path(&self.layout, params.slug.as_deref())?; + let kind = params.kind.to_string(); + let slug = audit_slug(¶ms.kind, params.slug.as_deref()); - let already_exists = path.exists(); + let before_hash = file_hash(&path).ok().flatten(); + let already_exists = before_hash.is_some(); let mode = if already_exists { WriteMode::Update } else { @@ -56,20 +62,77 @@ impl Tool for WriteTool { let report = self.linter.lint(&path, ¶ms.content, mode); if report.has_errors() { - return Err(ToolError::InvalidArgument(format_report(&report))); + let reason = format_report(&report); + let _ = append_record_operation( + &self.layout, + RecordOperationAudit { + op: "write".to_string(), + status: AuditStatus::Failed, + kind, + slug, + path: path.display().to_string(), + before_hash, + after_hash: None, + reason: Some(reason.clone()), + }, + ); + return Err(ToolError::InvalidArgument(reason)); } if let Some(parent) = path.parent() { - std::fs::create_dir_all(parent).map_err(|e| { - ToolError::ExecutionFailed(format!( - "failed to create directory {}: {e}", - parent.display() - )) - })?; + if let Err(e) = std::fs::create_dir_all(parent) { + let reason = format!("failed to create directory {}: {e}", parent.display()); + let _ = append_record_operation( + &self.layout, + RecordOperationAudit { + op: "write".to_string(), + status: AuditStatus::Failed, + kind, + slug, + path: path.display().to_string(), + before_hash, + after_hash: None, + reason: Some(reason.clone()), + }, + ); + return Err(ToolError::ExecutionFailed(reason)); + } } - std::fs::write(&path, params.content.as_bytes()).map_err(|e| { - ToolError::ExecutionFailed(format!("failed to write {}: {e}", path.display())) - })?; + if let Err(e) = std::fs::write(&path, params.content.as_bytes()) { + let reason = format!("failed to write {}: {e}", path.display()); + let _ = append_record_operation( + &self.layout, + RecordOperationAudit { + op: "write".to_string(), + status: AuditStatus::Failed, + kind, + slug, + path: path.display().to_string(), + before_hash, + after_hash: None, + reason: Some(reason.clone()), + }, + ); + return Err(ToolError::ExecutionFailed(reason)); + } + let after_hash = Some(hash_bytes(params.content.as_bytes())); + let _ = append_record_operation( + &self.layout, + RecordOperationAudit { + op: "write".to_string(), + status: AuditStatus::Success, + kind, + slug, + path: path.display().to_string(), + before_hash, + after_hash, + reason: if report.warnings.is_empty() { + None + } else { + Some(format!("{} warning(s)", report.warnings.len())) + }, + }, + ); let summary = format!( "{} {}{}", @@ -88,6 +151,13 @@ impl Tool for WriteTool { } } +fn audit_slug(kind: &MemoryToolKind, slug: Option<&str>) -> String { + match kind { + MemoryToolKind::Summary => "summary".to_string(), + _ => slug.unwrap_or("").to_string(), + } +} + fn format_report(report: &LintReport) -> String { use std::fmt::Write as _; let mut buf = String::from("memory linter rejected the write:"); diff --git a/crates/memory/src/workspace.rs b/crates/memory/src/workspace.rs index f1d40902..86bc7d20 100644 --- a/crates/memory/src/workspace.rs +++ b/crates/memory/src/workspace.rs @@ -11,6 +11,7 @@ //! - `/.insomnia/memory/decisions/.md` //! - `/.insomnia/memory/requests/.md` //! - `/.insomnia/memory/_staging/.json` +//! - `/.insomnia/memory/_logs/current.log` (append-only audit log) //! //! `memory/` is reserved for session-derived / generated state; //! Workflows are human-managed and live one level up under @@ -24,6 +25,7 @@ use std::path::{Path, PathBuf}; use crate::Slug; use crate::error::LintError; +#[cfg(test)] use lint_common::RecordLintError; const INSOMNIA_DIR: &str = ".insomnia"; @@ -35,7 +37,9 @@ const DECISIONS_DIR: &str = "decisions"; const REQUESTS_DIR: &str = "requests"; const STAGING_DIR: &str = "_staging"; const USAGE_DIR: &str = "_usage"; +const LOGS_DIR: &str = "_logs"; const USAGE_EVENTS_FILE: &str = "events.jsonl"; +const AUDIT_CURRENT_LOG_FILE: &str = "current.log"; /// What kind of record a path under the memory tree represents. #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -137,6 +141,18 @@ impl WorkspaceLayout { self.usage_dir().join(USAGE_EVENTS_FILE) } + pub fn audit_logs_dir(&self) -> PathBuf { + self.memory_dir().join(LOGS_DIR) + } + + /// Tail-friendly latest memory audit log path. + /// + /// Operators can inspect live memory worker and tool events with: + /// `tail -f .insomnia/memory/_logs/current.log`. + pub fn audit_current_log_path(&self) -> PathBuf { + self.audit_logs_dir().join(AUDIT_CURRENT_LOG_FILE) + } + pub fn decision_path(&self, slug: &Slug) -> PathBuf { self.decisions_dir().join(format!("{slug}.md")) } @@ -156,7 +172,7 @@ impl WorkspaceLayout { /// Classify a path under the memory tree. Returns `None` if the /// path is not under `.insomnia/memory/` or `.insomnia/knowledge/` /// of this workspace, or if it lives in - /// `_staging/` / `_usage/` (opaque subsystem-owned trees). + /// `_staging/` / `_usage/` / `_logs/` (opaque subsystem-owned trees). /// /// On a conventional path that's *almost* a record but malformed /// (e.g. `.insomnia/memory/decisions/Foo.md` with an invalid slug), @@ -189,7 +205,7 @@ impl WorkspaceLayout { slug: None, })); } - if first == STAGING_DIR || first == USAGE_DIR { + if first == STAGING_DIR || first == USAGE_DIR || first == LOGS_DIR { // Linter opts out of subsystem-owned opaque trees. return Ok(None); } @@ -300,6 +316,14 @@ mod tests { assert!(cp.is_none()); } + #[test] + fn logs_tree_is_opaque_to_classifier() { + let cp = layout() + .classify(&PathBuf::from("/ws/.insomnia/memory/_logs/current.log")) + .unwrap(); + assert!(cp.is_none()); + } + #[test] fn outside_returns_none() { assert!( diff --git a/crates/pod/src/controller.rs b/crates/pod/src/controller.rs index 2bab6f17..9e160d40 100644 --- a/crates/pod/src/controller.rs +++ b/crates/pod/src/controller.rs @@ -513,6 +513,7 @@ where )); worker.register_tool(memory::tool::write_tool(layout.clone())); worker.register_tool(memory::tool::edit_tool(layout.clone())); + worker.register_tool(memory::tool::delete_tool(layout.clone())); worker.register_tool(memory::tool::memory_query_tool(layout.clone(), query_cfg)); worker.register_tool(memory::tool::knowledge_query_tool(layout, query_cfg)); } diff --git a/crates/pod/src/pod.rs b/crates/pod/src/pod.rs index f5cc55aa..5f20252c 100644 --- a/crates/pod/src/pod.rs +++ b/crates/pod/src/pod.rs @@ -2596,6 +2596,25 @@ impl Pod { // `Some(0)` means disabled, same as `None`. Otherwise the // `tokens_since >= 0` comparison would fire on every post-run. let Some(threshold) = memory_cfg.extract_threshold.filter(|n| *n > 0) else { + let layout = memory::WorkspaceLayout::resolve(&memory_cfg, &self.pwd); + let model = memory_cfg + .extract_model + .as_ref() + .unwrap_or(&self.manifest.model); + WorkerAuditBase::new( + memory::audit::AuditWorker::MemoryExtract, + memory::audit::AuditTrigger::TokenThreshold, + Some(model_audit_from_manifest(model)), + ) + .emit( + &layout, + self.event_tx.as_ref(), + memory::audit::WorkerLifecycleStatus::Skipped, + "extract_threshold_disabled", + None, + None, + None, + ); return Ok(()); }; @@ -2607,6 +2626,25 @@ impl Pod { .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) .is_err() { + let layout = memory::WorkspaceLayout::resolve(&memory_cfg, &self.pwd); + let model = memory_cfg + .extract_model + .as_ref() + .unwrap_or(&self.manifest.model); + WorkerAuditBase::new( + memory::audit::AuditWorker::MemoryExtract, + memory::audit::AuditTrigger::TokenThreshold, + Some(model_audit_from_manifest(model)), + ) + .emit( + &layout, + self.event_tx.as_ref(), + memory::audit::WorkerLifecycleStatus::Skipped, + "extract_already_in_flight", + None, + None, + None, + ); return Ok(()); } let result = self.run_extract_once(&memory_cfg, threshold).await; @@ -2644,6 +2682,18 @@ impl Pod { ) -> Result { use memory::extract; + let layout = memory::WorkspaceLayout::resolve(memory_cfg, &self.pwd); + let model = memory_cfg + .extract_model + .as_ref() + .unwrap_or(&self.manifest.model); + let audit = WorkerAuditBase::new( + memory::audit::AuditWorker::MemoryExtract, + memory::audit::AuditTrigger::TokenThreshold, + Some(model_audit_from_manifest(model)), + ); + let event_tx = self.event_tx.as_ref(); + let pointer_snapshot = self .extract_pointer .lock() @@ -2656,6 +2706,17 @@ impl Pod { let tokens_since = self.tokens_added_since(processed_history_len); if tokens_since < threshold { + audit.emit( + &layout, + event_tx, + memory::audit::WorkerLifecycleStatus::Skipped, + format!( + "token_threshold_not_reached tokens_since={tokens_since} threshold={threshold}" + ), + None, + None, + None, + ); return Ok(ExtractDecision::Skipped); } @@ -2666,6 +2727,18 @@ impl Pod { .history() .len(); if current_history_len <= processed_history_len { + audit.emit( + &layout, + event_tx, + memory::audit::WorkerLifecycleStatus::Skipped, + "no_new_history_items", + None, + Some(memory::audit::ExtractAudit { + history_range: Some([processed_history_len as u64, current_history_len as u64]), + ..Default::default() + }), + None, + ); return Ok(ExtractDecision::Skipped); } @@ -2677,6 +2750,15 @@ impl Pod { .read_all(self.session_id(), self.segment_id())? .len(); if entries_now == 0 { + audit.emit( + &layout, + event_tx, + memory::audit::WorkerLifecycleStatus::Skipped, + "empty_segment_log", + None, + None, + None, + ); return Ok(ExtractDecision::Skipped); } let end_entry = entries_now - 1; @@ -2685,42 +2767,118 @@ impl Pod { .map(|p| p.processed_through_entry + 1) .unwrap_or(0); if start_entry > end_entry { + audit.emit( + &layout, + event_tx, + memory::audit::WorkerLifecycleStatus::Skipped, + "no_new_segment_entries", + None, + Some(memory::audit::ExtractAudit { + session_id: Some(self.session_id().to_string()), + segment_id: Some(self.segment_id().to_string()), + entry_range: Some([start_entry as u64, end_entry as u64]), + history_range: Some([processed_history_len as u64, current_history_len as u64]), + ..Default::default() + }), + None, + ); return Ok(ExtractDecision::Skipped); } + let extract_audit_base = memory::audit::ExtractAudit { + session_id: Some(self.session_id().to_string()), + segment_id: Some(self.segment_id().to_string()), + entry_range: Some([start_entry as u64, end_entry as u64]), + history_range: Some([processed_history_len as u64, current_history_len as u64]), + ..Default::default() + }; + audit.emit( + &layout, + event_tx, + memory::audit::WorkerLifecycleStatus::Started, + format!("token_threshold_reached tokens_since={tokens_since} threshold={threshold}"), + None, + Some(extract_audit_base.clone()), + None, + ); + let items_to_extract = self.worker.as_ref().expect("worker present").history() [processed_history_len..current_history_len] .to_vec(); - let layout = memory::WorkspaceLayout::resolve(memory_cfg, &self.pwd); let extract_worker_max_turns = memory_cfg .extract_worker_max_turns .or(manifest::defaults::MEMORY_EXTRACT_WORKER_MAX_TURNS); - let client = self.build_extractor_client(memory_cfg)?; + let client = match self.build_extractor_client(memory_cfg) { + Ok(client) => client, + Err(err) => { + audit.emit( + &layout, + event_tx, + memory::audit::WorkerLifecycleStatus::Failed, + format!("client_build_failed: {err}"), + None, + Some(extract_audit_base), + None, + ); + return Err(err); + } + }; let memory_language = memory_language(memory_cfg); - let extract_system_prompt = self - .prompts - .memory_extract_system(memory_language) - .map_err(PodError::PromptCatalog)?; + let extract_system_prompt = match self.prompts.memory_extract_system(memory_language) { + Ok(prompt) => prompt, + Err(err) => { + audit.emit( + &layout, + event_tx, + memory::audit::WorkerLifecycleStatus::Failed, + format!("prompt_render_failed: {err}"), + None, + Some(extract_audit_base), + None, + ); + return Err(PodError::PromptCatalog(err)); + } + }; let mut extract_worker = Worker::new(client).system_prompt(extract_system_prompt); extract_worker.set_cache_key(Some(self.segment_id().to_string())); extract_worker.set_max_turns(extract_worker_max_turns); + let usage_capture = Arc::new(Mutex::new(None)); + let usage_capture_for_worker = usage_capture.clone(); + extract_worker.on_usage(move |event| { + *usage_capture_for_worker + .lock() + .expect("memory extract usage capture poisoned") = + Some(usage_audit_from_event(event)); + }); + let ctx = Arc::new(extract::ExtractWorkerContext::new()); extract_worker.register_tool(extract::write_extracted_tool(ctx.clone())); let input_text = extract::build_extract_input(&items_to_extract); - extract_worker - .run(input_text) - .await - .map_err(PodError::Worker)?; + if let Err(err) = extract_worker.run(input_text).await { + let usage = usage_capture + .lock() + .expect("memory extract usage capture poisoned") + .clone(); + audit.emit( + &layout, + event_tx, + lifecycle_status_for_worker_error(&err), + format!("worker_failed: {err}"), + usage, + Some(extract_audit_base), + None, + ); + return Err(PodError::Worker(err)); + } let payload = ctx.take_payload().unwrap_or_else(|| { tracing::warn!( - "extract worker did not call write_extracted; \ - advancing pointer with empty payload" + "extract worker did not call write_extracted; advancing pointer with empty payload" ); extract::ExtractedPayload::default() }); @@ -2733,15 +2891,32 @@ impl Pod { segment_id: source_segment_id.to_string(), range: [start_entry as u64, end_entry as u64], }; - let (id, _) = extract::write_staging(&layout, source, payload) - .map_err(PodError::ExtractStaging)?; + let (id, _) = match extract::write_staging(&layout, source, payload) { + Ok(result) => result, + Err(err) => { + let usage = usage_capture + .lock() + .expect("memory extract usage capture poisoned") + .clone(); + audit.emit( + &layout, + event_tx, + memory::audit::WorkerLifecycleStatus::Failed, + format!("staging_write_failed: {err}"), + usage, + Some(extract_audit_base), + None, + ); + return Err(PodError::ExtractStaging(err)); + } + }; id.to_string() }; let pointer_payload = extract::ExtractPointerPayload { processed_through_entry: end_entry, processed_through_history_len: current_history_len, - staging_id, + staging_id: staging_id.clone(), }; let payload_value = serde_json::to_value(&pointer_payload) .expect("ExtractPointerPayload is always JSON-serializable"); @@ -2756,6 +2931,37 @@ impl Pod { .lock() .expect("extract_pointer poisoned") = Some(pointer_payload); + let mut extract_audit = extract_audit_base; + if !staging_id.is_empty() { + extract_audit.staging_count = 1; + extract_audit.staging_ids.push(staging_id.clone()); + extract_audit.staging_paths.push( + layout + .staging_dir() + .join(format!("{staging_id}.json")) + .display() + .to_string(), + ); + } + let usage = usage_capture + .lock() + .expect("memory extract usage capture poisoned") + .clone(); + let reason = if staging_id.is_empty() { + "completed_no_staging_output" + } else { + "completed_staging_written" + }; + audit.emit( + &layout, + event_tx, + memory::audit::WorkerLifecycleStatus::Completed, + reason, + usage, + Some(extract_audit), + None, + ); + Ok(ExtractDecision::Completed) } @@ -2799,6 +3005,25 @@ impl Pod { let files_threshold = memory_cfg.consolidation_threshold_files.filter(|n| *n > 0); let bytes_threshold = memory_cfg.consolidation_threshold_bytes.filter(|n| *n > 0); if files_threshold.is_none() && bytes_threshold.is_none() { + let layout = memory::WorkspaceLayout::resolve(&memory_cfg, &self.pwd); + let model = memory_cfg + .consolidation_model + .as_ref() + .unwrap_or(&self.manifest.model); + WorkerAuditBase::new( + memory::audit::AuditWorker::MemoryConsolidation, + memory::audit::AuditTrigger::StagingBacklog, + Some(model_audit_from_manifest(model)), + ) + .emit( + &layout, + self.event_tx.as_ref(), + memory::audit::WorkerLifecycleStatus::Skipped, + "consolidation_threshold_disabled", + None, + None, + None, + ); return Ok(()); } @@ -2808,6 +3033,25 @@ impl Pod { .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) .is_err() { + let layout = memory::WorkspaceLayout::resolve(&memory_cfg, &self.pwd); + let model = memory_cfg + .consolidation_model + .as_ref() + .unwrap_or(&self.manifest.model); + WorkerAuditBase::new( + memory::audit::AuditWorker::MemoryConsolidation, + memory::audit::AuditTrigger::StagingBacklog, + Some(model_audit_from_manifest(model)), + ) + .emit( + &layout, + self.event_tx.as_ref(), + memory::audit::WorkerLifecycleStatus::Skipped, + "consolidation_already_in_flight", + None, + None, + None, + ); return Ok(()); } let result = self @@ -2843,21 +3087,57 @@ impl Pod { use memory::consolidate; let layout = memory::WorkspaceLayout::resolve(memory_cfg, &self.pwd); + let model = memory_cfg + .consolidation_model + .as_ref() + .unwrap_or(&self.manifest.model); + let audit = WorkerAuditBase::new( + memory::audit::AuditWorker::MemoryConsolidation, + memory::audit::AuditTrigger::StagingBacklog, + Some(model_audit_from_manifest(model)), + ); + let event_tx = self.event_tx.as_ref(); let entries = consolidate::list_staging_entries(&layout); if entries.is_empty() { + audit.emit( + &layout, + event_tx, + memory::audit::WorkerLifecycleStatus::Skipped, + "no_staging_entries", + None, + None, + Some(memory::audit::ConsolidationAudit::default()), + ); return Ok(ConsolidateDecision::Skipped); } let total_files = entries.len(); let total_bytes: u64 = entries.iter().map(|e| e.bytes).sum(); + let consumed_ids: Vec = entries.iter().map(|e| e.id).collect(); + let base_consolidation = memory::audit::ConsolidationAudit { + staging_count: total_files, + staging_bytes: total_bytes, + consumed_staging_ids: consumed_ids.iter().map(ToString::to_string).collect(), + operations: memory::audit::OperationCounts::default(), + }; let files_hit = files_threshold.is_some_and(|n| total_files >= n); let bytes_hit = bytes_threshold.is_some_and(|n| total_bytes >= n); if !files_hit && !bytes_hit { + audit.emit( + &layout, + event_tx, + memory::audit::WorkerLifecycleStatus::Skipped, + format!( + "threshold_not_reached files={total_files} bytes={total_bytes} files_threshold={files_threshold:?} bytes_threshold={bytes_threshold:?}" + ), + None, + None, + Some(base_consolidation), + ); return Ok(ConsolidateDecision::Skipped); } - let consumed_ids: Vec = entries.iter().map(|e| e.id).collect(); let lock = match consolidate::StagingLock::acquire( &layout, std::process::id(), @@ -2866,15 +3146,56 @@ impl Pod { ) { Ok(l) => l, Err(memory::consolidate::LockError::InUse { .. }) => { + audit.emit( + &layout, + event_tx, + memory::audit::WorkerLifecycleStatus::Skipped, + "staging_lock_in_use", + None, + None, + Some(base_consolidation), + ); return Ok(ConsolidateDecision::Skipped); } - Err(e) => return Err(PodError::ConsolidationLock(e)), + Err(e) => { + audit.emit( + &layout, + event_tx, + memory::audit::WorkerLifecycleStatus::Failed, + format!("staging_lock_failed: {e}"), + None, + None, + Some(base_consolidation), + ); + return Err(PodError::ConsolidationLock(e)); + } }; + audit.emit( + &layout, + event_tx, + memory::audit::WorkerLifecycleStatus::Started, + format!("staging_threshold_reached files={total_files} bytes={total_bytes}"), + None, + None, + Some(base_consolidation.clone()), + ); + + let before_records = memory::audit::snapshot_records(&layout); + let client = match self.build_consolidator_client(memory_cfg) { Ok(c) => c, Err(e) => { lock.release_only(); + audit.emit( + &layout, + event_tx, + memory::audit::WorkerLifecycleStatus::Failed, + format!("client_build_failed: {e}"), + None, + None, + Some(base_consolidation), + ); return Err(e); } }; @@ -2884,12 +3205,30 @@ impl Pod { Ok(p) => p, Err(e) => { lock.release_only(); + audit.emit( + &layout, + event_tx, + memory::audit::WorkerLifecycleStatus::Failed, + format!("prompt_render_failed: {e}"), + None, + None, + Some(base_consolidation), + ); return Err(PodError::PromptCatalog(e)); } }; let mut worker = Worker::new(client).system_prompt(consolidation_system_prompt); worker.set_cache_key(Some(self.segment_id().to_string())); + let usage_capture = Arc::new(Mutex::new(None)); + let usage_capture_for_worker = usage_capture.clone(); + worker.on_usage(move |event| { + *usage_capture_for_worker + .lock() + .expect("memory consolidation usage capture poisoned") = + Some(usage_audit_from_event(event)); + }); + // Memory tools are self-contained — they bypass ScopedFs and write // directly under the workspace via WorkspaceLayout. Resident // knowledge injection (`Pod::set_resident_knowledge_injection`) is @@ -2904,6 +3243,7 @@ impl Pod { )); worker.register_tool(memory::tool::write_tool(layout.clone())); worker.register_tool(memory::tool::edit_tool(layout.clone())); + worker.register_tool(memory::tool::delete_tool(layout.clone())); worker.register_tool(memory::tool::memory_query_tool(layout.clone(), query_cfg)); worker.register_tool(memory::tool::knowledge_query_tool( layout.clone(), @@ -2922,19 +3262,159 @@ impl Pod { consolidate::build_consolidate_input(&layout, &entries, &tidy, &usage_report); let run_result = worker.run(input_text).await; + let usage = usage_capture + .lock() + .expect("memory consolidation usage capture poisoned") + .clone(); match run_result { Ok(_) => { lock.release_with_cleanup(&layout); + let after_records = memory::audit::snapshot_records(&layout); + let mut consolidation = base_consolidation; + consolidation.operations = + memory::audit::operation_counts_from_snapshots(&before_records, &after_records); + let reason = if consolidation.operations.total_record_changes() == 0 { + "completed_no_record_changes" + } else { + "completed_record_changes" + }; + audit.emit( + &layout, + event_tx, + memory::audit::WorkerLifecycleStatus::Completed, + reason, + usage, + None, + Some(consolidation), + ); Ok(ConsolidateDecision::Completed) } Err(e) => { lock.release_only(); + audit.emit( + &layout, + event_tx, + lifecycle_status_for_worker_error(&e), + format!("worker_failed: {e}"), + usage, + None, + Some(base_consolidation), + ); Err(PodError::Worker(e)) } } } } +fn lifecycle_status_for_worker_error(err: &WorkerError) -> memory::audit::WorkerLifecycleStatus { + if matches!(err, WorkerError::Cancelled) { + memory::audit::WorkerLifecycleStatus::Cancelled + } else { + memory::audit::WorkerLifecycleStatus::Failed + } +} + +fn usage_audit_from_event( + event: &llm_worker::llm_client::event::UsageEvent, +) -> memory::audit::UsageAudit { + memory::audit::UsageAudit { + input_tokens: event.input_tokens, + output_tokens: event.output_tokens, + total_tokens: event.total_tokens, + cache_read_input_tokens: event.cache_read_input_tokens, + cache_creation_input_tokens: event.cache_creation_input_tokens, + } +} + +fn model_audit_from_manifest(model: &manifest::ModelManifest) -> memory::audit::ModelAudit { + memory::audit::ModelAudit { + ref_: model.ref_.clone(), + scheme: model.scheme.map(|scheme| format!("{scheme:?}")), + model_id: model.model_id.clone(), + } +} + +fn emit_memory_worker_event( + event_tx: Option<&broadcast::Sender>, + run_id: uuid::Uuid, + worker: memory::audit::AuditWorker, + status: memory::audit::WorkerLifecycleStatus, + trigger: memory::audit::AuditTrigger, + reason: &str, +) { + let Some(event_tx) = event_tx else { + return; + }; + let message = format!("memory {} {}: {reason}", worker.label(), status.label()); + let _ = event_tx.send(Event::MemoryWorker(protocol::MemoryWorkerEvent { + worker: worker.label().to_string(), + status: status.label().to_string(), + run_id: run_id.to_string(), + trigger: trigger.label().to_string(), + reason: reason.to_string(), + message, + timestamp_ms: segment_log::now_millis() as i64, + })); +} + +#[derive(Debug, Clone)] +struct WorkerAuditBase { + run_id: uuid::Uuid, + worker: memory::audit::AuditWorker, + trigger: memory::audit::AuditTrigger, + model: Option, +} + +impl WorkerAuditBase { + fn new( + worker: memory::audit::AuditWorker, + trigger: memory::audit::AuditTrigger, + model: Option, + ) -> Self { + Self { + run_id: uuid::Uuid::now_v7(), + worker, + trigger, + model, + } + } + + fn emit( + &self, + layout: &memory::WorkspaceLayout, + event_tx: Option<&broadcast::Sender>, + status: memory::audit::WorkerLifecycleStatus, + reason: impl Into, + usage: Option, + extract: Option, + consolidation: Option, + ) { + let reason = reason.into(); + let _ = memory::audit::append_worker_lifecycle( + layout, + memory::audit::WorkerLifecycleAudit { + run_id: self.run_id, + worker: self.worker, + status, + trigger: self.trigger, + reason: reason.clone(), + model: self.model.clone(), + usage, + extract, + consolidation, + }, + ); + emit_memory_worker_event( + event_tx, + self.run_id, + self.worker, + status, + self.trigger, + &reason, + ); + } +} + fn memory_language(cfg: &manifest::MemoryConfig) -> &str { cfg.language .as_deref() diff --git a/crates/protocol/src/lib.rs b/crates/protocol/src/lib.rs index 74c63de1..06e08d3c 100644 --- a/crates/protocol/src/lib.rs +++ b/crates/protocol/src/lib.rs @@ -425,6 +425,11 @@ pub enum Event { result: serde_json::Value, }, Alert(Alert), + /// Latest memory extract/consolidation lifecycle event for UI observability. + /// + /// This is not part of LLM history or prompt context; clients may display it + /// briefly as operational status. + MemoryWorker(MemoryWorkerEvent), /// Pod has started compacting the current session. /// /// Fired immediately before a compaction run. Success is signalled by @@ -460,6 +465,19 @@ pub struct Alert { pub timestamp_ms: i64, } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MemoryWorkerEvent { + pub worker: String, + pub status: String, + pub run_id: String, + pub trigger: String, + pub reason: String, + /// Human-readable compact form for actionbar rendering. + pub message: String, + /// Milliseconds since the Unix epoch. + pub timestamp_ms: i64, +} + #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] pub enum AlertLevel { diff --git a/crates/tui/src/app.rs b/crates/tui/src/app.rs index 051fef7b..08218376 100644 --- a/crates/tui/src/app.rs +++ b/crates/tui/src/app.rs @@ -89,6 +89,8 @@ pub struct App { pub context_window: u64, pub turn_index: usize, pub current_tool: Option, + /// Latest memory extract/consolidation lifecycle event for actionbar observability. + pub latest_memory_worker_event: Option, /// Normal composer input that is submitted as `Method::Run`. pub input: InputBuffer, /// Separate command-line input. It is never submitted as a user message. @@ -148,6 +150,7 @@ impl App { context_window: 0, turn_index: 0, current_tool: None, + latest_memory_worker_event: None, input: InputBuffer::new(), command_input: InputBuffer::new(), input_mode: CommandInputMode::Composer, @@ -844,6 +847,9 @@ impl App { message: alert.message, }); } + Event::MemoryWorker(event) => { + self.latest_memory_worker_event = Some(event.message); + } Event::Snapshot { entries, greeting, @@ -2083,6 +2089,26 @@ mod completion_flow_tests { assert_eq!(app.run_output_tokens, 9); } + #[test] + fn memory_worker_event_updates_actionbar_state() { + let mut app = App::new("test".into()); + + app.handle_pod_event(Event::MemoryWorker(protocol::MemoryWorkerEvent { + worker: "extract".into(), + status: "done".into(), + run_id: "00000000-0000-0000-0000-000000000000".into(), + trigger: "token_threshold".into(), + reason: "completed_staging_written".into(), + message: "memory extract done: completed_staging_written".into(), + timestamp_ms: 0, + })); + + assert_eq!( + app.latest_memory_worker_event.as_deref(), + Some("memory extract done: completed_staging_written") + ); + } + #[test] fn compact_done_resets_session_context_tokens() { let mut app = App::new("test".into()); diff --git a/crates/tui/src/ui.rs b/crates/tui/src/ui.rs index 52a9ccc4..b79b0bf7 100644 --- a/crates/tui/src/ui.rs +++ b/crates/tui/src/ui.rs @@ -1218,6 +1218,11 @@ fn draw_actionbar(frame: &mut Frame, app: &App, area: Rect) { "Alt-q edit queued Alt-c clear queued", Style::default().fg(Color::DarkGray), )); + } else if let Some(memory_event) = app.latest_memory_worker_event.as_deref() { + left.push(Span::styled( + truncate_with_ellipsis(memory_event, 72), + Style::default().fg(Color::Blue), + )); } let mut right: Vec> = Vec::new(); diff --git a/resources/prompts/internal/memory_consolidation_system.md b/resources/prompts/internal/memory_consolidation_system.md index bbde4573..f0251740 100644 --- a/resources/prompts/internal/memory_consolidation_system.md +++ b/resources/prompts/internal/memory_consolidation_system.md @@ -6,7 +6,7 @@ Your job is to take extract activity-log staging entries together with the works 2. **Tidy step** — clean up the existing records that the integration step didn't already touch. You have: -- `MemoryRead`, `MemoryWrite`, `MemoryEdit` for memory and knowledge records. +- `MemoryRead`, `MemoryWrite`, `MemoryEdit`, `MemoryDelete` for memory and knowledge records. - `MemoryQuery` for memory-side records (summary / decisions / requests). - `KnowledgeQuery` for knowledge records — use it to find existing slugs before creating new ones.