merge: memory-audit-log

This commit is contained in:
Keisuke Hirata 2026-05-25 03:38:03 +09:00
commit 235ddba9c5
17 changed files with 1534 additions and 60 deletions

1
Cargo.lock generated
View File

@ -1796,6 +1796,7 @@ dependencies = [
"serde",
"serde_json",
"serde_yaml",
"sha2 0.11.0",
"tempfile",
"thiserror 2.0.18",
"tokio",

View File

@ -14,6 +14,7 @@ manifest = { workspace = true }
schemars = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
sha2 = { workspace = true }
serde_yaml = "0.9.34"
thiserror = { workspace = true }
tracing = { workspace = true }

444
crates/memory/src/audit.rs Normal file
View File

@ -0,0 +1,444 @@
//! Append-only JSONL audit log for memory workers and tools.
//!
//! The log is evidence-only observability data under
//! `.insomnia/memory/_logs/current.log`. It is intentionally separate from
//! `_staging` and `_usage`, and consolidation never consumes it. Operators can
//! follow the latest stream with:
//!
//! ```text
//! tail -f .insomnia/memory/_logs/current.log
//! ```
use std::collections::BTreeMap;
use std::fs::{self, OpenOptions};
use std::io::{self, Write};
use std::path::{Path, PathBuf};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use uuid::Uuid;
use crate::workspace::WorkspaceLayout;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum AuditWorker {
MemoryExtract,
MemoryConsolidation,
}
impl AuditWorker {
pub fn label(self) -> &'static str {
match self {
Self::MemoryExtract => "extract",
Self::MemoryConsolidation => "consolidation",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum WorkerLifecycleStatus {
Started,
Completed,
Skipped,
Failed,
Cancelled,
}
impl WorkerLifecycleStatus {
pub fn label(self) -> &'static str {
match self {
Self::Started => "running",
Self::Completed => "done",
Self::Skipped => "skipped",
Self::Failed => "failed",
Self::Cancelled => "cancelled",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum AuditTrigger {
SessionEnd,
TurnThreshold,
TokenThreshold,
StagingBacklog,
Idle,
Manual,
StartupRecovery,
Unknown,
}
impl AuditTrigger {
pub fn label(self) -> &'static str {
match self {
Self::SessionEnd => "session_end",
Self::TurnThreshold => "turn_threshold",
Self::TokenThreshold => "token_threshold",
Self::StagingBacklog => "staging_backlog",
Self::Idle => "idle",
Self::Manual => "manual",
Self::StartupRecovery => "startup_recovery",
Self::Unknown => "unknown",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum AuditStatus {
Success,
Failed,
Skipped,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ModelAudit {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub ref_: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub scheme: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub model_id: Option<String>,
}
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct UsageAudit {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub input_tokens: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub output_tokens: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub total_tokens: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub cache_read_input_tokens: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub cache_creation_input_tokens: Option<u64>,
}
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct ExtractAudit {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub session_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub segment_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub entry_range: Option<[u64; 2]>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub history_range: Option<[u64; 2]>,
#[serde(default)]
pub staging_count: usize,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub staging_ids: Vec<String>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub staging_paths: Vec<String>,
}
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct ConsolidationAudit {
#[serde(default)]
pub staging_count: usize,
#[serde(default)]
pub staging_bytes: u64,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub consumed_staging_ids: Vec<String>,
#[serde(default)]
pub operations: OperationCounts,
}
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct OperationCounts {
#[serde(default)]
pub write: usize,
#[serde(default)]
pub edit: usize,
#[serde(default)]
pub delete: usize,
#[serde(default)]
pub drop: usize,
#[serde(default)]
pub merge: usize,
#[serde(default)]
pub trim: usize,
}
impl OperationCounts {
pub fn total_record_changes(&self) -> usize {
self.write + self.edit + self.delete + self.drop + self.merge + self.trim
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct WorkerLifecycleAudit {
pub run_id: Uuid,
pub worker: AuditWorker,
pub status: WorkerLifecycleStatus,
pub trigger: AuditTrigger,
pub reason: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub model: Option<ModelAudit>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub usage: Option<UsageAudit>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub extract: Option<ExtractAudit>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub consolidation: Option<ConsolidationAudit>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct RecordOperationAudit {
pub op: String,
pub status: AuditStatus,
pub kind: String,
pub slug: String,
pub path: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub before_hash: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub after_hash: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub reason: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct RecordUsageAudit {
pub op: String,
pub status: AuditStatus,
pub kind: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub slug: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub path: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub query: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub result_count: Option<usize>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub reason: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "event", rename_all = "snake_case")]
pub enum AuditPayload {
WorkerLifecycle(WorkerLifecycleAudit),
RecordOperation(RecordOperationAudit),
RecordUsage(RecordUsageAudit),
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct AuditEvent {
pub id: Uuid,
pub occurred_at: DateTime<Utc>,
#[serde(flatten)]
pub payload: AuditPayload,
}
impl AuditEvent {
pub fn new(payload: AuditPayload) -> Self {
Self {
id: Uuid::now_v7(),
occurred_at: Utc::now(),
payload,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RecordSnapshot {
pub kind: String,
pub slug: String,
pub path: PathBuf,
pub hash: String,
}
/// Append one audit event to `.insomnia/memory/_logs/current.log`.
pub fn append_audit_event(layout: &WorkspaceLayout, event: &AuditEvent) -> io::Result<()> {
let path = layout.audit_current_log_path();
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)?;
}
let line = serde_json::to_string(event)
.map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err))?;
let mut file = OpenOptions::new().create(true).append(true).open(path)?;
writeln!(file, "{line}")?;
Ok(())
}
pub fn append_worker_lifecycle(
layout: &WorkspaceLayout,
audit: WorkerLifecycleAudit,
) -> io::Result<()> {
append_audit_event(
layout,
&AuditEvent::new(AuditPayload::WorkerLifecycle(audit)),
)
}
pub fn append_record_operation(
layout: &WorkspaceLayout,
audit: RecordOperationAudit,
) -> io::Result<()> {
append_audit_event(
layout,
&AuditEvent::new(AuditPayload::RecordOperation(audit)),
)
}
pub fn append_record_usage(layout: &WorkspaceLayout, audit: RecordUsageAudit) -> io::Result<()> {
append_audit_event(layout, &AuditEvent::new(AuditPayload::RecordUsage(audit)))
}
pub fn file_hash(path: &Path) -> io::Result<Option<String>> {
match fs::read(path) {
Ok(bytes) => Ok(Some(hash_bytes(&bytes))),
Err(err) if err.kind() == io::ErrorKind::NotFound => Ok(None),
Err(err) => Err(err),
}
}
pub fn hash_bytes(bytes: &[u8]) -> String {
let digest = Sha256::digest(bytes);
let mut out = String::with_capacity("sha256:".len() + digest.len() * 2);
out.push_str("sha256:");
for byte in digest {
use std::fmt::Write as _;
let _ = write!(&mut out, "{byte:02x}");
}
out
}
pub fn snapshot_records(layout: &WorkspaceLayout) -> BTreeMap<String, RecordSnapshot> {
let mut out = BTreeMap::new();
snapshot_one(&mut out, "summary", "summary", layout.summary_path());
snapshot_dir(&mut out, "decision", layout.decisions_dir());
snapshot_dir(&mut out, "request", layout.requests_dir());
snapshot_dir(&mut out, "knowledge", layout.knowledge_dir());
out
}
pub fn operation_counts_from_snapshots(
before: &BTreeMap<String, RecordSnapshot>,
after: &BTreeMap<String, RecordSnapshot>,
) -> OperationCounts {
let mut counts = OperationCounts::default();
for (key, after_record) in after {
match before.get(key) {
None => counts.write += 1,
Some(before_record) if before_record.hash != after_record.hash => counts.edit += 1,
Some(_) => {}
}
}
for key in before.keys() {
if !after.contains_key(key) {
counts.delete += 1;
}
}
counts
}
fn snapshot_dir(out: &mut BTreeMap<String, RecordSnapshot>, kind: &str, dir: PathBuf) {
let entries = match fs::read_dir(dir) {
Ok(entries) => entries,
Err(_) => return,
};
for entry in entries.flatten() {
let path = entry.path();
if !path.is_file() {
continue;
}
let Some(name) = path.file_name().and_then(|n| n.to_str()) else {
continue;
};
let Some(slug) = name.strip_suffix(".md").map(str::to_string) else {
continue;
};
snapshot_one(out, kind, &slug, path);
}
}
fn snapshot_one(out: &mut BTreeMap<String, RecordSnapshot>, kind: &str, slug: &str, path: PathBuf) {
if !path.is_file() {
return;
}
let Ok(Some(hash)) = file_hash(&path) else {
return;
};
out.insert(
format!("{kind}/{slug}"),
RecordSnapshot {
kind: kind.to_string(),
slug: slug.to_string(),
path,
hash,
},
);
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
fn setup() -> (TempDir, WorkspaceLayout) {
let dir = TempDir::new().unwrap();
let layout = WorkspaceLayout::new(dir.path().to_path_buf());
(dir, layout)
}
#[test]
fn appends_jsonl_to_current_log() {
let (_dir, layout) = setup();
let run_id = Uuid::now_v7();
append_worker_lifecycle(
&layout,
WorkerLifecycleAudit {
run_id,
worker: AuditWorker::MemoryExtract,
status: WorkerLifecycleStatus::Started,
trigger: AuditTrigger::TokenThreshold,
reason: "tokens_threshold_reached".to_string(),
model: None,
usage: None,
extract: None,
consolidation: None,
},
)
.unwrap();
let text = fs::read_to_string(layout.audit_current_log_path()).unwrap();
let value: serde_json::Value = serde_json::from_str(text.trim()).unwrap();
assert_eq!(value["event"], "worker_lifecycle");
assert_eq!(value["worker"], "memory_extract");
assert_eq!(value["status"], "started");
assert_eq!(value["run_id"], run_id.to_string());
}
#[test]
fn counts_created_edited_deleted_records() {
let (dir, layout) = setup();
let decision_dir = dir.path().join(".insomnia/memory/decisions");
fs::create_dir_all(&decision_dir).unwrap();
fs::write(decision_dir.join("a.md"), "old").unwrap();
fs::write(decision_dir.join("gone.md"), "old").unwrap();
let before = snapshot_records(&layout);
fs::write(decision_dir.join("a.md"), "new").unwrap();
fs::remove_file(decision_dir.join("gone.md")).unwrap();
fs::write(decision_dir.join("created.md"), "new").unwrap();
let after = snapshot_records(&layout);
let counts = operation_counts_from_snapshots(&before, &after);
assert_eq!(counts.write, 1);
assert_eq!(counts.edit, 1);
assert_eq!(counts.delete, 1);
}
#[test]
fn hash_has_sha256_prefix() {
assert_eq!(hash_bytes(b"abc").len(), "sha256:".len() + 64);
assert!(hash_bytes(b"abc").starts_with("sha256:"));
}
}

View File

@ -6,6 +6,7 @@
//! crate) must not touch these directories — Pod is responsible for
//! denying them at the Scope level when memory is enabled.
pub mod audit;
pub mod consolidate;
pub mod error;
pub mod extract;

View File

@ -0,0 +1,152 @@
//! `MemoryDelete` tool for removing memory / knowledge records with audit logging.
use std::sync::Arc;
use async_trait::async_trait;
use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput};
use serde::Deserialize;
use crate::audit::{AuditStatus, RecordOperationAudit, append_record_operation, file_hash};
use crate::tool::MemoryToolKind;
use crate::workspace::WorkspaceLayout;
const DESCRIPTION: &str = "Delete an existing memory or knowledge record selected by `kind` + `slug`. \
For `summary` omit `slug`; for the others `slug` is required. The delete is audited and cannot target \
workflow or staging/log files.";
#[derive(Debug, Deserialize, schemars::JsonSchema)]
struct DeleteParams {
/// Kind of record to delete.
kind: MemoryToolKind,
/// Slug. Required for everything except `summary`; forbidden for `summary`.
#[serde(default)]
slug: Option<String>,
}
struct MemoryDeleteTool {
layout: WorkspaceLayout,
}
#[async_trait]
impl Tool for MemoryDeleteTool {
async fn execute(&self, input_json: &str) -> Result<ToolOutput, ToolError> {
let params: DeleteParams = serde_json::from_str(input_json)
.map_err(|e| ToolError::InvalidArgument(format!("invalid MemoryDelete input: {e}")))?;
let path = params
.kind
.resolve_path(&self.layout, params.slug.as_deref())?;
let kind = params.kind.to_string();
let slug = audit_slug(&params.kind, params.slug.as_deref());
let before_hash = file_hash(&path).ok().flatten();
if before_hash.is_none() {
let reason = format!("record not found: {}", path.display());
let _ = append_record_operation(
&self.layout,
RecordOperationAudit {
op: "delete".to_string(),
status: AuditStatus::Failed,
kind,
slug,
path: path.display().to_string(),
before_hash,
after_hash: None,
reason: Some(reason.clone()),
},
);
return Err(ToolError::ExecutionFailed(reason));
}
if let Err(err) = std::fs::remove_file(&path) {
let reason = format!("failed to delete {}: {err}", path.display());
let _ = append_record_operation(
&self.layout,
RecordOperationAudit {
op: "delete".to_string(),
status: AuditStatus::Failed,
kind,
slug,
path: path.display().to_string(),
before_hash,
after_hash: None,
reason: Some(reason.clone()),
},
);
return Err(ToolError::ExecutionFailed(reason));
}
let _ = append_record_operation(
&self.layout,
RecordOperationAudit {
op: "delete".to_string(),
status: AuditStatus::Success,
kind,
slug,
path: path.display().to_string(),
before_hash,
after_hash: None,
reason: None,
},
);
Ok(ToolOutput {
summary: format!("Deleted {}", path.display()),
content: None,
})
}
}
pub fn delete_tool(layout: WorkspaceLayout) -> ToolDefinition {
Arc::new(move || {
let schema = schemars::schema_for!(DeleteParams);
let schema_value = serde_json::to_value(schema).unwrap_or(serde_json::json!({}));
let meta = ToolMeta::new("MemoryDelete")
.description(DESCRIPTION)
.input_schema(schema_value);
let tool: Arc<dyn Tool> = Arc::new(MemoryDeleteTool {
layout: layout.clone(),
});
(meta, tool)
})
}
fn audit_slug(kind: &MemoryToolKind, slug: Option<&str>) -> String {
match kind {
MemoryToolKind::Summary => "summary".to_string(),
_ => slug.unwrap_or("<missing>").to_string(),
}
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::Utc;
use tempfile::TempDir;
#[tokio::test]
async fn delete_removes_file_and_audits() {
let dir = TempDir::new().unwrap();
let layout = WorkspaceLayout::new(dir.path().to_path_buf());
std::fs::create_dir_all(layout.decisions_dir()).unwrap();
let path = layout.decisions_dir().join("obsolete.md");
let now = Utc::now().to_rfc3339();
std::fs::write(
&path,
format!(
"---\ncreated_at: {now}\nupdated_at: {now}\nsources: []\nstatus: open\n---\nold"
),
)
.unwrap();
let (_, tool) = delete_tool(layout.clone())();
let out = tool
.execute(r#"{"kind":"decision","slug":"obsolete"}"#)
.await
.unwrap();
assert!(out.summary.contains("Deleted"));
assert!(!path.exists());
let log = std::fs::read_to_string(layout.audit_current_log_path()).unwrap();
assert!(log.contains(r#""event":"record_operation""#));
assert!(log.contains(r#""op":"delete""#));
assert!(log.contains(r#""status":"success""#));
}
}

View File

@ -12,6 +12,9 @@ use async_trait::async_trait;
use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput};
use serde::Deserialize;
use crate::audit::{
AuditStatus, RecordOperationAudit, append_record_operation, file_hash, hash_bytes,
};
use crate::linter::{LintReport, Linter, WriteMode};
use crate::tool::MemoryToolKind;
use crate::workspace::WorkspaceLayout;
@ -62,30 +65,94 @@ impl Tool for EditTool {
let path = params
.kind
.resolve_path(&self.layout, params.slug.as_deref())?;
let kind = params.kind.to_string();
let slug = audit_slug(&params.kind, params.slug.as_deref());
let current_bytes = std::fs::read(&path).map_err(|e| match e.kind() {
std::io::ErrorKind::NotFound => ToolError::ExecutionFailed(format!(
"record not found (use MemoryWrite to create): {}",
path.display()
)),
_ => ToolError::ExecutionFailed(format!("read failed at {}: {e}", path.display())),
})?;
let current_text = std::str::from_utf8(&current_bytes).map_err(|_| {
ToolError::InvalidArgument(format!("file is not valid UTF-8: {}", path.display()))
})?;
let current_bytes = match std::fs::read(&path) {
Ok(bytes) => bytes,
Err(e) => {
let reason = match e.kind() {
std::io::ErrorKind::NotFound => format!(
"record not found (use MemoryWrite to create): {}",
path.display()
),
_ => format!("read failed at {}: {e}", path.display()),
};
let _ = append_record_operation(
&self.layout,
RecordOperationAudit {
op: "edit".to_string(),
status: AuditStatus::Failed,
kind,
slug,
path: path.display().to_string(),
before_hash: None,
after_hash: None,
reason: Some(reason.clone()),
},
);
return Err(ToolError::ExecutionFailed(reason));
}
};
let before_hash = Some(hash_bytes(&current_bytes));
let current_text = match std::str::from_utf8(&current_bytes) {
Ok(text) => text,
Err(_) => {
let reason = format!("file is not valid UTF-8: {}", path.display());
let _ = append_record_operation(
&self.layout,
RecordOperationAudit {
op: "edit".to_string(),
status: AuditStatus::Failed,
kind,
slug,
path: path.display().to_string(),
before_hash,
after_hash: None,
reason: Some(reason.clone()),
},
);
return Err(ToolError::InvalidArgument(reason));
}
};
let count = current_text.matches(&params.old_string).count();
if count == 0 {
return Err(ToolError::InvalidArgument(format!(
"old_string not found in {}",
path.display()
)));
let reason = format!("old_string not found in {}", path.display());
let _ = append_record_operation(
&self.layout,
RecordOperationAudit {
op: "edit".to_string(),
status: AuditStatus::Failed,
kind,
slug,
path: path.display().to_string(),
before_hash,
after_hash: None,
reason: Some(reason.clone()),
},
);
return Err(ToolError::InvalidArgument(reason));
}
if !params.replace_all && count > 1 {
return Err(ToolError::InvalidArgument(format!(
let reason = format!(
"old_string occurs {count} times in {}; pass replace_all: true or narrow the snippet",
path.display()
)));
);
let _ = append_record_operation(
&self.layout,
RecordOperationAudit {
op: "edit".to_string(),
status: AuditStatus::Failed,
kind,
slug,
path: path.display().to_string(),
before_hash,
after_hash: None,
reason: Some(reason.clone()),
},
);
return Err(ToolError::InvalidArgument(reason));
}
let new_text = if params.replace_all {
@ -97,12 +164,58 @@ impl Tool for EditTool {
let report = self.linter.lint(&path, &new_text, WriteMode::Update);
if report.has_errors() {
return Err(ToolError::InvalidArgument(format_report(&report)));
let reason = format_report(&report);
let _ = append_record_operation(
&self.layout,
RecordOperationAudit {
op: "edit".to_string(),
status: AuditStatus::Failed,
kind,
slug,
path: path.display().to_string(),
before_hash,
after_hash: None,
reason: Some(reason.clone()),
},
);
return Err(ToolError::InvalidArgument(reason));
}
std::fs::write(&path, new_text.as_bytes()).map_err(|e| {
ToolError::ExecutionFailed(format!("failed to write {}: {e}", path.display()))
})?;
if let Err(e) = std::fs::write(&path, new_text.as_bytes()) {
let reason = format!("failed to write {}: {e}", path.display());
let _ = append_record_operation(
&self.layout,
RecordOperationAudit {
op: "edit".to_string(),
status: AuditStatus::Failed,
kind,
slug,
path: path.display().to_string(),
before_hash,
after_hash: None,
reason: Some(reason.clone()),
},
);
return Err(ToolError::ExecutionFailed(reason));
}
let after_hash = file_hash(&path).ok().flatten();
let _ = append_record_operation(
&self.layout,
RecordOperationAudit {
op: "edit".to_string(),
status: AuditStatus::Success,
kind,
slug,
path: path.display().to_string(),
before_hash,
after_hash,
reason: if report.warnings.is_empty() {
None
} else {
Some(format!("{} warning(s)", report.warnings.len()))
},
},
);
let summary = format!(
"Edited {} ({} replacement{}){}",
@ -118,6 +231,13 @@ impl Tool for EditTool {
}
}
fn audit_slug(kind: &MemoryToolKind, slug: Option<&str>) -> String {
match kind {
MemoryToolKind::Summary => "summary".to_string(),
_ => slug.unwrap_or("<missing>").to_string(),
}
}
fn format_report(report: &LintReport) -> String {
use std::fmt::Write as _;
let mut buf = String::from("memory linter rejected the edit:");

View File

@ -5,11 +5,14 @@
//! to know the on-disk layout — Search returns `{slug, kind, ...}` and
//! that pair feeds straight into Read / Edit.
mod delete;
mod edit;
mod query;
mod read;
mod write;
pub use delete::delete_tool;
use std::path::PathBuf;
use llm_worker::tool::ToolError;
@ -34,6 +37,17 @@ pub enum MemoryToolKind {
Knowledge,
}
impl std::fmt::Display for MemoryToolKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(match self {
Self::Summary => "summary",
Self::Decision => "decision",
Self::Request => "request",
Self::Knowledge => "knowledge",
})
}
}
impl MemoryToolKind {
pub fn as_str(self) -> &'static str {
match self {

View File

@ -7,8 +7,9 @@
//! enumerate what records exist without knowing what's inside them.
//!
//! - `MemoryQuery` walks `.insomnia/memory/{summary.md,decisions/,
//! requests/}`. `.insomnia/workflow/` and `.insomnia/memory/_staging/`
//! are excluded by construction.
//! requests/}`. `.insomnia/workflow/`, `.insomnia/memory/_staging/`,
//! `.insomnia/memory/_usage/`, and `.insomnia/memory/_logs/` are excluded
//! by construction.
//! - `KnowledgeQuery` walks `.insomnia/knowledge/*.md` and supports a
//! `kind` filter against the Knowledge frontmatter's `kind` field.
//!
@ -23,6 +24,7 @@ use async_trait::async_trait;
use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput};
use serde::{Deserialize, Serialize};
use crate::audit::{AuditStatus, RecordUsageAudit, append_record_usage};
use crate::schema::{KnowledgeFrontmatter, split_frontmatter};
use crate::workspace::WorkspaceLayout;
@ -128,7 +130,25 @@ impl Tool for MemoryQueryTool {
let params: MemoryQueryParams = serde_json::from_str(input_json)
.map_err(|e| ToolError::InvalidArgument(format!("invalid MemoryQuery input: {e}")))?;
let needle = match params.query.as_deref() {
Some(q) => Some(validate_query(q)?),
Some(q) => match validate_query(q) {
Ok(q) => Some(q),
Err(err) => {
let _ = append_record_usage(
&self.layout,
RecordUsageAudit {
op: "query".to_string(),
status: AuditStatus::Failed,
kind: "memory".to_string(),
slug: None,
path: None,
query: params.query.clone(),
result_count: None,
reason: Some(err.to_string()),
},
);
return Err(err);
}
},
None => None,
};
@ -194,6 +214,23 @@ impl Tool for MemoryQueryTool {
Some(q) => format!("{} hit(s) for {q:?}", records.len()),
None => format!("{} record(s)", records.len()),
};
let _ = append_record_usage(
&self.layout,
RecordUsageAudit {
op: "query".to_string(),
status: AuditStatus::Success,
kind: "memory".to_string(),
slug: None,
path: None,
query: params.query.clone(),
result_count: Some(records.len()),
reason: if records.len() >= limit {
Some("result_limit_reached".to_string())
} else {
None
},
},
);
Ok(ToolOutput {
summary,
content: Some(body),
@ -208,7 +245,25 @@ impl Tool for KnowledgeQueryTool {
ToolError::InvalidArgument(format!("invalid KnowledgeQuery input: {e}"))
})?;
let needle = match params.query.as_deref() {
Some(q) => Some(validate_query(q)?),
Some(q) => match validate_query(q) {
Ok(q) => Some(q),
Err(err) => {
let _ = append_record_usage(
&self.layout,
RecordUsageAudit {
op: "query".to_string(),
status: AuditStatus::Failed,
kind: "knowledge".to_string(),
slug: None,
path: None,
query: params.query.clone(),
result_count: None,
reason: Some(err.to_string()),
},
);
return Err(err);
}
},
None => None,
};
let kind_filter = params.kind.as_deref();
@ -272,6 +327,23 @@ impl Tool for KnowledgeQueryTool {
Some(q) => format!("{} hit(s) for {q:?}", records.len()),
None => format!("{} record(s)", records.len()),
};
let _ = append_record_usage(
&self.layout,
RecordUsageAudit {
op: "query".to_string(),
status: AuditStatus::Success,
kind: "knowledge".to_string(),
slug: None,
path: None,
query: params.query.clone(),
result_count: Some(records.len()),
reason: if records.len() >= limit {
Some("result_limit_reached".to_string())
} else {
None
},
},
);
Ok(ToolOutput {
summary,
content: Some(body),

View File

@ -11,6 +11,7 @@ use async_trait::async_trait;
use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput};
use serde::Deserialize;
use crate::audit::{AuditStatus, RecordUsageAudit, append_record_usage};
use crate::tool::MemoryToolKind;
use crate::usage::{self, UsageSource};
use crate::workspace::WorkspaceLayout;
@ -51,13 +52,32 @@ impl Tool for ReadTool {
let path = params
.kind
.resolve_path(&self.layout, params.slug.as_deref())?;
let kind = params.kind.to_string();
let slug = audit_slug(&params.kind, params.slug.as_deref());
let bytes = std::fs::read(&path).map_err(|e| match e.kind() {
std::io::ErrorKind::NotFound => {
ToolError::ExecutionFailed(format!("record not found: {}", path.display()))
let bytes = match std::fs::read(&path) {
Ok(bytes) => bytes,
Err(e) => {
let reason = match e.kind() {
std::io::ErrorKind::NotFound => format!("record not found: {}", path.display()),
_ => format!("read failed at {}: {e}", path.display()),
};
let _ = append_record_usage(
&self.layout,
RecordUsageAudit {
op: "read".to_string(),
status: AuditStatus::Failed,
kind,
slug: Some(slug),
path: Some(path.display().to_string()),
query: None,
result_count: None,
reason: Some(reason.clone()),
},
);
return Err(ToolError::ExecutionFailed(reason));
}
_ => ToolError::ExecutionFailed(format!("read failed at {}: {e}", path.display())),
})?;
};
let text = String::from_utf8_lossy(&bytes).into_owned();
if let Some(segment_id) = self.usage_session_id.as_deref() {
@ -97,6 +117,24 @@ impl Tool for ReadTool {
)
};
let _ = append_record_usage(
&self.layout,
RecordUsageAudit {
op: "read".to_string(),
status: AuditStatus::Success,
kind,
slug: Some(slug),
path: Some(path.display().to_string()),
query: None,
result_count: Some(rendered.line_count),
reason: if rendered.truncated {
Some("truncated".to_string())
} else {
None
},
},
);
Ok(ToolOutput {
summary,
content: Some(rendered.body),
@ -104,6 +142,13 @@ impl Tool for ReadTool {
}
}
fn audit_slug(kind: &MemoryToolKind, slug: Option<&str>) -> String {
match kind {
MemoryToolKind::Summary => "summary".to_string(),
_ => slug.unwrap_or("<missing>").to_string(),
}
}
struct Rendered {
body: String,
line_count: usize,

View File

@ -12,6 +12,9 @@ use async_trait::async_trait;
use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput};
use serde::Deserialize;
use crate::audit::{
AuditStatus, RecordOperationAudit, append_record_operation, file_hash, hash_bytes,
};
use crate::linter::{LintReport, Linter, WriteMode};
use crate::tool::MemoryToolKind;
use crate::workspace::WorkspaceLayout;
@ -46,8 +49,11 @@ impl Tool for WriteTool {
let path = params
.kind
.resolve_path(&self.layout, params.slug.as_deref())?;
let kind = params.kind.to_string();
let slug = audit_slug(&params.kind, params.slug.as_deref());
let already_exists = path.exists();
let before_hash = file_hash(&path).ok().flatten();
let already_exists = before_hash.is_some();
let mode = if already_exists {
WriteMode::Update
} else {
@ -56,20 +62,77 @@ impl Tool for WriteTool {
let report = self.linter.lint(&path, &params.content, mode);
if report.has_errors() {
return Err(ToolError::InvalidArgument(format_report(&report)));
let reason = format_report(&report);
let _ = append_record_operation(
&self.layout,
RecordOperationAudit {
op: "write".to_string(),
status: AuditStatus::Failed,
kind,
slug,
path: path.display().to_string(),
before_hash,
after_hash: None,
reason: Some(reason.clone()),
},
);
return Err(ToolError::InvalidArgument(reason));
}
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent).map_err(|e| {
ToolError::ExecutionFailed(format!(
"failed to create directory {}: {e}",
parent.display()
))
})?;
if let Err(e) = std::fs::create_dir_all(parent) {
let reason = format!("failed to create directory {}: {e}", parent.display());
let _ = append_record_operation(
&self.layout,
RecordOperationAudit {
op: "write".to_string(),
status: AuditStatus::Failed,
kind,
slug,
path: path.display().to_string(),
before_hash,
after_hash: None,
reason: Some(reason.clone()),
},
);
return Err(ToolError::ExecutionFailed(reason));
}
}
std::fs::write(&path, params.content.as_bytes()).map_err(|e| {
ToolError::ExecutionFailed(format!("failed to write {}: {e}", path.display()))
})?;
if let Err(e) = std::fs::write(&path, params.content.as_bytes()) {
let reason = format!("failed to write {}: {e}", path.display());
let _ = append_record_operation(
&self.layout,
RecordOperationAudit {
op: "write".to_string(),
status: AuditStatus::Failed,
kind,
slug,
path: path.display().to_string(),
before_hash,
after_hash: None,
reason: Some(reason.clone()),
},
);
return Err(ToolError::ExecutionFailed(reason));
}
let after_hash = Some(hash_bytes(params.content.as_bytes()));
let _ = append_record_operation(
&self.layout,
RecordOperationAudit {
op: "write".to_string(),
status: AuditStatus::Success,
kind,
slug,
path: path.display().to_string(),
before_hash,
after_hash,
reason: if report.warnings.is_empty() {
None
} else {
Some(format!("{} warning(s)", report.warnings.len()))
},
},
);
let summary = format!(
"{} {}{}",
@ -88,6 +151,13 @@ impl Tool for WriteTool {
}
}
fn audit_slug(kind: &MemoryToolKind, slug: Option<&str>) -> String {
match kind {
MemoryToolKind::Summary => "summary".to_string(),
_ => slug.unwrap_or("<missing>").to_string(),
}
}
fn format_report(report: &LintReport) -> String {
use std::fmt::Write as _;
let mut buf = String::from("memory linter rejected the write:");

View File

@ -11,6 +11,7 @@
//! - `<root>/.insomnia/memory/decisions/<slug>.md`
//! - `<root>/.insomnia/memory/requests/<slug>.md`
//! - `<root>/.insomnia/memory/_staging/<id>.json`
//! - `<root>/.insomnia/memory/_logs/current.log` (append-only audit log)
//!
//! `memory/` is reserved for session-derived / generated state;
//! Workflows are human-managed and live one level up under
@ -24,6 +25,7 @@ use std::path::{Path, PathBuf};
use crate::Slug;
use crate::error::LintError;
#[cfg(test)]
use lint_common::RecordLintError;
const INSOMNIA_DIR: &str = ".insomnia";
@ -35,7 +37,9 @@ const DECISIONS_DIR: &str = "decisions";
const REQUESTS_DIR: &str = "requests";
const STAGING_DIR: &str = "_staging";
const USAGE_DIR: &str = "_usage";
const LOGS_DIR: &str = "_logs";
const USAGE_EVENTS_FILE: &str = "events.jsonl";
const AUDIT_CURRENT_LOG_FILE: &str = "current.log";
/// What kind of record a path under the memory tree represents.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@ -137,6 +141,18 @@ impl WorkspaceLayout {
self.usage_dir().join(USAGE_EVENTS_FILE)
}
pub fn audit_logs_dir(&self) -> PathBuf {
self.memory_dir().join(LOGS_DIR)
}
/// Tail-friendly latest memory audit log path.
///
/// Operators can inspect live memory worker and tool events with:
/// `tail -f .insomnia/memory/_logs/current.log`.
pub fn audit_current_log_path(&self) -> PathBuf {
self.audit_logs_dir().join(AUDIT_CURRENT_LOG_FILE)
}
pub fn decision_path(&self, slug: &Slug) -> PathBuf {
self.decisions_dir().join(format!("{slug}.md"))
}
@ -156,7 +172,7 @@ impl WorkspaceLayout {
/// Classify a path under the memory tree. Returns `None` if the
/// path is not under `.insomnia/memory/` or `.insomnia/knowledge/`
/// of this workspace, or if it lives in
/// `_staging/` / `_usage/` (opaque subsystem-owned trees).
/// `_staging/` / `_usage/` / `_logs/` (opaque subsystem-owned trees).
///
/// On a conventional path that's *almost* a record but malformed
/// (e.g. `.insomnia/memory/decisions/Foo.md` with an invalid slug),
@ -189,7 +205,7 @@ impl WorkspaceLayout {
slug: None,
}));
}
if first == STAGING_DIR || first == USAGE_DIR {
if first == STAGING_DIR || first == USAGE_DIR || first == LOGS_DIR {
// Linter opts out of subsystem-owned opaque trees.
return Ok(None);
}
@ -300,6 +316,14 @@ mod tests {
assert!(cp.is_none());
}
#[test]
fn logs_tree_is_opaque_to_classifier() {
let cp = layout()
.classify(&PathBuf::from("/ws/.insomnia/memory/_logs/current.log"))
.unwrap();
assert!(cp.is_none());
}
#[test]
fn outside_returns_none() {
assert!(

View File

@ -513,6 +513,7 @@ where
));
worker.register_tool(memory::tool::write_tool(layout.clone()));
worker.register_tool(memory::tool::edit_tool(layout.clone()));
worker.register_tool(memory::tool::delete_tool(layout.clone()));
worker.register_tool(memory::tool::memory_query_tool(layout.clone(), query_cfg));
worker.register_tool(memory::tool::knowledge_query_tool(layout, query_cfg));
}

View File

@ -2596,6 +2596,25 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
// `Some(0)` means disabled, same as `None`. Otherwise the
// `tokens_since >= 0` comparison would fire on every post-run.
let Some(threshold) = memory_cfg.extract_threshold.filter(|n| *n > 0) else {
let layout = memory::WorkspaceLayout::resolve(&memory_cfg, &self.pwd);
let model = memory_cfg
.extract_model
.as_ref()
.unwrap_or(&self.manifest.model);
WorkerAuditBase::new(
memory::audit::AuditWorker::MemoryExtract,
memory::audit::AuditTrigger::TokenThreshold,
Some(model_audit_from_manifest(model)),
)
.emit(
&layout,
self.event_tx.as_ref(),
memory::audit::WorkerLifecycleStatus::Skipped,
"extract_threshold_disabled",
None,
None,
None,
);
return Ok(());
};
@ -2607,6 +2626,25 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{
let layout = memory::WorkspaceLayout::resolve(&memory_cfg, &self.pwd);
let model = memory_cfg
.extract_model
.as_ref()
.unwrap_or(&self.manifest.model);
WorkerAuditBase::new(
memory::audit::AuditWorker::MemoryExtract,
memory::audit::AuditTrigger::TokenThreshold,
Some(model_audit_from_manifest(model)),
)
.emit(
&layout,
self.event_tx.as_ref(),
memory::audit::WorkerLifecycleStatus::Skipped,
"extract_already_in_flight",
None,
None,
None,
);
return Ok(());
}
let result = self.run_extract_once(&memory_cfg, threshold).await;
@ -2644,6 +2682,18 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
) -> Result<ExtractDecision, PodError> {
use memory::extract;
let layout = memory::WorkspaceLayout::resolve(memory_cfg, &self.pwd);
let model = memory_cfg
.extract_model
.as_ref()
.unwrap_or(&self.manifest.model);
let audit = WorkerAuditBase::new(
memory::audit::AuditWorker::MemoryExtract,
memory::audit::AuditTrigger::TokenThreshold,
Some(model_audit_from_manifest(model)),
);
let event_tx = self.event_tx.as_ref();
let pointer_snapshot = self
.extract_pointer
.lock()
@ -2656,6 +2706,17 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
let tokens_since = self.tokens_added_since(processed_history_len);
if tokens_since < threshold {
audit.emit(
&layout,
event_tx,
memory::audit::WorkerLifecycleStatus::Skipped,
format!(
"token_threshold_not_reached tokens_since={tokens_since} threshold={threshold}"
),
None,
None,
None,
);
return Ok(ExtractDecision::Skipped);
}
@ -2666,6 +2727,18 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
.history()
.len();
if current_history_len <= processed_history_len {
audit.emit(
&layout,
event_tx,
memory::audit::WorkerLifecycleStatus::Skipped,
"no_new_history_items",
None,
Some(memory::audit::ExtractAudit {
history_range: Some([processed_history_len as u64, current_history_len as u64]),
..Default::default()
}),
None,
);
return Ok(ExtractDecision::Skipped);
}
@ -2677,6 +2750,15 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
.read_all(self.session_id(), self.segment_id())?
.len();
if entries_now == 0 {
audit.emit(
&layout,
event_tx,
memory::audit::WorkerLifecycleStatus::Skipped,
"empty_segment_log",
None,
None,
None,
);
return Ok(ExtractDecision::Skipped);
}
let end_entry = entries_now - 1;
@ -2685,42 +2767,118 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
.map(|p| p.processed_through_entry + 1)
.unwrap_or(0);
if start_entry > end_entry {
audit.emit(
&layout,
event_tx,
memory::audit::WorkerLifecycleStatus::Skipped,
"no_new_segment_entries",
None,
Some(memory::audit::ExtractAudit {
session_id: Some(self.session_id().to_string()),
segment_id: Some(self.segment_id().to_string()),
entry_range: Some([start_entry as u64, end_entry as u64]),
history_range: Some([processed_history_len as u64, current_history_len as u64]),
..Default::default()
}),
None,
);
return Ok(ExtractDecision::Skipped);
}
let extract_audit_base = memory::audit::ExtractAudit {
session_id: Some(self.session_id().to_string()),
segment_id: Some(self.segment_id().to_string()),
entry_range: Some([start_entry as u64, end_entry as u64]),
history_range: Some([processed_history_len as u64, current_history_len as u64]),
..Default::default()
};
audit.emit(
&layout,
event_tx,
memory::audit::WorkerLifecycleStatus::Started,
format!("token_threshold_reached tokens_since={tokens_since} threshold={threshold}"),
None,
Some(extract_audit_base.clone()),
None,
);
let items_to_extract = self.worker.as_ref().expect("worker present").history()
[processed_history_len..current_history_len]
.to_vec();
let layout = memory::WorkspaceLayout::resolve(memory_cfg, &self.pwd);
let extract_worker_max_turns = memory_cfg
.extract_worker_max_turns
.or(manifest::defaults::MEMORY_EXTRACT_WORKER_MAX_TURNS);
let client = self.build_extractor_client(memory_cfg)?;
let client = match self.build_extractor_client(memory_cfg) {
Ok(client) => client,
Err(err) => {
audit.emit(
&layout,
event_tx,
memory::audit::WorkerLifecycleStatus::Failed,
format!("client_build_failed: {err}"),
None,
Some(extract_audit_base),
None,
);
return Err(err);
}
};
let memory_language = memory_language(memory_cfg);
let extract_system_prompt = self
.prompts
.memory_extract_system(memory_language)
.map_err(PodError::PromptCatalog)?;
let extract_system_prompt = match self.prompts.memory_extract_system(memory_language) {
Ok(prompt) => prompt,
Err(err) => {
audit.emit(
&layout,
event_tx,
memory::audit::WorkerLifecycleStatus::Failed,
format!("prompt_render_failed: {err}"),
None,
Some(extract_audit_base),
None,
);
return Err(PodError::PromptCatalog(err));
}
};
let mut extract_worker = Worker::new(client).system_prompt(extract_system_prompt);
extract_worker.set_cache_key(Some(self.segment_id().to_string()));
extract_worker.set_max_turns(extract_worker_max_turns);
let usage_capture = Arc::new(Mutex::new(None));
let usage_capture_for_worker = usage_capture.clone();
extract_worker.on_usage(move |event| {
*usage_capture_for_worker
.lock()
.expect("memory extract usage capture poisoned") =
Some(usage_audit_from_event(event));
});
let ctx = Arc::new(extract::ExtractWorkerContext::new());
extract_worker.register_tool(extract::write_extracted_tool(ctx.clone()));
let input_text = extract::build_extract_input(&items_to_extract);
extract_worker
.run(input_text)
.await
.map_err(PodError::Worker)?;
if let Err(err) = extract_worker.run(input_text).await {
let usage = usage_capture
.lock()
.expect("memory extract usage capture poisoned")
.clone();
audit.emit(
&layout,
event_tx,
lifecycle_status_for_worker_error(&err),
format!("worker_failed: {err}"),
usage,
Some(extract_audit_base),
None,
);
return Err(PodError::Worker(err));
}
let payload = ctx.take_payload().unwrap_or_else(|| {
tracing::warn!(
"extract worker did not call write_extracted; \
advancing pointer with empty payload"
"extract worker did not call write_extracted; advancing pointer with empty payload"
);
extract::ExtractedPayload::default()
});
@ -2733,15 +2891,32 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
segment_id: source_segment_id.to_string(),
range: [start_entry as u64, end_entry as u64],
};
let (id, _) = extract::write_staging(&layout, source, payload)
.map_err(PodError::ExtractStaging)?;
let (id, _) = match extract::write_staging(&layout, source, payload) {
Ok(result) => result,
Err(err) => {
let usage = usage_capture
.lock()
.expect("memory extract usage capture poisoned")
.clone();
audit.emit(
&layout,
event_tx,
memory::audit::WorkerLifecycleStatus::Failed,
format!("staging_write_failed: {err}"),
usage,
Some(extract_audit_base),
None,
);
return Err(PodError::ExtractStaging(err));
}
};
id.to_string()
};
let pointer_payload = extract::ExtractPointerPayload {
processed_through_entry: end_entry,
processed_through_history_len: current_history_len,
staging_id,
staging_id: staging_id.clone(),
};
let payload_value = serde_json::to_value(&pointer_payload)
.expect("ExtractPointerPayload is always JSON-serializable");
@ -2756,6 +2931,37 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
.lock()
.expect("extract_pointer poisoned") = Some(pointer_payload);
let mut extract_audit = extract_audit_base;
if !staging_id.is_empty() {
extract_audit.staging_count = 1;
extract_audit.staging_ids.push(staging_id.clone());
extract_audit.staging_paths.push(
layout
.staging_dir()
.join(format!("{staging_id}.json"))
.display()
.to_string(),
);
}
let usage = usage_capture
.lock()
.expect("memory extract usage capture poisoned")
.clone();
let reason = if staging_id.is_empty() {
"completed_no_staging_output"
} else {
"completed_staging_written"
};
audit.emit(
&layout,
event_tx,
memory::audit::WorkerLifecycleStatus::Completed,
reason,
usage,
Some(extract_audit),
None,
);
Ok(ExtractDecision::Completed)
}
@ -2799,6 +3005,25 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
let files_threshold = memory_cfg.consolidation_threshold_files.filter(|n| *n > 0);
let bytes_threshold = memory_cfg.consolidation_threshold_bytes.filter(|n| *n > 0);
if files_threshold.is_none() && bytes_threshold.is_none() {
let layout = memory::WorkspaceLayout::resolve(&memory_cfg, &self.pwd);
let model = memory_cfg
.consolidation_model
.as_ref()
.unwrap_or(&self.manifest.model);
WorkerAuditBase::new(
memory::audit::AuditWorker::MemoryConsolidation,
memory::audit::AuditTrigger::StagingBacklog,
Some(model_audit_from_manifest(model)),
)
.emit(
&layout,
self.event_tx.as_ref(),
memory::audit::WorkerLifecycleStatus::Skipped,
"consolidation_threshold_disabled",
None,
None,
None,
);
return Ok(());
}
@ -2808,6 +3033,25 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{
let layout = memory::WorkspaceLayout::resolve(&memory_cfg, &self.pwd);
let model = memory_cfg
.consolidation_model
.as_ref()
.unwrap_or(&self.manifest.model);
WorkerAuditBase::new(
memory::audit::AuditWorker::MemoryConsolidation,
memory::audit::AuditTrigger::StagingBacklog,
Some(model_audit_from_manifest(model)),
)
.emit(
&layout,
self.event_tx.as_ref(),
memory::audit::WorkerLifecycleStatus::Skipped,
"consolidation_already_in_flight",
None,
None,
None,
);
return Ok(());
}
let result = self
@ -2843,21 +3087,57 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
use memory::consolidate;
let layout = memory::WorkspaceLayout::resolve(memory_cfg, &self.pwd);
let model = memory_cfg
.consolidation_model
.as_ref()
.unwrap_or(&self.manifest.model);
let audit = WorkerAuditBase::new(
memory::audit::AuditWorker::MemoryConsolidation,
memory::audit::AuditTrigger::StagingBacklog,
Some(model_audit_from_manifest(model)),
);
let event_tx = self.event_tx.as_ref();
let entries = consolidate::list_staging_entries(&layout);
if entries.is_empty() {
audit.emit(
&layout,
event_tx,
memory::audit::WorkerLifecycleStatus::Skipped,
"no_staging_entries",
None,
None,
Some(memory::audit::ConsolidationAudit::default()),
);
return Ok(ConsolidateDecision::Skipped);
}
let total_files = entries.len();
let total_bytes: u64 = entries.iter().map(|e| e.bytes).sum();
let consumed_ids: Vec<uuid::Uuid> = entries.iter().map(|e| e.id).collect();
let base_consolidation = memory::audit::ConsolidationAudit {
staging_count: total_files,
staging_bytes: total_bytes,
consumed_staging_ids: consumed_ids.iter().map(ToString::to_string).collect(),
operations: memory::audit::OperationCounts::default(),
};
let files_hit = files_threshold.is_some_and(|n| total_files >= n);
let bytes_hit = bytes_threshold.is_some_and(|n| total_bytes >= n);
if !files_hit && !bytes_hit {
audit.emit(
&layout,
event_tx,
memory::audit::WorkerLifecycleStatus::Skipped,
format!(
"threshold_not_reached files={total_files} bytes={total_bytes} files_threshold={files_threshold:?} bytes_threshold={bytes_threshold:?}"
),
None,
None,
Some(base_consolidation),
);
return Ok(ConsolidateDecision::Skipped);
}
let consumed_ids: Vec<uuid::Uuid> = entries.iter().map(|e| e.id).collect();
let lock = match consolidate::StagingLock::acquire(
&layout,
std::process::id(),
@ -2866,15 +3146,56 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
) {
Ok(l) => l,
Err(memory::consolidate::LockError::InUse { .. }) => {
audit.emit(
&layout,
event_tx,
memory::audit::WorkerLifecycleStatus::Skipped,
"staging_lock_in_use",
None,
None,
Some(base_consolidation),
);
return Ok(ConsolidateDecision::Skipped);
}
Err(e) => return Err(PodError::ConsolidationLock(e)),
Err(e) => {
audit.emit(
&layout,
event_tx,
memory::audit::WorkerLifecycleStatus::Failed,
format!("staging_lock_failed: {e}"),
None,
None,
Some(base_consolidation),
);
return Err(PodError::ConsolidationLock(e));
}
};
audit.emit(
&layout,
event_tx,
memory::audit::WorkerLifecycleStatus::Started,
format!("staging_threshold_reached files={total_files} bytes={total_bytes}"),
None,
None,
Some(base_consolidation.clone()),
);
let before_records = memory::audit::snapshot_records(&layout);
let client = match self.build_consolidator_client(memory_cfg) {
Ok(c) => c,
Err(e) => {
lock.release_only();
audit.emit(
&layout,
event_tx,
memory::audit::WorkerLifecycleStatus::Failed,
format!("client_build_failed: {e}"),
None,
None,
Some(base_consolidation),
);
return Err(e);
}
};
@ -2884,12 +3205,30 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
Ok(p) => p,
Err(e) => {
lock.release_only();
audit.emit(
&layout,
event_tx,
memory::audit::WorkerLifecycleStatus::Failed,
format!("prompt_render_failed: {e}"),
None,
None,
Some(base_consolidation),
);
return Err(PodError::PromptCatalog(e));
}
};
let mut worker = Worker::new(client).system_prompt(consolidation_system_prompt);
worker.set_cache_key(Some(self.segment_id().to_string()));
let usage_capture = Arc::new(Mutex::new(None));
let usage_capture_for_worker = usage_capture.clone();
worker.on_usage(move |event| {
*usage_capture_for_worker
.lock()
.expect("memory consolidation usage capture poisoned") =
Some(usage_audit_from_event(event));
});
// Memory tools are self-contained — they bypass ScopedFs and write
// directly under the workspace via WorkspaceLayout. Resident
// knowledge injection (`Pod::set_resident_knowledge_injection`) is
@ -2904,6 +3243,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
));
worker.register_tool(memory::tool::write_tool(layout.clone()));
worker.register_tool(memory::tool::edit_tool(layout.clone()));
worker.register_tool(memory::tool::delete_tool(layout.clone()));
worker.register_tool(memory::tool::memory_query_tool(layout.clone(), query_cfg));
worker.register_tool(memory::tool::knowledge_query_tool(
layout.clone(),
@ -2922,19 +3262,159 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
consolidate::build_consolidate_input(&layout, &entries, &tidy, &usage_report);
let run_result = worker.run(input_text).await;
let usage = usage_capture
.lock()
.expect("memory consolidation usage capture poisoned")
.clone();
match run_result {
Ok(_) => {
lock.release_with_cleanup(&layout);
let after_records = memory::audit::snapshot_records(&layout);
let mut consolidation = base_consolidation;
consolidation.operations =
memory::audit::operation_counts_from_snapshots(&before_records, &after_records);
let reason = if consolidation.operations.total_record_changes() == 0 {
"completed_no_record_changes"
} else {
"completed_record_changes"
};
audit.emit(
&layout,
event_tx,
memory::audit::WorkerLifecycleStatus::Completed,
reason,
usage,
None,
Some(consolidation),
);
Ok(ConsolidateDecision::Completed)
}
Err(e) => {
lock.release_only();
audit.emit(
&layout,
event_tx,
lifecycle_status_for_worker_error(&e),
format!("worker_failed: {e}"),
usage,
None,
Some(base_consolidation),
);
Err(PodError::Worker(e))
}
}
}
}
fn lifecycle_status_for_worker_error(err: &WorkerError) -> memory::audit::WorkerLifecycleStatus {
if matches!(err, WorkerError::Cancelled) {
memory::audit::WorkerLifecycleStatus::Cancelled
} else {
memory::audit::WorkerLifecycleStatus::Failed
}
}
fn usage_audit_from_event(
event: &llm_worker::llm_client::event::UsageEvent,
) -> memory::audit::UsageAudit {
memory::audit::UsageAudit {
input_tokens: event.input_tokens,
output_tokens: event.output_tokens,
total_tokens: event.total_tokens,
cache_read_input_tokens: event.cache_read_input_tokens,
cache_creation_input_tokens: event.cache_creation_input_tokens,
}
}
fn model_audit_from_manifest(model: &manifest::ModelManifest) -> memory::audit::ModelAudit {
memory::audit::ModelAudit {
ref_: model.ref_.clone(),
scheme: model.scheme.map(|scheme| format!("{scheme:?}")),
model_id: model.model_id.clone(),
}
}
fn emit_memory_worker_event(
event_tx: Option<&broadcast::Sender<Event>>,
run_id: uuid::Uuid,
worker: memory::audit::AuditWorker,
status: memory::audit::WorkerLifecycleStatus,
trigger: memory::audit::AuditTrigger,
reason: &str,
) {
let Some(event_tx) = event_tx else {
return;
};
let message = format!("memory {} {}: {reason}", worker.label(), status.label());
let _ = event_tx.send(Event::MemoryWorker(protocol::MemoryWorkerEvent {
worker: worker.label().to_string(),
status: status.label().to_string(),
run_id: run_id.to_string(),
trigger: trigger.label().to_string(),
reason: reason.to_string(),
message,
timestamp_ms: segment_log::now_millis() as i64,
}));
}
#[derive(Debug, Clone)]
struct WorkerAuditBase {
run_id: uuid::Uuid,
worker: memory::audit::AuditWorker,
trigger: memory::audit::AuditTrigger,
model: Option<memory::audit::ModelAudit>,
}
impl WorkerAuditBase {
fn new(
worker: memory::audit::AuditWorker,
trigger: memory::audit::AuditTrigger,
model: Option<memory::audit::ModelAudit>,
) -> Self {
Self {
run_id: uuid::Uuid::now_v7(),
worker,
trigger,
model,
}
}
fn emit(
&self,
layout: &memory::WorkspaceLayout,
event_tx: Option<&broadcast::Sender<Event>>,
status: memory::audit::WorkerLifecycleStatus,
reason: impl Into<String>,
usage: Option<memory::audit::UsageAudit>,
extract: Option<memory::audit::ExtractAudit>,
consolidation: Option<memory::audit::ConsolidationAudit>,
) {
let reason = reason.into();
let _ = memory::audit::append_worker_lifecycle(
layout,
memory::audit::WorkerLifecycleAudit {
run_id: self.run_id,
worker: self.worker,
status,
trigger: self.trigger,
reason: reason.clone(),
model: self.model.clone(),
usage,
extract,
consolidation,
},
);
emit_memory_worker_event(
event_tx,
self.run_id,
self.worker,
status,
self.trigger,
&reason,
);
}
}
fn memory_language(cfg: &manifest::MemoryConfig) -> &str {
cfg.language
.as_deref()

View File

@ -425,6 +425,11 @@ pub enum Event {
result: serde_json::Value,
},
Alert(Alert),
/// Latest memory extract/consolidation lifecycle event for UI observability.
///
/// This is not part of LLM history or prompt context; clients may display it
/// briefly as operational status.
MemoryWorker(MemoryWorkerEvent),
/// Pod has started compacting the current session.
///
/// Fired immediately before a compaction run. Success is signalled by
@ -460,6 +465,19 @@ pub struct Alert {
pub timestamp_ms: i64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MemoryWorkerEvent {
pub worker: String,
pub status: String,
pub run_id: String,
pub trigger: String,
pub reason: String,
/// Human-readable compact form for actionbar rendering.
pub message: String,
/// Milliseconds since the Unix epoch.
pub timestamp_ms: i64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum AlertLevel {

View File

@ -89,6 +89,8 @@ pub struct App {
pub context_window: u64,
pub turn_index: usize,
pub current_tool: Option<String>,
/// Latest memory extract/consolidation lifecycle event for actionbar observability.
pub latest_memory_worker_event: Option<String>,
/// Normal composer input that is submitted as `Method::Run`.
pub input: InputBuffer,
/// Separate command-line input. It is never submitted as a user message.
@ -148,6 +150,7 @@ impl App {
context_window: 0,
turn_index: 0,
current_tool: None,
latest_memory_worker_event: None,
input: InputBuffer::new(),
command_input: InputBuffer::new(),
input_mode: CommandInputMode::Composer,
@ -844,6 +847,9 @@ impl App {
message: alert.message,
});
}
Event::MemoryWorker(event) => {
self.latest_memory_worker_event = Some(event.message);
}
Event::Snapshot {
entries,
greeting,
@ -2083,6 +2089,26 @@ mod completion_flow_tests {
assert_eq!(app.run_output_tokens, 9);
}
#[test]
fn memory_worker_event_updates_actionbar_state() {
let mut app = App::new("test".into());
app.handle_pod_event(Event::MemoryWorker(protocol::MemoryWorkerEvent {
worker: "extract".into(),
status: "done".into(),
run_id: "00000000-0000-0000-0000-000000000000".into(),
trigger: "token_threshold".into(),
reason: "completed_staging_written".into(),
message: "memory extract done: completed_staging_written".into(),
timestamp_ms: 0,
}));
assert_eq!(
app.latest_memory_worker_event.as_deref(),
Some("memory extract done: completed_staging_written")
);
}
#[test]
fn compact_done_resets_session_context_tokens() {
let mut app = App::new("test".into());

View File

@ -1218,6 +1218,11 @@ fn draw_actionbar(frame: &mut Frame, app: &App, area: Rect) {
"Alt-q edit queued Alt-c clear queued",
Style::default().fg(Color::DarkGray),
));
} else if let Some(memory_event) = app.latest_memory_worker_event.as_deref() {
left.push(Span::styled(
truncate_with_ellipsis(memory_event, 72),
Style::default().fg(Color::Blue),
));
}
let mut right: Vec<Span<'static>> = Vec::new();

View File

@ -6,7 +6,7 @@ Your job is to take extract activity-log staging entries together with the works
2. **Tidy step** — clean up the existing records that the integration step didn't already touch.
You have:
- `MemoryRead`, `MemoryWrite`, `MemoryEdit` for memory and knowledge records.
- `MemoryRead`, `MemoryWrite`, `MemoryEdit`, `MemoryDelete` for memory and knowledge records.
- `MemoryQuery` for memory-side records (summary / decisions / requests).
- `KnowledgeQuery` for knowledge records — use it to find existing slugs before creating new ones.