From d8a7200ea4163bfef10c6f030ecf030c72fc0a22 Mon Sep 17 00:00:00 2001 From: Hare Date: Fri, 1 May 2026 23:00:55 +0900 Subject: [PATCH] =?UTF-8?q?=E3=83=A1=E3=83=A2=E3=83=AAPhase2=E3=81=AE?= =?UTF-8?q?=E5=AE=9F=E8=A3=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Cargo.lock | 6 +- TODO.md | 1 + crates/manifest/src/config.rs | 10 + crates/manifest/src/defaults.rs | 5 + crates/manifest/src/lib.rs | 21 ++ crates/memory/Cargo.toml | 1 + crates/memory/src/consolidate/input.rs | 324 ++++++++++++++++++++ crates/memory/src/consolidate/lock.rs | 305 +++++++++++++++++++ crates/memory/src/consolidate/mod.rs | 32 ++ crates/memory/src/consolidate/prompt.rs | 69 +++++ crates/memory/src/consolidate/staging.rs | 139 +++++++++ crates/memory/src/consolidate/tidy.rs | 362 +++++++++++++++++++++++ crates/memory/src/lib.rs | 1 + crates/pod/Cargo.toml | 1 + crates/pod/src/controller.rs | 24 ++ crates/pod/src/pod.rs | 213 +++++++++++++ crates/pod/tests/consolidation_test.rs | 277 +++++++++++++++++ tickets/internal-worker-workflow.md | 73 +++++ 18 files changed, 1862 insertions(+), 2 deletions(-) create mode 100644 crates/memory/src/consolidate/input.rs create mode 100644 crates/memory/src/consolidate/lock.rs create mode 100644 crates/memory/src/consolidate/mod.rs create mode 100644 crates/memory/src/consolidate/prompt.rs create mode 100644 crates/memory/src/consolidate/staging.rs create mode 100644 crates/memory/src/consolidate/tidy.rs create mode 100644 crates/pod/tests/consolidation_test.rs create mode 100644 tickets/internal-worker-workflow.md diff --git a/Cargo.lock b/Cargo.lock index d9287522..1ca24238 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1571,9 +1571,9 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "libc" -version = "0.2.185" +version = "0.2.186" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52ff2c0fe9bc6cb6b14a0592c2ff4fa9ceb83eea9db979b0487cd054946a2b8f" +checksum = "68ab91017fe16c622486840e4c83c9a37afeff978bd239b5293d61ece587de66" [[package]] name = "libredox" @@ -1757,6 +1757,7 @@ version = "0.1.0" dependencies = [ "async-trait", "chrono", + "libc", "llm-worker", "manifest", "schemars", @@ -2140,6 +2141,7 @@ dependencies = [ "toml", "tools", "tracing", + "uuid", ] [[package]] diff --git a/TODO.md b/TODO.md index 8d85c9dc..fc9e5f48 100644 --- a/TODO.md +++ b/TODO.md @@ -1,5 +1,6 @@ - [ ] Workflow / Skills - [ ] Workflow 実装 → [tickets/workflow.md](tickets/workflow.md) + - [ ] 内部 Worker / 内部 Pod の Workflow 化 → [tickets/internal-worker-workflow.md](tickets/internal-worker-workflow.md) - [ ] Agent Skills を Workflow として ingest → [tickets/agent-skills.md](tickets/agent-skills.md) - [ ] ツール設計 - [ ] Bash ツール (Permission 層と統合) → [tickets/bash-tool.md](tickets/bash-tool.md) diff --git a/crates/manifest/src/config.rs b/crates/manifest/src/config.rs index 3945e552..221deadc 100644 --- a/crates/manifest/src/config.rs +++ b/crates/manifest/src/config.rs @@ -217,6 +217,16 @@ impl MemoryConfig { extract_worker_max_input_tokens: upper .extract_worker_max_input_tokens .or(self.extract_worker_max_input_tokens), + consolidation_model: upper.consolidation_model.or(self.consolidation_model), + consolidation_worker_max_input_tokens: upper + .consolidation_worker_max_input_tokens + .or(self.consolidation_worker_max_input_tokens), + consolidation_threshold_files: upper + .consolidation_threshold_files + .or(self.consolidation_threshold_files), + consolidation_threshold_bytes: upper + .consolidation_threshold_bytes + .or(self.consolidation_threshold_bytes), } } } diff --git a/crates/manifest/src/defaults.rs b/crates/manifest/src/defaults.rs index 102a0832..3fd5ca6d 100644 --- a/crates/manifest/src/defaults.rs +++ b/crates/manifest/src/defaults.rs @@ -50,3 +50,8 @@ pub const COMPACT_DEFAULT_REFERENCE_COUNT: usize = 5; /// own LLM calls. Exceeding this aborts the extract run. /// See [`crate::MemoryConfig::extract_worker_max_input_tokens`]. pub const MEMORY_EXTRACT_WORKER_MAX_INPUT_TOKENS: u64 = 30_000; + +/// Cumulative input-token cap for the memory Phase 2 (consolidation) +/// worker's own LLM calls. Exceeding this aborts the consolidation run. +/// See [`crate::MemoryConfig::consolidation_worker_max_input_tokens`]. +pub const MEMORY_CONSOLIDATION_WORKER_MAX_INPUT_TOKENS: u64 = 80_000; diff --git a/crates/manifest/src/lib.rs b/crates/manifest/src/lib.rs index ca3f7bbf..5bf83ece 100644 --- a/crates/manifest/src/lib.rs +++ b/crates/manifest/src/lib.rs @@ -85,6 +85,27 @@ pub struct MemoryConfig { /// [`defaults::MEMORY_EXTRACT_WORKER_MAX_INPUT_TOKENS`]. #[serde(default)] pub extract_worker_max_input_tokens: Option, + /// Optional model for the Phase 2 (consolidation) worker. When + /// `None`, the main pod model is cloned via `clone_boxed()`. + /// Reasoning-class models are recommended. + #[serde(default)] + pub consolidation_model: Option, + /// Cumulative input-token cap for the consolidation worker's own + /// LLM calls. Exceeding this aborts the consolidation run. `None` ⇒ + /// [`defaults::MEMORY_CONSOLIDATION_WORKER_MAX_INPUT_TOKENS`]. + #[serde(default)] + pub consolidation_worker_max_input_tokens: Option, + /// Phase 2 trigger: file-count threshold of `_staging/`. Phase 2 + /// fires when the staging directory has at least this many entries. + /// Either threshold reaching its limit fires Phase 2 (logical OR). + /// `None` for both thresholds ⇒ Phase 2 disabled. + #[serde(default)] + pub consolidation_threshold_files: Option, + /// Phase 2 trigger: byte-size threshold across all `_staging/` + /// entries. Either threshold reaching its limit fires Phase 2. + /// `None` for both thresholds ⇒ Phase 2 disabled. + #[serde(default)] + pub consolidation_threshold_bytes: Option, } /// Pod metadata. diff --git a/crates/memory/Cargo.toml b/crates/memory/Cargo.toml index a896e63b..beafdcb6 100644 --- a/crates/memory/Cargo.toml +++ b/crates/memory/Cargo.toml @@ -7,6 +7,7 @@ license.workspace = true [dependencies] async-trait = "0.1.89" chrono = { version = "0.4.44", features = ["serde"] } +libc = "0.2.186" llm-worker = { version = "0.2.1", path = "../llm-worker" } manifest = { version = "0.1.0", path = "../manifest" } schemars = "1.2.1" diff --git a/crates/memory/src/consolidate/input.rs b/crates/memory/src/consolidate/input.rs new file mode 100644 index 00000000..6b4df1dc --- /dev/null +++ b/crates/memory/src/consolidate/input.rs @@ -0,0 +1,324 @@ +//! Phase 2 sub-Worker への最初のユーザー入力を組み立てる。 +//! +//! Phase 1 (`extract::build_extract_input`) と同じ方針で、固定 schema の +//! markdown セクション列にしてサブWorker に渡す。`docs/plan/memory.md` +//! §Phase 2 入力 / §整理材料 の項目に従い: +//! +//! 1. consumed staging エントリ全文(`source` 込み) +//! 2. 既存 `memory/*` 全文(summary / decisions / requests) +//! 3. Knowledge 化候補レポート(メトリクス未完なら空) +//! 4. 整理材料(Linter Warn ベース、メトリクス未完なら明示 invoke 頻度なし) +//! +//! 既存 `knowledge/*` 本文は埋めず、agent に `KnowledgeQuery` 経由で引かせる +//! 設計(`docs/plan/memory.md` §retrieval 経路 / §Phase 2 の Knowledge アクセス)。 + +use std::fmt::Write; + +use crate::consolidate::staging::StagingEntry; +use crate::consolidate::tidy::TidyHints; +use crate::workspace::{RecordKind, WorkspaceLayout}; + +/// Knowledge 化候補レポート。`tickets/memory-usage-metrics.md` の成果物が +/// 出るまでは空で渡す前提(`docs/plan/memory.md` §Knowledge 化候補レポート)。 +/// 空入力時、統合 phase は新規 Knowledge を作らず decisions / requests / +/// summary / 既存 Knowledge update に留まる。 +#[derive(Debug, Default, Clone)] +pub struct KnowledgeCandidateReport { + /// 候補に上がった `(kind, slug, frequency_per_mtoken)` の三つ組。 + /// 空配列を渡すと「候補なし」を意味する。 + pub entries: Vec, +} + +#[derive(Debug, Clone)] +pub struct KnowledgeCandidateEntry { + pub source_kind: &'static str, + pub source_slug: String, + pub frequency_per_mtoken: f64, +} + +impl KnowledgeCandidateReport { + pub fn empty() -> Self { + Self::default() + } + pub fn is_empty(&self) -> bool { + self.entries.is_empty() + } +} + +/// Phase 2 sub-Worker の最初の user 入力。 +pub fn build_consolidate_input( + layout: &WorkspaceLayout, + staging: &[StagingEntry], + tidy: &TidyHints, + candidates: &KnowledgeCandidateReport, +) -> String { + let mut out = String::new(); + out.push_str( + "Phase 2 consolidation input. Run the consolidation phase first \ + (fold the staging activity logs into memory and knowledge), then the \ + tidy phase (clean up existing records). Use the memory tools for \ + every write — direct file writes are denied by the pod scope.\n\n", + ); + + out.push_str("## Staging entries (consumed by this run)\n\n"); + out.push_str(&render_staging_records(staging)); + out.push('\n'); + + out.push_str("## Existing memory records (full content)\n\n"); + out.push_str(&render_existing_memory_records(layout)); + out.push('\n'); + + out.push_str("## Knowledge candidate report\n\n"); + out.push_str(&render_candidate_report(candidates)); + out.push('\n'); + + out.push_str("## Tidy hints\n\n"); + out.push_str(&render_tidy_hints(tidy)); + out.push('\n'); + + out.push_str( + "When done, end the turn with a short final assistant message describing \ + what changed.", + ); + out +} + +/// Staging エントリ群を「`### ` ヘッダ + 整形 JSON ブロック」で並べる。 +/// 空配列なら「(none)」と書く。 +pub fn render_staging_records(entries: &[StagingEntry]) -> String { + if entries.is_empty() { + return "(none)\n".to_string(); + } + let mut out = String::new(); + for entry in entries { + let _ = writeln!(&mut out, "### {}", entry.id); + let json = serde_json::to_string_pretty(&entry.record).unwrap_or_else(|_| "{}".into()); + out.push_str("```json\n"); + out.push_str(&json); + out.push_str("\n```\n\n"); + } + out +} + +/// `/.insomnia/memory/{summary.md,decisions/*,requests/*}` を +/// 「`### :` ヘッダ + raw markdown ブロック」で全文渡す。 +pub fn render_existing_memory_records(layout: &WorkspaceLayout) -> String { + let mut out = String::new(); + + let summary = layout.summary_path(); + if let Ok(content) = std::fs::read_to_string(&summary) { + out.push_str("### summary\n"); + out.push_str("```markdown\n"); + out.push_str(content.trim_end_matches('\n')); + out.push_str("\n```\n\n"); + } + + push_kind_records(&mut out, layout, RecordKind::Decision); + push_kind_records(&mut out, layout, RecordKind::Request); + + if out.is_empty() { + return "(none)\n".to_string(); + } + out +} + +fn push_kind_records(out: &mut String, layout: &WorkspaceLayout, kind: RecordKind) { + let dir = match kind { + RecordKind::Decision => layout.decisions_dir(), + RecordKind::Request => layout.requests_dir(), + RecordKind::Knowledge | RecordKind::Summary | RecordKind::Workflow => return, + }; + let entries = match std::fs::read_dir(&dir) { + Ok(it) => it, + Err(_) => return, + }; + let mut paths: Vec<(String, std::path::PathBuf)> = Vec::new(); + for entry in entries.flatten() { + let path = entry.path(); + if !path.is_file() { + continue; + } + let stem = match path.file_stem().and_then(|s| s.to_str()) { + Some(s) => s, + None => continue, + }; + if path.extension().and_then(|s| s.to_str()) != Some("md") { + continue; + } + paths.push((stem.to_string(), path)); + } + paths.sort(); + for (slug, path) in paths { + let Ok(content) = std::fs::read_to_string(&path) else { + continue; + }; + let _ = writeln!(out, "### {}:{}", kind.as_str(), slug); + out.push_str("```markdown\n"); + out.push_str(content.trim_end_matches('\n')); + out.push_str("\n```\n\n"); + } +} + +fn render_candidate_report(report: &KnowledgeCandidateReport) -> String { + if report.is_empty() { + return "(empty — usage metrics pipeline not populated. \ + Do not create new Knowledge records this run.)\n" + .to_string(); + } + let mut out = String::new(); + for c in &report.entries { + let _ = writeln!( + &mut out, + "- {} `{}` — frequency {:.3} invokes/Mtoken", + c.source_kind, c.source_slug, c.frequency_per_mtoken + ); + } + out +} + +/// Tidy hints の Markdown 描画。空ヒントなら "(none)" 1 行。 +pub fn render_tidy_hints(tidy: &TidyHints) -> String { + if tidy.is_empty() { + return "(none)\n".to_string(); + } + let mut out = String::new(); + + if !tidy.replaced_decisions.is_empty() { + out.push_str("**Replaced decisions still on disk** — collapse if the chain has settled:\n"); + for (slug, replaced_by) in &tidy.replaced_decisions { + match replaced_by { + Some(target) => { + let _ = writeln!(&mut out, "- `{slug}` → `{target}`"); + } + None => { + let _ = writeln!(&mut out, "- `{slug}` (no `replaced_by` set)"); + } + } + } + out.push('\n'); + } + + if !tidy.sources_overflow.is_empty() { + out.push_str( + "**Sources overflow** — consider trimming to the most recent entries (git log keeps the rest):\n", + ); + for s in &tidy.sources_overflow { + let _ = writeln!(&mut out, "- {} `{}` ({} sources)", s.kind.as_str(), s.slug, s.count); + } + out.push('\n'); + } + + if !tidy.similar_slug_clusters.is_empty() { + out.push_str("**Similar slug clusters** — evaluate for merge / rename:\n"); + for c in &tidy.similar_slug_clusters { + let joined = c + .slugs + .iter() + .map(|s| format!("`{s}`")) + .collect::>() + .join(", "); + let _ = writeln!(&mut out, "- {}: {}", c.kind.as_str(), joined); + } + out.push('\n'); + } + + out.push_str( + "Explicit-invoke metrics (protection threshold) are not yet wired up; \ + skip drop on long-standing records when uncertain.\n", + ); + out +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::consolidate::tidy::{SimilarSlugCluster, SourcesOverflow}; + use crate::extract::{ExtractedPayload, write_staging}; + use crate::schema::SourceRef; + use chrono::Utc; + use std::path::Path; + + fn now() -> String { + Utc::now().to_rfc3339() + } + + fn write(p: &Path, content: &str) { + if let Some(parent) = p.parent() { + std::fs::create_dir_all(parent).unwrap(); + } + std::fs::write(p, content).unwrap(); + } + + #[test] + fn build_includes_all_sections_when_populated() { + let dir = tempfile::TempDir::new().unwrap(); + let layout = WorkspaceLayout::new(dir.path().to_path_buf()); + + write( + &dir.path().join(".insomnia/memory/summary.md"), + &format!("---\nupdated_at: {n}\n---\nstate of the world\n", n = now()), + ); + write( + &dir.path().join(".insomnia/memory/decisions/dec.md"), + &format!( + "---\ncreated_at: {n}\nupdated_at: {n}\nsources: []\nstatus: open\n---\nbody\n", + n = now() + ), + ); + let (_id, _) = write_staging( + &layout, + SourceRef { + session_id: "s".into(), + range: [0, 1], + }, + ExtractedPayload::default(), + ) + .unwrap(); + let staging = crate::consolidate::staging::list_staging_entries(&layout); + let tidy = TidyHints { + replaced_decisions: [( + "old".to_string(), + Some("new".to_string()), + )] + .into_iter() + .collect(), + sources_overflow: vec![SourcesOverflow { + kind: RecordKind::Decision, + slug: "dec".into(), + count: 12, + }], + similar_slug_clusters: vec![SimilarSlugCluster { + kind: RecordKind::Decision, + slugs: vec!["a".into(), "ab".into()], + }], + }; + let report = KnowledgeCandidateReport::empty(); + + let out = build_consolidate_input(&layout, &staging, &tidy, &report); + assert!(out.contains("Staging entries")); + assert!(out.contains("Existing memory records")); + assert!(out.contains("Knowledge candidate report")); + assert!(out.contains("Tidy hints")); + assert!(out.contains("state of the world")); + assert!(out.contains("decision:dec")); + assert!(out.contains("Replaced decisions")); + assert!(out.contains("Sources overflow")); + assert!(out.contains("Similar slug clusters")); + assert!(out.contains("usage metrics pipeline not populated")); + } + + #[test] + fn empty_inputs_render_placeholders() { + let dir = tempfile::TempDir::new().unwrap(); + let layout = WorkspaceLayout::new(dir.path().to_path_buf()); + let out = build_consolidate_input( + &layout, + &[], + &TidyHints::default(), + &KnowledgeCandidateReport::empty(), + ); + // Both staging and tidy show "(none)"; existing memory records too. + assert!(out.contains("Staging entries")); + assert!(out.contains("(none)")); + } +} diff --git a/crates/memory/src/consolidate/lock.rs b/crates/memory/src/consolidate/lock.rs new file mode 100644 index 00000000..772387e1 --- /dev/null +++ b/crates/memory/src/consolidate/lock.rs @@ -0,0 +1,305 @@ +//! `_staging/.consolidation.lock` による Phase 2 占有ファイル。 +//! +//! `docs/plan/memory.md` §並走防止 に従い: +//! +//! - ファイルが存在し、記録された Pod が動作している間、その Pod が排他占有 +//! - クラッシュで残った stale lock は、所有者 PID が死んでいれば次回 spawn +//! 時に上書き取得できる +//! - cleanup は consumed ID の staging エントリのみ削除し、実行中に Phase 1 +//! が追加した分は残す +//! +//! 占有判定は Linux/macOS の `kill(pid, 0)` 経由で行う(`ESRCH` で死亡判定)。 +//! Windows は対象外: INSOMNIA は POSIX 環境を前提にしている。 + +use std::fs; +use std::path::{Path, PathBuf}; + +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +use crate::workspace::WorkspaceLayout; + +const LOCK_FILE: &str = ".consolidation.lock"; + +/// 占有ファイルの中身。`pid` で stale 判定し、`pod_name` / `started_at` / +/// `consumed_ids` は診断とクラッシュ復旧時の参照に使う。 +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LockRecord { + pub pid: u32, + pub pod_name: String, + pub started_at: DateTime, + /// この Phase 2 run が起動時スナップショットで確定した consumed staging + /// entry の UUIDv7 列。完了時はこの列のみ削除し、追加分は残す。 + pub consumed_ids: Vec, +} + +/// 占有取得 / 解放のエラー。 +#[derive(Debug, thiserror::Error)] +pub enum LockError { + /// 占有ファイルが既にあり、所有者 PID が生きているのでスキップ。 + #[error("Phase 2 lock held by live pid {pid} (pod {pod_name:?})")] + InUse { pid: u32, pod_name: String }, + #[error("io error at {}: {source}", .path.display())] + Io { + path: PathBuf, + #[source] + source: std::io::Error, + }, + #[error("failed to (de)serialize lock record: {0}")] + Serde(#[from] serde_json::Error), +} + +impl LockError { + fn io(path: impl Into, source: std::io::Error) -> Self { + Self::Io { + path: path.into(), + source, + } + } +} + +/// Phase 2 が走っている間 RAII で持つ占有ハンドル。`Drop` では何もしない — +/// 完了時の cleanup は consumed ID 列削除と一緒に行う必要があるため、明示 +/// 解放 [`StagingLock::release_with_cleanup`] を使う。明示解放しないまま +/// drop された場合は占有ファイルがそのまま残り、次回 spawn 時に PID が +/// 死んでいれば stale 上書きされる。 +#[derive(Debug)] +pub struct StagingLock { + path: PathBuf, + record: LockRecord, +} + +impl StagingLock { + pub fn record(&self) -> &LockRecord { + &self.record + } + + pub fn path(&self) -> &Path { + &self.path + } + + /// 占有取得を試みる。既に live な lock があれば + /// [`LockError::InUse`]、stale 判定なら上書き取得する。 + /// staging dir が無ければ作成する。 + pub fn acquire( + layout: &WorkspaceLayout, + pid: u32, + pod_name: impl Into, + consumed_ids: Vec, + ) -> Result { + let staging_dir = layout.staging_dir(); + fs::create_dir_all(&staging_dir).map_err(|e| LockError::io(&staging_dir, e))?; + let path = staging_dir.join(LOCK_FILE); + + if path.exists() { + let raw = fs::read_to_string(&path).map_err(|e| LockError::io(&path, e))?; + // 壊れた lock は stale とみなして上書き許可。 + if let Ok(existing) = serde_json::from_str::(&raw) { + if pid_is_alive(existing.pid) { + return Err(LockError::InUse { + pid: existing.pid, + pod_name: existing.pod_name, + }); + } + tracing::warn!( + stale_pid = existing.pid, + stale_pod = %existing.pod_name, + "Phase 2 stale lock detected, taking over" + ); + } else { + tracing::warn!(path = %path.display(), "Phase 2 lock unparseable, treating as stale"); + } + } + + let record = LockRecord { + pid, + pod_name: pod_name.into(), + started_at: Utc::now(), + consumed_ids, + }; + let json = serde_json::to_string_pretty(&record)?; + fs::write(&path, json).map_err(|e| LockError::io(&path, e))?; + Ok(Self { path, record }) + } + + /// 占有を解放しつつ consumed ID 列の staging エントリを削除する。 + /// 削除対象が見当たらない場合は黙ってスキップ(既に外部で消えていた等)。 + /// 占有ファイル自体の削除も best-effort: 失敗時は warn を出すだけで + /// エラーは伝播しない(次回 spawn 時に stale 判定で上書きされる)。 + pub fn release_with_cleanup(self, layout: &WorkspaceLayout) { + let staging_dir = layout.staging_dir(); + for id in &self.record.consumed_ids { + let target = staging_dir.join(format!("{id}.json")); + match fs::remove_file(&target) { + Ok(_) => {} + Err(e) if e.kind() == std::io::ErrorKind::NotFound => {} + Err(e) => { + tracing::warn!( + path = %target.display(), + error = %e, + "failed to clean up consumed staging entry" + ); + } + } + } + self.unlink_lock_only(); + } + + /// 占有ファイルだけ削除し、staging エントリには触らない。Phase 2 + /// sub-Worker が途中で失敗した場合に使う: 入力 staging を残したまま + /// 次回再評価で再処理させる(`docs/plan/memory.md` §並走防止 の + /// 「重複作成は同一 slug update に自然収束」運用)。 + pub fn release_only(self) { + self.unlink_lock_only(); + } + + fn unlink_lock_only(&self) { + if let Err(e) = fs::remove_file(&self.path) { + if e.kind() != std::io::ErrorKind::NotFound { + tracing::warn!( + path = %self.path.display(), + error = %e, + "failed to remove Phase 2 lock" + ); + } + } + } +} + +#[cfg(unix)] +fn pid_is_alive(pid: u32) -> bool { + // `kill(0, 0)` and `kill(-1, 0)` are POSIX-special (process group / all + // signalable processes) and would yield false positives. Reject pids + // that don't fit a positive `pid_t` so a corrupted lock file with a + // u32::MAX-ish value is treated as stale instead of magically alive. + if pid == 0 || pid > i32::MAX as u32 { + return false; + } + // SAFETY: `kill` with sig 0 only probes whether the target pid exists + // and the caller has permission to signal it. No signal is delivered. + let rc = unsafe { libc::kill(pid as i32, 0) }; + if rc == 0 { + return true; + } + // EPERM means the process exists but we can't signal it — still alive + // for our purposes. ESRCH means it's gone. + let errno = std::io::Error::last_os_error() + .raw_os_error() + .unwrap_or(libc::EINVAL); + errno != libc::ESRCH +} + +#[cfg(not(unix))] +fn pid_is_alive(_pid: u32) -> bool { + // Unsupported platforms: assume the lock is live so we never overwrite + // someone else's claim. Phase 2 will skip and try again next post-run. + true +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::extract::{ExtractedPayload, write_staging}; + use crate::schema::SourceRef; + + fn make_layout() -> (tempfile::TempDir, WorkspaceLayout) { + let dir = tempfile::TempDir::new().unwrap(); + let layout = WorkspaceLayout::new(dir.path().to_path_buf()); + std::fs::create_dir_all(layout.staging_dir()).unwrap(); + (dir, layout) + } + + #[test] + fn acquire_writes_lock_file() { + let (_dir, layout) = make_layout(); + let lock = StagingLock::acquire(&layout, std::process::id(), "pod", Vec::new()).unwrap(); + let path = layout.staging_dir().join(LOCK_FILE); + assert!(path.exists()); + assert_eq!(lock.record().pid, std::process::id()); + assert_eq!(lock.record().pod_name, "pod"); + } + + #[test] + fn acquire_rejects_when_live_pid_holds_lock() { + let (_dir, layout) = make_layout(); + // Use this test process's pid — it's definitely alive. + let _first = + StagingLock::acquire(&layout, std::process::id(), "pod-a", Vec::new()).unwrap(); + let err = StagingLock::acquire(&layout, std::process::id(), "pod-b", Vec::new()) + .expect_err("expected InUse"); + assert!(matches!(err, LockError::InUse { .. })); + } + + #[test] + fn acquire_overwrites_stale_lock() { + let (_dir, layout) = make_layout(); + // pid 1 is init on linux but for arbitrarily-large pids we'd need + // `kill(pid, 0)` to return ESRCH. Use u32::MAX which is guaranteed + // dead on every platform we target. + let stale = LockRecord { + pid: u32::MAX, + pod_name: "ghost".into(), + started_at: Utc::now(), + consumed_ids: Vec::new(), + }; + std::fs::write( + layout.staging_dir().join(LOCK_FILE), + serde_json::to_string_pretty(&stale).unwrap(), + ) + .unwrap(); + + let lock = StagingLock::acquire(&layout, std::process::id(), "pod", Vec::new()) + .expect("stale lock must be overwritable"); + assert_eq!(lock.record().pid, std::process::id()); + } + + #[test] + fn release_drops_consumed_entries_and_unlinks_lock() { + let (_dir, layout) = make_layout(); + let (id_a, _) = write_staging( + &layout, + SourceRef { + session_id: "s".into(), + range: [0, 0], + }, + ExtractedPayload::default(), + ) + .unwrap(); + let (id_b, _) = write_staging( + &layout, + SourceRef { + session_id: "s".into(), + range: [1, 1], + }, + ExtractedPayload::default(), + ) + .unwrap(); + + let lock = StagingLock::acquire(&layout, std::process::id(), "pod", vec![id_a]).unwrap(); + let lock_path = lock.path().to_path_buf(); + lock.release_with_cleanup(&layout); + + assert!(!lock_path.exists(), "lock file must be removed"); + assert!( + !layout.staging_dir().join(format!("{id_a}.json")).exists(), + "consumed entry must be deleted" + ); + assert!( + layout.staging_dir().join(format!("{id_b}.json")).exists(), + "non-consumed entry must remain" + ); + } + + #[test] + fn release_is_resilient_to_missing_consumed_entries() { + let (_dir, layout) = make_layout(); + let phantom = uuid::Uuid::now_v7(); + let lock = + StagingLock::acquire(&layout, std::process::id(), "pod", vec![phantom]).unwrap(); + let lock_path = lock.path().to_path_buf(); + // No file at /.json — release must not panic. + lock.release_with_cleanup(&layout); + assert!(!lock_path.exists()); + } +} diff --git a/crates/memory/src/consolidate/mod.rs b/crates/memory/src/consolidate/mod.rs new file mode 100644 index 00000000..a103aad4 --- /dev/null +++ b/crates/memory/src/consolidate/mod.rs @@ -0,0 +1,32 @@ +//! Phase 2: 統合 + 整理。 +//! +//! Phase 1 が staging に残した活動ログを `memory/*` / `knowledge/*` に +//! 統合し、続けて既存 record を `outdated | superseded | unused | noisy` +//! の観点で整理する disposable Worker を、Pod 側が組み立てるための +//! ヘルパー群を提供する。Pod は次の手順で sub-Worker を構築する: +//! +//! - [`CONSOLIDATION_SYSTEM_PROMPT`] を sub-Worker の system prompt に +//! - [`build_consolidate_input`] を sub-Worker の最初の user 入力に +//! - memory 専用 Tool (read / write / edit) と Knowledge / memory 検索ツールを登録 +//! - [`StagingLock::acquire`] で並走防止 + consumed ID 確定 +//! - sub-Worker run 完了後、[`StagingLock::release_with_cleanup`] で +//! consumed ID 分の staging のみ削除し、占有ファイルを解放 +//! +//! Knowledge 化候補レポートと使用頻度メトリクスは別チケットで供給される +//! 想定。本モジュール時点では空入力として扱い、prompt 側の説明だけ +//! 残しておく(`docs/plan/memory.md` §Phase 2 / 整理材料)。 + +mod input; +mod lock; +mod prompt; +mod staging; +mod tidy; + +pub use input::{ + KnowledgeCandidateReport, build_consolidate_input, render_existing_memory_records, + render_staging_records, render_tidy_hints, +}; +pub use lock::{LockError, LockRecord, StagingLock}; +pub use prompt::CONSOLIDATION_SYSTEM_PROMPT; +pub use staging::{StagingEntry, list_staging_entries}; +pub use tidy::{TidyHints, collect_tidy_hints}; diff --git a/crates/memory/src/consolidate/prompt.rs b/crates/memory/src/consolidate/prompt.rs new file mode 100644 index 00000000..55c52a80 --- /dev/null +++ b/crates/memory/src/consolidate/prompt.rs @@ -0,0 +1,69 @@ +//! Phase 2 sub-Worker の system prompt。 +//! +//! 内容は `docs/plan/memory-prompts.md` §共通原則 / §Phase 2 統合 + 整理 / +//! §Phase 2 Knowledge 書き込み を縮約。統合 phase / 整理 phase は同じ +//! prompt 1 本で順に進める縛り(agent から見ると 1 セッション内のフェーズ +//! 進行、別 trigger / 別 Worker は持たない、`docs/plan/memory.md` §整理 +//! の扱い)。 + +pub const CONSOLIDATION_SYSTEM_PROMPT: &str = r#"You are the Phase 2 consolidation worker for an INSOMNIA memory subsystem. + +Your job is to take Phase 1 activity-log staging entries together with the workspace's current `memory/*` / `knowledge/*` records, then run two phases back-to-back in this single session: + +1. **Consolidation phase** — fold staging into memory and knowledge. +2. **Tidy phase** — clean up the existing records that the consolidation phase didn't already touch. + +You have: +- `MemoryRead`, `MemoryWrite`, `MemoryEdit` for memory and knowledge records. +- `MemoryQuery` for memory-side records (summary / decisions / requests). +- `KnowledgeQuery` for knowledge records — use it to find existing slugs before creating new ones. + +Your initial user message contains the staging entries, the full memory records, the knowledge candidate report, and the tidy hints. Existing knowledge bodies are NOT in the prompt; pull them through `KnowledgeQuery` + `MemoryRead` when relevant. + +# Common rules (both phases) + +- **Do not invent provenance.** Decisions / Requests `sources` arrays MUST be copied from the staging `source` field for the originating activity log entries. Do not synthesise `session_id` or entry ranges. Do not fabricate `last_sources` for Knowledge. +- **Rewrite is allowed and often preferred over append.** When integrating new information, restructure existing records to raise information density. Preserve the existing claims, rationale, and `sources` while you compress. +- **Update over create.** If an existing slug fits, edit it. Only create a new slug when no existing record fits and you can articulate why. +- **`replaced` over delete.** When a Decision is superseded by a different one, mark the old one `status: replaced` with `replaced_by: `. Do not silently drop it. +- **Don't duplicate static docs.** Skip content that already lives in `AGENTS.md`, `docs/plan/*`, or other fixed project documents. +- **Empty output is fine.** If a staging entry doesn't justify a memory write, skip it. +- **Slug rules.** Slugs are kebab-case, short, recognisable, and must be unique within their kind. Same-slug create is a linter error — use Edit instead. +- **Linter errors come back as tool errors.** When the memory linter rejects a write, read the error, fix the issue (missing frontmatter field, oversized body, unknown reference, etc.), and try again. Do not work around the rule. + +# Consolidation phase + +Walk every staging entry in the input. For each one: + +- Add or update `decisions` / `requests` records as appropriate. Copy `sources` verbatim from the staging entry. +- Update existing knowledge records when the staging activity refines them. Use `KnowledgeQuery` to find candidates before creating anything new. +- **Knowledge creation is gated.** Only create a new `knowledge/.md` when the originating source appears in the supplied "Knowledge candidate report". When the report is empty (the metrics pipeline is still being built), do not create new knowledge — fold the activity into decisions / requests / summary or update existing knowledge instead. +- Rewrite `memory/summary.md` only when needed. Aim for 1–5k tokens. Preserve the high-level shape (current focus, recent decisions, stable facts) while pruning stale items. + +# Tidy phase + +Once the consolidation phase is done, evaluate every existing memory and knowledge record against four categories: + +- `outdated`: was correct, no longer matches the current implementation / policy / operation. +- `superseded`: another record is now the de-facto authoritative one; this one is mostly redundant. +- `unused`: not wrong, but rarely referenced — noise rather than signal. +- `noisy`: useful content but bad shape (overlap, sources accumulation, fractured slugs that should merge). + +A single record may fall into more than one category. Choose one of `drop / merge / split / trim / rewrite`: + +- Prefer `merge` and `trim` over `drop` for anything you'd flag as `unused` or `noisy` — git can reverse you, but a confidently-wrong drop hurts discovery. +- `drop` is allowed for `outdated` / `superseded` records you can justify in the diff. +- `replaced` markers (`status: replaced`) and chains pointed at by the tidy hints should be collapsed in this phase. + +**Protection threshold.** When the tidy hints include explicit-invoke metrics, records with `frequency >= 1.0 invokes/Mtoken` are off-limits to drop / large compression. The metrics pipeline is not always populated; when the input lacks frequency data, behave conservatively and skip drop on long-standing records. + +# Closing the turn + +When both phases are done, write a short final assistant message stating: + +- which staging entries you folded in (by short summary, not by ID), +- which existing records you touched (slug + operation), +- anything you intentionally left alone and why. + +Then end the turn. Do not ask questions — there is no human in the loop for this run. +"#; diff --git a/crates/memory/src/consolidate/staging.rs b/crates/memory/src/consolidate/staging.rs new file mode 100644 index 00000000..96345143 --- /dev/null +++ b/crates/memory/src/consolidate/staging.rs @@ -0,0 +1,139 @@ +//! `_staging/*.json` を列挙して [`StagingRecord`] に展開する読み込みヘルパー。 +//! +//! Phase 2 起動時のスナップショット(consumed ID list 確定)と、整理 phase +//! が終わった後の cleanup の双方で使う。`.consolidation.lock` のような +//! 占有ファイルは UUIDv7 として parse できないので自然に除外される。 +//! +//! [`StagingRecord`] のスキーマは Phase 1 が書き出す側 (`crate::extract`) +//! と単一の真実源 — ここでは読み出す側だけを担当する。 + +use std::path::PathBuf; + +use uuid::Uuid; + +use crate::extract::StagingRecord; +use crate::workspace::WorkspaceLayout; + +/// staging に積まれている 1 件分のエントリ。`id` は UUIDv7 で、ファイル名 +/// `.json` を逆引きしたもの。 +#[derive(Debug, Clone)] +pub struct StagingEntry { + pub id: Uuid, + pub path: PathBuf, + pub record: StagingRecord, + /// このファイルのバイト長。閾値判定 (`consolidation_threshold_bytes`) + /// に使う。 + pub bytes: u64, +} + +/// `/*.json` を読んで UUIDv7 順に並べた [`StagingEntry`] +/// 配列を返す。staging_dir が存在しなければ空配列。読めないファイルや +/// JSON parse 失敗は `tracing::warn!` してスキップ(壊れた個別ファイルが +/// Phase 2 全体を止めないように)。 +pub fn list_staging_entries(layout: &WorkspaceLayout) -> Vec { + let dir = layout.staging_dir(); + let entries = match std::fs::read_dir(&dir) { + Ok(it) => it, + Err(_) => return Vec::new(), + }; + + let mut out: Vec = Vec::new(); + for entry in entries.flatten() { + let path = entry.path(); + if !path.is_file() { + continue; + } + let stem = match path.file_stem().and_then(|s| s.to_str()) { + Some(s) => s, + None => continue, + }; + let ext = path.extension().and_then(|s| s.to_str()).unwrap_or(""); + if ext != "json" { + continue; + } + let id = match Uuid::parse_str(stem) { + Ok(u) => u, + Err(_) => continue, + }; + let bytes = match std::fs::metadata(&path) { + Ok(m) => m.len(), + Err(_) => 0, + }; + let raw = match std::fs::read_to_string(&path) { + Ok(s) => s, + Err(e) => { + tracing::warn!(path = %path.display(), error = %e, "failed to read staging entry"); + continue; + } + }; + let record = match serde_json::from_str::(&raw) { + Ok(r) => r, + Err(e) => { + tracing::warn!(path = %path.display(), error = %e, "failed to parse staging entry"); + continue; + } + }; + out.push(StagingEntry { + id, + path, + record, + bytes, + }); + } + out.sort_by_key(|e| e.id); + out +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::extract::{ExtractedPayload, write_staging}; + use crate::schema::SourceRef; + + fn empty_payload() -> ExtractedPayload { + ExtractedPayload::default() + } + + fn source(session_id: &str, range: [u64; 2]) -> SourceRef { + SourceRef { + session_id: session_id.into(), + range, + } + } + + #[test] + fn lists_in_uuidv7_order() { + let tmp = tempfile::TempDir::new().unwrap(); + let layout = WorkspaceLayout::new(tmp.path().to_path_buf()); + + let (id1, _) = write_staging(&layout, source("s", [0, 1]), empty_payload()).unwrap(); + let (id2, _) = write_staging(&layout, source("s", [2, 3]), empty_payload()).unwrap(); + let (id3, _) = write_staging(&layout, source("s", [4, 5]), empty_payload()).unwrap(); + + let entries = list_staging_entries(&layout); + let ids: Vec = entries.iter().map(|e| e.id).collect(); + assert_eq!(ids, vec![id1, id2, id3]); + } + + #[test] + fn skips_lock_file_and_garbage() { + let tmp = tempfile::TempDir::new().unwrap(); + let layout = WorkspaceLayout::new(tmp.path().to_path_buf()); + let (_id, _) = write_staging(&layout, source("s", [0, 1]), empty_payload()).unwrap(); + + // Drop a non-UUID json file and a bare lock file alongside. + std::fs::write(layout.staging_dir().join("not-a-uuid.json"), "{}").unwrap(); + std::fs::write(layout.staging_dir().join(".consolidation.lock"), "{}").unwrap(); + + let entries = list_staging_entries(&layout); + assert_eq!(entries.len(), 1); + } + + #[test] + fn missing_dir_returns_empty() { + let tmp = tempfile::TempDir::new().unwrap(); + let layout = WorkspaceLayout::new(tmp.path().to_path_buf()); + // No staging dir at all. + assert!(list_staging_entries(&layout).is_empty()); + } +} diff --git a/crates/memory/src/consolidate/tidy.rs b/crates/memory/src/consolidate/tidy.rs new file mode 100644 index 00000000..34b52bcd --- /dev/null +++ b/crates/memory/src/consolidate/tidy.rs @@ -0,0 +1,362 @@ +//! 整理 phase が prompt 入力に乗せる「整理材料」スキャナ。 +//! +//! `docs/plan/memory.md` §整理(GC 相当)の扱い と +//! `tickets/memory-phase2-consolidation.md` の整理材料リストに従い、 +//! メトリクス未完の現状で機械的に拾えるヒントだけを集める: +//! +//! - `replaced` chain: `status: replaced` の Decision とその `replaced_by` +//! - sources 過多: `sources` / `last_sources` 配列が閾値超過の record +//! - 類似 slug 乱立: 同 kind の slug が Levenshtein 2 以内のクラスター +//! +//! 使用頻度メトリクスベースの保護閾値情報は `tickets/memory-usage-metrics.md` +//! の成果物が出るまで空で渡る。 + +use std::collections::{BTreeMap, BTreeSet}; + +use crate::schema::{ + DecisionFrontmatter, KnowledgeFrontmatter, RequestFrontmatter, split_frontmatter, +}; +use crate::slug::Slug; +use crate::workspace::{RecordKind, WorkspaceLayout}; + +/// `sources` overflow を flag する閾値。`linter::warnings::SOURCES_OVERFLOW_THRESHOLD` +/// と同値(10)を踏襲する。Linter Warn で sources 過多が検出されるラインと +/// 整理 phase で勧告するラインを揃える狙い。 +pub const SOURCES_OVERFLOW_THRESHOLD: usize = 10; +/// 類似 slug クラスタリングの距離。`linter::warnings::SIMILAR_SLUG_DISTANCE` +/// と同値。 +pub const SIMILAR_SLUG_DISTANCE: usize = 2; + +/// 整理 phase 用の機械集計ヒント。空フィールドは「対象なし」を意味する。 +#[derive(Debug, Default, Clone)] +pub struct TidyHints { + /// `status: replaced` で残っている Decision の slug → `replaced_by` map。 + /// `replaced_by` が None でも置き換え滞留として列挙する。 + pub replaced_decisions: BTreeMap>, + /// kind / slug / sources count の三つ組で sources 累積ラインを表す。 + pub sources_overflow: Vec, + /// 同 kind 内で Levenshtein 距離 `<= SIMILAR_SLUG_DISTANCE` のクラスター。 + /// クラスター内の slug は sorted。 + pub similar_slug_clusters: Vec, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct SourcesOverflow { + pub kind: RecordKind, + pub slug: String, + pub count: usize, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct SimilarSlugCluster { + pub kind: RecordKind, + pub slugs: Vec, +} + +impl TidyHints { + pub fn is_empty(&self) -> bool { + self.replaced_decisions.is_empty() + && self.sources_overflow.is_empty() + && self.similar_slug_clusters.is_empty() + } +} + +/// workspace を一通りスキャンして [`TidyHints`] を組み立てる。読めない / +/// parse できない record は黙ってスキップ(Linter は write 経路で守って +/// いるので、ここで顕在化してもどうしようもない)。 +pub fn collect_tidy_hints(layout: &WorkspaceLayout) -> TidyHints { + let mut hints = TidyHints::default(); + + let decisions = read_kind_records(layout, RecordKind::Decision); + let requests = read_kind_records(layout, RecordKind::Request); + let knowledge = read_kind_records(layout, RecordKind::Knowledge); + + for (slug, content) in &decisions { + let fm = parse_yaml::(content); + if let Some(fm) = fm.as_ref() { + if matches!( + fm.status, + crate::schema::DecisionStatus::Replaced + ) { + hints + .replaced_decisions + .insert(slug.clone(), fm.replaced_by.as_ref().map(|s| s.to_string())); + } + if fm.sources.len() > SOURCES_OVERFLOW_THRESHOLD { + hints.sources_overflow.push(SourcesOverflow { + kind: RecordKind::Decision, + slug: slug.clone(), + count: fm.sources.len(), + }); + } + } + } + for (slug, content) in &requests { + if let Some(fm) = parse_yaml::(content) { + if fm.sources.len() > SOURCES_OVERFLOW_THRESHOLD { + hints.sources_overflow.push(SourcesOverflow { + kind: RecordKind::Request, + slug: slug.clone(), + count: fm.sources.len(), + }); + } + } + } + for (slug, content) in &knowledge { + if let Some(fm) = parse_yaml::(content) { + if fm.last_sources.len() > SOURCES_OVERFLOW_THRESHOLD { + hints.sources_overflow.push(SourcesOverflow { + kind: RecordKind::Knowledge, + slug: slug.clone(), + count: fm.last_sources.len(), + }); + } + } + } + hints + .sources_overflow + .sort_by(|a, b| (a.kind.as_str(), a.slug.as_str()).cmp(&(b.kind.as_str(), b.slug.as_str()))); + + let decision_slugs: Vec<&str> = decisions.keys().map(|s| s.as_str()).collect(); + let request_slugs: Vec<&str> = requests.keys().map(|s| s.as_str()).collect(); + let knowledge_slugs: Vec<&str> = knowledge.keys().map(|s| s.as_str()).collect(); + if let Some(c) = cluster_similar(&decision_slugs, RecordKind::Decision) { + hints.similar_slug_clusters.extend(c); + } + if let Some(c) = cluster_similar(&request_slugs, RecordKind::Request) { + hints.similar_slug_clusters.extend(c); + } + if let Some(c) = cluster_similar(&knowledge_slugs, RecordKind::Knowledge) { + hints.similar_slug_clusters.extend(c); + } + hints + .similar_slug_clusters + .sort_by(|a, b| (a.kind.as_str(), &a.slugs).cmp(&(b.kind.as_str(), &b.slugs))); + + hints +} + +/// `/.insomnia/memory//*.md` (Knowledge は +/// `/.insomnia/knowledge/*.md`) を slug ごとに `(slug, full content)` +/// 化して返す。 +fn read_kind_records( + layout: &WorkspaceLayout, + kind: RecordKind, +) -> BTreeMap { + let dir = match kind { + RecordKind::Decision => layout.decisions_dir(), + RecordKind::Request => layout.requests_dir(), + RecordKind::Knowledge => layout.knowledge_dir(), + RecordKind::Summary | RecordKind::Workflow => return BTreeMap::new(), + }; + let mut out: BTreeMap = BTreeMap::new(); + let entries = match std::fs::read_dir(&dir) { + Ok(it) => it, + Err(_) => return out, + }; + for entry in entries.flatten() { + let path = entry.path(); + if !path.is_file() { + continue; + } + let stem = match path.file_stem().and_then(|s| s.to_str()) { + Some(s) => s, + None => continue, + }; + if path.extension().and_then(|s| s.to_str()) != Some("md") { + continue; + } + if Slug::parse(stem).is_err() { + continue; + } + let content = match std::fs::read_to_string(&path) { + Ok(s) => s, + Err(_) => continue, + }; + out.insert(stem.to_string(), content); + } + out +} + +fn parse_yaml(content: &str) -> Option { + let (yaml, _body) = split_frontmatter(content).ok()?; + serde_yaml::from_str::(yaml).ok() +} + +/// Connected-component clustering over the `levenshtein <= SIMILAR_SLUG_DISTANCE` +/// graph among same-kind slugs. Returns each cluster of size >= 2 (singleton +/// clusters are not interesting for the integration phase). Returns `None` +/// when there are no clusters at all. +fn cluster_similar(slugs: &[&str], kind: RecordKind) -> Option> { + if slugs.len() < 2 { + return None; + } + let n = slugs.len(); + let mut parent: Vec = (0..n).collect(); + fn find(parent: &mut [usize], i: usize) -> usize { + if parent[i] == i { + i + } else { + let root = find(parent, parent[i]); + parent[i] = root; + root + } + } + fn union(parent: &mut [usize], a: usize, b: usize) { + let ra = find(parent, a); + let rb = find(parent, b); + if ra != rb { + parent[ra] = rb; + } + } + for i in 0..n { + for j in (i + 1)..n { + if levenshtein(slugs[i], slugs[j]) <= SIMILAR_SLUG_DISTANCE { + union(&mut parent, i, j); + } + } + } + let mut groups: BTreeMap> = BTreeMap::new(); + for i in 0..n { + let root = find(&mut parent, i); + groups.entry(root).or_default().push(slugs[i].to_string()); + } + let mut out: Vec = Vec::new(); + let mut seen_canonical: BTreeSet> = BTreeSet::new(); + for (_, mut group) in groups { + if group.len() < 2 { + continue; + } + group.sort(); + if seen_canonical.insert(group.clone()) { + out.push(SimilarSlugCluster { kind, slugs: group }); + } + } + if out.is_empty() { None } else { Some(out) } +} + +/// Iterative two-row Levenshtein distance over chars (matches the Linter's +/// implementation; kept private to avoid widening that crate-internal API). +fn levenshtein(a: &str, b: &str) -> usize { + let a: Vec = a.chars().collect(); + let b: Vec = b.chars().collect(); + if a.is_empty() { + return b.len(); + } + if b.is_empty() { + return a.len(); + } + let mut prev: Vec = (0..=b.len()).collect(); + let mut curr: Vec = vec![0; b.len() + 1]; + for (i, ca) in a.iter().enumerate() { + curr[0] = i + 1; + for (j, cb) in b.iter().enumerate() { + let cost = if ca == cb { 0 } else { 1 }; + curr[j + 1] = (curr[j] + 1).min(prev[j + 1] + 1).min(prev[j] + cost); + } + std::mem::swap(&mut prev, &mut curr); + } + prev[b.len()] +} + +#[cfg(test)] +mod tests { + use super::*; + use chrono::Utc; + use std::path::Path; + + fn now() -> String { + Utc::now().to_rfc3339() + } + + fn write(p: &Path, content: &str) { + if let Some(parent) = p.parent() { + std::fs::create_dir_all(parent).unwrap(); + } + std::fs::write(p, content).unwrap(); + } + + fn workspace() -> (tempfile::TempDir, WorkspaceLayout) { + let dir = tempfile::TempDir::new().unwrap(); + let layout = WorkspaceLayout::new(dir.path().to_path_buf()); + (dir, layout) + } + + #[test] + fn collects_replaced_chain() { + let (dir, layout) = workspace(); + write( + &dir.path().join(".insomnia/memory/decisions/replaced.md"), + &format!( + "---\ncreated_at: {n}\nupdated_at: {n}\nsources: []\nstatus: replaced\nreplaced_by: winner\n---\n", + n = now() + ), + ); + write( + &dir.path().join(".insomnia/memory/decisions/winner.md"), + &format!( + "---\ncreated_at: {n}\nupdated_at: {n}\nsources: []\nstatus: open\n---\n", + n = now() + ), + ); + let hints = collect_tidy_hints(&layout); + assert_eq!( + hints.replaced_decisions.get("replaced").cloned(), + Some(Some("winner".into())) + ); + assert!(!hints.replaced_decisions.contains_key("winner")); + } + + #[test] + fn flags_sources_overflow() { + let (dir, layout) = workspace(); + let many_sources: String = (0..15) + .map(|i| format!(" - session_id: s{i}\n range: [{i}, {i}]\n")) + .collect(); + write( + &dir.path().join(".insomnia/memory/decisions/big.md"), + &format!( + "---\ncreated_at: {n}\nupdated_at: {n}\nstatus: open\nsources:\n{m}---\n", + n = now(), + m = many_sources + ), + ); + let hints = collect_tidy_hints(&layout); + assert_eq!(hints.sources_overflow.len(), 1); + assert_eq!(hints.sources_overflow[0].slug, "big"); + assert_eq!(hints.sources_overflow[0].kind, RecordKind::Decision); + assert_eq!(hints.sources_overflow[0].count, 15); + } + + #[test] + fn clusters_similar_slugs() { + let (dir, layout) = workspace(); + for slug in ["db-pool", "db-pol", "db-pools", "alpha"] { + write( + &dir.path() + .join(format!(".insomnia/memory/decisions/{slug}.md")), + &format!( + "---\ncreated_at: {n}\nupdated_at: {n}\nsources: []\nstatus: open\n---\n", + n = now() + ), + ); + } + let hints = collect_tidy_hints(&layout); + assert_eq!(hints.similar_slug_clusters.len(), 1); + assert_eq!( + hints.similar_slug_clusters[0].slugs, + vec![ + "db-pol".to_string(), + "db-pool".to_string(), + "db-pools".to_string(), + ] + ); + } + + #[test] + fn empty_workspace_yields_empty_hints() { + let (_dir, layout) = workspace(); + let hints = collect_tidy_hints(&layout); + assert!(hints.is_empty()); + } +} diff --git a/crates/memory/src/lib.rs b/crates/memory/src/lib.rs index 94c69dd8..622c9fc2 100644 --- a/crates/memory/src/lib.rs +++ b/crates/memory/src/lib.rs @@ -6,6 +6,7 @@ //! crate) must not touch these directories — Pod is responsible for //! denying them at the Scope level when memory is enabled. +pub mod consolidate; pub mod error; pub mod extract; pub mod linter; diff --git a/crates/pod/Cargo.toml b/crates/pod/Cargo.toml index 16817d45..d55ed05c 100644 --- a/crates/pod/Cargo.toml +++ b/crates/pod/Cargo.toml @@ -27,6 +27,7 @@ fs4 = { version = "0.13.1", features = ["sync"] } libc = "0.2.185" schemars = "1.2.1" memory = { version = "0.1.0", path = "../memory" } +uuid = { version = "1.23.1", features = ["v7"] } [dev-dependencies] async-trait = "0.1.89" diff --git a/crates/pod/src/controller.rs b/crates/pod/src/controller.rs index d4685ed7..f346d96f 100644 --- a/crates/pod/src/controller.rs +++ b/crates/pod/src/controller.rs @@ -402,6 +402,14 @@ impl PodController { format!("post-run memory extract error: {e}"), ); } + if let Err(e) = pod.try_post_run_consolidate().await { + tracing::warn!(error = %e, "Post-run memory consolidate error"); + alerter.alert( + AlertLevel::Warn, + AlertSource::Pod, + format!("post-run memory consolidate error: {e}"), + ); + } if let Err(e) = pod.try_post_run_compact().await { tracing::warn!(error = %e, "Post-run compaction error"); alerter.alert( @@ -461,6 +469,14 @@ impl PodController { format!("post-run memory extract error: {e}"), ); } + if let Err(e) = pod.try_post_run_consolidate().await { + tracing::warn!(error = %e, "Post-run memory consolidate error"); + alerter.alert( + AlertLevel::Warn, + AlertSource::Pod, + format!("post-run memory consolidate error: {e}"), + ); + } if let Err(e) = pod.try_post_run_compact().await { tracing::warn!(error = %e, "Post-run compaction error"); alerter.alert( @@ -517,6 +533,14 @@ impl PodController { format!("post-run memory extract error: {e}"), ); } + if let Err(e) = pod.try_post_run_consolidate().await { + tracing::warn!(error = %e, "Post-run memory consolidate error"); + alerter.alert( + AlertLevel::Warn, + AlertSource::Pod, + format!("post-run memory consolidate error: {e}"), + ); + } if let Err(e) = pod.try_post_run_compact().await { tracing::warn!(error = %e, "Post-run compaction error"); alerter.alert( diff --git a/crates/pod/src/pod.rs b/crates/pod/src/pod.rs index 38231f43..dc219bed 100644 --- a/crates/pod/src/pod.rs +++ b/crates/pod/src/pod.rs @@ -136,6 +136,11 @@ pub struct Pod { /// the flag survives across `try_post_run_extract` calls without a /// `&mut self` race. extract_in_flight: Arc, + /// Phase 2 (memory.consolidation) in-process reentry guard. The + /// staging-side `StagingLock` already provides cross-process + /// exclusion, but this AtomicBool keeps a careless concurrent caller + /// inside the same Pod from racing on the staging snapshot. + consolidation_in_flight: Arc, /// Last completed Phase 1 boundary. `None` means no extract has /// run yet on this session — next extract starts from entry 0. /// Restored from `RestoredState.extensions` on `restore`, updated @@ -197,6 +202,7 @@ impl Pod { prompts, inject_resident_knowledge: true, extract_in_flight: Arc::new(AtomicBool::new(false)), + consolidation_in_flight: Arc::new(AtomicBool::new(false)), extract_pointer: Mutex::new(None), user_segments: Vec::new(), }; @@ -1490,6 +1496,173 @@ impl Pod { Ok(ExtractDecision::Completed) } + + /// Build the LlmClient for the Phase 2 (memory.consolidation) Worker. + /// + /// Uses `memory.consolidation_model` from manifest if set, otherwise + /// clones the main client. Mirrors [`build_extractor_client`]. + fn build_consolidator_client( + &self, + memory_cfg: &manifest::MemoryConfig, + ) -> Result, PodError> { + if let Some(ref m) = memory_cfg.consolidation_model { + let client = provider::build_client(m)?; + return Ok(client); + } + let worker = self.worker.as_ref().expect("worker taken during run"); + Ok(worker.client().clone_boxed()) + } + + /// Phase 2 (memory.consolidation) post-run trigger. + /// + /// Called by the Controller **after** [`try_post_run_extract`] and + /// **before** [`try_post_run_compact`]: extract feeds staging, compact + /// rewrites history. Phase 2 must consume staging before compact + /// reshapes the session. + /// + /// Behaviour follows `docs/plan/memory.md` §Phase 2 / §並走防止: + /// the staging-side `StagingLock` enforces cross-process exclusion; + /// `consolidation_in_flight` keeps in-process callers honest. On + /// success, the lock is released *with* consumed-id cleanup; on + /// worker failure, only the lock file is unlinked so the staging + /// entries remain for a future retry. + pub async fn try_post_run_consolidate(&mut self) -> Result<(), PodError> { + let Some(memory_cfg) = self.manifest.memory.clone() else { + return Ok(()); + }; + let files_threshold = memory_cfg.consolidation_threshold_files; + let bytes_threshold = memory_cfg.consolidation_threshold_bytes; + if files_threshold.is_none() && bytes_threshold.is_none() { + return Ok(()); + } + + loop { + if self + .consolidation_in_flight + .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) + .is_err() + { + return Ok(()); + } + let result = self + .run_consolidate_once(&memory_cfg, files_threshold, bytes_threshold) + .await; + self.consolidation_in_flight.store(false, Ordering::Release); + + match result { + Ok(ConsolidateDecision::Skipped) => return Ok(()), + Ok(ConsolidateDecision::Completed) => continue, + Err(e) => { + tracing::warn!(error = %e, "Phase 2 consolidation failed"); + self.alert( + AlertLevel::Warn, + AlertSource::Pod, + format!("memory Phase 2 consolidation failed: {e}"), + ); + return Ok(()); + } + } + } + } + + /// Single consolidation iteration: snapshot staging, decide whether to + /// fire, run the worker if so, release the lock and clean up consumed + /// IDs. + async fn run_consolidate_once( + &mut self, + memory_cfg: &manifest::MemoryConfig, + files_threshold: Option, + bytes_threshold: Option, + ) -> Result { + use memory::consolidate; + + let layout = memory::WorkspaceLayout::resolve(memory_cfg, &self.pwd); + + let entries = consolidate::list_staging_entries(&layout); + if entries.is_empty() { + return Ok(ConsolidateDecision::Skipped); + } + + let total_files = entries.len(); + let total_bytes: u64 = entries.iter().map(|e| e.bytes).sum(); + let files_hit = files_threshold.is_some_and(|n| total_files >= n); + let bytes_hit = bytes_threshold.is_some_and(|n| total_bytes >= n); + if !files_hit && !bytes_hit { + return Ok(ConsolidateDecision::Skipped); + } + + let consumed_ids: Vec = entries.iter().map(|e| e.id).collect(); + let lock = match consolidate::StagingLock::acquire( + &layout, + std::process::id(), + self.manifest.pod.name.clone(), + consumed_ids, + ) { + Ok(l) => l, + Err(memory::consolidate::LockError::InUse { .. }) => { + return Ok(ConsolidateDecision::Skipped); + } + Err(e) => return Err(PodError::ConsolidationLock(e)), + }; + + let cap = memory_cfg + .consolidation_worker_max_input_tokens + .unwrap_or(manifest::defaults::MEMORY_CONSOLIDATION_WORKER_MAX_INPUT_TOKENS); + let client = match self.build_consolidator_client(memory_cfg) { + Ok(c) => c, + Err(e) => { + lock.release_only(); + return Err(e); + } + }; + let mut worker = + Worker::new(client).system_prompt(consolidate::CONSOLIDATION_SYSTEM_PROMPT); + + let input_so_far = Arc::new(std::sync::atomic::AtomicU64::new(0)); + { + let acc = input_so_far.clone(); + worker.on_usage(move |event| { + if let Some(tokens) = event.input_tokens { + acc.fetch_add(tokens, Ordering::Relaxed); + } + }); + } + worker.set_interceptor(MemoryConsolidationWorkerInterceptor { + input_so_far: input_so_far.clone(), + max_input_tokens: cap, + }); + + // Memory tools are self-contained — they bypass ScopedFs and write + // directly under the workspace via WorkspaceLayout. Resident + // knowledge injection (`Pod::set_resident_knowledge_injection`) is + // a Pod-level concern; this disposable Worker is built without it + // by construction, in keeping with `docs/plan/memory.md` §Phase 2 + // のKnowledgeアクセス (agent pulls knowledge through the search + // tool instead of via system-prompt residency). + let query_cfg = memory::tool::QueryConfig::from(memory_cfg); + worker.register_tool(memory::tool::read_tool(layout.clone())); + worker.register_tool(memory::tool::write_tool(layout.clone())); + worker.register_tool(memory::tool::edit_tool(layout.clone())); + worker.register_tool(memory::tool::memory_query_tool(layout.clone(), query_cfg)); + worker.register_tool(memory::tool::knowledge_query_tool(layout.clone(), query_cfg)); + + let tidy = consolidate::collect_tidy_hints(&layout); + let candidates = consolidate::KnowledgeCandidateReport::empty(); + let input_text = + consolidate::build_consolidate_input(&layout, &entries, &tidy, &candidates); + + let run_result = worker.run(input_text).await; + match run_result { + Ok(_) => { + lock.release_with_cleanup(&layout); + Ok(ConsolidateDecision::Completed) + } + Err(e) => { + lock.release_only(); + Err(PodError::Worker(e)) + } + } + } } /// Outcome of a single Phase 1 extract iteration. Internal to @@ -1526,6 +1699,40 @@ impl llm_worker::interceptor::Interceptor for MemoryExtractWorkerInterceptor { } } +/// Outcome of a single Phase 2 consolidation iteration. Internal to +/// `try_post_run_consolidate` / `run_consolidate_once`. +enum ConsolidateDecision { + /// Either threshold not met, no staging, or another Pod holds the lock. + Skipped, + /// Consolidation ran. Caller re-evaluates threshold against any + /// staging entries that arrived during the run (Coalesce). + Completed, +} + +/// Pre-request interceptor for the Phase 2 consolidation worker. Same +/// shape as the extract interceptor; kept separate so the abort message +/// names the right subsystem. +struct MemoryConsolidationWorkerInterceptor { + input_so_far: Arc, + max_input_tokens: u64, +} + +#[async_trait] +impl llm_worker::interceptor::Interceptor for MemoryConsolidationWorkerInterceptor { + async fn pre_llm_request( + &self, + _context: &mut Vec, + ) -> llm_worker::interceptor::PreRequestAction { + if self.input_so_far.load(Ordering::Relaxed) > self.max_input_tokens { + return llm_worker::interceptor::PreRequestAction::Cancel(format!( + "Phase 2 consolidation worker input exceeded {} tokens", + self.max_input_tokens + )); + } + llm_worker::interceptor::PreRequestAction::Continue + } +} + impl Pod, St> { /// Create a Pod entirely from a validated manifest. /// @@ -1596,6 +1803,7 @@ impl Pod, St> { prompts: common.prompts, inject_resident_knowledge: true, extract_in_flight: Arc::new(AtomicBool::new(false)), + consolidation_in_flight: Arc::new(AtomicBool::new(false)), extract_pointer: Mutex::new(None), user_segments: Vec::new(), }; @@ -1653,6 +1861,7 @@ impl Pod, St> { prompts: common.prompts, inject_resident_knowledge: true, extract_in_flight: Arc::new(AtomicBool::new(false)), + consolidation_in_flight: Arc::new(AtomicBool::new(false)), extract_pointer: Mutex::new(None), user_segments: Vec::new(), }; @@ -1762,6 +1971,7 @@ impl Pod, St> { prompts: common.prompts, inject_resident_knowledge: true, extract_in_flight: Arc::new(AtomicBool::new(false)), + consolidation_in_flight: Arc::new(AtomicBool::new(false)), extract_pointer: Mutex::new(extract_pointer), user_segments: state.user_segments, }; @@ -1963,6 +2173,9 @@ pub enum PodError { #[error("memory Phase 1 staging write failed: {0}")] ExtractStaging(#[source] memory::extract::StagingError), + #[error("memory Phase 2 lock acquisition failed: {0}")] + ConsolidationLock(#[source] memory::consolidate::LockError), + #[error("session {session_id} has no entries to restore")] SessionEmpty { session_id: SessionId }, } diff --git a/crates/pod/tests/consolidation_test.rs b/crates/pod/tests/consolidation_test.rs new file mode 100644 index 00000000..6fa78294 --- /dev/null +++ b/crates/pod/tests/consolidation_test.rs @@ -0,0 +1,277 @@ +//! Phase 2 (memory.consolidation) post-run trigger. +//! +//! Covers the gating, lock and cleanup behaviour without exercising the +//! full sub-worker tool loop: +//! +//! - no `[memory]` section → no-op +//! - `[memory]` present but no thresholds → no-op +//! - staging empty → skip +//! - staging below thresholds → skip + lock not acquired +//! - staging above threshold → sub-worker runs, consumed entries removed +//! - existing live lock → skip without error +//! +//! The sub-worker is fed a no-op LLM response (plain text) so it returns +//! immediately. The post-run path then exercises lock acquisition, +//! cleanup, and the empty-payload fast path. + +use std::pin::Pin; +use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; + +use async_trait::async_trait; +use futures::Stream; +use llm_worker::Worker; +use llm_worker::llm_client::event::{Event as LlmEvent, ResponseStatus, StatusEvent}; +use llm_worker::llm_client::{ClientError, LlmClient, Request}; +use memory::WorkspaceLayout; +use memory::extract::{ExtractedPayload, write_staging}; +use memory::schema::SourceRef; +use session_store::FsStore; + +use pod::Pod; + +#[derive(Clone)] +struct MockClient { + responses: Arc>>, + call_count: Arc, +} + +impl MockClient { + fn new(responses: Vec>) -> Self { + Self { + responses: Arc::new(responses), + call_count: Arc::new(AtomicUsize::new(0)), + } + } +} + +#[async_trait] +impl LlmClient for MockClient { + fn clone_boxed(&self) -> Box { + Box::new(self.clone()) + } + + async fn stream( + &self, + _request: Request, + ) -> Result> + Send>>, ClientError> + { + let count = self.call_count.fetch_add(1, Ordering::SeqCst); + if count >= self.responses.len() { + return Err(ClientError::Config("mock client exhausted".into())); + } + let events = self.responses[count].clone(); + let stream = futures::stream::iter(events.into_iter().map(Ok)); + Ok(Box::pin(stream)) + } +} + +fn done(text: &str) -> Vec { + vec![ + LlmEvent::text_block_start(0), + LlmEvent::text_delta(0, text), + LlmEvent::text_block_stop(0, None), + LlmEvent::Status(StatusEvent { + status: ResponseStatus::Completed, + }), + ] +} + +const NO_MEMORY_TOML: &str = r#" +[pod] +name = "test-pod" + +[model] +scheme = "anthropic" +model_id = "test-model" + +[worker] +max_tokens = 100 + +[[scope.allow]] +target = "./" +permission = "write" +"#; + +const MEMORY_NO_THRESHOLDS_TOML: &str = r#" +[pod] +name = "test-pod" + +[model] +scheme = "anthropic" +model_id = "test-model" + +[worker] +max_tokens = 100 + +[memory] + +[[scope.allow]] +target = "./" +permission = "write" +"#; + +const FILES_THRESHOLD_TOML: &str = r#" +[pod] +name = "test-pod" + +[model] +scheme = "anthropic" +model_id = "test-model" + +[worker] +max_tokens = 100 + +[memory] +consolidation_threshold_files = 2 + +[[scope.allow]] +target = "./" +permission = "write" +"#; + +async fn make_pod_with( + manifest_toml: &str, + pwd: std::path::PathBuf, + client: MockClient, +) -> Pod { + let manifest = pod::PodManifest::from_toml(manifest_toml).unwrap(); + + let store_tmp = tempfile::tempdir().unwrap(); + let store = FsStore::new(store_tmp.path()).await.unwrap(); + std::mem::forget(store_tmp); + + let scope = pod::Scope::writable(&pwd).unwrap(); + let worker = Worker::new(client); + Pod::new(manifest, worker, store, pwd, scope).await.unwrap() +} + +fn write_n_staging(layout: &WorkspaceLayout, n: usize) -> Vec { + let mut ids = Vec::new(); + for i in 0..n { + let (id, _) = write_staging( + layout, + SourceRef { + session_id: format!("s-{i}"), + range: [i as u64, i as u64], + }, + ExtractedPayload::default(), + ) + .unwrap(); + ids.push(id); + } + ids +} + +#[tokio::test] +async fn no_memory_section_is_a_noop() { + let pwd = tempfile::tempdir().unwrap(); + let client = MockClient::new(vec![]); + let mut pod = make_pod_with(NO_MEMORY_TOML, pwd.path().to_path_buf(), client).await; + pod.try_post_run_consolidate() + .await + .expect("missing memory section must skip cleanly"); +} + +#[tokio::test] +async fn no_thresholds_is_a_noop() { + let pwd = tempfile::tempdir().unwrap(); + let layout = WorkspaceLayout::new(pwd.path().to_path_buf()); + write_n_staging(&layout, 5); + + let client = MockClient::new(vec![]); + let mut pod = make_pod_with(MEMORY_NO_THRESHOLDS_TOML, pwd.path().to_path_buf(), client).await; + pod.try_post_run_consolidate() + .await + .expect("phase 2 disabled when both thresholds are None"); + + // No staging entries removed. + assert_eq!( + memory::consolidate::list_staging_entries(&layout).len(), + 5 + ); +} + +#[tokio::test] +async fn empty_staging_skips() { + let pwd = tempfile::tempdir().unwrap(); + let client = MockClient::new(vec![]); + let mut pod = make_pod_with(FILES_THRESHOLD_TOML, pwd.path().to_path_buf(), client).await; + pod.try_post_run_consolidate().await.unwrap(); + // No mock calls expected. +} + +#[tokio::test] +async fn below_threshold_skips_and_does_not_take_lock() { + let pwd = tempfile::tempdir().unwrap(); + let layout = WorkspaceLayout::new(pwd.path().to_path_buf()); + write_n_staging(&layout, 1); // threshold is 2 + + let client = MockClient::new(vec![]); + let mut pod = make_pod_with(FILES_THRESHOLD_TOML, pwd.path().to_path_buf(), client).await; + pod.try_post_run_consolidate().await.unwrap(); + + // Staging untouched. + assert_eq!( + memory::consolidate::list_staging_entries(&layout).len(), + 1 + ); + // Lock file must not exist. + let lock_path = layout.staging_dir().join(".consolidation.lock"); + assert!(!lock_path.exists(), "lock file should not be created"); +} + +#[tokio::test] +async fn fires_on_threshold_and_cleans_up_consumed_entries() { + let pwd = tempfile::tempdir().unwrap(); + let layout = WorkspaceLayout::new(pwd.path().to_path_buf()); + write_n_staging(&layout, 2); // threshold is 2 — fires. + + // Sub-worker is given a single text-only response. The Phase 2 prompt + // tells it to call memory tools; the mock skips those, but `Worker::run` + // returns Ok regardless once the LLM closes with a final text. + let client = MockClient::new(vec![done("ok")]); + let mut pod = make_pod_with(FILES_THRESHOLD_TOML, pwd.path().to_path_buf(), client).await; + pod.try_post_run_consolidate().await.unwrap(); + + // Consumed entries removed. + assert!( + memory::consolidate::list_staging_entries(&layout).is_empty(), + "consumed staging entries must be cleaned up" + ); + // Lock removed too. + let lock_path = layout.staging_dir().join(".consolidation.lock"); + assert!( + !lock_path.exists(), + "lock file must be removed on success" + ); +} + +#[tokio::test] +async fn live_lock_held_by_other_pod_skips() { + let pwd = tempfile::tempdir().unwrap(); + let layout = WorkspaceLayout::new(pwd.path().to_path_buf()); + write_n_staging(&layout, 3); + + // Pre-acquire lock with this test's PID — definitely alive — and + // *don't* release it. The Phase 2 path must skip without error. + let _live_lock = memory::consolidate::StagingLock::acquire( + &layout, + std::process::id(), + "other-pod", + Vec::new(), + ) + .unwrap(); + + let client = MockClient::new(vec![]); + let mut pod = make_pod_with(FILES_THRESHOLD_TOML, pwd.path().to_path_buf(), client).await; + pod.try_post_run_consolidate() + .await + .expect("InUse lock must surface as graceful skip"); + + // Staging untouched: lock holder owns the snapshot, not us. + assert_eq!( + memory::consolidate::list_staging_entries(&layout).len(), + 3 + ); +} diff --git a/tickets/internal-worker-workflow.md b/tickets/internal-worker-workflow.md new file mode 100644 index 00000000..38be2ef0 --- /dev/null +++ b/tickets/internal-worker-workflow.md @@ -0,0 +1,73 @@ +# 内部 Worker / 内部 Pod の Workflow 化 + +## 背景 + +INSOMNIA が内部で固定 prompt を持って disposable Worker / 専用 Pod を立ち上げている経路がいくつかある: + +- Phase 1 活動抽出(`crates/memory/src/extract/prompt.rs::EXTRACT_SYSTEM_PROMPT`) +- Phase 2 統合 + 整理(`tickets/memory-phase2-consolidation.md`、本チケット時点では実装中 / 直前) +- Compact(`PromptCatalog::compact_system`) + +これらは実装内 `&str` 定数や `PromptCatalog` の overlay で管理されており、prompt の調整や運用カスタマイズが「コード変更 + 再ビルド」を要する。一方、ユーザー向け `/` Workflow(`tickets/workflow.md`)は `/.insomnia/memory/workflow/.md` に住み、frontmatter + Markdown 本文 + `requires` Knowledge inject を持つ宣言形式で運用できる。 + +両者を寄せ、内部 Worker / 内部 Pod の prompt + ツール surface + Knowledge 依存を **Workflow と同一仕様で記述** できる経路を用意する。これにより: + +- 内部 prompt の運用調整が workspace 側でできる(コード変更不要) +- Phase 2 の prompt 案 (`docs/plan/memory-prompts.md`) を workspace に直接 ingest できる +- 将来 consolidation を独立 Pod に引き上げる際も、Workflow を submit する形に揃えられる + +## 要件 + +### Workflow の役割拡張 + +`tickets/workflow.md` の Workflow 仕様は「ユーザーが `/` で submit する制約付き作業」だが、本チケットでは **内部トリガー(Pod 内部の状態遷移)から呼び出される Workflow** を一級扱いに広げる。 + +- 同じファイル形式(`memory/workflow/.md`)、同じ frontmatter / Linter +- `user_invocable: false` で `/` 経路から見えなくする +- `auto_invoke` は通常 Pod 用の system prompt 注入仕様のまま(内部 Workflow は通常 OFF) +- 内部 Workflow を識別するキー(例: `internal_role`)と、必要なツール surface を表明する手段を frontmatter に追加する。具体 schema は実装で詰める + +### 内部呼び出し経路 + +Pod 側の既存トリガー(Phase 1 post-run / Phase 2 staging 閾値 / Compact 閾値 等)は固定 `&str` の代わりに Workflow loader 経由で: + +1. 内部識別キーで該当 Workflow を解決(衝突時は workspace 上書き優先、なければ insomnia bundled default) +2. `requires` Knowledge を本文の前に inject +3. Workflow 本文を sub-Worker / sub-Pod の prompt として渡す(system prompt 扱いか初回 submit 扱いかは内部用途で固定し、role ごとに揃える) +4. 既存のツール登録ロジックは Workflow が表明したツール surface に従う + +### Bundled defaults + +ユーザー workspace に該当 Workflow が無い場合に備え、insomnia 同梱の default Workflow を読む層を `PromptCatalog` の overlay と整合する形で持つ。 + +- 既存の Pod prompt 4 層 overlay(builtin / user / workspace / pod-pack)と同じ優先順 +- bundled default の物理配置は実装で決める + +### 関連チケットとの順序 + +- `tickets/workflow.md`(ユーザー向け Workflow 本体)が先行する。本チケットはその仕様を前提に「内部呼び出し経路」を追加する側 +- `tickets/memory-phase2-consolidation.md` は当面 `&str` 定数で実装してよい。本チケット完了時に Workflow 化に乗り換える +- Phase 1 / Compact も同様に role ごとに段階移行 + +## 範囲外 + +- Workflow 仕様自体の本体実装(`tickets/workflow.md`) +- 内部 Workflow の自動生成(consolidation の offer 等。`docs/plan/memory.md` §Offer 経路 / 将来検討) +- 既存 `&str` 定数の物理削除タイミング(移行が完了した role ごとに削除する運用) +- `auto_invoke` 注入予算の最適化(既存 Knowledge 常駐注入予算と合算する規約は `docs/plan/memory.md` 側) + +## 完了条件 + +- 各内部 Worker / 内部 Pod(少なくとも Phase 1 / Phase 2 / Compact のうち、本チケット着手時点で実装済みのもの)が内部識別キー付き Workflow を解決して prompt とツール surface を組み立てる +- workspace で `memory/workflow/.md` を上書きすれば内部 Worker の prompt が変わる +- workspace に該当 Workflow が無い場合、bundled default が使われる +- `user_invocable: false` の内部 Workflow は `/` 候補から除外され、ユーザーからは呼べない +- 内部 Workflow も consolidation の自動書き込み禁止対象のまま(Linter で構造的担保、`workflow.md` と整合) +- 単体テストで bundled default / workspace overlay / ツール surface 表明 + 解決 + 適用がカバーされる + +## 参照 + +- 前提: `tickets/workflow.md` +- 最初の利用者: `tickets/memory-phase2-consolidation.md` +- 関連: `tickets/agent-skills.md`(外部 SKILL ingest 経路。本チケットの内部呼び出し経路とは別軸) +- 設計: `docs/plan/workflow.md`、`docs/plan/memory.md`、`docs/plan/memory-prompts.md`