Compare commits

...

2 Commits

Author SHA1 Message Date
1afe7c53aa
スキルの整理 2026-05-01 23:14:37 +09:00
beb6b686a1
メモリPhase2の実装 2026-05-01 23:00:55 +09:00
25 changed files with 1884 additions and 95 deletions

View File

@ -1,5 +0,0 @@
- [Event broadcast pattern](project_event_broadcast_pattern.md) — Pod は event_tx: Option<broadcast::Sender<Event>> を保持、Controller が attach_notifier と同タイミングで attach
- [Test-path omission precedent](feedback_test_path_omission.md) — 要件に挙がったテストを「共通ヘルパ経由だから省略」した場合は Approve with follow-up が相場
- [cargo add workspace pitfall](feedback_cargo_add_workspace_pitfall.md) — ルート Cargo.toml に [workspace.dependencies] が未定義、workspace = true 指定は現状使えない
- [Out-of-scope diff mixing](feedback_out_of_scope_mixing.md) — スコープ外修正が ticket diff に同居 → [major] Non-blocking で指摘、コミット分割推奨、総合は Approve
- [Explicit out-of-scope violation](feedback_explicit_out_of_scope_violation.md) — 要件/範囲外に反する変更は Request changes、ticket 先更新 or 戻すの二択

View File

@ -1,18 +0,0 @@
---
name: cargo add workspace pitfall
description: ルート Cargo.toml に [workspace.dependencies] が未定義なので workspace = true は使えない
type: feedback
---
ルート `Cargo.toml``[workspace.package]` のみを持ち `[workspace.dependencies]`
定義していない。したがってチケットや PR に
`foo = { workspace = true, features = [...] }` と書かれていても、そのままでは解決しない。
**Why:** プロジェクトの現状流儀として、各クレートは直接バージョン指定する
(例: `crates/session-store/Cargo.toml``uuid = { version = "1", features = [...] }`)。
protocol-design (2026-04-21) レビュー時に発見。
**How to apply:** チケットに `workspace = true` の文言を見たら、
- 実装が直接バージョン指定にしていれば「コードベース流儀に整合」として Follow-up 扱い、
- `workspace = true` のまま書かれていたら「ビルドが通らないはず」として Request changes、
- もしくは `[workspace.dependencies]` を整備する方向の提案を添える。

View File

@ -1,14 +0,0 @@
---
name: Explicit out-of-scope violation is Request-changes
description: Precedent — when implementation contradicts ticket's 要件 or 範囲外 sections verbatim, reviewer should Request changes even if the change seems like a quality-of-life improvement
type: feedback
---
実装が ticket の「要件」または「範囲外」に明記された指示と矛盾している場合、それが「どう見ても spirit としては改善」であっても Request changes が相場。
**Why:** チケットのライフサイクルは CLAUDE.md で "b. 詳細化や前提の変化: `tickets/foo.md` を更新して commit" と定められており、仕様判断は ticket 更新で先に通すのが手順。実装から暗黙に上書きすると、後続チケットが前提にしている文面とのズレが残る。過去の「protocol-tool-result-shape」では要件に「`output` は残す」、範囲外に「`ToolOutput.content` の protocol 化は別チケット」と明記されているにもかかわらず protocol フィールドを `output: String``content: Option<String>` に改名する diff が出て、Blocking として指摘。
**How to apply:**
- 要件・範囲外セクションの明示的な文言に反する変更を見つけたら、たとえ技術的には合理的でも Blocking として扱う
- 対処方針として (1) 元通りに戻す、(2) ticket を先に書き換える、の 2 択を review に書き添える
- 「内部モデル (worker 側 struct など) と名前を揃える」系の動機は特に見落としやすい。protocol / wire 層は内部型と分離しておくのが既定、合わせるなら意思決定を ticket に上げさせる

View File

@ -1,17 +0,0 @@
---
name: Out-of-scope diff mixed into ticket
description: 本チケットのスコープ外修正が同じ diff/作業ツリーに同居していた場合のレビュー判定ルール
type: feedback
---
ticket の実装 diff にスコープ外の修正(別の疎通バグ fix、別レイヤの API 調整等)が同居している状況では、**major 扱いの Non-blocking**= Approve 可、ただし follow-up 指摘)で扱うのが precedent。
**Why:** ユーザー自身が「別コミット候補」と認識した上で差分提示してくるケースが複数回ある。コミット分割はユーザーの git 操作領域CLAUDE.md: Git はユーザー責務なので、reviewer 側は**コミット分割を推奨する**指摘に留める。blocker にはしない。
**How to apply:**
- review.md の Non-blocking セクションで「スコープ外 diff が同居している」項目を [major] で立て、該当ファイルと何が本筋外かを列挙する。
- 「本チケットの review は X 単体の妥当性判定に留め、スコープ外修正の可否まで巻き込まない」ことを明記。
- 本筋(チケット要件)が満たされていれば総合判定は Approve で良い。
- 挙動保存の確認は本筋と同時に行うが、スコープ外変更の影響で疎通確認が混同しないか(どの変更が疎通パスした根拠か)を review.md に一言書く。
precedent: `tickets/llm-capability-ownership.review.md` (2026-04-21)

View File

@ -1,19 +0,0 @@
---
name: Test-path omission precedent
description: 要件で列挙されたテストパスを「共通ヘルパなので省略」した場合の判断相場
type: feedback
---
チケット要件にテストパス (例: 成功/失敗/mid-turn の 3 本) が明示列挙されている場合、
そのうち 1 本を「共通ヘルパ経由だから inspection で担保」として省略する実装が来たら、
**Approve with follow-up** が相場。Blocking にはしない。
**Why:** 共通化されたインスツルメント (例: `send_event`) 1 点だけが共通で、
呼び出し側の制御フロー (async 再帰・フラグ管理・エラー伝播) は個別なのが通例。
ただしビルドと主要パスが動いており、後続チケットでテストを足すだけの差分で済むケースが多い。
protocol-design (2026-04-21) で先例。
**How to apply:** 要件とテストコードを 1:1 で突き合わせ、欠けたパスがあれば
- 制御フローが共通化されていれば Follow-up
- 制御フローが別物 (別関数 / 別状態遷移) なら Request changes
と切り分ける。`send_event` 型のヘルパ共通化は Follow-up 側の判断。

View File

@ -1,20 +0,0 @@
---
name: Event broadcast pattern
description: Pod が protocol::Event を broadcast する公式パターン (Notifier と別経路)
type: project
---
Pod 内部から `protocol::Event` を broadcast する正規ルートは、`Pod` に
`event_tx: Option<broadcast::Sender<Event>>` を持たせて `attach_event_tx`
Controller 側から注入する方式。`Notifier` は `Event::Notification`
replay バッファ専用で、他イベントは通さない。
**Why:** `Notifier` は Notification 型の Warn/Error レベル情報 + late subscriber への
snapshot replay を責務にしており、Event 一般を乗せると意味が噛み合わない。
protocol-design チケットの決定事項 6/7 で確定 (2026-04-21)。
**How to apply:** 新しい Pod 発の Event を追加するときは、
1) `Pod::send_event(&self, event)` ヘルパ (`pod.rs:370-374`) を使う、
2) Controller は `pod.attach_notifier` の直後に `pod.attach_event_tx` を呼ぶ、
3) late subscriber への届きは期待しない (buffer 化が必要なら別チケット化)。
Notifier 経由で新種 Event を流す PR が来たら差し戻し対象。

View File

@ -0,0 +1,22 @@
---
name: worktree-workflow
description: "Worktreeを用いた開発フローを進める。git上の開発に置けるミクロな指示で、プロジェクトの管理に関する指示は提供されていない。"
allowed-tools: "Bash(cd *), Bash(git worktree *), Bash(mkdir *), Bash(cp *), Bash(ln *), Bash(ls *), Bash(find *)"
---
# Worktreeを用いた開発
Goal: 実装を完了させ、ブランチをマージ待ちの状態にする。
`./.worktree`にworktreeを作成します。
エージェントの1セッション=1ワークツリーとしており、ブランチ/イシュー/チケット単位で切ります。
このワークフローにおいては、ブランチはローカルで並行開発するためのマージ後削除の運用とし、Worktreeと同名のbranchを同時に作って進めます。メインのディレクトリのブランチから切るものとして扱います。
```
git worktree add .worktree/<task-name> -n <task-name>
```
## flake.nixの無効化
基本的に、CWDを変更できない場合、.envrcによる自動アクティベートは効かないので無視で構わない。

6
Cargo.lock generated
View File

@ -1571,9 +1571,9 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2"
[[package]] [[package]]
name = "libc" name = "libc"
version = "0.2.185" version = "0.2.186"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "52ff2c0fe9bc6cb6b14a0592c2ff4fa9ceb83eea9db979b0487cd054946a2b8f" checksum = "68ab91017fe16c622486840e4c83c9a37afeff978bd239b5293d61ece587de66"
[[package]] [[package]]
name = "libredox" name = "libredox"
@ -1757,6 +1757,7 @@ version = "0.1.0"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"chrono", "chrono",
"libc",
"llm-worker", "llm-worker",
"manifest", "manifest",
"schemars", "schemars",
@ -2140,6 +2141,7 @@ dependencies = [
"toml", "toml",
"tools", "tools",
"tracing", "tracing",
"uuid",
] ]
[[package]] [[package]]

View File

@ -1,5 +1,6 @@
- [ ] Workflow / Skills - [ ] Workflow / Skills
- [ ] Workflow 実装 → [tickets/workflow.md](tickets/workflow.md) - [ ] Workflow 実装 → [tickets/workflow.md](tickets/workflow.md)
- [ ] 内部 Worker / 内部 Pod の Workflow 化 → [tickets/internal-worker-workflow.md](tickets/internal-worker-workflow.md)
- [ ] Agent Skills を Workflow として ingest → [tickets/agent-skills.md](tickets/agent-skills.md) - [ ] Agent Skills を Workflow として ingest → [tickets/agent-skills.md](tickets/agent-skills.md)
- [ ] ツール設計 - [ ] ツール設計
- [ ] Bash ツール (Permission 層と統合) → [tickets/bash-tool.md](tickets/bash-tool.md) - [ ] Bash ツール (Permission 層と統合) → [tickets/bash-tool.md](tickets/bash-tool.md)

View File

@ -217,6 +217,16 @@ impl MemoryConfig {
extract_worker_max_input_tokens: upper extract_worker_max_input_tokens: upper
.extract_worker_max_input_tokens .extract_worker_max_input_tokens
.or(self.extract_worker_max_input_tokens), .or(self.extract_worker_max_input_tokens),
consolidation_model: upper.consolidation_model.or(self.consolidation_model),
consolidation_worker_max_input_tokens: upper
.consolidation_worker_max_input_tokens
.or(self.consolidation_worker_max_input_tokens),
consolidation_threshold_files: upper
.consolidation_threshold_files
.or(self.consolidation_threshold_files),
consolidation_threshold_bytes: upper
.consolidation_threshold_bytes
.or(self.consolidation_threshold_bytes),
} }
} }
} }

View File

@ -50,3 +50,8 @@ pub const COMPACT_DEFAULT_REFERENCE_COUNT: usize = 5;
/// own LLM calls. Exceeding this aborts the extract run. /// own LLM calls. Exceeding this aborts the extract run.
/// See [`crate::MemoryConfig::extract_worker_max_input_tokens`]. /// See [`crate::MemoryConfig::extract_worker_max_input_tokens`].
pub const MEMORY_EXTRACT_WORKER_MAX_INPUT_TOKENS: u64 = 30_000; pub const MEMORY_EXTRACT_WORKER_MAX_INPUT_TOKENS: u64 = 30_000;
/// Cumulative input-token cap for the memory Phase 2 (consolidation)
/// worker's own LLM calls. Exceeding this aborts the consolidation run.
/// See [`crate::MemoryConfig::consolidation_worker_max_input_tokens`].
pub const MEMORY_CONSOLIDATION_WORKER_MAX_INPUT_TOKENS: u64 = 80_000;

View File

@ -85,6 +85,27 @@ pub struct MemoryConfig {
/// [`defaults::MEMORY_EXTRACT_WORKER_MAX_INPUT_TOKENS`]. /// [`defaults::MEMORY_EXTRACT_WORKER_MAX_INPUT_TOKENS`].
#[serde(default)] #[serde(default)]
pub extract_worker_max_input_tokens: Option<u64>, pub extract_worker_max_input_tokens: Option<u64>,
/// Optional model for the Phase 2 (consolidation) worker. When
/// `None`, the main pod model is cloned via `clone_boxed()`.
/// Reasoning-class models are recommended.
#[serde(default)]
pub consolidation_model: Option<ModelManifest>,
/// Cumulative input-token cap for the consolidation worker's own
/// LLM calls. Exceeding this aborts the consolidation run. `None` ⇒
/// [`defaults::MEMORY_CONSOLIDATION_WORKER_MAX_INPUT_TOKENS`].
#[serde(default)]
pub consolidation_worker_max_input_tokens: Option<u64>,
/// Phase 2 trigger: file-count threshold of `_staging/`. Phase 2
/// fires when the staging directory has at least this many entries.
/// Either threshold reaching its limit fires Phase 2 (logical OR).
/// `None` for both thresholds ⇒ Phase 2 disabled.
#[serde(default)]
pub consolidation_threshold_files: Option<usize>,
/// Phase 2 trigger: byte-size threshold across all `_staging/`
/// entries. Either threshold reaching its limit fires Phase 2.
/// `None` for both thresholds ⇒ Phase 2 disabled.
#[serde(default)]
pub consolidation_threshold_bytes: Option<u64>,
} }
/// Pod metadata. /// Pod metadata.

View File

@ -7,6 +7,7 @@ license.workspace = true
[dependencies] [dependencies]
async-trait = "0.1.89" async-trait = "0.1.89"
chrono = { version = "0.4.44", features = ["serde"] } chrono = { version = "0.4.44", features = ["serde"] }
libc = "0.2.186"
llm-worker = { version = "0.2.1", path = "../llm-worker" } llm-worker = { version = "0.2.1", path = "../llm-worker" }
manifest = { version = "0.1.0", path = "../manifest" } manifest = { version = "0.1.0", path = "../manifest" }
schemars = "1.2.1" schemars = "1.2.1"

View File

@ -0,0 +1,324 @@
//! Phase 2 sub-Worker への最初のユーザー入力を組み立てる。
//!
//! Phase 1 (`extract::build_extract_input`) と同じ方針で、固定 schema の
//! markdown セクション列にしてサブWorker に渡す。`docs/plan/memory.md`
//! §Phase 2 入力 / §整理材料 の項目に従い:
//!
//! 1. consumed staging エントリ全文(`source` 込み)
//! 2. 既存 `memory/*` 全文summary / decisions / requests
//! 3. Knowledge 化候補レポート(メトリクス未完なら空)
//! 4. 整理材料Linter Warn ベース、メトリクス未完なら明示 invoke 頻度なし)
//!
//! 既存 `knowledge/*` 本文は埋めず、agent に `KnowledgeQuery` 経由で引かせる
//! 設計(`docs/plan/memory.md` §retrieval 経路 / §Phase 2 の Knowledge アクセス)。
use std::fmt::Write;
use crate::consolidate::staging::StagingEntry;
use crate::consolidate::tidy::TidyHints;
use crate::workspace::{RecordKind, WorkspaceLayout};
/// Knowledge 化候補レポート。`tickets/memory-usage-metrics.md` の成果物が
/// 出るまでは空で渡す前提(`docs/plan/memory.md` §Knowledge 化候補レポート)。
/// 空入力時、統合 phase は新規 Knowledge を作らず decisions / requests /
/// summary / 既存 Knowledge update に留まる。
#[derive(Debug, Default, Clone)]
pub struct KnowledgeCandidateReport {
/// 候補に上がった `(kind, slug, frequency_per_mtoken)` の三つ組。
/// 空配列を渡すと「候補なし」を意味する。
pub entries: Vec<KnowledgeCandidateEntry>,
}
#[derive(Debug, Clone)]
pub struct KnowledgeCandidateEntry {
pub source_kind: &'static str,
pub source_slug: String,
pub frequency_per_mtoken: f64,
}
impl KnowledgeCandidateReport {
pub fn empty() -> Self {
Self::default()
}
pub fn is_empty(&self) -> bool {
self.entries.is_empty()
}
}
/// Phase 2 sub-Worker の最初の user 入力。
pub fn build_consolidate_input(
layout: &WorkspaceLayout,
staging: &[StagingEntry],
tidy: &TidyHints,
candidates: &KnowledgeCandidateReport,
) -> String {
let mut out = String::new();
out.push_str(
"Phase 2 consolidation input. Run the consolidation phase first \
(fold the staging activity logs into memory and knowledge), then the \
tidy phase (clean up existing records). Use the memory tools for \
every write direct file writes are denied by the pod scope.\n\n",
);
out.push_str("## Staging entries (consumed by this run)\n\n");
out.push_str(&render_staging_records(staging));
out.push('\n');
out.push_str("## Existing memory records (full content)\n\n");
out.push_str(&render_existing_memory_records(layout));
out.push('\n');
out.push_str("## Knowledge candidate report\n\n");
out.push_str(&render_candidate_report(candidates));
out.push('\n');
out.push_str("## Tidy hints\n\n");
out.push_str(&render_tidy_hints(tidy));
out.push('\n');
out.push_str(
"When done, end the turn with a short final assistant message describing \
what changed.",
);
out
}
/// Staging エントリ群を「`### <id>` ヘッダ + 整形 JSON ブロック」で並べる。
/// 空配列なら「(none)」と書く。
pub fn render_staging_records(entries: &[StagingEntry]) -> String {
if entries.is_empty() {
return "(none)\n".to_string();
}
let mut out = String::new();
for entry in entries {
let _ = writeln!(&mut out, "### {}", entry.id);
let json = serde_json::to_string_pretty(&entry.record).unwrap_or_else(|_| "{}".into());
out.push_str("```json\n");
out.push_str(&json);
out.push_str("\n```\n\n");
}
out
}
/// `<workspace>/.insomnia/memory/{summary.md,decisions/*,requests/*}` を
/// 「`### <kind>:<slug>` ヘッダ + raw markdown ブロック」で全文渡す。
pub fn render_existing_memory_records(layout: &WorkspaceLayout) -> String {
let mut out = String::new();
let summary = layout.summary_path();
if let Ok(content) = std::fs::read_to_string(&summary) {
out.push_str("### summary\n");
out.push_str("```markdown\n");
out.push_str(content.trim_end_matches('\n'));
out.push_str("\n```\n\n");
}
push_kind_records(&mut out, layout, RecordKind::Decision);
push_kind_records(&mut out, layout, RecordKind::Request);
if out.is_empty() {
return "(none)\n".to_string();
}
out
}
fn push_kind_records(out: &mut String, layout: &WorkspaceLayout, kind: RecordKind) {
let dir = match kind {
RecordKind::Decision => layout.decisions_dir(),
RecordKind::Request => layout.requests_dir(),
RecordKind::Knowledge | RecordKind::Summary | RecordKind::Workflow => return,
};
let entries = match std::fs::read_dir(&dir) {
Ok(it) => it,
Err(_) => return,
};
let mut paths: Vec<(String, std::path::PathBuf)> = Vec::new();
for entry in entries.flatten() {
let path = entry.path();
if !path.is_file() {
continue;
}
let stem = match path.file_stem().and_then(|s| s.to_str()) {
Some(s) => s,
None => continue,
};
if path.extension().and_then(|s| s.to_str()) != Some("md") {
continue;
}
paths.push((stem.to_string(), path));
}
paths.sort();
for (slug, path) in paths {
let Ok(content) = std::fs::read_to_string(&path) else {
continue;
};
let _ = writeln!(out, "### {}:{}", kind.as_str(), slug);
out.push_str("```markdown\n");
out.push_str(content.trim_end_matches('\n'));
out.push_str("\n```\n\n");
}
}
fn render_candidate_report(report: &KnowledgeCandidateReport) -> String {
if report.is_empty() {
return "(empty — usage metrics pipeline not populated. \
Do not create new Knowledge records this run.)\n"
.to_string();
}
let mut out = String::new();
for c in &report.entries {
let _ = writeln!(
&mut out,
"- {} `{}` — frequency {:.3} invokes/Mtoken",
c.source_kind, c.source_slug, c.frequency_per_mtoken
);
}
out
}
/// Tidy hints の Markdown 描画。空ヒントなら "(none)" 1 行。
pub fn render_tidy_hints(tidy: &TidyHints) -> String {
if tidy.is_empty() {
return "(none)\n".to_string();
}
let mut out = String::new();
if !tidy.replaced_decisions.is_empty() {
out.push_str("**Replaced decisions still on disk** — collapse if the chain has settled:\n");
for (slug, replaced_by) in &tidy.replaced_decisions {
match replaced_by {
Some(target) => {
let _ = writeln!(&mut out, "- `{slug}` → `{target}`");
}
None => {
let _ = writeln!(&mut out, "- `{slug}` (no `replaced_by` set)");
}
}
}
out.push('\n');
}
if !tidy.sources_overflow.is_empty() {
out.push_str(
"**Sources overflow** — consider trimming to the most recent entries (git log keeps the rest):\n",
);
for s in &tidy.sources_overflow {
let _ = writeln!(&mut out, "- {} `{}` ({} sources)", s.kind.as_str(), s.slug, s.count);
}
out.push('\n');
}
if !tidy.similar_slug_clusters.is_empty() {
out.push_str("**Similar slug clusters** — evaluate for merge / rename:\n");
for c in &tidy.similar_slug_clusters {
let joined = c
.slugs
.iter()
.map(|s| format!("`{s}`"))
.collect::<Vec<_>>()
.join(", ");
let _ = writeln!(&mut out, "- {}: {}", c.kind.as_str(), joined);
}
out.push('\n');
}
out.push_str(
"Explicit-invoke metrics (protection threshold) are not yet wired up; \
skip drop on long-standing records when uncertain.\n",
);
out
}
#[cfg(test)]
mod tests {
use super::*;
use crate::consolidate::tidy::{SimilarSlugCluster, SourcesOverflow};
use crate::extract::{ExtractedPayload, write_staging};
use crate::schema::SourceRef;
use chrono::Utc;
use std::path::Path;
fn now() -> String {
Utc::now().to_rfc3339()
}
fn write(p: &Path, content: &str) {
if let Some(parent) = p.parent() {
std::fs::create_dir_all(parent).unwrap();
}
std::fs::write(p, content).unwrap();
}
#[test]
fn build_includes_all_sections_when_populated() {
let dir = tempfile::TempDir::new().unwrap();
let layout = WorkspaceLayout::new(dir.path().to_path_buf());
write(
&dir.path().join(".insomnia/memory/summary.md"),
&format!("---\nupdated_at: {n}\n---\nstate of the world\n", n = now()),
);
write(
&dir.path().join(".insomnia/memory/decisions/dec.md"),
&format!(
"---\ncreated_at: {n}\nupdated_at: {n}\nsources: []\nstatus: open\n---\nbody\n",
n = now()
),
);
let (_id, _) = write_staging(
&layout,
SourceRef {
session_id: "s".into(),
range: [0, 1],
},
ExtractedPayload::default(),
)
.unwrap();
let staging = crate::consolidate::staging::list_staging_entries(&layout);
let tidy = TidyHints {
replaced_decisions: [(
"old".to_string(),
Some("new".to_string()),
)]
.into_iter()
.collect(),
sources_overflow: vec![SourcesOverflow {
kind: RecordKind::Decision,
slug: "dec".into(),
count: 12,
}],
similar_slug_clusters: vec![SimilarSlugCluster {
kind: RecordKind::Decision,
slugs: vec!["a".into(), "ab".into()],
}],
};
let report = KnowledgeCandidateReport::empty();
let out = build_consolidate_input(&layout, &staging, &tidy, &report);
assert!(out.contains("Staging entries"));
assert!(out.contains("Existing memory records"));
assert!(out.contains("Knowledge candidate report"));
assert!(out.contains("Tidy hints"));
assert!(out.contains("state of the world"));
assert!(out.contains("decision:dec"));
assert!(out.contains("Replaced decisions"));
assert!(out.contains("Sources overflow"));
assert!(out.contains("Similar slug clusters"));
assert!(out.contains("usage metrics pipeline not populated"));
}
#[test]
fn empty_inputs_render_placeholders() {
let dir = tempfile::TempDir::new().unwrap();
let layout = WorkspaceLayout::new(dir.path().to_path_buf());
let out = build_consolidate_input(
&layout,
&[],
&TidyHints::default(),
&KnowledgeCandidateReport::empty(),
);
// Both staging and tidy show "(none)"; existing memory records too.
assert!(out.contains("Staging entries"));
assert!(out.contains("(none)"));
}
}

View File

@ -0,0 +1,305 @@
//! `_staging/.consolidation.lock` による Phase 2 占有ファイル。
//!
//! `docs/plan/memory.md` §並走防止 に従い:
//!
//! - ファイルが存在し、記録された Pod が動作している間、その Pod が排他占有
//! - クラッシュで残った stale lock は、所有者 PID が死んでいれば次回 spawn
//! 時に上書き取得できる
//! - cleanup は consumed ID の staging エントリのみ削除し、実行中に Phase 1
//! が追加した分は残す
//!
//! 占有判定は Linux/macOS の `kill(pid, 0)` 経由で行う(`ESRCH` で死亡判定)。
//! Windows は対象外: INSOMNIA は POSIX 環境を前提にしている。
use std::fs;
use std::path::{Path, PathBuf};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::workspace::WorkspaceLayout;
const LOCK_FILE: &str = ".consolidation.lock";
/// 占有ファイルの中身。`pid` で stale 判定し、`pod_name` / `started_at` /
/// `consumed_ids` は診断とクラッシュ復旧時の参照に使う。
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LockRecord {
pub pid: u32,
pub pod_name: String,
pub started_at: DateTime<Utc>,
/// この Phase 2 run が起動時スナップショットで確定した consumed staging
/// entry の UUIDv7 列。完了時はこの列のみ削除し、追加分は残す。
pub consumed_ids: Vec<Uuid>,
}
/// 占有取得 / 解放のエラー。
#[derive(Debug, thiserror::Error)]
pub enum LockError {
/// 占有ファイルが既にあり、所有者 PID が生きているのでスキップ。
#[error("Phase 2 lock held by live pid {pid} (pod {pod_name:?})")]
InUse { pid: u32, pod_name: String },
#[error("io error at {}: {source}", .path.display())]
Io {
path: PathBuf,
#[source]
source: std::io::Error,
},
#[error("failed to (de)serialize lock record: {0}")]
Serde(#[from] serde_json::Error),
}
impl LockError {
fn io(path: impl Into<PathBuf>, source: std::io::Error) -> Self {
Self::Io {
path: path.into(),
source,
}
}
}
/// Phase 2 が走っている間 RAII で持つ占有ハンドル。`Drop` では何もしない —
/// 完了時の cleanup は consumed ID 列削除と一緒に行う必要があるため、明示
/// 解放 [`StagingLock::release_with_cleanup`] を使う。明示解放しないまま
/// drop された場合は占有ファイルがそのまま残り、次回 spawn 時に PID が
/// 死んでいれば stale 上書きされる。
#[derive(Debug)]
pub struct StagingLock {
path: PathBuf,
record: LockRecord,
}
impl StagingLock {
pub fn record(&self) -> &LockRecord {
&self.record
}
pub fn path(&self) -> &Path {
&self.path
}
/// 占有取得を試みる。既に live な lock があれば
/// [`LockError::InUse`]、stale 判定なら上書き取得する。
/// staging dir が無ければ作成する。
pub fn acquire(
layout: &WorkspaceLayout,
pid: u32,
pod_name: impl Into<String>,
consumed_ids: Vec<Uuid>,
) -> Result<Self, LockError> {
let staging_dir = layout.staging_dir();
fs::create_dir_all(&staging_dir).map_err(|e| LockError::io(&staging_dir, e))?;
let path = staging_dir.join(LOCK_FILE);
if path.exists() {
let raw = fs::read_to_string(&path).map_err(|e| LockError::io(&path, e))?;
// 壊れた lock は stale とみなして上書き許可。
if let Ok(existing) = serde_json::from_str::<LockRecord>(&raw) {
if pid_is_alive(existing.pid) {
return Err(LockError::InUse {
pid: existing.pid,
pod_name: existing.pod_name,
});
}
tracing::warn!(
stale_pid = existing.pid,
stale_pod = %existing.pod_name,
"Phase 2 stale lock detected, taking over"
);
} else {
tracing::warn!(path = %path.display(), "Phase 2 lock unparseable, treating as stale");
}
}
let record = LockRecord {
pid,
pod_name: pod_name.into(),
started_at: Utc::now(),
consumed_ids,
};
let json = serde_json::to_string_pretty(&record)?;
fs::write(&path, json).map_err(|e| LockError::io(&path, e))?;
Ok(Self { path, record })
}
/// 占有を解放しつつ consumed ID 列の staging エントリを削除する。
/// 削除対象が見当たらない場合は黙ってスキップ(既に外部で消えていた等)。
/// 占有ファイル自体の削除も best-effort: 失敗時は warn を出すだけで
/// エラーは伝播しない(次回 spawn 時に stale 判定で上書きされる)。
pub fn release_with_cleanup(self, layout: &WorkspaceLayout) {
let staging_dir = layout.staging_dir();
for id in &self.record.consumed_ids {
let target = staging_dir.join(format!("{id}.json"));
match fs::remove_file(&target) {
Ok(_) => {}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
Err(e) => {
tracing::warn!(
path = %target.display(),
error = %e,
"failed to clean up consumed staging entry"
);
}
}
}
self.unlink_lock_only();
}
/// 占有ファイルだけ削除し、staging エントリには触らない。Phase 2
/// sub-Worker が途中で失敗した場合に使う: 入力 staging を残したまま
/// 次回再評価で再処理させる(`docs/plan/memory.md` §並走防止 の
/// 「重複作成は同一 slug update に自然収束」運用)。
pub fn release_only(self) {
self.unlink_lock_only();
}
fn unlink_lock_only(&self) {
if let Err(e) = fs::remove_file(&self.path) {
if e.kind() != std::io::ErrorKind::NotFound {
tracing::warn!(
path = %self.path.display(),
error = %e,
"failed to remove Phase 2 lock"
);
}
}
}
}
#[cfg(unix)]
fn pid_is_alive(pid: u32) -> bool {
// `kill(0, 0)` and `kill(-1, 0)` are POSIX-special (process group / all
// signalable processes) and would yield false positives. Reject pids
// that don't fit a positive `pid_t` so a corrupted lock file with a
// u32::MAX-ish value is treated as stale instead of magically alive.
if pid == 0 || pid > i32::MAX as u32 {
return false;
}
// SAFETY: `kill` with sig 0 only probes whether the target pid exists
// and the caller has permission to signal it. No signal is delivered.
let rc = unsafe { libc::kill(pid as i32, 0) };
if rc == 0 {
return true;
}
// EPERM means the process exists but we can't signal it — still alive
// for our purposes. ESRCH means it's gone.
let errno = std::io::Error::last_os_error()
.raw_os_error()
.unwrap_or(libc::EINVAL);
errno != libc::ESRCH
}
#[cfg(not(unix))]
fn pid_is_alive(_pid: u32) -> bool {
// Unsupported platforms: assume the lock is live so we never overwrite
// someone else's claim. Phase 2 will skip and try again next post-run.
true
}
#[cfg(test)]
mod tests {
use super::*;
use crate::extract::{ExtractedPayload, write_staging};
use crate::schema::SourceRef;
fn make_layout() -> (tempfile::TempDir, WorkspaceLayout) {
let dir = tempfile::TempDir::new().unwrap();
let layout = WorkspaceLayout::new(dir.path().to_path_buf());
std::fs::create_dir_all(layout.staging_dir()).unwrap();
(dir, layout)
}
#[test]
fn acquire_writes_lock_file() {
let (_dir, layout) = make_layout();
let lock = StagingLock::acquire(&layout, std::process::id(), "pod", Vec::new()).unwrap();
let path = layout.staging_dir().join(LOCK_FILE);
assert!(path.exists());
assert_eq!(lock.record().pid, std::process::id());
assert_eq!(lock.record().pod_name, "pod");
}
#[test]
fn acquire_rejects_when_live_pid_holds_lock() {
let (_dir, layout) = make_layout();
// Use this test process's pid — it's definitely alive.
let _first =
StagingLock::acquire(&layout, std::process::id(), "pod-a", Vec::new()).unwrap();
let err = StagingLock::acquire(&layout, std::process::id(), "pod-b", Vec::new())
.expect_err("expected InUse");
assert!(matches!(err, LockError::InUse { .. }));
}
#[test]
fn acquire_overwrites_stale_lock() {
let (_dir, layout) = make_layout();
// pid 1 is init on linux but for arbitrarily-large pids we'd need
// `kill(pid, 0)` to return ESRCH. Use u32::MAX which is guaranteed
// dead on every platform we target.
let stale = LockRecord {
pid: u32::MAX,
pod_name: "ghost".into(),
started_at: Utc::now(),
consumed_ids: Vec::new(),
};
std::fs::write(
layout.staging_dir().join(LOCK_FILE),
serde_json::to_string_pretty(&stale).unwrap(),
)
.unwrap();
let lock = StagingLock::acquire(&layout, std::process::id(), "pod", Vec::new())
.expect("stale lock must be overwritable");
assert_eq!(lock.record().pid, std::process::id());
}
#[test]
fn release_drops_consumed_entries_and_unlinks_lock() {
let (_dir, layout) = make_layout();
let (id_a, _) = write_staging(
&layout,
SourceRef {
session_id: "s".into(),
range: [0, 0],
},
ExtractedPayload::default(),
)
.unwrap();
let (id_b, _) = write_staging(
&layout,
SourceRef {
session_id: "s".into(),
range: [1, 1],
},
ExtractedPayload::default(),
)
.unwrap();
let lock = StagingLock::acquire(&layout, std::process::id(), "pod", vec![id_a]).unwrap();
let lock_path = lock.path().to_path_buf();
lock.release_with_cleanup(&layout);
assert!(!lock_path.exists(), "lock file must be removed");
assert!(
!layout.staging_dir().join(format!("{id_a}.json")).exists(),
"consumed entry must be deleted"
);
assert!(
layout.staging_dir().join(format!("{id_b}.json")).exists(),
"non-consumed entry must remain"
);
}
#[test]
fn release_is_resilient_to_missing_consumed_entries() {
let (_dir, layout) = make_layout();
let phantom = uuid::Uuid::now_v7();
let lock =
StagingLock::acquire(&layout, std::process::id(), "pod", vec![phantom]).unwrap();
let lock_path = lock.path().to_path_buf();
// No file at <staging>/<phantom>.json — release must not panic.
lock.release_with_cleanup(&layout);
assert!(!lock_path.exists());
}
}

View File

@ -0,0 +1,32 @@
//! Phase 2: 統合 + 整理。
//!
//! Phase 1 が staging に残した活動ログを `memory/*` / `knowledge/*` に
//! 統合し、続けて既存 record を `outdated | superseded | unused | noisy`
//! の観点で整理する disposable Worker を、Pod 側が組み立てるための
//! ヘルパー群を提供する。Pod は次の手順で sub-Worker を構築する:
//!
//! - [`CONSOLIDATION_SYSTEM_PROMPT`] を sub-Worker の system prompt に
//! - [`build_consolidate_input`] を sub-Worker の最初の user 入力に
//! - memory 専用 Tool (read / write / edit) と Knowledge / memory 検索ツールを登録
//! - [`StagingLock::acquire`] で並走防止 + consumed ID 確定
//! - sub-Worker run 完了後、[`StagingLock::release_with_cleanup`] で
//! consumed ID 分の staging のみ削除し、占有ファイルを解放
//!
//! Knowledge 化候補レポートと使用頻度メトリクスは別チケットで供給される
//! 想定。本モジュール時点では空入力として扱い、prompt 側の説明だけ
//! 残しておく(`docs/plan/memory.md` §Phase 2 / 整理材料)。
mod input;
mod lock;
mod prompt;
mod staging;
mod tidy;
pub use input::{
KnowledgeCandidateReport, build_consolidate_input, render_existing_memory_records,
render_staging_records, render_tidy_hints,
};
pub use lock::{LockError, LockRecord, StagingLock};
pub use prompt::CONSOLIDATION_SYSTEM_PROMPT;
pub use staging::{StagingEntry, list_staging_entries};
pub use tidy::{TidyHints, collect_tidy_hints};

View File

@ -0,0 +1,69 @@
//! Phase 2 sub-Worker の system prompt。
//!
//! 内容は `docs/plan/memory-prompts.md` §共通原則 / §Phase 2 統合 + 整理 /
//! §Phase 2 Knowledge 書き込み を縮約。統合 phase / 整理 phase は同じ
//! prompt 1 本で順に進める縛りagent から見ると 1 セッション内のフェーズ
//! 進行、別 trigger / 別 Worker は持たない、`docs/plan/memory.md` §整理
//! の扱い)。
pub const CONSOLIDATION_SYSTEM_PROMPT: &str = r#"You are the Phase 2 consolidation worker for an INSOMNIA memory subsystem.
Your job is to take Phase 1 activity-log staging entries together with the workspace's current `memory/*` / `knowledge/*` records, then run two phases back-to-back in this single session:
1. **Consolidation phase** fold staging into memory and knowledge.
2. **Tidy phase** clean up the existing records that the consolidation phase didn't already touch.
You have:
- `MemoryRead`, `MemoryWrite`, `MemoryEdit` for memory and knowledge records.
- `MemoryQuery` for memory-side records (summary / decisions / requests).
- `KnowledgeQuery` for knowledge records use it to find existing slugs before creating new ones.
Your initial user message contains the staging entries, the full memory records, the knowledge candidate report, and the tidy hints. Existing knowledge bodies are NOT in the prompt; pull them through `KnowledgeQuery` + `MemoryRead` when relevant.
# Common rules (both phases)
- **Do not invent provenance.** Decisions / Requests `sources` arrays MUST be copied from the staging `source` field for the originating activity log entries. Do not synthesise `session_id` or entry ranges. Do not fabricate `last_sources` for Knowledge.
- **Rewrite is allowed and often preferred over append.** When integrating new information, restructure existing records to raise information density. Preserve the existing claims, rationale, and `sources` while you compress.
- **Update over create.** If an existing slug fits, edit it. Only create a new slug when no existing record fits and you can articulate why.
- **`replaced` over delete.** When a Decision is superseded by a different one, mark the old one `status: replaced` with `replaced_by: <new-slug>`. Do not silently drop it.
- **Don't duplicate static docs.** Skip content that already lives in `AGENTS.md`, `docs/plan/*`, or other fixed project documents.
- **Empty output is fine.** If a staging entry doesn't justify a memory write, skip it.
- **Slug rules.** Slugs are kebab-case, short, recognisable, and must be unique within their kind. Same-slug create is a linter error use Edit instead.
- **Linter errors come back as tool errors.** When the memory linter rejects a write, read the error, fix the issue (missing frontmatter field, oversized body, unknown reference, etc.), and try again. Do not work around the rule.
# Consolidation phase
Walk every staging entry in the input. For each one:
- Add or update `decisions` / `requests` records as appropriate. Copy `sources` verbatim from the staging entry.
- Update existing knowledge records when the staging activity refines them. Use `KnowledgeQuery` to find candidates before creating anything new.
- **Knowledge creation is gated.** Only create a new `knowledge/<slug>.md` when the originating source appears in the supplied "Knowledge candidate report". When the report is empty (the metrics pipeline is still being built), do not create new knowledge fold the activity into decisions / requests / summary or update existing knowledge instead.
- Rewrite `memory/summary.md` only when needed. Aim for 15k tokens. Preserve the high-level shape (current focus, recent decisions, stable facts) while pruning stale items.
# Tidy phase
Once the consolidation phase is done, evaluate every existing memory and knowledge record against four categories:
- `outdated`: was correct, no longer matches the current implementation / policy / operation.
- `superseded`: another record is now the de-facto authoritative one; this one is mostly redundant.
- `unused`: not wrong, but rarely referenced noise rather than signal.
- `noisy`: useful content but bad shape (overlap, sources accumulation, fractured slugs that should merge).
A single record may fall into more than one category. Choose one of `drop / merge / split / trim / rewrite`:
- Prefer `merge` and `trim` over `drop` for anything you'd flag as `unused` or `noisy` git can reverse you, but a confidently-wrong drop hurts discovery.
- `drop` is allowed for `outdated` / `superseded` records you can justify in the diff.
- `replaced` markers (`status: replaced`) and chains pointed at by the tidy hints should be collapsed in this phase.
**Protection threshold.** When the tidy hints include explicit-invoke metrics, records with `frequency >= 1.0 invokes/Mtoken` are off-limits to drop / large compression. The metrics pipeline is not always populated; when the input lacks frequency data, behave conservatively and skip drop on long-standing records.
# Closing the turn
When both phases are done, write a short final assistant message stating:
- which staging entries you folded in (by short summary, not by ID),
- which existing records you touched (slug + operation),
- anything you intentionally left alone and why.
Then end the turn. Do not ask questions there is no human in the loop for this run.
"#;

View File

@ -0,0 +1,139 @@
//! `_staging/*.json` を列挙して [`StagingRecord`] に展開する読み込みヘルパー。
//!
//! Phase 2 起動時のスナップショットconsumed ID list 確定)と、整理 phase
//! が終わった後の cleanup の双方で使う。`.consolidation.lock` のような
//! 占有ファイルは UUIDv7 として parse できないので自然に除外される。
//!
//! [`StagingRecord`] のスキーマは Phase 1 が書き出す側 (`crate::extract`)
//! と単一の真実源 — ここでは読み出す側だけを担当する。
use std::path::PathBuf;
use uuid::Uuid;
use crate::extract::StagingRecord;
use crate::workspace::WorkspaceLayout;
/// staging に積まれている 1 件分のエントリ。`id` は UUIDv7 で、ファイル名
/// `<id>.json` を逆引きしたもの。
#[derive(Debug, Clone)]
pub struct StagingEntry {
pub id: Uuid,
pub path: PathBuf,
pub record: StagingRecord,
/// このファイルのバイト長。閾値判定 (`consolidation_threshold_bytes`)
/// に使う。
pub bytes: u64,
}
/// `<staging_dir>/*.json` を読んで UUIDv7 順に並べた [`StagingEntry`]
/// 配列を返す。staging_dir が存在しなければ空配列。読めないファイルや
/// JSON parse 失敗は `tracing::warn!` してスキップ(壊れた個別ファイルが
/// Phase 2 全体を止めないように)。
pub fn list_staging_entries(layout: &WorkspaceLayout) -> Vec<StagingEntry> {
let dir = layout.staging_dir();
let entries = match std::fs::read_dir(&dir) {
Ok(it) => it,
Err(_) => return Vec::new(),
};
let mut out: Vec<StagingEntry> = Vec::new();
for entry in entries.flatten() {
let path = entry.path();
if !path.is_file() {
continue;
}
let stem = match path.file_stem().and_then(|s| s.to_str()) {
Some(s) => s,
None => continue,
};
let ext = path.extension().and_then(|s| s.to_str()).unwrap_or("");
if ext != "json" {
continue;
}
let id = match Uuid::parse_str(stem) {
Ok(u) => u,
Err(_) => continue,
};
let bytes = match std::fs::metadata(&path) {
Ok(m) => m.len(),
Err(_) => 0,
};
let raw = match std::fs::read_to_string(&path) {
Ok(s) => s,
Err(e) => {
tracing::warn!(path = %path.display(), error = %e, "failed to read staging entry");
continue;
}
};
let record = match serde_json::from_str::<StagingRecord>(&raw) {
Ok(r) => r,
Err(e) => {
tracing::warn!(path = %path.display(), error = %e, "failed to parse staging entry");
continue;
}
};
out.push(StagingEntry {
id,
path,
record,
bytes,
});
}
out.sort_by_key(|e| e.id);
out
}
#[cfg(test)]
mod tests {
use super::*;
use crate::extract::{ExtractedPayload, write_staging};
use crate::schema::SourceRef;
fn empty_payload() -> ExtractedPayload {
ExtractedPayload::default()
}
fn source(session_id: &str, range: [u64; 2]) -> SourceRef {
SourceRef {
session_id: session_id.into(),
range,
}
}
#[test]
fn lists_in_uuidv7_order() {
let tmp = tempfile::TempDir::new().unwrap();
let layout = WorkspaceLayout::new(tmp.path().to_path_buf());
let (id1, _) = write_staging(&layout, source("s", [0, 1]), empty_payload()).unwrap();
let (id2, _) = write_staging(&layout, source("s", [2, 3]), empty_payload()).unwrap();
let (id3, _) = write_staging(&layout, source("s", [4, 5]), empty_payload()).unwrap();
let entries = list_staging_entries(&layout);
let ids: Vec<Uuid> = entries.iter().map(|e| e.id).collect();
assert_eq!(ids, vec![id1, id2, id3]);
}
#[test]
fn skips_lock_file_and_garbage() {
let tmp = tempfile::TempDir::new().unwrap();
let layout = WorkspaceLayout::new(tmp.path().to_path_buf());
let (_id, _) = write_staging(&layout, source("s", [0, 1]), empty_payload()).unwrap();
// Drop a non-UUID json file and a bare lock file alongside.
std::fs::write(layout.staging_dir().join("not-a-uuid.json"), "{}").unwrap();
std::fs::write(layout.staging_dir().join(".consolidation.lock"), "{}").unwrap();
let entries = list_staging_entries(&layout);
assert_eq!(entries.len(), 1);
}
#[test]
fn missing_dir_returns_empty() {
let tmp = tempfile::TempDir::new().unwrap();
let layout = WorkspaceLayout::new(tmp.path().to_path_buf());
// No staging dir at all.
assert!(list_staging_entries(&layout).is_empty());
}
}

View File

@ -0,0 +1,362 @@
//! 整理 phase が prompt 入力に乗せる「整理材料」スキャナ。
//!
//! `docs/plan/memory.md` §整理GC 相当)の扱い と
//! `tickets/memory-phase2-consolidation.md` の整理材料リストに従い、
//! メトリクス未完の現状で機械的に拾えるヒントだけを集める:
//!
//! - `replaced` chain: `status: replaced` の Decision とその `replaced_by`
//! - sources 過多: `sources` / `last_sources` 配列が閾値超過の record
//! - 類似 slug 乱立: 同 kind の slug が Levenshtein 2 以内のクラスター
//!
//! 使用頻度メトリクスベースの保護閾値情報は `tickets/memory-usage-metrics.md`
//! の成果物が出るまで空で渡る。
use std::collections::{BTreeMap, BTreeSet};
use crate::schema::{
DecisionFrontmatter, KnowledgeFrontmatter, RequestFrontmatter, split_frontmatter,
};
use crate::slug::Slug;
use crate::workspace::{RecordKind, WorkspaceLayout};
/// `sources` overflow を flag する閾値。`linter::warnings::SOURCES_OVERFLOW_THRESHOLD`
/// と同値10を踏襲する。Linter Warn で sources 過多が検出されるラインと
/// 整理 phase で勧告するラインを揃える狙い。
pub const SOURCES_OVERFLOW_THRESHOLD: usize = 10;
/// 類似 slug クラスタリングの距離。`linter::warnings::SIMILAR_SLUG_DISTANCE`
/// と同値。
pub const SIMILAR_SLUG_DISTANCE: usize = 2;
/// 整理 phase 用の機械集計ヒント。空フィールドは「対象なし」を意味する。
#[derive(Debug, Default, Clone)]
pub struct TidyHints {
/// `status: replaced` で残っている Decision の slug → `replaced_by` map。
/// `replaced_by` が None でも置き換え滞留として列挙する。
pub replaced_decisions: BTreeMap<String, Option<String>>,
/// kind / slug / sources count の三つ組で sources 累積ラインを表す。
pub sources_overflow: Vec<SourcesOverflow>,
/// 同 kind 内で Levenshtein 距離 `<= SIMILAR_SLUG_DISTANCE` のクラスター。
/// クラスター内の slug は sorted。
pub similar_slug_clusters: Vec<SimilarSlugCluster>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SourcesOverflow {
pub kind: RecordKind,
pub slug: String,
pub count: usize,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SimilarSlugCluster {
pub kind: RecordKind,
pub slugs: Vec<String>,
}
impl TidyHints {
pub fn is_empty(&self) -> bool {
self.replaced_decisions.is_empty()
&& self.sources_overflow.is_empty()
&& self.similar_slug_clusters.is_empty()
}
}
/// workspace を一通りスキャンして [`TidyHints`] を組み立てる。読めない /
/// parse できない record は黙ってスキップLinter は write 経路で守って
/// いるので、ここで顕在化してもどうしようもない)。
pub fn collect_tidy_hints(layout: &WorkspaceLayout) -> TidyHints {
let mut hints = TidyHints::default();
let decisions = read_kind_records(layout, RecordKind::Decision);
let requests = read_kind_records(layout, RecordKind::Request);
let knowledge = read_kind_records(layout, RecordKind::Knowledge);
for (slug, content) in &decisions {
let fm = parse_yaml::<DecisionFrontmatter>(content);
if let Some(fm) = fm.as_ref() {
if matches!(
fm.status,
crate::schema::DecisionStatus::Replaced
) {
hints
.replaced_decisions
.insert(slug.clone(), fm.replaced_by.as_ref().map(|s| s.to_string()));
}
if fm.sources.len() > SOURCES_OVERFLOW_THRESHOLD {
hints.sources_overflow.push(SourcesOverflow {
kind: RecordKind::Decision,
slug: slug.clone(),
count: fm.sources.len(),
});
}
}
}
for (slug, content) in &requests {
if let Some(fm) = parse_yaml::<RequestFrontmatter>(content) {
if fm.sources.len() > SOURCES_OVERFLOW_THRESHOLD {
hints.sources_overflow.push(SourcesOverflow {
kind: RecordKind::Request,
slug: slug.clone(),
count: fm.sources.len(),
});
}
}
}
for (slug, content) in &knowledge {
if let Some(fm) = parse_yaml::<KnowledgeFrontmatter>(content) {
if fm.last_sources.len() > SOURCES_OVERFLOW_THRESHOLD {
hints.sources_overflow.push(SourcesOverflow {
kind: RecordKind::Knowledge,
slug: slug.clone(),
count: fm.last_sources.len(),
});
}
}
}
hints
.sources_overflow
.sort_by(|a, b| (a.kind.as_str(), a.slug.as_str()).cmp(&(b.kind.as_str(), b.slug.as_str())));
let decision_slugs: Vec<&str> = decisions.keys().map(|s| s.as_str()).collect();
let request_slugs: Vec<&str> = requests.keys().map(|s| s.as_str()).collect();
let knowledge_slugs: Vec<&str> = knowledge.keys().map(|s| s.as_str()).collect();
if let Some(c) = cluster_similar(&decision_slugs, RecordKind::Decision) {
hints.similar_slug_clusters.extend(c);
}
if let Some(c) = cluster_similar(&request_slugs, RecordKind::Request) {
hints.similar_slug_clusters.extend(c);
}
if let Some(c) = cluster_similar(&knowledge_slugs, RecordKind::Knowledge) {
hints.similar_slug_clusters.extend(c);
}
hints
.similar_slug_clusters
.sort_by(|a, b| (a.kind.as_str(), &a.slugs).cmp(&(b.kind.as_str(), &b.slugs)));
hints
}
/// `<root>/.insomnia/memory/<kind>/*.md` (Knowledge は
/// `<root>/.insomnia/knowledge/*.md`) を slug ごとに `(slug, full content)`
/// 化して返す。
fn read_kind_records(
layout: &WorkspaceLayout,
kind: RecordKind,
) -> BTreeMap<String, String> {
let dir = match kind {
RecordKind::Decision => layout.decisions_dir(),
RecordKind::Request => layout.requests_dir(),
RecordKind::Knowledge => layout.knowledge_dir(),
RecordKind::Summary | RecordKind::Workflow => return BTreeMap::new(),
};
let mut out: BTreeMap<String, String> = BTreeMap::new();
let entries = match std::fs::read_dir(&dir) {
Ok(it) => it,
Err(_) => return out,
};
for entry in entries.flatten() {
let path = entry.path();
if !path.is_file() {
continue;
}
let stem = match path.file_stem().and_then(|s| s.to_str()) {
Some(s) => s,
None => continue,
};
if path.extension().and_then(|s| s.to_str()) != Some("md") {
continue;
}
if Slug::parse(stem).is_err() {
continue;
}
let content = match std::fs::read_to_string(&path) {
Ok(s) => s,
Err(_) => continue,
};
out.insert(stem.to_string(), content);
}
out
}
fn parse_yaml<F: serde::de::DeserializeOwned>(content: &str) -> Option<F> {
let (yaml, _body) = split_frontmatter(content).ok()?;
serde_yaml::from_str::<F>(yaml).ok()
}
/// Connected-component clustering over the `levenshtein <= SIMILAR_SLUG_DISTANCE`
/// graph among same-kind slugs. Returns each cluster of size >= 2 (singleton
/// clusters are not interesting for the integration phase). Returns `None`
/// when there are no clusters at all.
fn cluster_similar(slugs: &[&str], kind: RecordKind) -> Option<Vec<SimilarSlugCluster>> {
if slugs.len() < 2 {
return None;
}
let n = slugs.len();
let mut parent: Vec<usize> = (0..n).collect();
fn find(parent: &mut [usize], i: usize) -> usize {
if parent[i] == i {
i
} else {
let root = find(parent, parent[i]);
parent[i] = root;
root
}
}
fn union(parent: &mut [usize], a: usize, b: usize) {
let ra = find(parent, a);
let rb = find(parent, b);
if ra != rb {
parent[ra] = rb;
}
}
for i in 0..n {
for j in (i + 1)..n {
if levenshtein(slugs[i], slugs[j]) <= SIMILAR_SLUG_DISTANCE {
union(&mut parent, i, j);
}
}
}
let mut groups: BTreeMap<usize, Vec<String>> = BTreeMap::new();
for i in 0..n {
let root = find(&mut parent, i);
groups.entry(root).or_default().push(slugs[i].to_string());
}
let mut out: Vec<SimilarSlugCluster> = Vec::new();
let mut seen_canonical: BTreeSet<Vec<String>> = BTreeSet::new();
for (_, mut group) in groups {
if group.len() < 2 {
continue;
}
group.sort();
if seen_canonical.insert(group.clone()) {
out.push(SimilarSlugCluster { kind, slugs: group });
}
}
if out.is_empty() { None } else { Some(out) }
}
/// Iterative two-row Levenshtein distance over chars (matches the Linter's
/// implementation; kept private to avoid widening that crate-internal API).
fn levenshtein(a: &str, b: &str) -> usize {
let a: Vec<char> = a.chars().collect();
let b: Vec<char> = b.chars().collect();
if a.is_empty() {
return b.len();
}
if b.is_empty() {
return a.len();
}
let mut prev: Vec<usize> = (0..=b.len()).collect();
let mut curr: Vec<usize> = vec![0; b.len() + 1];
for (i, ca) in a.iter().enumerate() {
curr[0] = i + 1;
for (j, cb) in b.iter().enumerate() {
let cost = if ca == cb { 0 } else { 1 };
curr[j + 1] = (curr[j] + 1).min(prev[j + 1] + 1).min(prev[j] + cost);
}
std::mem::swap(&mut prev, &mut curr);
}
prev[b.len()]
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::Utc;
use std::path::Path;
fn now() -> String {
Utc::now().to_rfc3339()
}
fn write(p: &Path, content: &str) {
if let Some(parent) = p.parent() {
std::fs::create_dir_all(parent).unwrap();
}
std::fs::write(p, content).unwrap();
}
fn workspace() -> (tempfile::TempDir, WorkspaceLayout) {
let dir = tempfile::TempDir::new().unwrap();
let layout = WorkspaceLayout::new(dir.path().to_path_buf());
(dir, layout)
}
#[test]
fn collects_replaced_chain() {
let (dir, layout) = workspace();
write(
&dir.path().join(".insomnia/memory/decisions/replaced.md"),
&format!(
"---\ncreated_at: {n}\nupdated_at: {n}\nsources: []\nstatus: replaced\nreplaced_by: winner\n---\n",
n = now()
),
);
write(
&dir.path().join(".insomnia/memory/decisions/winner.md"),
&format!(
"---\ncreated_at: {n}\nupdated_at: {n}\nsources: []\nstatus: open\n---\n",
n = now()
),
);
let hints = collect_tidy_hints(&layout);
assert_eq!(
hints.replaced_decisions.get("replaced").cloned(),
Some(Some("winner".into()))
);
assert!(!hints.replaced_decisions.contains_key("winner"));
}
#[test]
fn flags_sources_overflow() {
let (dir, layout) = workspace();
let many_sources: String = (0..15)
.map(|i| format!(" - session_id: s{i}\n range: [{i}, {i}]\n"))
.collect();
write(
&dir.path().join(".insomnia/memory/decisions/big.md"),
&format!(
"---\ncreated_at: {n}\nupdated_at: {n}\nstatus: open\nsources:\n{m}---\n",
n = now(),
m = many_sources
),
);
let hints = collect_tidy_hints(&layout);
assert_eq!(hints.sources_overflow.len(), 1);
assert_eq!(hints.sources_overflow[0].slug, "big");
assert_eq!(hints.sources_overflow[0].kind, RecordKind::Decision);
assert_eq!(hints.sources_overflow[0].count, 15);
}
#[test]
fn clusters_similar_slugs() {
let (dir, layout) = workspace();
for slug in ["db-pool", "db-pol", "db-pools", "alpha"] {
write(
&dir.path()
.join(format!(".insomnia/memory/decisions/{slug}.md")),
&format!(
"---\ncreated_at: {n}\nupdated_at: {n}\nsources: []\nstatus: open\n---\n",
n = now()
),
);
}
let hints = collect_tidy_hints(&layout);
assert_eq!(hints.similar_slug_clusters.len(), 1);
assert_eq!(
hints.similar_slug_clusters[0].slugs,
vec![
"db-pol".to_string(),
"db-pool".to_string(),
"db-pools".to_string(),
]
);
}
#[test]
fn empty_workspace_yields_empty_hints() {
let (_dir, layout) = workspace();
let hints = collect_tidy_hints(&layout);
assert!(hints.is_empty());
}
}

View File

@ -6,6 +6,7 @@
//! crate) must not touch these directories — Pod is responsible for //! crate) must not touch these directories — Pod is responsible for
//! denying them at the Scope level when memory is enabled. //! denying them at the Scope level when memory is enabled.
pub mod consolidate;
pub mod error; pub mod error;
pub mod extract; pub mod extract;
pub mod linter; pub mod linter;

View File

@ -27,6 +27,7 @@ fs4 = { version = "0.13.1", features = ["sync"] }
libc = "0.2.185" libc = "0.2.185"
schemars = "1.2.1" schemars = "1.2.1"
memory = { version = "0.1.0", path = "../memory" } memory = { version = "0.1.0", path = "../memory" }
uuid = { version = "1.23.1", features = ["v7"] }
[dev-dependencies] [dev-dependencies]
async-trait = "0.1.89" async-trait = "0.1.89"

View File

@ -402,6 +402,14 @@ impl PodController {
format!("post-run memory extract error: {e}"), format!("post-run memory extract error: {e}"),
); );
} }
if let Err(e) = pod.try_post_run_consolidate().await {
tracing::warn!(error = %e, "Post-run memory consolidate error");
alerter.alert(
AlertLevel::Warn,
AlertSource::Pod,
format!("post-run memory consolidate error: {e}"),
);
}
if let Err(e) = pod.try_post_run_compact().await { if let Err(e) = pod.try_post_run_compact().await {
tracing::warn!(error = %e, "Post-run compaction error"); tracing::warn!(error = %e, "Post-run compaction error");
alerter.alert( alerter.alert(
@ -461,6 +469,14 @@ impl PodController {
format!("post-run memory extract error: {e}"), format!("post-run memory extract error: {e}"),
); );
} }
if let Err(e) = pod.try_post_run_consolidate().await {
tracing::warn!(error = %e, "Post-run memory consolidate error");
alerter.alert(
AlertLevel::Warn,
AlertSource::Pod,
format!("post-run memory consolidate error: {e}"),
);
}
if let Err(e) = pod.try_post_run_compact().await { if let Err(e) = pod.try_post_run_compact().await {
tracing::warn!(error = %e, "Post-run compaction error"); tracing::warn!(error = %e, "Post-run compaction error");
alerter.alert( alerter.alert(
@ -517,6 +533,14 @@ impl PodController {
format!("post-run memory extract error: {e}"), format!("post-run memory extract error: {e}"),
); );
} }
if let Err(e) = pod.try_post_run_consolidate().await {
tracing::warn!(error = %e, "Post-run memory consolidate error");
alerter.alert(
AlertLevel::Warn,
AlertSource::Pod,
format!("post-run memory consolidate error: {e}"),
);
}
if let Err(e) = pod.try_post_run_compact().await { if let Err(e) = pod.try_post_run_compact().await {
tracing::warn!(error = %e, "Post-run compaction error"); tracing::warn!(error = %e, "Post-run compaction error");
alerter.alert( alerter.alert(

View File

@ -136,6 +136,11 @@ pub struct Pod<C: LlmClient, St: Store> {
/// the flag survives across `try_post_run_extract` calls without a /// the flag survives across `try_post_run_extract` calls without a
/// `&mut self` race. /// `&mut self` race.
extract_in_flight: Arc<AtomicBool>, extract_in_flight: Arc<AtomicBool>,
/// Phase 2 (memory.consolidation) in-process reentry guard. The
/// staging-side `StagingLock` already provides cross-process
/// exclusion, but this AtomicBool keeps a careless concurrent caller
/// inside the same Pod from racing on the staging snapshot.
consolidation_in_flight: Arc<AtomicBool>,
/// Last completed Phase 1 boundary. `None` means no extract has /// Last completed Phase 1 boundary. `None` means no extract has
/// run yet on this session — next extract starts from entry 0. /// run yet on this session — next extract starts from entry 0.
/// Restored from `RestoredState.extensions` on `restore`, updated /// Restored from `RestoredState.extensions` on `restore`, updated
@ -197,6 +202,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
prompts, prompts,
inject_resident_knowledge: true, inject_resident_knowledge: true,
extract_in_flight: Arc::new(AtomicBool::new(false)), extract_in_flight: Arc::new(AtomicBool::new(false)),
consolidation_in_flight: Arc::new(AtomicBool::new(false)),
extract_pointer: Mutex::new(None), extract_pointer: Mutex::new(None),
user_segments: Vec::new(), user_segments: Vec::new(),
}; };
@ -1490,6 +1496,173 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
Ok(ExtractDecision::Completed) Ok(ExtractDecision::Completed)
} }
/// Build the LlmClient for the Phase 2 (memory.consolidation) Worker.
///
/// Uses `memory.consolidation_model` from manifest if set, otherwise
/// clones the main client. Mirrors [`build_extractor_client`].
fn build_consolidator_client(
&self,
memory_cfg: &manifest::MemoryConfig,
) -> Result<Box<dyn LlmClient>, PodError> {
if let Some(ref m) = memory_cfg.consolidation_model {
let client = provider::build_client(m)?;
return Ok(client);
}
let worker = self.worker.as_ref().expect("worker taken during run");
Ok(worker.client().clone_boxed())
}
/// Phase 2 (memory.consolidation) post-run trigger.
///
/// Called by the Controller **after** [`try_post_run_extract`] and
/// **before** [`try_post_run_compact`]: extract feeds staging, compact
/// rewrites history. Phase 2 must consume staging before compact
/// reshapes the session.
///
/// Behaviour follows `docs/plan/memory.md` §Phase 2 / §並走防止:
/// the staging-side `StagingLock` enforces cross-process exclusion;
/// `consolidation_in_flight` keeps in-process callers honest. On
/// success, the lock is released *with* consumed-id cleanup; on
/// worker failure, only the lock file is unlinked so the staging
/// entries remain for a future retry.
pub async fn try_post_run_consolidate(&mut self) -> Result<(), PodError> {
let Some(memory_cfg) = self.manifest.memory.clone() else {
return Ok(());
};
let files_threshold = memory_cfg.consolidation_threshold_files;
let bytes_threshold = memory_cfg.consolidation_threshold_bytes;
if files_threshold.is_none() && bytes_threshold.is_none() {
return Ok(());
}
loop {
if self
.consolidation_in_flight
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{
return Ok(());
}
let result = self
.run_consolidate_once(&memory_cfg, files_threshold, bytes_threshold)
.await;
self.consolidation_in_flight.store(false, Ordering::Release);
match result {
Ok(ConsolidateDecision::Skipped) => return Ok(()),
Ok(ConsolidateDecision::Completed) => continue,
Err(e) => {
tracing::warn!(error = %e, "Phase 2 consolidation failed");
self.alert(
AlertLevel::Warn,
AlertSource::Pod,
format!("memory Phase 2 consolidation failed: {e}"),
);
return Ok(());
}
}
}
}
/// Single consolidation iteration: snapshot staging, decide whether to
/// fire, run the worker if so, release the lock and clean up consumed
/// IDs.
async fn run_consolidate_once(
&mut self,
memory_cfg: &manifest::MemoryConfig,
files_threshold: Option<usize>,
bytes_threshold: Option<u64>,
) -> Result<ConsolidateDecision, PodError> {
use memory::consolidate;
let layout = memory::WorkspaceLayout::resolve(memory_cfg, &self.pwd);
let entries = consolidate::list_staging_entries(&layout);
if entries.is_empty() {
return Ok(ConsolidateDecision::Skipped);
}
let total_files = entries.len();
let total_bytes: u64 = entries.iter().map(|e| e.bytes).sum();
let files_hit = files_threshold.is_some_and(|n| total_files >= n);
let bytes_hit = bytes_threshold.is_some_and(|n| total_bytes >= n);
if !files_hit && !bytes_hit {
return Ok(ConsolidateDecision::Skipped);
}
let consumed_ids: Vec<uuid::Uuid> = entries.iter().map(|e| e.id).collect();
let lock = match consolidate::StagingLock::acquire(
&layout,
std::process::id(),
self.manifest.pod.name.clone(),
consumed_ids,
) {
Ok(l) => l,
Err(memory::consolidate::LockError::InUse { .. }) => {
return Ok(ConsolidateDecision::Skipped);
}
Err(e) => return Err(PodError::ConsolidationLock(e)),
};
let cap = memory_cfg
.consolidation_worker_max_input_tokens
.unwrap_or(manifest::defaults::MEMORY_CONSOLIDATION_WORKER_MAX_INPUT_TOKENS);
let client = match self.build_consolidator_client(memory_cfg) {
Ok(c) => c,
Err(e) => {
lock.release_only();
return Err(e);
}
};
let mut worker =
Worker::new(client).system_prompt(consolidate::CONSOLIDATION_SYSTEM_PROMPT);
let input_so_far = Arc::new(std::sync::atomic::AtomicU64::new(0));
{
let acc = input_so_far.clone();
worker.on_usage(move |event| {
if let Some(tokens) = event.input_tokens {
acc.fetch_add(tokens, Ordering::Relaxed);
}
});
}
worker.set_interceptor(MemoryConsolidationWorkerInterceptor {
input_so_far: input_so_far.clone(),
max_input_tokens: cap,
});
// Memory tools are self-contained — they bypass ScopedFs and write
// directly under the workspace via WorkspaceLayout. Resident
// knowledge injection (`Pod::set_resident_knowledge_injection`) is
// a Pod-level concern; this disposable Worker is built without it
// by construction, in keeping with `docs/plan/memory.md` §Phase 2
// のKnowledgeアクセス (agent pulls knowledge through the search
// tool instead of via system-prompt residency).
let query_cfg = memory::tool::QueryConfig::from(memory_cfg);
worker.register_tool(memory::tool::read_tool(layout.clone()));
worker.register_tool(memory::tool::write_tool(layout.clone()));
worker.register_tool(memory::tool::edit_tool(layout.clone()));
worker.register_tool(memory::tool::memory_query_tool(layout.clone(), query_cfg));
worker.register_tool(memory::tool::knowledge_query_tool(layout.clone(), query_cfg));
let tidy = consolidate::collect_tidy_hints(&layout);
let candidates = consolidate::KnowledgeCandidateReport::empty();
let input_text =
consolidate::build_consolidate_input(&layout, &entries, &tidy, &candidates);
let run_result = worker.run(input_text).await;
match run_result {
Ok(_) => {
lock.release_with_cleanup(&layout);
Ok(ConsolidateDecision::Completed)
}
Err(e) => {
lock.release_only();
Err(PodError::Worker(e))
}
}
}
} }
/// Outcome of a single Phase 1 extract iteration. Internal to /// Outcome of a single Phase 1 extract iteration. Internal to
@ -1526,6 +1699,40 @@ impl llm_worker::interceptor::Interceptor for MemoryExtractWorkerInterceptor {
} }
} }
/// Outcome of a single Phase 2 consolidation iteration. Internal to
/// `try_post_run_consolidate` / `run_consolidate_once`.
enum ConsolidateDecision {
/// Either threshold not met, no staging, or another Pod holds the lock.
Skipped,
/// Consolidation ran. Caller re-evaluates threshold against any
/// staging entries that arrived during the run (Coalesce).
Completed,
}
/// Pre-request interceptor for the Phase 2 consolidation worker. Same
/// shape as the extract interceptor; kept separate so the abort message
/// names the right subsystem.
struct MemoryConsolidationWorkerInterceptor {
input_so_far: Arc<std::sync::atomic::AtomicU64>,
max_input_tokens: u64,
}
#[async_trait]
impl llm_worker::interceptor::Interceptor for MemoryConsolidationWorkerInterceptor {
async fn pre_llm_request(
&self,
_context: &mut Vec<Item>,
) -> llm_worker::interceptor::PreRequestAction {
if self.input_so_far.load(Ordering::Relaxed) > self.max_input_tokens {
return llm_worker::interceptor::PreRequestAction::Cancel(format!(
"Phase 2 consolidation worker input exceeded {} tokens",
self.max_input_tokens
));
}
llm_worker::interceptor::PreRequestAction::Continue
}
}
impl<St: Store> Pod<Box<dyn LlmClient>, St> { impl<St: Store> Pod<Box<dyn LlmClient>, St> {
/// Create a Pod entirely from a validated manifest. /// Create a Pod entirely from a validated manifest.
/// ///
@ -1596,6 +1803,7 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
prompts: common.prompts, prompts: common.prompts,
inject_resident_knowledge: true, inject_resident_knowledge: true,
extract_in_flight: Arc::new(AtomicBool::new(false)), extract_in_flight: Arc::new(AtomicBool::new(false)),
consolidation_in_flight: Arc::new(AtomicBool::new(false)),
extract_pointer: Mutex::new(None), extract_pointer: Mutex::new(None),
user_segments: Vec::new(), user_segments: Vec::new(),
}; };
@ -1653,6 +1861,7 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
prompts: common.prompts, prompts: common.prompts,
inject_resident_knowledge: true, inject_resident_knowledge: true,
extract_in_flight: Arc::new(AtomicBool::new(false)), extract_in_flight: Arc::new(AtomicBool::new(false)),
consolidation_in_flight: Arc::new(AtomicBool::new(false)),
extract_pointer: Mutex::new(None), extract_pointer: Mutex::new(None),
user_segments: Vec::new(), user_segments: Vec::new(),
}; };
@ -1762,6 +1971,7 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
prompts: common.prompts, prompts: common.prompts,
inject_resident_knowledge: true, inject_resident_knowledge: true,
extract_in_flight: Arc::new(AtomicBool::new(false)), extract_in_flight: Arc::new(AtomicBool::new(false)),
consolidation_in_flight: Arc::new(AtomicBool::new(false)),
extract_pointer: Mutex::new(extract_pointer), extract_pointer: Mutex::new(extract_pointer),
user_segments: state.user_segments, user_segments: state.user_segments,
}; };
@ -1963,6 +2173,9 @@ pub enum PodError {
#[error("memory Phase 1 staging write failed: {0}")] #[error("memory Phase 1 staging write failed: {0}")]
ExtractStaging(#[source] memory::extract::StagingError), ExtractStaging(#[source] memory::extract::StagingError),
#[error("memory Phase 2 lock acquisition failed: {0}")]
ConsolidationLock(#[source] memory::consolidate::LockError),
#[error("session {session_id} has no entries to restore")] #[error("session {session_id} has no entries to restore")]
SessionEmpty { session_id: SessionId }, SessionEmpty { session_id: SessionId },
} }

View File

@ -0,0 +1,277 @@
//! Phase 2 (memory.consolidation) post-run trigger.
//!
//! Covers the gating, lock and cleanup behaviour without exercising the
//! full sub-worker tool loop:
//!
//! - no `[memory]` section → no-op
//! - `[memory]` present but no thresholds → no-op
//! - staging empty → skip
//! - staging below thresholds → skip + lock not acquired
//! - staging above threshold → sub-worker runs, consumed entries removed
//! - existing live lock → skip without error
//!
//! The sub-worker is fed a no-op LLM response (plain text) so it returns
//! immediately. The post-run path then exercises lock acquisition,
//! cleanup, and the empty-payload fast path.
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use async_trait::async_trait;
use futures::Stream;
use llm_worker::Worker;
use llm_worker::llm_client::event::{Event as LlmEvent, ResponseStatus, StatusEvent};
use llm_worker::llm_client::{ClientError, LlmClient, Request};
use memory::WorkspaceLayout;
use memory::extract::{ExtractedPayload, write_staging};
use memory::schema::SourceRef;
use session_store::FsStore;
use pod::Pod;
#[derive(Clone)]
struct MockClient {
responses: Arc<Vec<Vec<LlmEvent>>>,
call_count: Arc<AtomicUsize>,
}
impl MockClient {
fn new(responses: Vec<Vec<LlmEvent>>) -> Self {
Self {
responses: Arc::new(responses),
call_count: Arc::new(AtomicUsize::new(0)),
}
}
}
#[async_trait]
impl LlmClient for MockClient {
fn clone_boxed(&self) -> Box<dyn LlmClient> {
Box::new(self.clone())
}
async fn stream(
&self,
_request: Request,
) -> Result<Pin<Box<dyn Stream<Item = Result<LlmEvent, ClientError>> + Send>>, ClientError>
{
let count = self.call_count.fetch_add(1, Ordering::SeqCst);
if count >= self.responses.len() {
return Err(ClientError::Config("mock client exhausted".into()));
}
let events = self.responses[count].clone();
let stream = futures::stream::iter(events.into_iter().map(Ok));
Ok(Box::pin(stream))
}
}
fn done(text: &str) -> Vec<LlmEvent> {
vec![
LlmEvent::text_block_start(0),
LlmEvent::text_delta(0, text),
LlmEvent::text_block_stop(0, None),
LlmEvent::Status(StatusEvent {
status: ResponseStatus::Completed,
}),
]
}
const NO_MEMORY_TOML: &str = r#"
[pod]
name = "test-pod"
[model]
scheme = "anthropic"
model_id = "test-model"
[worker]
max_tokens = 100
[[scope.allow]]
target = "./"
permission = "write"
"#;
const MEMORY_NO_THRESHOLDS_TOML: &str = r#"
[pod]
name = "test-pod"
[model]
scheme = "anthropic"
model_id = "test-model"
[worker]
max_tokens = 100
[memory]
[[scope.allow]]
target = "./"
permission = "write"
"#;
const FILES_THRESHOLD_TOML: &str = r#"
[pod]
name = "test-pod"
[model]
scheme = "anthropic"
model_id = "test-model"
[worker]
max_tokens = 100
[memory]
consolidation_threshold_files = 2
[[scope.allow]]
target = "./"
permission = "write"
"#;
async fn make_pod_with(
manifest_toml: &str,
pwd: std::path::PathBuf,
client: MockClient,
) -> Pod<MockClient, FsStore> {
let manifest = pod::PodManifest::from_toml(manifest_toml).unwrap();
let store_tmp = tempfile::tempdir().unwrap();
let store = FsStore::new(store_tmp.path()).await.unwrap();
std::mem::forget(store_tmp);
let scope = pod::Scope::writable(&pwd).unwrap();
let worker = Worker::new(client);
Pod::new(manifest, worker, store, pwd, scope).await.unwrap()
}
fn write_n_staging(layout: &WorkspaceLayout, n: usize) -> Vec<uuid::Uuid> {
let mut ids = Vec::new();
for i in 0..n {
let (id, _) = write_staging(
layout,
SourceRef {
session_id: format!("s-{i}"),
range: [i as u64, i as u64],
},
ExtractedPayload::default(),
)
.unwrap();
ids.push(id);
}
ids
}
#[tokio::test]
async fn no_memory_section_is_a_noop() {
let pwd = tempfile::tempdir().unwrap();
let client = MockClient::new(vec![]);
let mut pod = make_pod_with(NO_MEMORY_TOML, pwd.path().to_path_buf(), client).await;
pod.try_post_run_consolidate()
.await
.expect("missing memory section must skip cleanly");
}
#[tokio::test]
async fn no_thresholds_is_a_noop() {
let pwd = tempfile::tempdir().unwrap();
let layout = WorkspaceLayout::new(pwd.path().to_path_buf());
write_n_staging(&layout, 5);
let client = MockClient::new(vec![]);
let mut pod = make_pod_with(MEMORY_NO_THRESHOLDS_TOML, pwd.path().to_path_buf(), client).await;
pod.try_post_run_consolidate()
.await
.expect("phase 2 disabled when both thresholds are None");
// No staging entries removed.
assert_eq!(
memory::consolidate::list_staging_entries(&layout).len(),
5
);
}
#[tokio::test]
async fn empty_staging_skips() {
let pwd = tempfile::tempdir().unwrap();
let client = MockClient::new(vec![]);
let mut pod = make_pod_with(FILES_THRESHOLD_TOML, pwd.path().to_path_buf(), client).await;
pod.try_post_run_consolidate().await.unwrap();
// No mock calls expected.
}
#[tokio::test]
async fn below_threshold_skips_and_does_not_take_lock() {
let pwd = tempfile::tempdir().unwrap();
let layout = WorkspaceLayout::new(pwd.path().to_path_buf());
write_n_staging(&layout, 1); // threshold is 2
let client = MockClient::new(vec![]);
let mut pod = make_pod_with(FILES_THRESHOLD_TOML, pwd.path().to_path_buf(), client).await;
pod.try_post_run_consolidate().await.unwrap();
// Staging untouched.
assert_eq!(
memory::consolidate::list_staging_entries(&layout).len(),
1
);
// Lock file must not exist.
let lock_path = layout.staging_dir().join(".consolidation.lock");
assert!(!lock_path.exists(), "lock file should not be created");
}
#[tokio::test]
async fn fires_on_threshold_and_cleans_up_consumed_entries() {
let pwd = tempfile::tempdir().unwrap();
let layout = WorkspaceLayout::new(pwd.path().to_path_buf());
write_n_staging(&layout, 2); // threshold is 2 — fires.
// Sub-worker is given a single text-only response. The Phase 2 prompt
// tells it to call memory tools; the mock skips those, but `Worker::run`
// returns Ok regardless once the LLM closes with a final text.
let client = MockClient::new(vec![done("ok")]);
let mut pod = make_pod_with(FILES_THRESHOLD_TOML, pwd.path().to_path_buf(), client).await;
pod.try_post_run_consolidate().await.unwrap();
// Consumed entries removed.
assert!(
memory::consolidate::list_staging_entries(&layout).is_empty(),
"consumed staging entries must be cleaned up"
);
// Lock removed too.
let lock_path = layout.staging_dir().join(".consolidation.lock");
assert!(
!lock_path.exists(),
"lock file must be removed on success"
);
}
#[tokio::test]
async fn live_lock_held_by_other_pod_skips() {
let pwd = tempfile::tempdir().unwrap();
let layout = WorkspaceLayout::new(pwd.path().to_path_buf());
write_n_staging(&layout, 3);
// Pre-acquire lock with this test's PID — definitely alive — and
// *don't* release it. The Phase 2 path must skip without error.
let _live_lock = memory::consolidate::StagingLock::acquire(
&layout,
std::process::id(),
"other-pod",
Vec::new(),
)
.unwrap();
let client = MockClient::new(vec![]);
let mut pod = make_pod_with(FILES_THRESHOLD_TOML, pwd.path().to_path_buf(), client).await;
pod.try_post_run_consolidate()
.await
.expect("InUse lock must surface as graceful skip");
// Staging untouched: lock holder owns the snapshot, not us.
assert_eq!(
memory::consolidate::list_staging_entries(&layout).len(),
3
);
}

View File

@ -0,0 +1,73 @@
# 内部 Worker / 内部 Pod の Workflow 化
## 背景
INSOMNIA が内部で固定 prompt を持って disposable Worker / 専用 Pod を立ち上げている経路がいくつかある:
- Phase 1 活動抽出(`crates/memory/src/extract/prompt.rs::EXTRACT_SYSTEM_PROMPT`
- Phase 2 統合 + 整理(`tickets/memory-phase2-consolidation.md`、本チケット時点では実装中 / 直前)
- Compact`PromptCatalog::compact_system`
これらは実装内 `&str` 定数や `PromptCatalog` の overlay で管理されており、prompt の調整や運用カスタマイズが「コード変更 + 再ビルド」を要する。一方、ユーザー向け `/<slug>` Workflow`tickets/workflow.md`)は `<workspace_root>/.insomnia/memory/workflow/<slug>.md` に住み、frontmatter + Markdown 本文 + `requires` Knowledge inject を持つ宣言形式で運用できる。
両者を寄せ、内部 Worker / 内部 Pod の prompt + ツール surface + Knowledge 依存を **Workflow と同一仕様で記述** できる経路を用意する。これにより:
- 内部 prompt の運用調整が workspace 側でできる(コード変更不要)
- Phase 2 の prompt 案 (`docs/plan/memory-prompts.md`) を workspace に直接 ingest できる
- 将来 consolidation を独立 Pod に引き上げる際も、Workflow を submit する形に揃えられる
## 要件
### Workflow の役割拡張
`tickets/workflow.md` の Workflow 仕様は「ユーザーが `/<slug>` で submit する制約付き作業」だが、本チケットでは **内部トリガーPod 内部の状態遷移)から呼び出される Workflow** を一級扱いに広げる。
- 同じファイル形式(`memory/workflow/<slug>.md`)、同じ frontmatter / Linter
- `user_invocable: false``/<slug>` 経路から見えなくする
- `auto_invoke` は通常 Pod 用の system prompt 注入仕様のまま(内部 Workflow は通常 OFF
- 内部 Workflow を識別するキー(例: `internal_role`)と、必要なツール surface を表明する手段を frontmatter に追加する。具体 schema は実装で詰める
### 内部呼び出し経路
Pod 側の既存トリガーPhase 1 post-run / Phase 2 staging 閾値 / Compact 閾値 等)は固定 `&str` の代わりに Workflow loader 経由で:
1. 内部識別キーで該当 Workflow を解決(衝突時は workspace 上書き優先、なければ insomnia bundled default
2. `requires` Knowledge を本文の前に inject
3. Workflow 本文を sub-Worker / sub-Pod の prompt として渡すsystem prompt 扱いか初回 submit 扱いかは内部用途で固定し、role ごとに揃える)
4. 既存のツール登録ロジックは Workflow が表明したツール surface に従う
### Bundled defaults
ユーザー workspace に該当 Workflow が無い場合に備え、insomnia 同梱の default Workflow を読む層を `PromptCatalog` の overlay と整合する形で持つ。
- 既存の Pod prompt 4 層 overlaybuiltin / user / workspace / pod-packと同じ優先順
- bundled default の物理配置は実装で決める
### 関連チケットとの順序
- `tickets/workflow.md`(ユーザー向け Workflow 本体)が先行する。本チケットはその仕様を前提に「内部呼び出し経路」を追加する側
- `tickets/memory-phase2-consolidation.md` は当面 `&str` 定数で実装してよい。本チケット完了時に Workflow 化に乗り換える
- Phase 1 / Compact も同様に role ごとに段階移行
## 範囲外
- Workflow 仕様自体の本体実装(`tickets/workflow.md`
- 内部 Workflow の自動生成consolidation の offer 等。`docs/plan/memory.md` §Offer 経路 / 将来検討)
- 既存 `&str` 定数の物理削除タイミング(移行が完了した role ごとに削除する運用)
- `auto_invoke` 注入予算の最適化(既存 Knowledge 常駐注入予算と合算する規約は `docs/plan/memory.md` 側)
## 完了条件
- 各内部 Worker / 内部 Pod少なくとも Phase 1 / Phase 2 / Compact のうち、本チケット着手時点で実装済みのもの)が内部識別キー付き Workflow を解決して prompt とツール surface を組み立てる
- workspace で `memory/workflow/<slug>.md` を上書きすれば内部 Worker の prompt が変わる
- workspace に該当 Workflow が無い場合、bundled default が使われる
- `user_invocable: false` の内部 Workflow は `/<slug>` 候補から除外され、ユーザーからは呼べない
- 内部 Workflow も consolidation の自動書き込み禁止対象のままLinter で構造的担保、`workflow.md` と整合)
- 単体テストで bundled default / workspace overlay / ツール surface 表明 + 解決 + 適用がカバーされる
## 参照
- 前提: `tickets/workflow.md`
- 最初の利用者: `tickets/memory-phase2-consolidation.md`
- 関連: `tickets/agent-skills.md`(外部 SKILL ingest 経路。本チケットの内部呼び出し経路とは別軸)
- 設計: `docs/plan/workflow.md`、`docs/plan/memory.md`、`docs/plan/memory-prompts.md`