メモリPhase2の実装
This commit is contained in:
parent
ca27d88869
commit
31eeded4a6
6
Cargo.lock
generated
6
Cargo.lock
generated
|
|
@ -1571,9 +1571,9 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2"
|
|||
|
||||
[[package]]
|
||||
name = "libc"
|
||||
version = "0.2.185"
|
||||
version = "0.2.186"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "52ff2c0fe9bc6cb6b14a0592c2ff4fa9ceb83eea9db979b0487cd054946a2b8f"
|
||||
checksum = "68ab91017fe16c622486840e4c83c9a37afeff978bd239b5293d61ece587de66"
|
||||
|
||||
[[package]]
|
||||
name = "libredox"
|
||||
|
|
@ -1757,6 +1757,7 @@ version = "0.1.0"
|
|||
dependencies = [
|
||||
"async-trait",
|
||||
"chrono",
|
||||
"libc",
|
||||
"llm-worker",
|
||||
"manifest",
|
||||
"schemars",
|
||||
|
|
@ -2140,6 +2141,7 @@ dependencies = [
|
|||
"toml",
|
||||
"tools",
|
||||
"tracing",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
|
|
|||
1
TODO.md
1
TODO.md
|
|
@ -1,5 +1,6 @@
|
|||
- [ ] Workflow / Skills
|
||||
- [ ] Workflow 実装 → [tickets/workflow.md](tickets/workflow.md)
|
||||
- [ ] 内部 Worker / 内部 Pod の Workflow 化 → [tickets/internal-worker-workflow.md](tickets/internal-worker-workflow.md)
|
||||
- [ ] Agent Skills を Workflow として ingest → [tickets/agent-skills.md](tickets/agent-skills.md)
|
||||
- [ ] ツール設計
|
||||
- [ ] Bash ツール (Permission 層と統合) → [tickets/bash-tool.md](tickets/bash-tool.md)
|
||||
|
|
|
|||
|
|
@ -217,6 +217,16 @@ impl MemoryConfig {
|
|||
extract_worker_max_input_tokens: upper
|
||||
.extract_worker_max_input_tokens
|
||||
.or(self.extract_worker_max_input_tokens),
|
||||
consolidation_model: upper.consolidation_model.or(self.consolidation_model),
|
||||
consolidation_worker_max_input_tokens: upper
|
||||
.consolidation_worker_max_input_tokens
|
||||
.or(self.consolidation_worker_max_input_tokens),
|
||||
consolidation_threshold_files: upper
|
||||
.consolidation_threshold_files
|
||||
.or(self.consolidation_threshold_files),
|
||||
consolidation_threshold_bytes: upper
|
||||
.consolidation_threshold_bytes
|
||||
.or(self.consolidation_threshold_bytes),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -50,3 +50,8 @@ pub const COMPACT_DEFAULT_REFERENCE_COUNT: usize = 5;
|
|||
/// own LLM calls. Exceeding this aborts the extract run.
|
||||
/// See [`crate::MemoryConfig::extract_worker_max_input_tokens`].
|
||||
pub const MEMORY_EXTRACT_WORKER_MAX_INPUT_TOKENS: u64 = 30_000;
|
||||
|
||||
/// Cumulative input-token cap for the memory Phase 2 (consolidation)
|
||||
/// worker's own LLM calls. Exceeding this aborts the consolidation run.
|
||||
/// See [`crate::MemoryConfig::consolidation_worker_max_input_tokens`].
|
||||
pub const MEMORY_CONSOLIDATION_WORKER_MAX_INPUT_TOKENS: u64 = 80_000;
|
||||
|
|
|
|||
|
|
@ -85,6 +85,27 @@ pub struct MemoryConfig {
|
|||
/// [`defaults::MEMORY_EXTRACT_WORKER_MAX_INPUT_TOKENS`].
|
||||
#[serde(default)]
|
||||
pub extract_worker_max_input_tokens: Option<u64>,
|
||||
/// Optional model for the Phase 2 (consolidation) worker. When
|
||||
/// `None`, the main pod model is cloned via `clone_boxed()`.
|
||||
/// Reasoning-class models are recommended.
|
||||
#[serde(default)]
|
||||
pub consolidation_model: Option<ModelManifest>,
|
||||
/// Cumulative input-token cap for the consolidation worker's own
|
||||
/// LLM calls. Exceeding this aborts the consolidation run. `None` ⇒
|
||||
/// [`defaults::MEMORY_CONSOLIDATION_WORKER_MAX_INPUT_TOKENS`].
|
||||
#[serde(default)]
|
||||
pub consolidation_worker_max_input_tokens: Option<u64>,
|
||||
/// Phase 2 trigger: file-count threshold of `_staging/`. Phase 2
|
||||
/// fires when the staging directory has at least this many entries.
|
||||
/// Either threshold reaching its limit fires Phase 2 (logical OR).
|
||||
/// `None` for both thresholds ⇒ Phase 2 disabled.
|
||||
#[serde(default)]
|
||||
pub consolidation_threshold_files: Option<usize>,
|
||||
/// Phase 2 trigger: byte-size threshold across all `_staging/`
|
||||
/// entries. Either threshold reaching its limit fires Phase 2.
|
||||
/// `None` for both thresholds ⇒ Phase 2 disabled.
|
||||
#[serde(default)]
|
||||
pub consolidation_threshold_bytes: Option<u64>,
|
||||
}
|
||||
|
||||
/// Pod metadata.
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ license.workspace = true
|
|||
[dependencies]
|
||||
async-trait = "0.1.89"
|
||||
chrono = { version = "0.4.44", features = ["serde"] }
|
||||
libc = "0.2.186"
|
||||
llm-worker = { version = "0.2.1", path = "../llm-worker" }
|
||||
manifest = { version = "0.1.0", path = "../manifest" }
|
||||
schemars = "1.2.1"
|
||||
|
|
|
|||
324
crates/memory/src/consolidate/input.rs
Normal file
324
crates/memory/src/consolidate/input.rs
Normal file
|
|
@ -0,0 +1,324 @@
|
|||
//! Phase 2 sub-Worker への最初のユーザー入力を組み立てる。
|
||||
//!
|
||||
//! Phase 1 (`extract::build_extract_input`) と同じ方針で、固定 schema の
|
||||
//! markdown セクション列にしてサブWorker に渡す。`docs/plan/memory.md`
|
||||
//! §Phase 2 入力 / §整理材料 の項目に従い:
|
||||
//!
|
||||
//! 1. consumed staging エントリ全文(`source` 込み)
|
||||
//! 2. 既存 `memory/*` 全文(summary / decisions / requests)
|
||||
//! 3. Knowledge 化候補レポート(メトリクス未完なら空)
|
||||
//! 4. 整理材料(Linter Warn ベース、メトリクス未完なら明示 invoke 頻度なし)
|
||||
//!
|
||||
//! 既存 `knowledge/*` 本文は埋めず、agent に `KnowledgeQuery` 経由で引かせる
|
||||
//! 設計(`docs/plan/memory.md` §retrieval 経路 / §Phase 2 の Knowledge アクセス)。
|
||||
|
||||
use std::fmt::Write;
|
||||
|
||||
use crate::consolidate::staging::StagingEntry;
|
||||
use crate::consolidate::tidy::TidyHints;
|
||||
use crate::workspace::{RecordKind, WorkspaceLayout};
|
||||
|
||||
/// Knowledge 化候補レポート。`tickets/memory-usage-metrics.md` の成果物が
|
||||
/// 出るまでは空で渡す前提(`docs/plan/memory.md` §Knowledge 化候補レポート)。
|
||||
/// 空入力時、統合 phase は新規 Knowledge を作らず decisions / requests /
|
||||
/// summary / 既存 Knowledge update に留まる。
|
||||
#[derive(Debug, Default, Clone)]
|
||||
pub struct KnowledgeCandidateReport {
|
||||
/// 候補に上がった `(kind, slug, frequency_per_mtoken)` の三つ組。
|
||||
/// 空配列を渡すと「候補なし」を意味する。
|
||||
pub entries: Vec<KnowledgeCandidateEntry>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct KnowledgeCandidateEntry {
|
||||
pub source_kind: &'static str,
|
||||
pub source_slug: String,
|
||||
pub frequency_per_mtoken: f64,
|
||||
}
|
||||
|
||||
impl KnowledgeCandidateReport {
|
||||
pub fn empty() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.entries.is_empty()
|
||||
}
|
||||
}
|
||||
|
||||
/// Phase 2 sub-Worker の最初の user 入力。
|
||||
pub fn build_consolidate_input(
|
||||
layout: &WorkspaceLayout,
|
||||
staging: &[StagingEntry],
|
||||
tidy: &TidyHints,
|
||||
candidates: &KnowledgeCandidateReport,
|
||||
) -> String {
|
||||
let mut out = String::new();
|
||||
out.push_str(
|
||||
"Phase 2 consolidation input. Run the consolidation phase first \
|
||||
(fold the staging activity logs into memory and knowledge), then the \
|
||||
tidy phase (clean up existing records). Use the memory tools for \
|
||||
every write — direct file writes are denied by the pod scope.\n\n",
|
||||
);
|
||||
|
||||
out.push_str("## Staging entries (consumed by this run)\n\n");
|
||||
out.push_str(&render_staging_records(staging));
|
||||
out.push('\n');
|
||||
|
||||
out.push_str("## Existing memory records (full content)\n\n");
|
||||
out.push_str(&render_existing_memory_records(layout));
|
||||
out.push('\n');
|
||||
|
||||
out.push_str("## Knowledge candidate report\n\n");
|
||||
out.push_str(&render_candidate_report(candidates));
|
||||
out.push('\n');
|
||||
|
||||
out.push_str("## Tidy hints\n\n");
|
||||
out.push_str(&render_tidy_hints(tidy));
|
||||
out.push('\n');
|
||||
|
||||
out.push_str(
|
||||
"When done, end the turn with a short final assistant message describing \
|
||||
what changed.",
|
||||
);
|
||||
out
|
||||
}
|
||||
|
||||
/// Staging エントリ群を「`### <id>` ヘッダ + 整形 JSON ブロック」で並べる。
|
||||
/// 空配列なら「(none)」と書く。
|
||||
pub fn render_staging_records(entries: &[StagingEntry]) -> String {
|
||||
if entries.is_empty() {
|
||||
return "(none)\n".to_string();
|
||||
}
|
||||
let mut out = String::new();
|
||||
for entry in entries {
|
||||
let _ = writeln!(&mut out, "### {}", entry.id);
|
||||
let json = serde_json::to_string_pretty(&entry.record).unwrap_or_else(|_| "{}".into());
|
||||
out.push_str("```json\n");
|
||||
out.push_str(&json);
|
||||
out.push_str("\n```\n\n");
|
||||
}
|
||||
out
|
||||
}
|
||||
|
||||
/// `<workspace>/.insomnia/memory/{summary.md,decisions/*,requests/*}` を
|
||||
/// 「`### <kind>:<slug>` ヘッダ + raw markdown ブロック」で全文渡す。
|
||||
pub fn render_existing_memory_records(layout: &WorkspaceLayout) -> String {
|
||||
let mut out = String::new();
|
||||
|
||||
let summary = layout.summary_path();
|
||||
if let Ok(content) = std::fs::read_to_string(&summary) {
|
||||
out.push_str("### summary\n");
|
||||
out.push_str("```markdown\n");
|
||||
out.push_str(content.trim_end_matches('\n'));
|
||||
out.push_str("\n```\n\n");
|
||||
}
|
||||
|
||||
push_kind_records(&mut out, layout, RecordKind::Decision);
|
||||
push_kind_records(&mut out, layout, RecordKind::Request);
|
||||
|
||||
if out.is_empty() {
|
||||
return "(none)\n".to_string();
|
||||
}
|
||||
out
|
||||
}
|
||||
|
||||
fn push_kind_records(out: &mut String, layout: &WorkspaceLayout, kind: RecordKind) {
|
||||
let dir = match kind {
|
||||
RecordKind::Decision => layout.decisions_dir(),
|
||||
RecordKind::Request => layout.requests_dir(),
|
||||
RecordKind::Knowledge | RecordKind::Summary | RecordKind::Workflow => return,
|
||||
};
|
||||
let entries = match std::fs::read_dir(&dir) {
|
||||
Ok(it) => it,
|
||||
Err(_) => return,
|
||||
};
|
||||
let mut paths: Vec<(String, std::path::PathBuf)> = Vec::new();
|
||||
for entry in entries.flatten() {
|
||||
let path = entry.path();
|
||||
if !path.is_file() {
|
||||
continue;
|
||||
}
|
||||
let stem = match path.file_stem().and_then(|s| s.to_str()) {
|
||||
Some(s) => s,
|
||||
None => continue,
|
||||
};
|
||||
if path.extension().and_then(|s| s.to_str()) != Some("md") {
|
||||
continue;
|
||||
}
|
||||
paths.push((stem.to_string(), path));
|
||||
}
|
||||
paths.sort();
|
||||
for (slug, path) in paths {
|
||||
let Ok(content) = std::fs::read_to_string(&path) else {
|
||||
continue;
|
||||
};
|
||||
let _ = writeln!(out, "### {}:{}", kind.as_str(), slug);
|
||||
out.push_str("```markdown\n");
|
||||
out.push_str(content.trim_end_matches('\n'));
|
||||
out.push_str("\n```\n\n");
|
||||
}
|
||||
}
|
||||
|
||||
fn render_candidate_report(report: &KnowledgeCandidateReport) -> String {
|
||||
if report.is_empty() {
|
||||
return "(empty — usage metrics pipeline not populated. \
|
||||
Do not create new Knowledge records this run.)\n"
|
||||
.to_string();
|
||||
}
|
||||
let mut out = String::new();
|
||||
for c in &report.entries {
|
||||
let _ = writeln!(
|
||||
&mut out,
|
||||
"- {} `{}` — frequency {:.3} invokes/Mtoken",
|
||||
c.source_kind, c.source_slug, c.frequency_per_mtoken
|
||||
);
|
||||
}
|
||||
out
|
||||
}
|
||||
|
||||
/// Tidy hints の Markdown 描画。空ヒントなら "(none)" 1 行。
|
||||
pub fn render_tidy_hints(tidy: &TidyHints) -> String {
|
||||
if tidy.is_empty() {
|
||||
return "(none)\n".to_string();
|
||||
}
|
||||
let mut out = String::new();
|
||||
|
||||
if !tidy.replaced_decisions.is_empty() {
|
||||
out.push_str("**Replaced decisions still on disk** — collapse if the chain has settled:\n");
|
||||
for (slug, replaced_by) in &tidy.replaced_decisions {
|
||||
match replaced_by {
|
||||
Some(target) => {
|
||||
let _ = writeln!(&mut out, "- `{slug}` → `{target}`");
|
||||
}
|
||||
None => {
|
||||
let _ = writeln!(&mut out, "- `{slug}` (no `replaced_by` set)");
|
||||
}
|
||||
}
|
||||
}
|
||||
out.push('\n');
|
||||
}
|
||||
|
||||
if !tidy.sources_overflow.is_empty() {
|
||||
out.push_str(
|
||||
"**Sources overflow** — consider trimming to the most recent entries (git log keeps the rest):\n",
|
||||
);
|
||||
for s in &tidy.sources_overflow {
|
||||
let _ = writeln!(&mut out, "- {} `{}` ({} sources)", s.kind.as_str(), s.slug, s.count);
|
||||
}
|
||||
out.push('\n');
|
||||
}
|
||||
|
||||
if !tidy.similar_slug_clusters.is_empty() {
|
||||
out.push_str("**Similar slug clusters** — evaluate for merge / rename:\n");
|
||||
for c in &tidy.similar_slug_clusters {
|
||||
let joined = c
|
||||
.slugs
|
||||
.iter()
|
||||
.map(|s| format!("`{s}`"))
|
||||
.collect::<Vec<_>>()
|
||||
.join(", ");
|
||||
let _ = writeln!(&mut out, "- {}: {}", c.kind.as_str(), joined);
|
||||
}
|
||||
out.push('\n');
|
||||
}
|
||||
|
||||
out.push_str(
|
||||
"Explicit-invoke metrics (protection threshold) are not yet wired up; \
|
||||
skip drop on long-standing records when uncertain.\n",
|
||||
);
|
||||
out
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::consolidate::tidy::{SimilarSlugCluster, SourcesOverflow};
|
||||
use crate::extract::{ExtractedPayload, write_staging};
|
||||
use crate::schema::SourceRef;
|
||||
use chrono::Utc;
|
||||
use std::path::Path;
|
||||
|
||||
fn now() -> String {
|
||||
Utc::now().to_rfc3339()
|
||||
}
|
||||
|
||||
fn write(p: &Path, content: &str) {
|
||||
if let Some(parent) = p.parent() {
|
||||
std::fs::create_dir_all(parent).unwrap();
|
||||
}
|
||||
std::fs::write(p, content).unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn build_includes_all_sections_when_populated() {
|
||||
let dir = tempfile::TempDir::new().unwrap();
|
||||
let layout = WorkspaceLayout::new(dir.path().to_path_buf());
|
||||
|
||||
write(
|
||||
&dir.path().join(".insomnia/memory/summary.md"),
|
||||
&format!("---\nupdated_at: {n}\n---\nstate of the world\n", n = now()),
|
||||
);
|
||||
write(
|
||||
&dir.path().join(".insomnia/memory/decisions/dec.md"),
|
||||
&format!(
|
||||
"---\ncreated_at: {n}\nupdated_at: {n}\nsources: []\nstatus: open\n---\nbody\n",
|
||||
n = now()
|
||||
),
|
||||
);
|
||||
let (_id, _) = write_staging(
|
||||
&layout,
|
||||
SourceRef {
|
||||
session_id: "s".into(),
|
||||
range: [0, 1],
|
||||
},
|
||||
ExtractedPayload::default(),
|
||||
)
|
||||
.unwrap();
|
||||
let staging = crate::consolidate::staging::list_staging_entries(&layout);
|
||||
let tidy = TidyHints {
|
||||
replaced_decisions: [(
|
||||
"old".to_string(),
|
||||
Some("new".to_string()),
|
||||
)]
|
||||
.into_iter()
|
||||
.collect(),
|
||||
sources_overflow: vec![SourcesOverflow {
|
||||
kind: RecordKind::Decision,
|
||||
slug: "dec".into(),
|
||||
count: 12,
|
||||
}],
|
||||
similar_slug_clusters: vec![SimilarSlugCluster {
|
||||
kind: RecordKind::Decision,
|
||||
slugs: vec!["a".into(), "ab".into()],
|
||||
}],
|
||||
};
|
||||
let report = KnowledgeCandidateReport::empty();
|
||||
|
||||
let out = build_consolidate_input(&layout, &staging, &tidy, &report);
|
||||
assert!(out.contains("Staging entries"));
|
||||
assert!(out.contains("Existing memory records"));
|
||||
assert!(out.contains("Knowledge candidate report"));
|
||||
assert!(out.contains("Tidy hints"));
|
||||
assert!(out.contains("state of the world"));
|
||||
assert!(out.contains("decision:dec"));
|
||||
assert!(out.contains("Replaced decisions"));
|
||||
assert!(out.contains("Sources overflow"));
|
||||
assert!(out.contains("Similar slug clusters"));
|
||||
assert!(out.contains("usage metrics pipeline not populated"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn empty_inputs_render_placeholders() {
|
||||
let dir = tempfile::TempDir::new().unwrap();
|
||||
let layout = WorkspaceLayout::new(dir.path().to_path_buf());
|
||||
let out = build_consolidate_input(
|
||||
&layout,
|
||||
&[],
|
||||
&TidyHints::default(),
|
||||
&KnowledgeCandidateReport::empty(),
|
||||
);
|
||||
// Both staging and tidy show "(none)"; existing memory records too.
|
||||
assert!(out.contains("Staging entries"));
|
||||
assert!(out.contains("(none)"));
|
||||
}
|
||||
}
|
||||
305
crates/memory/src/consolidate/lock.rs
Normal file
305
crates/memory/src/consolidate/lock.rs
Normal file
|
|
@ -0,0 +1,305 @@
|
|||
//! `_staging/.consolidation.lock` による Phase 2 占有ファイル。
|
||||
//!
|
||||
//! `docs/plan/memory.md` §並走防止 に従い:
|
||||
//!
|
||||
//! - ファイルが存在し、記録された Pod が動作している間、その Pod が排他占有
|
||||
//! - クラッシュで残った stale lock は、所有者 PID が死んでいれば次回 spawn
|
||||
//! 時に上書き取得できる
|
||||
//! - cleanup は consumed ID の staging エントリのみ削除し、実行中に Phase 1
|
||||
//! が追加した分は残す
|
||||
//!
|
||||
//! 占有判定は Linux/macOS の `kill(pid, 0)` 経由で行う(`ESRCH` で死亡判定)。
|
||||
//! Windows は対象外: INSOMNIA は POSIX 環境を前提にしている。
|
||||
|
||||
use std::fs;
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::workspace::WorkspaceLayout;
|
||||
|
||||
const LOCK_FILE: &str = ".consolidation.lock";
|
||||
|
||||
/// 占有ファイルの中身。`pid` で stale 判定し、`pod_name` / `started_at` /
|
||||
/// `consumed_ids` は診断とクラッシュ復旧時の参照に使う。
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct LockRecord {
|
||||
pub pid: u32,
|
||||
pub pod_name: String,
|
||||
pub started_at: DateTime<Utc>,
|
||||
/// この Phase 2 run が起動時スナップショットで確定した consumed staging
|
||||
/// entry の UUIDv7 列。完了時はこの列のみ削除し、追加分は残す。
|
||||
pub consumed_ids: Vec<Uuid>,
|
||||
}
|
||||
|
||||
/// 占有取得 / 解放のエラー。
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum LockError {
|
||||
/// 占有ファイルが既にあり、所有者 PID が生きているのでスキップ。
|
||||
#[error("Phase 2 lock held by live pid {pid} (pod {pod_name:?})")]
|
||||
InUse { pid: u32, pod_name: String },
|
||||
#[error("io error at {}: {source}", .path.display())]
|
||||
Io {
|
||||
path: PathBuf,
|
||||
#[source]
|
||||
source: std::io::Error,
|
||||
},
|
||||
#[error("failed to (de)serialize lock record: {0}")]
|
||||
Serde(#[from] serde_json::Error),
|
||||
}
|
||||
|
||||
impl LockError {
|
||||
fn io(path: impl Into<PathBuf>, source: std::io::Error) -> Self {
|
||||
Self::Io {
|
||||
path: path.into(),
|
||||
source,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Phase 2 が走っている間 RAII で持つ占有ハンドル。`Drop` では何もしない —
|
||||
/// 完了時の cleanup は consumed ID 列削除と一緒に行う必要があるため、明示
|
||||
/// 解放 [`StagingLock::release_with_cleanup`] を使う。明示解放しないまま
|
||||
/// drop された場合は占有ファイルがそのまま残り、次回 spawn 時に PID が
|
||||
/// 死んでいれば stale 上書きされる。
|
||||
#[derive(Debug)]
|
||||
pub struct StagingLock {
|
||||
path: PathBuf,
|
||||
record: LockRecord,
|
||||
}
|
||||
|
||||
impl StagingLock {
|
||||
pub fn record(&self) -> &LockRecord {
|
||||
&self.record
|
||||
}
|
||||
|
||||
pub fn path(&self) -> &Path {
|
||||
&self.path
|
||||
}
|
||||
|
||||
/// 占有取得を試みる。既に live な lock があれば
|
||||
/// [`LockError::InUse`]、stale 判定なら上書き取得する。
|
||||
/// staging dir が無ければ作成する。
|
||||
pub fn acquire(
|
||||
layout: &WorkspaceLayout,
|
||||
pid: u32,
|
||||
pod_name: impl Into<String>,
|
||||
consumed_ids: Vec<Uuid>,
|
||||
) -> Result<Self, LockError> {
|
||||
let staging_dir = layout.staging_dir();
|
||||
fs::create_dir_all(&staging_dir).map_err(|e| LockError::io(&staging_dir, e))?;
|
||||
let path = staging_dir.join(LOCK_FILE);
|
||||
|
||||
if path.exists() {
|
||||
let raw = fs::read_to_string(&path).map_err(|e| LockError::io(&path, e))?;
|
||||
// 壊れた lock は stale とみなして上書き許可。
|
||||
if let Ok(existing) = serde_json::from_str::<LockRecord>(&raw) {
|
||||
if pid_is_alive(existing.pid) {
|
||||
return Err(LockError::InUse {
|
||||
pid: existing.pid,
|
||||
pod_name: existing.pod_name,
|
||||
});
|
||||
}
|
||||
tracing::warn!(
|
||||
stale_pid = existing.pid,
|
||||
stale_pod = %existing.pod_name,
|
||||
"Phase 2 stale lock detected, taking over"
|
||||
);
|
||||
} else {
|
||||
tracing::warn!(path = %path.display(), "Phase 2 lock unparseable, treating as stale");
|
||||
}
|
||||
}
|
||||
|
||||
let record = LockRecord {
|
||||
pid,
|
||||
pod_name: pod_name.into(),
|
||||
started_at: Utc::now(),
|
||||
consumed_ids,
|
||||
};
|
||||
let json = serde_json::to_string_pretty(&record)?;
|
||||
fs::write(&path, json).map_err(|e| LockError::io(&path, e))?;
|
||||
Ok(Self { path, record })
|
||||
}
|
||||
|
||||
/// 占有を解放しつつ consumed ID 列の staging エントリを削除する。
|
||||
/// 削除対象が見当たらない場合は黙ってスキップ(既に外部で消えていた等)。
|
||||
/// 占有ファイル自体の削除も best-effort: 失敗時は warn を出すだけで
|
||||
/// エラーは伝播しない(次回 spawn 時に stale 判定で上書きされる)。
|
||||
pub fn release_with_cleanup(self, layout: &WorkspaceLayout) {
|
||||
let staging_dir = layout.staging_dir();
|
||||
for id in &self.record.consumed_ids {
|
||||
let target = staging_dir.join(format!("{id}.json"));
|
||||
match fs::remove_file(&target) {
|
||||
Ok(_) => {}
|
||||
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
|
||||
Err(e) => {
|
||||
tracing::warn!(
|
||||
path = %target.display(),
|
||||
error = %e,
|
||||
"failed to clean up consumed staging entry"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
self.unlink_lock_only();
|
||||
}
|
||||
|
||||
/// 占有ファイルだけ削除し、staging エントリには触らない。Phase 2
|
||||
/// sub-Worker が途中で失敗した場合に使う: 入力 staging を残したまま
|
||||
/// 次回再評価で再処理させる(`docs/plan/memory.md` §並走防止 の
|
||||
/// 「重複作成は同一 slug update に自然収束」運用)。
|
||||
pub fn release_only(self) {
|
||||
self.unlink_lock_only();
|
||||
}
|
||||
|
||||
fn unlink_lock_only(&self) {
|
||||
if let Err(e) = fs::remove_file(&self.path) {
|
||||
if e.kind() != std::io::ErrorKind::NotFound {
|
||||
tracing::warn!(
|
||||
path = %self.path.display(),
|
||||
error = %e,
|
||||
"failed to remove Phase 2 lock"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
fn pid_is_alive(pid: u32) -> bool {
|
||||
// `kill(0, 0)` and `kill(-1, 0)` are POSIX-special (process group / all
|
||||
// signalable processes) and would yield false positives. Reject pids
|
||||
// that don't fit a positive `pid_t` so a corrupted lock file with a
|
||||
// u32::MAX-ish value is treated as stale instead of magically alive.
|
||||
if pid == 0 || pid > i32::MAX as u32 {
|
||||
return false;
|
||||
}
|
||||
// SAFETY: `kill` with sig 0 only probes whether the target pid exists
|
||||
// and the caller has permission to signal it. No signal is delivered.
|
||||
let rc = unsafe { libc::kill(pid as i32, 0) };
|
||||
if rc == 0 {
|
||||
return true;
|
||||
}
|
||||
// EPERM means the process exists but we can't signal it — still alive
|
||||
// for our purposes. ESRCH means it's gone.
|
||||
let errno = std::io::Error::last_os_error()
|
||||
.raw_os_error()
|
||||
.unwrap_or(libc::EINVAL);
|
||||
errno != libc::ESRCH
|
||||
}
|
||||
|
||||
#[cfg(not(unix))]
|
||||
fn pid_is_alive(_pid: u32) -> bool {
|
||||
// Unsupported platforms: assume the lock is live so we never overwrite
|
||||
// someone else's claim. Phase 2 will skip and try again next post-run.
|
||||
true
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::extract::{ExtractedPayload, write_staging};
|
||||
use crate::schema::SourceRef;
|
||||
|
||||
fn make_layout() -> (tempfile::TempDir, WorkspaceLayout) {
|
||||
let dir = tempfile::TempDir::new().unwrap();
|
||||
let layout = WorkspaceLayout::new(dir.path().to_path_buf());
|
||||
std::fs::create_dir_all(layout.staging_dir()).unwrap();
|
||||
(dir, layout)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn acquire_writes_lock_file() {
|
||||
let (_dir, layout) = make_layout();
|
||||
let lock = StagingLock::acquire(&layout, std::process::id(), "pod", Vec::new()).unwrap();
|
||||
let path = layout.staging_dir().join(LOCK_FILE);
|
||||
assert!(path.exists());
|
||||
assert_eq!(lock.record().pid, std::process::id());
|
||||
assert_eq!(lock.record().pod_name, "pod");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn acquire_rejects_when_live_pid_holds_lock() {
|
||||
let (_dir, layout) = make_layout();
|
||||
// Use this test process's pid — it's definitely alive.
|
||||
let _first =
|
||||
StagingLock::acquire(&layout, std::process::id(), "pod-a", Vec::new()).unwrap();
|
||||
let err = StagingLock::acquire(&layout, std::process::id(), "pod-b", Vec::new())
|
||||
.expect_err("expected InUse");
|
||||
assert!(matches!(err, LockError::InUse { .. }));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn acquire_overwrites_stale_lock() {
|
||||
let (_dir, layout) = make_layout();
|
||||
// pid 1 is init on linux but for arbitrarily-large pids we'd need
|
||||
// `kill(pid, 0)` to return ESRCH. Use u32::MAX which is guaranteed
|
||||
// dead on every platform we target.
|
||||
let stale = LockRecord {
|
||||
pid: u32::MAX,
|
||||
pod_name: "ghost".into(),
|
||||
started_at: Utc::now(),
|
||||
consumed_ids: Vec::new(),
|
||||
};
|
||||
std::fs::write(
|
||||
layout.staging_dir().join(LOCK_FILE),
|
||||
serde_json::to_string_pretty(&stale).unwrap(),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let lock = StagingLock::acquire(&layout, std::process::id(), "pod", Vec::new())
|
||||
.expect("stale lock must be overwritable");
|
||||
assert_eq!(lock.record().pid, std::process::id());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn release_drops_consumed_entries_and_unlinks_lock() {
|
||||
let (_dir, layout) = make_layout();
|
||||
let (id_a, _) = write_staging(
|
||||
&layout,
|
||||
SourceRef {
|
||||
session_id: "s".into(),
|
||||
range: [0, 0],
|
||||
},
|
||||
ExtractedPayload::default(),
|
||||
)
|
||||
.unwrap();
|
||||
let (id_b, _) = write_staging(
|
||||
&layout,
|
||||
SourceRef {
|
||||
session_id: "s".into(),
|
||||
range: [1, 1],
|
||||
},
|
||||
ExtractedPayload::default(),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let lock = StagingLock::acquire(&layout, std::process::id(), "pod", vec![id_a]).unwrap();
|
||||
let lock_path = lock.path().to_path_buf();
|
||||
lock.release_with_cleanup(&layout);
|
||||
|
||||
assert!(!lock_path.exists(), "lock file must be removed");
|
||||
assert!(
|
||||
!layout.staging_dir().join(format!("{id_a}.json")).exists(),
|
||||
"consumed entry must be deleted"
|
||||
);
|
||||
assert!(
|
||||
layout.staging_dir().join(format!("{id_b}.json")).exists(),
|
||||
"non-consumed entry must remain"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn release_is_resilient_to_missing_consumed_entries() {
|
||||
let (_dir, layout) = make_layout();
|
||||
let phantom = uuid::Uuid::now_v7();
|
||||
let lock =
|
||||
StagingLock::acquire(&layout, std::process::id(), "pod", vec![phantom]).unwrap();
|
||||
let lock_path = lock.path().to_path_buf();
|
||||
// No file at <staging>/<phantom>.json — release must not panic.
|
||||
lock.release_with_cleanup(&layout);
|
||||
assert!(!lock_path.exists());
|
||||
}
|
||||
}
|
||||
32
crates/memory/src/consolidate/mod.rs
Normal file
32
crates/memory/src/consolidate/mod.rs
Normal file
|
|
@ -0,0 +1,32 @@
|
|||
//! Phase 2: 統合 + 整理。
|
||||
//!
|
||||
//! Phase 1 が staging に残した活動ログを `memory/*` / `knowledge/*` に
|
||||
//! 統合し、続けて既存 record を `outdated | superseded | unused | noisy`
|
||||
//! の観点で整理する disposable Worker を、Pod 側が組み立てるための
|
||||
//! ヘルパー群を提供する。Pod は次の手順で sub-Worker を構築する:
|
||||
//!
|
||||
//! - [`CONSOLIDATION_SYSTEM_PROMPT`] を sub-Worker の system prompt に
|
||||
//! - [`build_consolidate_input`] を sub-Worker の最初の user 入力に
|
||||
//! - memory 専用 Tool (read / write / edit) と Knowledge / memory 検索ツールを登録
|
||||
//! - [`StagingLock::acquire`] で並走防止 + consumed ID 確定
|
||||
//! - sub-Worker run 完了後、[`StagingLock::release_with_cleanup`] で
|
||||
//! consumed ID 分の staging のみ削除し、占有ファイルを解放
|
||||
//!
|
||||
//! Knowledge 化候補レポートと使用頻度メトリクスは別チケットで供給される
|
||||
//! 想定。本モジュール時点では空入力として扱い、prompt 側の説明だけ
|
||||
//! 残しておく(`docs/plan/memory.md` §Phase 2 / 整理材料)。
|
||||
|
||||
mod input;
|
||||
mod lock;
|
||||
mod prompt;
|
||||
mod staging;
|
||||
mod tidy;
|
||||
|
||||
pub use input::{
|
||||
KnowledgeCandidateReport, build_consolidate_input, render_existing_memory_records,
|
||||
render_staging_records, render_tidy_hints,
|
||||
};
|
||||
pub use lock::{LockError, LockRecord, StagingLock};
|
||||
pub use prompt::CONSOLIDATION_SYSTEM_PROMPT;
|
||||
pub use staging::{StagingEntry, list_staging_entries};
|
||||
pub use tidy::{TidyHints, collect_tidy_hints};
|
||||
69
crates/memory/src/consolidate/prompt.rs
Normal file
69
crates/memory/src/consolidate/prompt.rs
Normal file
|
|
@ -0,0 +1,69 @@
|
|||
//! Phase 2 sub-Worker の system prompt。
|
||||
//!
|
||||
//! 内容は `docs/plan/memory-prompts.md` §共通原則 / §Phase 2 統合 + 整理 /
|
||||
//! §Phase 2 Knowledge 書き込み を縮約。統合 phase / 整理 phase は同じ
|
||||
//! prompt 1 本で順に進める縛り(agent から見ると 1 セッション内のフェーズ
|
||||
//! 進行、別 trigger / 別 Worker は持たない、`docs/plan/memory.md` §整理
|
||||
//! の扱い)。
|
||||
|
||||
pub const CONSOLIDATION_SYSTEM_PROMPT: &str = r#"You are the Phase 2 consolidation worker for an INSOMNIA memory subsystem.
|
||||
|
||||
Your job is to take Phase 1 activity-log staging entries together with the workspace's current `memory/*` / `knowledge/*` records, then run two phases back-to-back in this single session:
|
||||
|
||||
1. **Consolidation phase** — fold staging into memory and knowledge.
|
||||
2. **Tidy phase** — clean up the existing records that the consolidation phase didn't already touch.
|
||||
|
||||
You have:
|
||||
- `MemoryRead`, `MemoryWrite`, `MemoryEdit` for memory and knowledge records.
|
||||
- `MemoryQuery` for memory-side records (summary / decisions / requests).
|
||||
- `KnowledgeQuery` for knowledge records — use it to find existing slugs before creating new ones.
|
||||
|
||||
Your initial user message contains the staging entries, the full memory records, the knowledge candidate report, and the tidy hints. Existing knowledge bodies are NOT in the prompt; pull them through `KnowledgeQuery` + `MemoryRead` when relevant.
|
||||
|
||||
# Common rules (both phases)
|
||||
|
||||
- **Do not invent provenance.** Decisions / Requests `sources` arrays MUST be copied from the staging `source` field for the originating activity log entries. Do not synthesise `session_id` or entry ranges. Do not fabricate `last_sources` for Knowledge.
|
||||
- **Rewrite is allowed and often preferred over append.** When integrating new information, restructure existing records to raise information density. Preserve the existing claims, rationale, and `sources` while you compress.
|
||||
- **Update over create.** If an existing slug fits, edit it. Only create a new slug when no existing record fits and you can articulate why.
|
||||
- **`replaced` over delete.** When a Decision is superseded by a different one, mark the old one `status: replaced` with `replaced_by: <new-slug>`. Do not silently drop it.
|
||||
- **Don't duplicate static docs.** Skip content that already lives in `AGENTS.md`, `docs/plan/*`, or other fixed project documents.
|
||||
- **Empty output is fine.** If a staging entry doesn't justify a memory write, skip it.
|
||||
- **Slug rules.** Slugs are kebab-case, short, recognisable, and must be unique within their kind. Same-slug create is a linter error — use Edit instead.
|
||||
- **Linter errors come back as tool errors.** When the memory linter rejects a write, read the error, fix the issue (missing frontmatter field, oversized body, unknown reference, etc.), and try again. Do not work around the rule.
|
||||
|
||||
# Consolidation phase
|
||||
|
||||
Walk every staging entry in the input. For each one:
|
||||
|
||||
- Add or update `decisions` / `requests` records as appropriate. Copy `sources` verbatim from the staging entry.
|
||||
- Update existing knowledge records when the staging activity refines them. Use `KnowledgeQuery` to find candidates before creating anything new.
|
||||
- **Knowledge creation is gated.** Only create a new `knowledge/<slug>.md` when the originating source appears in the supplied "Knowledge candidate report". When the report is empty (the metrics pipeline is still being built), do not create new knowledge — fold the activity into decisions / requests / summary or update existing knowledge instead.
|
||||
- Rewrite `memory/summary.md` only when needed. Aim for 1–5k tokens. Preserve the high-level shape (current focus, recent decisions, stable facts) while pruning stale items.
|
||||
|
||||
# Tidy phase
|
||||
|
||||
Once the consolidation phase is done, evaluate every existing memory and knowledge record against four categories:
|
||||
|
||||
- `outdated`: was correct, no longer matches the current implementation / policy / operation.
|
||||
- `superseded`: another record is now the de-facto authoritative one; this one is mostly redundant.
|
||||
- `unused`: not wrong, but rarely referenced — noise rather than signal.
|
||||
- `noisy`: useful content but bad shape (overlap, sources accumulation, fractured slugs that should merge).
|
||||
|
||||
A single record may fall into more than one category. Choose one of `drop / merge / split / trim / rewrite`:
|
||||
|
||||
- Prefer `merge` and `trim` over `drop` for anything you'd flag as `unused` or `noisy` — git can reverse you, but a confidently-wrong drop hurts discovery.
|
||||
- `drop` is allowed for `outdated` / `superseded` records you can justify in the diff.
|
||||
- `replaced` markers (`status: replaced`) and chains pointed at by the tidy hints should be collapsed in this phase.
|
||||
|
||||
**Protection threshold.** When the tidy hints include explicit-invoke metrics, records with `frequency >= 1.0 invokes/Mtoken` are off-limits to drop / large compression. The metrics pipeline is not always populated; when the input lacks frequency data, behave conservatively and skip drop on long-standing records.
|
||||
|
||||
# Closing the turn
|
||||
|
||||
When both phases are done, write a short final assistant message stating:
|
||||
|
||||
- which staging entries you folded in (by short summary, not by ID),
|
||||
- which existing records you touched (slug + operation),
|
||||
- anything you intentionally left alone and why.
|
||||
|
||||
Then end the turn. Do not ask questions — there is no human in the loop for this run.
|
||||
"#;
|
||||
139
crates/memory/src/consolidate/staging.rs
Normal file
139
crates/memory/src/consolidate/staging.rs
Normal file
|
|
@ -0,0 +1,139 @@
|
|||
//! `_staging/*.json` を列挙して [`StagingRecord`] に展開する読み込みヘルパー。
|
||||
//!
|
||||
//! Phase 2 起動時のスナップショット(consumed ID list 確定)と、整理 phase
|
||||
//! が終わった後の cleanup の双方で使う。`.consolidation.lock` のような
|
||||
//! 占有ファイルは UUIDv7 として parse できないので自然に除外される。
|
||||
//!
|
||||
//! [`StagingRecord`] のスキーマは Phase 1 が書き出す側 (`crate::extract`)
|
||||
//! と単一の真実源 — ここでは読み出す側だけを担当する。
|
||||
|
||||
use std::path::PathBuf;
|
||||
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::extract::StagingRecord;
|
||||
use crate::workspace::WorkspaceLayout;
|
||||
|
||||
/// staging に積まれている 1 件分のエントリ。`id` は UUIDv7 で、ファイル名
|
||||
/// `<id>.json` を逆引きしたもの。
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct StagingEntry {
|
||||
pub id: Uuid,
|
||||
pub path: PathBuf,
|
||||
pub record: StagingRecord,
|
||||
/// このファイルのバイト長。閾値判定 (`consolidation_threshold_bytes`)
|
||||
/// に使う。
|
||||
pub bytes: u64,
|
||||
}
|
||||
|
||||
/// `<staging_dir>/*.json` を読んで UUIDv7 順に並べた [`StagingEntry`]
|
||||
/// 配列を返す。staging_dir が存在しなければ空配列。読めないファイルや
|
||||
/// JSON parse 失敗は `tracing::warn!` してスキップ(壊れた個別ファイルが
|
||||
/// Phase 2 全体を止めないように)。
|
||||
pub fn list_staging_entries(layout: &WorkspaceLayout) -> Vec<StagingEntry> {
|
||||
let dir = layout.staging_dir();
|
||||
let entries = match std::fs::read_dir(&dir) {
|
||||
Ok(it) => it,
|
||||
Err(_) => return Vec::new(),
|
||||
};
|
||||
|
||||
let mut out: Vec<StagingEntry> = Vec::new();
|
||||
for entry in entries.flatten() {
|
||||
let path = entry.path();
|
||||
if !path.is_file() {
|
||||
continue;
|
||||
}
|
||||
let stem = match path.file_stem().and_then(|s| s.to_str()) {
|
||||
Some(s) => s,
|
||||
None => continue,
|
||||
};
|
||||
let ext = path.extension().and_then(|s| s.to_str()).unwrap_or("");
|
||||
if ext != "json" {
|
||||
continue;
|
||||
}
|
||||
let id = match Uuid::parse_str(stem) {
|
||||
Ok(u) => u,
|
||||
Err(_) => continue,
|
||||
};
|
||||
let bytes = match std::fs::metadata(&path) {
|
||||
Ok(m) => m.len(),
|
||||
Err(_) => 0,
|
||||
};
|
||||
let raw = match std::fs::read_to_string(&path) {
|
||||
Ok(s) => s,
|
||||
Err(e) => {
|
||||
tracing::warn!(path = %path.display(), error = %e, "failed to read staging entry");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let record = match serde_json::from_str::<StagingRecord>(&raw) {
|
||||
Ok(r) => r,
|
||||
Err(e) => {
|
||||
tracing::warn!(path = %path.display(), error = %e, "failed to parse staging entry");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
out.push(StagingEntry {
|
||||
id,
|
||||
path,
|
||||
record,
|
||||
bytes,
|
||||
});
|
||||
}
|
||||
out.sort_by_key(|e| e.id);
|
||||
out
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::extract::{ExtractedPayload, write_staging};
|
||||
use crate::schema::SourceRef;
|
||||
|
||||
fn empty_payload() -> ExtractedPayload {
|
||||
ExtractedPayload::default()
|
||||
}
|
||||
|
||||
fn source(session_id: &str, range: [u64; 2]) -> SourceRef {
|
||||
SourceRef {
|
||||
session_id: session_id.into(),
|
||||
range,
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn lists_in_uuidv7_order() {
|
||||
let tmp = tempfile::TempDir::new().unwrap();
|
||||
let layout = WorkspaceLayout::new(tmp.path().to_path_buf());
|
||||
|
||||
let (id1, _) = write_staging(&layout, source("s", [0, 1]), empty_payload()).unwrap();
|
||||
let (id2, _) = write_staging(&layout, source("s", [2, 3]), empty_payload()).unwrap();
|
||||
let (id3, _) = write_staging(&layout, source("s", [4, 5]), empty_payload()).unwrap();
|
||||
|
||||
let entries = list_staging_entries(&layout);
|
||||
let ids: Vec<Uuid> = entries.iter().map(|e| e.id).collect();
|
||||
assert_eq!(ids, vec![id1, id2, id3]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn skips_lock_file_and_garbage() {
|
||||
let tmp = tempfile::TempDir::new().unwrap();
|
||||
let layout = WorkspaceLayout::new(tmp.path().to_path_buf());
|
||||
let (_id, _) = write_staging(&layout, source("s", [0, 1]), empty_payload()).unwrap();
|
||||
|
||||
// Drop a non-UUID json file and a bare lock file alongside.
|
||||
std::fs::write(layout.staging_dir().join("not-a-uuid.json"), "{}").unwrap();
|
||||
std::fs::write(layout.staging_dir().join(".consolidation.lock"), "{}").unwrap();
|
||||
|
||||
let entries = list_staging_entries(&layout);
|
||||
assert_eq!(entries.len(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn missing_dir_returns_empty() {
|
||||
let tmp = tempfile::TempDir::new().unwrap();
|
||||
let layout = WorkspaceLayout::new(tmp.path().to_path_buf());
|
||||
// No staging dir at all.
|
||||
assert!(list_staging_entries(&layout).is_empty());
|
||||
}
|
||||
}
|
||||
362
crates/memory/src/consolidate/tidy.rs
Normal file
362
crates/memory/src/consolidate/tidy.rs
Normal file
|
|
@ -0,0 +1,362 @@
|
|||
//! 整理 phase が prompt 入力に乗せる「整理材料」スキャナ。
|
||||
//!
|
||||
//! `docs/plan/memory.md` §整理(GC 相当)の扱い と
|
||||
//! `tickets/memory-phase2-consolidation.md` の整理材料リストに従い、
|
||||
//! メトリクス未完の現状で機械的に拾えるヒントだけを集める:
|
||||
//!
|
||||
//! - `replaced` chain: `status: replaced` の Decision とその `replaced_by`
|
||||
//! - sources 過多: `sources` / `last_sources` 配列が閾値超過の record
|
||||
//! - 類似 slug 乱立: 同 kind の slug が Levenshtein 2 以内のクラスター
|
||||
//!
|
||||
//! 使用頻度メトリクスベースの保護閾値情報は `tickets/memory-usage-metrics.md`
|
||||
//! の成果物が出るまで空で渡る。
|
||||
|
||||
use std::collections::{BTreeMap, BTreeSet};
|
||||
|
||||
use crate::schema::{
|
||||
DecisionFrontmatter, KnowledgeFrontmatter, RequestFrontmatter, split_frontmatter,
|
||||
};
|
||||
use crate::slug::Slug;
|
||||
use crate::workspace::{RecordKind, WorkspaceLayout};
|
||||
|
||||
/// `sources` overflow を flag する閾値。`linter::warnings::SOURCES_OVERFLOW_THRESHOLD`
|
||||
/// と同値(10)を踏襲する。Linter Warn で sources 過多が検出されるラインと
|
||||
/// 整理 phase で勧告するラインを揃える狙い。
|
||||
pub const SOURCES_OVERFLOW_THRESHOLD: usize = 10;
|
||||
/// 類似 slug クラスタリングの距離。`linter::warnings::SIMILAR_SLUG_DISTANCE`
|
||||
/// と同値。
|
||||
pub const SIMILAR_SLUG_DISTANCE: usize = 2;
|
||||
|
||||
/// 整理 phase 用の機械集計ヒント。空フィールドは「対象なし」を意味する。
|
||||
#[derive(Debug, Default, Clone)]
|
||||
pub struct TidyHints {
|
||||
/// `status: replaced` で残っている Decision の slug → `replaced_by` map。
|
||||
/// `replaced_by` が None でも置き換え滞留として列挙する。
|
||||
pub replaced_decisions: BTreeMap<String, Option<String>>,
|
||||
/// kind / slug / sources count の三つ組で sources 累積ラインを表す。
|
||||
pub sources_overflow: Vec<SourcesOverflow>,
|
||||
/// 同 kind 内で Levenshtein 距離 `<= SIMILAR_SLUG_DISTANCE` のクラスター。
|
||||
/// クラスター内の slug は sorted。
|
||||
pub similar_slug_clusters: Vec<SimilarSlugCluster>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct SourcesOverflow {
|
||||
pub kind: RecordKind,
|
||||
pub slug: String,
|
||||
pub count: usize,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct SimilarSlugCluster {
|
||||
pub kind: RecordKind,
|
||||
pub slugs: Vec<String>,
|
||||
}
|
||||
|
||||
impl TidyHints {
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.replaced_decisions.is_empty()
|
||||
&& self.sources_overflow.is_empty()
|
||||
&& self.similar_slug_clusters.is_empty()
|
||||
}
|
||||
}
|
||||
|
||||
/// workspace を一通りスキャンして [`TidyHints`] を組み立てる。読めない /
|
||||
/// parse できない record は黙ってスキップ(Linter は write 経路で守って
|
||||
/// いるので、ここで顕在化してもどうしようもない)。
|
||||
pub fn collect_tidy_hints(layout: &WorkspaceLayout) -> TidyHints {
|
||||
let mut hints = TidyHints::default();
|
||||
|
||||
let decisions = read_kind_records(layout, RecordKind::Decision);
|
||||
let requests = read_kind_records(layout, RecordKind::Request);
|
||||
let knowledge = read_kind_records(layout, RecordKind::Knowledge);
|
||||
|
||||
for (slug, content) in &decisions {
|
||||
let fm = parse_yaml::<DecisionFrontmatter>(content);
|
||||
if let Some(fm) = fm.as_ref() {
|
||||
if matches!(
|
||||
fm.status,
|
||||
crate::schema::DecisionStatus::Replaced
|
||||
) {
|
||||
hints
|
||||
.replaced_decisions
|
||||
.insert(slug.clone(), fm.replaced_by.as_ref().map(|s| s.to_string()));
|
||||
}
|
||||
if fm.sources.len() > SOURCES_OVERFLOW_THRESHOLD {
|
||||
hints.sources_overflow.push(SourcesOverflow {
|
||||
kind: RecordKind::Decision,
|
||||
slug: slug.clone(),
|
||||
count: fm.sources.len(),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
for (slug, content) in &requests {
|
||||
if let Some(fm) = parse_yaml::<RequestFrontmatter>(content) {
|
||||
if fm.sources.len() > SOURCES_OVERFLOW_THRESHOLD {
|
||||
hints.sources_overflow.push(SourcesOverflow {
|
||||
kind: RecordKind::Request,
|
||||
slug: slug.clone(),
|
||||
count: fm.sources.len(),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
for (slug, content) in &knowledge {
|
||||
if let Some(fm) = parse_yaml::<KnowledgeFrontmatter>(content) {
|
||||
if fm.last_sources.len() > SOURCES_OVERFLOW_THRESHOLD {
|
||||
hints.sources_overflow.push(SourcesOverflow {
|
||||
kind: RecordKind::Knowledge,
|
||||
slug: slug.clone(),
|
||||
count: fm.last_sources.len(),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
hints
|
||||
.sources_overflow
|
||||
.sort_by(|a, b| (a.kind.as_str(), a.slug.as_str()).cmp(&(b.kind.as_str(), b.slug.as_str())));
|
||||
|
||||
let decision_slugs: Vec<&str> = decisions.keys().map(|s| s.as_str()).collect();
|
||||
let request_slugs: Vec<&str> = requests.keys().map(|s| s.as_str()).collect();
|
||||
let knowledge_slugs: Vec<&str> = knowledge.keys().map(|s| s.as_str()).collect();
|
||||
if let Some(c) = cluster_similar(&decision_slugs, RecordKind::Decision) {
|
||||
hints.similar_slug_clusters.extend(c);
|
||||
}
|
||||
if let Some(c) = cluster_similar(&request_slugs, RecordKind::Request) {
|
||||
hints.similar_slug_clusters.extend(c);
|
||||
}
|
||||
if let Some(c) = cluster_similar(&knowledge_slugs, RecordKind::Knowledge) {
|
||||
hints.similar_slug_clusters.extend(c);
|
||||
}
|
||||
hints
|
||||
.similar_slug_clusters
|
||||
.sort_by(|a, b| (a.kind.as_str(), &a.slugs).cmp(&(b.kind.as_str(), &b.slugs)));
|
||||
|
||||
hints
|
||||
}
|
||||
|
||||
/// `<root>/.insomnia/memory/<kind>/*.md` (Knowledge は
|
||||
/// `<root>/.insomnia/knowledge/*.md`) を slug ごとに `(slug, full content)`
|
||||
/// 化して返す。
|
||||
fn read_kind_records(
|
||||
layout: &WorkspaceLayout,
|
||||
kind: RecordKind,
|
||||
) -> BTreeMap<String, String> {
|
||||
let dir = match kind {
|
||||
RecordKind::Decision => layout.decisions_dir(),
|
||||
RecordKind::Request => layout.requests_dir(),
|
||||
RecordKind::Knowledge => layout.knowledge_dir(),
|
||||
RecordKind::Summary | RecordKind::Workflow => return BTreeMap::new(),
|
||||
};
|
||||
let mut out: BTreeMap<String, String> = BTreeMap::new();
|
||||
let entries = match std::fs::read_dir(&dir) {
|
||||
Ok(it) => it,
|
||||
Err(_) => return out,
|
||||
};
|
||||
for entry in entries.flatten() {
|
||||
let path = entry.path();
|
||||
if !path.is_file() {
|
||||
continue;
|
||||
}
|
||||
let stem = match path.file_stem().and_then(|s| s.to_str()) {
|
||||
Some(s) => s,
|
||||
None => continue,
|
||||
};
|
||||
if path.extension().and_then(|s| s.to_str()) != Some("md") {
|
||||
continue;
|
||||
}
|
||||
if Slug::parse(stem).is_err() {
|
||||
continue;
|
||||
}
|
||||
let content = match std::fs::read_to_string(&path) {
|
||||
Ok(s) => s,
|
||||
Err(_) => continue,
|
||||
};
|
||||
out.insert(stem.to_string(), content);
|
||||
}
|
||||
out
|
||||
}
|
||||
|
||||
fn parse_yaml<F: serde::de::DeserializeOwned>(content: &str) -> Option<F> {
|
||||
let (yaml, _body) = split_frontmatter(content).ok()?;
|
||||
serde_yaml::from_str::<F>(yaml).ok()
|
||||
}
|
||||
|
||||
/// Connected-component clustering over the `levenshtein <= SIMILAR_SLUG_DISTANCE`
|
||||
/// graph among same-kind slugs. Returns each cluster of size >= 2 (singleton
|
||||
/// clusters are not interesting for the integration phase). Returns `None`
|
||||
/// when there are no clusters at all.
|
||||
fn cluster_similar(slugs: &[&str], kind: RecordKind) -> Option<Vec<SimilarSlugCluster>> {
|
||||
if slugs.len() < 2 {
|
||||
return None;
|
||||
}
|
||||
let n = slugs.len();
|
||||
let mut parent: Vec<usize> = (0..n).collect();
|
||||
fn find(parent: &mut [usize], i: usize) -> usize {
|
||||
if parent[i] == i {
|
||||
i
|
||||
} else {
|
||||
let root = find(parent, parent[i]);
|
||||
parent[i] = root;
|
||||
root
|
||||
}
|
||||
}
|
||||
fn union(parent: &mut [usize], a: usize, b: usize) {
|
||||
let ra = find(parent, a);
|
||||
let rb = find(parent, b);
|
||||
if ra != rb {
|
||||
parent[ra] = rb;
|
||||
}
|
||||
}
|
||||
for i in 0..n {
|
||||
for j in (i + 1)..n {
|
||||
if levenshtein(slugs[i], slugs[j]) <= SIMILAR_SLUG_DISTANCE {
|
||||
union(&mut parent, i, j);
|
||||
}
|
||||
}
|
||||
}
|
||||
let mut groups: BTreeMap<usize, Vec<String>> = BTreeMap::new();
|
||||
for i in 0..n {
|
||||
let root = find(&mut parent, i);
|
||||
groups.entry(root).or_default().push(slugs[i].to_string());
|
||||
}
|
||||
let mut out: Vec<SimilarSlugCluster> = Vec::new();
|
||||
let mut seen_canonical: BTreeSet<Vec<String>> = BTreeSet::new();
|
||||
for (_, mut group) in groups {
|
||||
if group.len() < 2 {
|
||||
continue;
|
||||
}
|
||||
group.sort();
|
||||
if seen_canonical.insert(group.clone()) {
|
||||
out.push(SimilarSlugCluster { kind, slugs: group });
|
||||
}
|
||||
}
|
||||
if out.is_empty() { None } else { Some(out) }
|
||||
}
|
||||
|
||||
/// Iterative two-row Levenshtein distance over chars (matches the Linter's
|
||||
/// implementation; kept private to avoid widening that crate-internal API).
|
||||
fn levenshtein(a: &str, b: &str) -> usize {
|
||||
let a: Vec<char> = a.chars().collect();
|
||||
let b: Vec<char> = b.chars().collect();
|
||||
if a.is_empty() {
|
||||
return b.len();
|
||||
}
|
||||
if b.is_empty() {
|
||||
return a.len();
|
||||
}
|
||||
let mut prev: Vec<usize> = (0..=b.len()).collect();
|
||||
let mut curr: Vec<usize> = vec![0; b.len() + 1];
|
||||
for (i, ca) in a.iter().enumerate() {
|
||||
curr[0] = i + 1;
|
||||
for (j, cb) in b.iter().enumerate() {
|
||||
let cost = if ca == cb { 0 } else { 1 };
|
||||
curr[j + 1] = (curr[j] + 1).min(prev[j + 1] + 1).min(prev[j] + cost);
|
||||
}
|
||||
std::mem::swap(&mut prev, &mut curr);
|
||||
}
|
||||
prev[b.len()]
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use chrono::Utc;
|
||||
use std::path::Path;
|
||||
|
||||
fn now() -> String {
|
||||
Utc::now().to_rfc3339()
|
||||
}
|
||||
|
||||
fn write(p: &Path, content: &str) {
|
||||
if let Some(parent) = p.parent() {
|
||||
std::fs::create_dir_all(parent).unwrap();
|
||||
}
|
||||
std::fs::write(p, content).unwrap();
|
||||
}
|
||||
|
||||
fn workspace() -> (tempfile::TempDir, WorkspaceLayout) {
|
||||
let dir = tempfile::TempDir::new().unwrap();
|
||||
let layout = WorkspaceLayout::new(dir.path().to_path_buf());
|
||||
(dir, layout)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn collects_replaced_chain() {
|
||||
let (dir, layout) = workspace();
|
||||
write(
|
||||
&dir.path().join(".insomnia/memory/decisions/replaced.md"),
|
||||
&format!(
|
||||
"---\ncreated_at: {n}\nupdated_at: {n}\nsources: []\nstatus: replaced\nreplaced_by: winner\n---\n",
|
||||
n = now()
|
||||
),
|
||||
);
|
||||
write(
|
||||
&dir.path().join(".insomnia/memory/decisions/winner.md"),
|
||||
&format!(
|
||||
"---\ncreated_at: {n}\nupdated_at: {n}\nsources: []\nstatus: open\n---\n",
|
||||
n = now()
|
||||
),
|
||||
);
|
||||
let hints = collect_tidy_hints(&layout);
|
||||
assert_eq!(
|
||||
hints.replaced_decisions.get("replaced").cloned(),
|
||||
Some(Some("winner".into()))
|
||||
);
|
||||
assert!(!hints.replaced_decisions.contains_key("winner"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn flags_sources_overflow() {
|
||||
let (dir, layout) = workspace();
|
||||
let many_sources: String = (0..15)
|
||||
.map(|i| format!(" - session_id: s{i}\n range: [{i}, {i}]\n"))
|
||||
.collect();
|
||||
write(
|
||||
&dir.path().join(".insomnia/memory/decisions/big.md"),
|
||||
&format!(
|
||||
"---\ncreated_at: {n}\nupdated_at: {n}\nstatus: open\nsources:\n{m}---\n",
|
||||
n = now(),
|
||||
m = many_sources
|
||||
),
|
||||
);
|
||||
let hints = collect_tidy_hints(&layout);
|
||||
assert_eq!(hints.sources_overflow.len(), 1);
|
||||
assert_eq!(hints.sources_overflow[0].slug, "big");
|
||||
assert_eq!(hints.sources_overflow[0].kind, RecordKind::Decision);
|
||||
assert_eq!(hints.sources_overflow[0].count, 15);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn clusters_similar_slugs() {
|
||||
let (dir, layout) = workspace();
|
||||
for slug in ["db-pool", "db-pol", "db-pools", "alpha"] {
|
||||
write(
|
||||
&dir.path()
|
||||
.join(format!(".insomnia/memory/decisions/{slug}.md")),
|
||||
&format!(
|
||||
"---\ncreated_at: {n}\nupdated_at: {n}\nsources: []\nstatus: open\n---\n",
|
||||
n = now()
|
||||
),
|
||||
);
|
||||
}
|
||||
let hints = collect_tidy_hints(&layout);
|
||||
assert_eq!(hints.similar_slug_clusters.len(), 1);
|
||||
assert_eq!(
|
||||
hints.similar_slug_clusters[0].slugs,
|
||||
vec![
|
||||
"db-pol".to_string(),
|
||||
"db-pool".to_string(),
|
||||
"db-pools".to_string(),
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn empty_workspace_yields_empty_hints() {
|
||||
let (_dir, layout) = workspace();
|
||||
let hints = collect_tidy_hints(&layout);
|
||||
assert!(hints.is_empty());
|
||||
}
|
||||
}
|
||||
|
|
@ -6,6 +6,7 @@
|
|||
//! crate) must not touch these directories — Pod is responsible for
|
||||
//! denying them at the Scope level when memory is enabled.
|
||||
|
||||
pub mod consolidate;
|
||||
pub mod error;
|
||||
pub mod extract;
|
||||
pub mod linter;
|
||||
|
|
|
|||
|
|
@ -27,6 +27,7 @@ fs4 = { version = "0.13.1", features = ["sync"] }
|
|||
libc = "0.2.185"
|
||||
schemars = "1.2.1"
|
||||
memory = { version = "0.1.0", path = "../memory" }
|
||||
uuid = { version = "1.23.1", features = ["v7"] }
|
||||
|
||||
[dev-dependencies]
|
||||
async-trait = "0.1.89"
|
||||
|
|
|
|||
|
|
@ -402,6 +402,14 @@ impl PodController {
|
|||
format!("post-run memory extract error: {e}"),
|
||||
);
|
||||
}
|
||||
if let Err(e) = pod.try_post_run_consolidate().await {
|
||||
tracing::warn!(error = %e, "Post-run memory consolidate error");
|
||||
alerter.alert(
|
||||
AlertLevel::Warn,
|
||||
AlertSource::Pod,
|
||||
format!("post-run memory consolidate error: {e}"),
|
||||
);
|
||||
}
|
||||
if let Err(e) = pod.try_post_run_compact().await {
|
||||
tracing::warn!(error = %e, "Post-run compaction error");
|
||||
alerter.alert(
|
||||
|
|
@ -461,6 +469,14 @@ impl PodController {
|
|||
format!("post-run memory extract error: {e}"),
|
||||
);
|
||||
}
|
||||
if let Err(e) = pod.try_post_run_consolidate().await {
|
||||
tracing::warn!(error = %e, "Post-run memory consolidate error");
|
||||
alerter.alert(
|
||||
AlertLevel::Warn,
|
||||
AlertSource::Pod,
|
||||
format!("post-run memory consolidate error: {e}"),
|
||||
);
|
||||
}
|
||||
if let Err(e) = pod.try_post_run_compact().await {
|
||||
tracing::warn!(error = %e, "Post-run compaction error");
|
||||
alerter.alert(
|
||||
|
|
@ -517,6 +533,14 @@ impl PodController {
|
|||
format!("post-run memory extract error: {e}"),
|
||||
);
|
||||
}
|
||||
if let Err(e) = pod.try_post_run_consolidate().await {
|
||||
tracing::warn!(error = %e, "Post-run memory consolidate error");
|
||||
alerter.alert(
|
||||
AlertLevel::Warn,
|
||||
AlertSource::Pod,
|
||||
format!("post-run memory consolidate error: {e}"),
|
||||
);
|
||||
}
|
||||
if let Err(e) = pod.try_post_run_compact().await {
|
||||
tracing::warn!(error = %e, "Post-run compaction error");
|
||||
alerter.alert(
|
||||
|
|
|
|||
|
|
@ -136,6 +136,11 @@ pub struct Pod<C: LlmClient, St: Store> {
|
|||
/// the flag survives across `try_post_run_extract` calls without a
|
||||
/// `&mut self` race.
|
||||
extract_in_flight: Arc<AtomicBool>,
|
||||
/// Phase 2 (memory.consolidation) in-process reentry guard. The
|
||||
/// staging-side `StagingLock` already provides cross-process
|
||||
/// exclusion, but this AtomicBool keeps a careless concurrent caller
|
||||
/// inside the same Pod from racing on the staging snapshot.
|
||||
consolidation_in_flight: Arc<AtomicBool>,
|
||||
/// Last completed Phase 1 boundary. `None` means no extract has
|
||||
/// run yet on this session — next extract starts from entry 0.
|
||||
/// Restored from `RestoredState.extensions` on `restore`, updated
|
||||
|
|
@ -197,6 +202,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
prompts,
|
||||
inject_resident_knowledge: true,
|
||||
extract_in_flight: Arc::new(AtomicBool::new(false)),
|
||||
consolidation_in_flight: Arc::new(AtomicBool::new(false)),
|
||||
extract_pointer: Mutex::new(None),
|
||||
user_segments: Vec::new(),
|
||||
};
|
||||
|
|
@ -1490,6 +1496,173 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
|
||||
Ok(ExtractDecision::Completed)
|
||||
}
|
||||
|
||||
/// Build the LlmClient for the Phase 2 (memory.consolidation) Worker.
|
||||
///
|
||||
/// Uses `memory.consolidation_model` from manifest if set, otherwise
|
||||
/// clones the main client. Mirrors [`build_extractor_client`].
|
||||
fn build_consolidator_client(
|
||||
&self,
|
||||
memory_cfg: &manifest::MemoryConfig,
|
||||
) -> Result<Box<dyn LlmClient>, PodError> {
|
||||
if let Some(ref m) = memory_cfg.consolidation_model {
|
||||
let client = provider::build_client(m)?;
|
||||
return Ok(client);
|
||||
}
|
||||
let worker = self.worker.as_ref().expect("worker taken during run");
|
||||
Ok(worker.client().clone_boxed())
|
||||
}
|
||||
|
||||
/// Phase 2 (memory.consolidation) post-run trigger.
|
||||
///
|
||||
/// Called by the Controller **after** [`try_post_run_extract`] and
|
||||
/// **before** [`try_post_run_compact`]: extract feeds staging, compact
|
||||
/// rewrites history. Phase 2 must consume staging before compact
|
||||
/// reshapes the session.
|
||||
///
|
||||
/// Behaviour follows `docs/plan/memory.md` §Phase 2 / §並走防止:
|
||||
/// the staging-side `StagingLock` enforces cross-process exclusion;
|
||||
/// `consolidation_in_flight` keeps in-process callers honest. On
|
||||
/// success, the lock is released *with* consumed-id cleanup; on
|
||||
/// worker failure, only the lock file is unlinked so the staging
|
||||
/// entries remain for a future retry.
|
||||
pub async fn try_post_run_consolidate(&mut self) -> Result<(), PodError> {
|
||||
let Some(memory_cfg) = self.manifest.memory.clone() else {
|
||||
return Ok(());
|
||||
};
|
||||
let files_threshold = memory_cfg.consolidation_threshold_files;
|
||||
let bytes_threshold = memory_cfg.consolidation_threshold_bytes;
|
||||
if files_threshold.is_none() && bytes_threshold.is_none() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
loop {
|
||||
if self
|
||||
.consolidation_in_flight
|
||||
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
|
||||
.is_err()
|
||||
{
|
||||
return Ok(());
|
||||
}
|
||||
let result = self
|
||||
.run_consolidate_once(&memory_cfg, files_threshold, bytes_threshold)
|
||||
.await;
|
||||
self.consolidation_in_flight.store(false, Ordering::Release);
|
||||
|
||||
match result {
|
||||
Ok(ConsolidateDecision::Skipped) => return Ok(()),
|
||||
Ok(ConsolidateDecision::Completed) => continue,
|
||||
Err(e) => {
|
||||
tracing::warn!(error = %e, "Phase 2 consolidation failed");
|
||||
self.alert(
|
||||
AlertLevel::Warn,
|
||||
AlertSource::Pod,
|
||||
format!("memory Phase 2 consolidation failed: {e}"),
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Single consolidation iteration: snapshot staging, decide whether to
|
||||
/// fire, run the worker if so, release the lock and clean up consumed
|
||||
/// IDs.
|
||||
async fn run_consolidate_once(
|
||||
&mut self,
|
||||
memory_cfg: &manifest::MemoryConfig,
|
||||
files_threshold: Option<usize>,
|
||||
bytes_threshold: Option<u64>,
|
||||
) -> Result<ConsolidateDecision, PodError> {
|
||||
use memory::consolidate;
|
||||
|
||||
let layout = memory::WorkspaceLayout::resolve(memory_cfg, &self.pwd);
|
||||
|
||||
let entries = consolidate::list_staging_entries(&layout);
|
||||
if entries.is_empty() {
|
||||
return Ok(ConsolidateDecision::Skipped);
|
||||
}
|
||||
|
||||
let total_files = entries.len();
|
||||
let total_bytes: u64 = entries.iter().map(|e| e.bytes).sum();
|
||||
let files_hit = files_threshold.is_some_and(|n| total_files >= n);
|
||||
let bytes_hit = bytes_threshold.is_some_and(|n| total_bytes >= n);
|
||||
if !files_hit && !bytes_hit {
|
||||
return Ok(ConsolidateDecision::Skipped);
|
||||
}
|
||||
|
||||
let consumed_ids: Vec<uuid::Uuid> = entries.iter().map(|e| e.id).collect();
|
||||
let lock = match consolidate::StagingLock::acquire(
|
||||
&layout,
|
||||
std::process::id(),
|
||||
self.manifest.pod.name.clone(),
|
||||
consumed_ids,
|
||||
) {
|
||||
Ok(l) => l,
|
||||
Err(memory::consolidate::LockError::InUse { .. }) => {
|
||||
return Ok(ConsolidateDecision::Skipped);
|
||||
}
|
||||
Err(e) => return Err(PodError::ConsolidationLock(e)),
|
||||
};
|
||||
|
||||
let cap = memory_cfg
|
||||
.consolidation_worker_max_input_tokens
|
||||
.unwrap_or(manifest::defaults::MEMORY_CONSOLIDATION_WORKER_MAX_INPUT_TOKENS);
|
||||
let client = match self.build_consolidator_client(memory_cfg) {
|
||||
Ok(c) => c,
|
||||
Err(e) => {
|
||||
lock.release_only();
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
let mut worker =
|
||||
Worker::new(client).system_prompt(consolidate::CONSOLIDATION_SYSTEM_PROMPT);
|
||||
|
||||
let input_so_far = Arc::new(std::sync::atomic::AtomicU64::new(0));
|
||||
{
|
||||
let acc = input_so_far.clone();
|
||||
worker.on_usage(move |event| {
|
||||
if let Some(tokens) = event.input_tokens {
|
||||
acc.fetch_add(tokens, Ordering::Relaxed);
|
||||
}
|
||||
});
|
||||
}
|
||||
worker.set_interceptor(MemoryConsolidationWorkerInterceptor {
|
||||
input_so_far: input_so_far.clone(),
|
||||
max_input_tokens: cap,
|
||||
});
|
||||
|
||||
// Memory tools are self-contained — they bypass ScopedFs and write
|
||||
// directly under the workspace via WorkspaceLayout. Resident
|
||||
// knowledge injection (`Pod::set_resident_knowledge_injection`) is
|
||||
// a Pod-level concern; this disposable Worker is built without it
|
||||
// by construction, in keeping with `docs/plan/memory.md` §Phase 2
|
||||
// のKnowledgeアクセス (agent pulls knowledge through the search
|
||||
// tool instead of via system-prompt residency).
|
||||
let query_cfg = memory::tool::QueryConfig::from(memory_cfg);
|
||||
worker.register_tool(memory::tool::read_tool(layout.clone()));
|
||||
worker.register_tool(memory::tool::write_tool(layout.clone()));
|
||||
worker.register_tool(memory::tool::edit_tool(layout.clone()));
|
||||
worker.register_tool(memory::tool::memory_query_tool(layout.clone(), query_cfg));
|
||||
worker.register_tool(memory::tool::knowledge_query_tool(layout.clone(), query_cfg));
|
||||
|
||||
let tidy = consolidate::collect_tidy_hints(&layout);
|
||||
let candidates = consolidate::KnowledgeCandidateReport::empty();
|
||||
let input_text =
|
||||
consolidate::build_consolidate_input(&layout, &entries, &tidy, &candidates);
|
||||
|
||||
let run_result = worker.run(input_text).await;
|
||||
match run_result {
|
||||
Ok(_) => {
|
||||
lock.release_with_cleanup(&layout);
|
||||
Ok(ConsolidateDecision::Completed)
|
||||
}
|
||||
Err(e) => {
|
||||
lock.release_only();
|
||||
Err(PodError::Worker(e))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Outcome of a single Phase 1 extract iteration. Internal to
|
||||
|
|
@ -1526,6 +1699,40 @@ impl llm_worker::interceptor::Interceptor for MemoryExtractWorkerInterceptor {
|
|||
}
|
||||
}
|
||||
|
||||
/// Outcome of a single Phase 2 consolidation iteration. Internal to
|
||||
/// `try_post_run_consolidate` / `run_consolidate_once`.
|
||||
enum ConsolidateDecision {
|
||||
/// Either threshold not met, no staging, or another Pod holds the lock.
|
||||
Skipped,
|
||||
/// Consolidation ran. Caller re-evaluates threshold against any
|
||||
/// staging entries that arrived during the run (Coalesce).
|
||||
Completed,
|
||||
}
|
||||
|
||||
/// Pre-request interceptor for the Phase 2 consolidation worker. Same
|
||||
/// shape as the extract interceptor; kept separate so the abort message
|
||||
/// names the right subsystem.
|
||||
struct MemoryConsolidationWorkerInterceptor {
|
||||
input_so_far: Arc<std::sync::atomic::AtomicU64>,
|
||||
max_input_tokens: u64,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl llm_worker::interceptor::Interceptor for MemoryConsolidationWorkerInterceptor {
|
||||
async fn pre_llm_request(
|
||||
&self,
|
||||
_context: &mut Vec<Item>,
|
||||
) -> llm_worker::interceptor::PreRequestAction {
|
||||
if self.input_so_far.load(Ordering::Relaxed) > self.max_input_tokens {
|
||||
return llm_worker::interceptor::PreRequestAction::Cancel(format!(
|
||||
"Phase 2 consolidation worker input exceeded {} tokens",
|
||||
self.max_input_tokens
|
||||
));
|
||||
}
|
||||
llm_worker::interceptor::PreRequestAction::Continue
|
||||
}
|
||||
}
|
||||
|
||||
impl<St: Store> Pod<Box<dyn LlmClient>, St> {
|
||||
/// Create a Pod entirely from a validated manifest.
|
||||
///
|
||||
|
|
@ -1596,6 +1803,7 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
|
|||
prompts: common.prompts,
|
||||
inject_resident_knowledge: true,
|
||||
extract_in_flight: Arc::new(AtomicBool::new(false)),
|
||||
consolidation_in_flight: Arc::new(AtomicBool::new(false)),
|
||||
extract_pointer: Mutex::new(None),
|
||||
user_segments: Vec::new(),
|
||||
};
|
||||
|
|
@ -1653,6 +1861,7 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
|
|||
prompts: common.prompts,
|
||||
inject_resident_knowledge: true,
|
||||
extract_in_flight: Arc::new(AtomicBool::new(false)),
|
||||
consolidation_in_flight: Arc::new(AtomicBool::new(false)),
|
||||
extract_pointer: Mutex::new(None),
|
||||
user_segments: Vec::new(),
|
||||
};
|
||||
|
|
@ -1762,6 +1971,7 @@ impl<St: Store> Pod<Box<dyn LlmClient>, St> {
|
|||
prompts: common.prompts,
|
||||
inject_resident_knowledge: true,
|
||||
extract_in_flight: Arc::new(AtomicBool::new(false)),
|
||||
consolidation_in_flight: Arc::new(AtomicBool::new(false)),
|
||||
extract_pointer: Mutex::new(extract_pointer),
|
||||
user_segments: state.user_segments,
|
||||
};
|
||||
|
|
@ -1963,6 +2173,9 @@ pub enum PodError {
|
|||
#[error("memory Phase 1 staging write failed: {0}")]
|
||||
ExtractStaging(#[source] memory::extract::StagingError),
|
||||
|
||||
#[error("memory Phase 2 lock acquisition failed: {0}")]
|
||||
ConsolidationLock(#[source] memory::consolidate::LockError),
|
||||
|
||||
#[error("session {session_id} has no entries to restore")]
|
||||
SessionEmpty { session_id: SessionId },
|
||||
}
|
||||
|
|
|
|||
277
crates/pod/tests/consolidation_test.rs
Normal file
277
crates/pod/tests/consolidation_test.rs
Normal file
|
|
@ -0,0 +1,277 @@
|
|||
//! Phase 2 (memory.consolidation) post-run trigger.
|
||||
//!
|
||||
//! Covers the gating, lock and cleanup behaviour without exercising the
|
||||
//! full sub-worker tool loop:
|
||||
//!
|
||||
//! - no `[memory]` section → no-op
|
||||
//! - `[memory]` present but no thresholds → no-op
|
||||
//! - staging empty → skip
|
||||
//! - staging below thresholds → skip + lock not acquired
|
||||
//! - staging above threshold → sub-worker runs, consumed entries removed
|
||||
//! - existing live lock → skip without error
|
||||
//!
|
||||
//! The sub-worker is fed a no-op LLM response (plain text) so it returns
|
||||
//! immediately. The post-run path then exercises lock acquisition,
|
||||
//! cleanup, and the empty-payload fast path.
|
||||
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use futures::Stream;
|
||||
use llm_worker::Worker;
|
||||
use llm_worker::llm_client::event::{Event as LlmEvent, ResponseStatus, StatusEvent};
|
||||
use llm_worker::llm_client::{ClientError, LlmClient, Request};
|
||||
use memory::WorkspaceLayout;
|
||||
use memory::extract::{ExtractedPayload, write_staging};
|
||||
use memory::schema::SourceRef;
|
||||
use session_store::FsStore;
|
||||
|
||||
use pod::Pod;
|
||||
|
||||
#[derive(Clone)]
|
||||
struct MockClient {
|
||||
responses: Arc<Vec<Vec<LlmEvent>>>,
|
||||
call_count: Arc<AtomicUsize>,
|
||||
}
|
||||
|
||||
impl MockClient {
|
||||
fn new(responses: Vec<Vec<LlmEvent>>) -> Self {
|
||||
Self {
|
||||
responses: Arc::new(responses),
|
||||
call_count: Arc::new(AtomicUsize::new(0)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl LlmClient for MockClient {
|
||||
fn clone_boxed(&self) -> Box<dyn LlmClient> {
|
||||
Box::new(self.clone())
|
||||
}
|
||||
|
||||
async fn stream(
|
||||
&self,
|
||||
_request: Request,
|
||||
) -> Result<Pin<Box<dyn Stream<Item = Result<LlmEvent, ClientError>> + Send>>, ClientError>
|
||||
{
|
||||
let count = self.call_count.fetch_add(1, Ordering::SeqCst);
|
||||
if count >= self.responses.len() {
|
||||
return Err(ClientError::Config("mock client exhausted".into()));
|
||||
}
|
||||
let events = self.responses[count].clone();
|
||||
let stream = futures::stream::iter(events.into_iter().map(Ok));
|
||||
Ok(Box::pin(stream))
|
||||
}
|
||||
}
|
||||
|
||||
fn done(text: &str) -> Vec<LlmEvent> {
|
||||
vec![
|
||||
LlmEvent::text_block_start(0),
|
||||
LlmEvent::text_delta(0, text),
|
||||
LlmEvent::text_block_stop(0, None),
|
||||
LlmEvent::Status(StatusEvent {
|
||||
status: ResponseStatus::Completed,
|
||||
}),
|
||||
]
|
||||
}
|
||||
|
||||
const NO_MEMORY_TOML: &str = r#"
|
||||
[pod]
|
||||
name = "test-pod"
|
||||
|
||||
[model]
|
||||
scheme = "anthropic"
|
||||
model_id = "test-model"
|
||||
|
||||
[worker]
|
||||
max_tokens = 100
|
||||
|
||||
[[scope.allow]]
|
||||
target = "./"
|
||||
permission = "write"
|
||||
"#;
|
||||
|
||||
const MEMORY_NO_THRESHOLDS_TOML: &str = r#"
|
||||
[pod]
|
||||
name = "test-pod"
|
||||
|
||||
[model]
|
||||
scheme = "anthropic"
|
||||
model_id = "test-model"
|
||||
|
||||
[worker]
|
||||
max_tokens = 100
|
||||
|
||||
[memory]
|
||||
|
||||
[[scope.allow]]
|
||||
target = "./"
|
||||
permission = "write"
|
||||
"#;
|
||||
|
||||
const FILES_THRESHOLD_TOML: &str = r#"
|
||||
[pod]
|
||||
name = "test-pod"
|
||||
|
||||
[model]
|
||||
scheme = "anthropic"
|
||||
model_id = "test-model"
|
||||
|
||||
[worker]
|
||||
max_tokens = 100
|
||||
|
||||
[memory]
|
||||
consolidation_threshold_files = 2
|
||||
|
||||
[[scope.allow]]
|
||||
target = "./"
|
||||
permission = "write"
|
||||
"#;
|
||||
|
||||
async fn make_pod_with(
|
||||
manifest_toml: &str,
|
||||
pwd: std::path::PathBuf,
|
||||
client: MockClient,
|
||||
) -> Pod<MockClient, FsStore> {
|
||||
let manifest = pod::PodManifest::from_toml(manifest_toml).unwrap();
|
||||
|
||||
let store_tmp = tempfile::tempdir().unwrap();
|
||||
let store = FsStore::new(store_tmp.path()).await.unwrap();
|
||||
std::mem::forget(store_tmp);
|
||||
|
||||
let scope = pod::Scope::writable(&pwd).unwrap();
|
||||
let worker = Worker::new(client);
|
||||
Pod::new(manifest, worker, store, pwd, scope).await.unwrap()
|
||||
}
|
||||
|
||||
fn write_n_staging(layout: &WorkspaceLayout, n: usize) -> Vec<uuid::Uuid> {
|
||||
let mut ids = Vec::new();
|
||||
for i in 0..n {
|
||||
let (id, _) = write_staging(
|
||||
layout,
|
||||
SourceRef {
|
||||
session_id: format!("s-{i}"),
|
||||
range: [i as u64, i as u64],
|
||||
},
|
||||
ExtractedPayload::default(),
|
||||
)
|
||||
.unwrap();
|
||||
ids.push(id);
|
||||
}
|
||||
ids
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn no_memory_section_is_a_noop() {
|
||||
let pwd = tempfile::tempdir().unwrap();
|
||||
let client = MockClient::new(vec![]);
|
||||
let mut pod = make_pod_with(NO_MEMORY_TOML, pwd.path().to_path_buf(), client).await;
|
||||
pod.try_post_run_consolidate()
|
||||
.await
|
||||
.expect("missing memory section must skip cleanly");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn no_thresholds_is_a_noop() {
|
||||
let pwd = tempfile::tempdir().unwrap();
|
||||
let layout = WorkspaceLayout::new(pwd.path().to_path_buf());
|
||||
write_n_staging(&layout, 5);
|
||||
|
||||
let client = MockClient::new(vec![]);
|
||||
let mut pod = make_pod_with(MEMORY_NO_THRESHOLDS_TOML, pwd.path().to_path_buf(), client).await;
|
||||
pod.try_post_run_consolidate()
|
||||
.await
|
||||
.expect("phase 2 disabled when both thresholds are None");
|
||||
|
||||
// No staging entries removed.
|
||||
assert_eq!(
|
||||
memory::consolidate::list_staging_entries(&layout).len(),
|
||||
5
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn empty_staging_skips() {
|
||||
let pwd = tempfile::tempdir().unwrap();
|
||||
let client = MockClient::new(vec![]);
|
||||
let mut pod = make_pod_with(FILES_THRESHOLD_TOML, pwd.path().to_path_buf(), client).await;
|
||||
pod.try_post_run_consolidate().await.unwrap();
|
||||
// No mock calls expected.
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn below_threshold_skips_and_does_not_take_lock() {
|
||||
let pwd = tempfile::tempdir().unwrap();
|
||||
let layout = WorkspaceLayout::new(pwd.path().to_path_buf());
|
||||
write_n_staging(&layout, 1); // threshold is 2
|
||||
|
||||
let client = MockClient::new(vec![]);
|
||||
let mut pod = make_pod_with(FILES_THRESHOLD_TOML, pwd.path().to_path_buf(), client).await;
|
||||
pod.try_post_run_consolidate().await.unwrap();
|
||||
|
||||
// Staging untouched.
|
||||
assert_eq!(
|
||||
memory::consolidate::list_staging_entries(&layout).len(),
|
||||
1
|
||||
);
|
||||
// Lock file must not exist.
|
||||
let lock_path = layout.staging_dir().join(".consolidation.lock");
|
||||
assert!(!lock_path.exists(), "lock file should not be created");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn fires_on_threshold_and_cleans_up_consumed_entries() {
|
||||
let pwd = tempfile::tempdir().unwrap();
|
||||
let layout = WorkspaceLayout::new(pwd.path().to_path_buf());
|
||||
write_n_staging(&layout, 2); // threshold is 2 — fires.
|
||||
|
||||
// Sub-worker is given a single text-only response. The Phase 2 prompt
|
||||
// tells it to call memory tools; the mock skips those, but `Worker::run`
|
||||
// returns Ok regardless once the LLM closes with a final text.
|
||||
let client = MockClient::new(vec![done("ok")]);
|
||||
let mut pod = make_pod_with(FILES_THRESHOLD_TOML, pwd.path().to_path_buf(), client).await;
|
||||
pod.try_post_run_consolidate().await.unwrap();
|
||||
|
||||
// Consumed entries removed.
|
||||
assert!(
|
||||
memory::consolidate::list_staging_entries(&layout).is_empty(),
|
||||
"consumed staging entries must be cleaned up"
|
||||
);
|
||||
// Lock removed too.
|
||||
let lock_path = layout.staging_dir().join(".consolidation.lock");
|
||||
assert!(
|
||||
!lock_path.exists(),
|
||||
"lock file must be removed on success"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn live_lock_held_by_other_pod_skips() {
|
||||
let pwd = tempfile::tempdir().unwrap();
|
||||
let layout = WorkspaceLayout::new(pwd.path().to_path_buf());
|
||||
write_n_staging(&layout, 3);
|
||||
|
||||
// Pre-acquire lock with this test's PID — definitely alive — and
|
||||
// *don't* release it. The Phase 2 path must skip without error.
|
||||
let _live_lock = memory::consolidate::StagingLock::acquire(
|
||||
&layout,
|
||||
std::process::id(),
|
||||
"other-pod",
|
||||
Vec::new(),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let client = MockClient::new(vec![]);
|
||||
let mut pod = make_pod_with(FILES_THRESHOLD_TOML, pwd.path().to_path_buf(), client).await;
|
||||
pod.try_post_run_consolidate()
|
||||
.await
|
||||
.expect("InUse lock must surface as graceful skip");
|
||||
|
||||
// Staging untouched: lock holder owns the snapshot, not us.
|
||||
assert_eq!(
|
||||
memory::consolidate::list_staging_entries(&layout).len(),
|
||||
3
|
||||
);
|
||||
}
|
||||
73
tickets/internal-worker-workflow.md
Normal file
73
tickets/internal-worker-workflow.md
Normal file
|
|
@ -0,0 +1,73 @@
|
|||
# 内部 Worker / 内部 Pod の Workflow 化
|
||||
|
||||
## 背景
|
||||
|
||||
INSOMNIA が内部で固定 prompt を持って disposable Worker / 専用 Pod を立ち上げている経路がいくつかある:
|
||||
|
||||
- Phase 1 活動抽出(`crates/memory/src/extract/prompt.rs::EXTRACT_SYSTEM_PROMPT`)
|
||||
- Phase 2 統合 + 整理(`tickets/memory-phase2-consolidation.md`、本チケット時点では実装中 / 直前)
|
||||
- Compact(`PromptCatalog::compact_system`)
|
||||
|
||||
これらは実装内 `&str` 定数や `PromptCatalog` の overlay で管理されており、prompt の調整や運用カスタマイズが「コード変更 + 再ビルド」を要する。一方、ユーザー向け `/<slug>` Workflow(`tickets/workflow.md`)は `<workspace_root>/.insomnia/memory/workflow/<slug>.md` に住み、frontmatter + Markdown 本文 + `requires` Knowledge inject を持つ宣言形式で運用できる。
|
||||
|
||||
両者を寄せ、内部 Worker / 内部 Pod の prompt + ツール surface + Knowledge 依存を **Workflow と同一仕様で記述** できる経路を用意する。これにより:
|
||||
|
||||
- 内部 prompt の運用調整が workspace 側でできる(コード変更不要)
|
||||
- Phase 2 の prompt 案 (`docs/plan/memory-prompts.md`) を workspace に直接 ingest できる
|
||||
- 将来 consolidation を独立 Pod に引き上げる際も、Workflow を submit する形に揃えられる
|
||||
|
||||
## 要件
|
||||
|
||||
### Workflow の役割拡張
|
||||
|
||||
`tickets/workflow.md` の Workflow 仕様は「ユーザーが `/<slug>` で submit する制約付き作業」だが、本チケットでは **内部トリガー(Pod 内部の状態遷移)から呼び出される Workflow** を一級扱いに広げる。
|
||||
|
||||
- 同じファイル形式(`memory/workflow/<slug>.md`)、同じ frontmatter / Linter
|
||||
- `user_invocable: false` で `/<slug>` 経路から見えなくする
|
||||
- `auto_invoke` は通常 Pod 用の system prompt 注入仕様のまま(内部 Workflow は通常 OFF)
|
||||
- 内部 Workflow を識別するキー(例: `internal_role`)と、必要なツール surface を表明する手段を frontmatter に追加する。具体 schema は実装で詰める
|
||||
|
||||
### 内部呼び出し経路
|
||||
|
||||
Pod 側の既存トリガー(Phase 1 post-run / Phase 2 staging 閾値 / Compact 閾値 等)は固定 `&str` の代わりに Workflow loader 経由で:
|
||||
|
||||
1. 内部識別キーで該当 Workflow を解決(衝突時は workspace 上書き優先、なければ insomnia bundled default)
|
||||
2. `requires` Knowledge を本文の前に inject
|
||||
3. Workflow 本文を sub-Worker / sub-Pod の prompt として渡す(system prompt 扱いか初回 submit 扱いかは内部用途で固定し、role ごとに揃える)
|
||||
4. 既存のツール登録ロジックは Workflow が表明したツール surface に従う
|
||||
|
||||
### Bundled defaults
|
||||
|
||||
ユーザー workspace に該当 Workflow が無い場合に備え、insomnia 同梱の default Workflow を読む層を `PromptCatalog` の overlay と整合する形で持つ。
|
||||
|
||||
- 既存の Pod prompt 4 層 overlay(builtin / user / workspace / pod-pack)と同じ優先順
|
||||
- bundled default の物理配置は実装で決める
|
||||
|
||||
### 関連チケットとの順序
|
||||
|
||||
- `tickets/workflow.md`(ユーザー向け Workflow 本体)が先行する。本チケットはその仕様を前提に「内部呼び出し経路」を追加する側
|
||||
- `tickets/memory-phase2-consolidation.md` は当面 `&str` 定数で実装してよい。本チケット完了時に Workflow 化に乗り換える
|
||||
- Phase 1 / Compact も同様に role ごとに段階移行
|
||||
|
||||
## 範囲外
|
||||
|
||||
- Workflow 仕様自体の本体実装(`tickets/workflow.md`)
|
||||
- 内部 Workflow の自動生成(consolidation の offer 等。`docs/plan/memory.md` §Offer 経路 / 将来検討)
|
||||
- 既存 `&str` 定数の物理削除タイミング(移行が完了した role ごとに削除する運用)
|
||||
- `auto_invoke` 注入予算の最適化(既存 Knowledge 常駐注入予算と合算する規約は `docs/plan/memory.md` 側)
|
||||
|
||||
## 完了条件
|
||||
|
||||
- 各内部 Worker / 内部 Pod(少なくとも Phase 1 / Phase 2 / Compact のうち、本チケット着手時点で実装済みのもの)が内部識別キー付き Workflow を解決して prompt とツール surface を組み立てる
|
||||
- workspace で `memory/workflow/<slug>.md` を上書きすれば内部 Worker の prompt が変わる
|
||||
- workspace に該当 Workflow が無い場合、bundled default が使われる
|
||||
- `user_invocable: false` の内部 Workflow は `/<slug>` 候補から除外され、ユーザーからは呼べない
|
||||
- 内部 Workflow も consolidation の自動書き込み禁止対象のまま(Linter で構造的担保、`workflow.md` と整合)
|
||||
- 単体テストで bundled default / workspace overlay / ツール surface 表明 + 解決 + 適用がカバーされる
|
||||
|
||||
## 参照
|
||||
|
||||
- 前提: `tickets/workflow.md`
|
||||
- 最初の利用者: `tickets/memory-phase2-consolidation.md`
|
||||
- 関連: `tickets/agent-skills.md`(外部 SKILL ingest 経路。本チケットの内部呼び出し経路とは別軸)
|
||||
- 設計: `docs/plan/workflow.md`、`docs/plan/memory.md`、`docs/plan/memory-prompts.md`
|
||||
Loading…
Reference in New Issue
Block a user