From 3d04f793de748f641b601983970242c26d29c765 Mon Sep 17 00:00:00 2001 From: Hare Date: Tue, 28 Apr 2026 12:58:33 +0900 Subject: [PATCH] =?UTF-8?q?memory=E3=82=92=E6=8A=BD=E5=87=BA=E3=81=99?= =?UTF-8?q?=E3=82=8B=E4=BB=95=E7=B5=84=E3=81=BF=E3=81=AE=E5=AE=9F=E8=A3=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .insomnia/manifest.toml | 2 + Cargo.lock | 1 + crates/manifest/src/config.rs | 5 + crates/manifest/src/defaults.rs | 5 + crates/manifest/src/lib.rs | 21 ++ crates/memory/Cargo.toml | 1 + crates/memory/src/extract/input.rs | 72 +++++++ crates/memory/src/extract/mod.rs | 36 ++++ crates/memory/src/extract/payload.rs | 88 ++++++++ crates/memory/src/extract/pointer.rs | 91 ++++++++ crates/memory/src/extract/prompt.rs | 32 +++ crates/memory/src/extract/staging.rs | 110 ++++++++++ crates/memory/src/extract/tool.rs | 163 ++++++++++++++ crates/memory/src/lib.rs | 2 + crates/pod/src/controller.rs | 32 +++ crates/pod/src/pod.rs | 274 ++++++++++++++++++++++++ crates/session-store/src/lib.rs | 2 +- crates/session-store/src/session.rs | 25 +++ crates/session-store/src/session_log.rs | 82 +++++++ docs/plan/memory.md | 6 +- tickets/memory-phase1-extract.md | 20 +- 21 files changed, 1065 insertions(+), 5 deletions(-) create mode 100644 .insomnia/manifest.toml create mode 100644 crates/memory/src/extract/input.rs create mode 100644 crates/memory/src/extract/mod.rs create mode 100644 crates/memory/src/extract/payload.rs create mode 100644 crates/memory/src/extract/pointer.rs create mode 100644 crates/memory/src/extract/prompt.rs create mode 100644 crates/memory/src/extract/staging.rs create mode 100644 crates/memory/src/extract/tool.rs diff --git a/.insomnia/manifest.toml b/.insomnia/manifest.toml new file mode 100644 index 00000000..82f04c9d --- /dev/null +++ b/.insomnia/manifest.toml @@ -0,0 +1,2 @@ +[memory] +extract_threshold = 1000 diff --git a/Cargo.lock b/Cargo.lock index d0cf04fd..cc0c11fc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1767,6 +1767,7 @@ dependencies = [ "thiserror 2.0.18", "tokio", "tracing", + "uuid", ] [[package]] diff --git a/crates/manifest/src/config.rs b/crates/manifest/src/config.rs index 87c2a8c5..3945e552 100644 --- a/crates/manifest/src/config.rs +++ b/crates/manifest/src/config.rs @@ -212,6 +212,11 @@ impl MemoryConfig { workspace_root: upper.workspace_root.or(self.workspace_root), query_result_limit: upper.query_result_limit.or(self.query_result_limit), query_excerpt_lines: upper.query_excerpt_lines.or(self.query_excerpt_lines), + extract_model: upper.extract_model.or(self.extract_model), + extract_threshold: upper.extract_threshold.or(self.extract_threshold), + extract_worker_max_input_tokens: upper + .extract_worker_max_input_tokens + .or(self.extract_worker_max_input_tokens), } } } diff --git a/crates/manifest/src/defaults.rs b/crates/manifest/src/defaults.rs index 5317c269..102a0832 100644 --- a/crates/manifest/src/defaults.rs +++ b/crates/manifest/src/defaults.rs @@ -45,3 +45,8 @@ pub const COMPACT_WORKER_MAX_INPUT_TOKENS: u64 = 50_000; /// Number of recently-touched files fed to the compact worker as /// default references. pub const COMPACT_DEFAULT_REFERENCE_COUNT: usize = 5; + +/// Cumulative input-token cap for the memory Phase 1 (extract) worker's +/// own LLM calls. Exceeding this aborts the extract run. +/// See [`crate::MemoryConfig::extract_worker_max_input_tokens`]. +pub const MEMORY_EXTRACT_WORKER_MAX_INPUT_TOKENS: u64 = 30_000; diff --git a/crates/manifest/src/lib.rs b/crates/manifest/src/lib.rs index 92630709..ca3f7bbf 100644 --- a/crates/manifest/src/lib.rs +++ b/crates/manifest/src/lib.rs @@ -49,6 +49,10 @@ pub struct PodManifest { /// Memory subsystem configuration. Presence in the manifest enables /// memory; the workspace root defaults to the Pod's pwd unless an /// explicit override is given. +/// +/// All fields are `Option`; defaults are applied at the consumer +/// (`.unwrap_or(defaults::...)`). This keeps cascade `merge` simple +/// (`upper.x.or(self.x)`) without a separate partial/resolved split. #[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct MemoryConfig { /// Override for the workspace root. When `None`, the Pod's pwd @@ -64,6 +68,23 @@ pub struct MemoryConfig { /// Ignored when the request omits `query`. `None` ⇒ tool default (3). #[serde(default)] pub query_excerpt_lines: Option, + /// Optional model for the Phase 1 (extract) worker. When `None`, + /// the main pod model is cloned via `clone_boxed()`. Lightweight + /// reasoning-capable models (Haiku / 4o-mini / Flash class) are + /// recommended. + #[serde(default)] + pub extract_model: Option, + /// Cumulative input-token threshold (since the last extract pointer) + /// that triggers a Phase 1 extract. `None` disables Phase 1 + /// entirely; memory tools and resident injection still work, only + /// the auto-extract trigger is dormant. + #[serde(default)] + pub extract_threshold: Option, + /// Cumulative input-token cap for the extract worker's own LLM + /// calls. Exceeding this aborts the extract run. `None` ⇒ + /// [`defaults::MEMORY_EXTRACT_WORKER_MAX_INPUT_TOKENS`]. + #[serde(default)] + pub extract_worker_max_input_tokens: Option, } /// Pod metadata. diff --git a/crates/memory/Cargo.toml b/crates/memory/Cargo.toml index 98f7bf8d..a896e63b 100644 --- a/crates/memory/Cargo.toml +++ b/crates/memory/Cargo.toml @@ -15,6 +15,7 @@ serde_json = "1.0.149" serde_yaml = "0.9.34" thiserror = "2.0.18" tracing = "0.1.44" +uuid = { version = "1.23.1", features = ["v7", "serde"] } [dev-dependencies] tempfile = "3.27.0" diff --git a/crates/memory/src/extract/input.rs b/crates/memory/src/extract/input.rs new file mode 100644 index 00000000..142c8fa8 --- /dev/null +++ b/crates/memory/src/extract/input.rs @@ -0,0 +1,72 @@ +//! Phase 1 sub-Worker への入力テキスト組み立て。 +//! +//! `crates/pod/src/pod.rs::build_summary_prompt` と同じ方針で +//! Item 列を flat な行に落とす(reasoning は省く、tool call は名前のみ、 +//! tool result は summary のみ)。conversation 全体を Markdown の単一 +//! セクションとして渡し、抽出指示は system prompt 側に寄せる。 + +use llm_worker::Item; + +/// 与えられた `items` を Phase 1 sub-Worker の最初の user 入力に整形する。 +pub fn build_extract_input(items: &[Item]) -> String { + let mut out = String::new(); + out.push_str( + "Extract activity logs from the conversation slice below. \ + Follow the system prompt's schema strictly and call `write_extracted` once.\n\n", + ); + out.push_str("## Conversation slice\n"); + out.push_str(&render_items(items)); + out.push_str("\n\nWhen you are done, call `write_extracted` and end the turn."); + out +} + +fn render_items(items: &[Item]) -> String { + let mut lines: Vec = Vec::new(); + for item in items { + match item { + Item::Message { role, content, .. } => { + let role_label = match role { + llm_worker::Role::User => "User", + llm_worker::Role::Assistant => "Assistant", + llm_worker::Role::System => "System", + }; + let text: String = content + .iter() + .map(|p| p.as_text()) + .collect::>() + .join(""); + lines.push(format!("[{role_label}] {text}")); + } + Item::ToolCall { name, .. } => { + lines.push(format!("[ToolCall] {name}")); + } + Item::ToolResult { summary, .. } => { + lines.push(format!("[ToolResult] {summary}")); + } + Item::Reasoning { .. } => {} + } + } + lines.join("\n\n") +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn renders_user_assistant_pair_and_tool_calls() { + let items = vec![ + Item::user_message("hello"), + Item::assistant_message("hi"), + Item::tool_call("c1", "read_file", "{}"), + Item::tool_result("c1", "ok"), + Item::reasoning("internal scratch — should be skipped"), + ]; + let s = build_extract_input(&items); + assert!(s.contains("[User] hello")); + assert!(s.contains("[Assistant] hi")); + assert!(s.contains("[ToolCall] read_file")); + assert!(s.contains("[ToolResult] ok")); + assert!(!s.contains("scratch")); + } +} diff --git a/crates/memory/src/extract/mod.rs b/crates/memory/src/extract/mod.rs new file mode 100644 index 00000000..31517874 --- /dev/null +++ b/crates/memory/src/extract/mod.rs @@ -0,0 +1,36 @@ +//! Phase 1: 活動抽出。 +//! +//! 通常 Pod の post-run hook で発火する disposable Worker と、その +//! 出力を `/.insomnia/memory/_staging/.json` に書き出す +//! ヘルパーを提供する。Pod 側はこのモジュールから: +//! +//! - [`EXTRACT_SYSTEM_PROMPT`] を sub-Worker の system prompt に +//! - [`build_extract_input`] を sub-Worker の最初の user 入力に +//! - [`write_extracted_tool`] を唯一のツールとして +//! - [`write_staging`] で受け取った JSON を staging に書き出し +//! +//! の順で組み立てる。pointer 永続化(session-store の +//! `LogEntry::Extension`、domain `"memory.extract"`)は Pod 側が責務を持つ。 +//! +//! 出力 JSON の wrap は [`write_staging`] が `source: { session_id, range }` +//! を機械付与する形で担当し、LLM には source を推論させない。 + +mod input; +mod payload; +mod pointer; +mod prompt; +mod staging; +mod tool; + +pub use input::build_extract_input; +pub use payload::{ + AttemptEntry, DecisionEntry, DiscussionEntry, ExtractedPayload, RequestEntry, StagingRecord, +}; +pub use pointer::{ExtractPointerPayload, fold_pointer}; +pub use prompt::EXTRACT_SYSTEM_PROMPT; +pub use staging::{StagingError, write_staging}; +pub use tool::{ExtractWorkerContext, write_extracted_tool}; + +/// session-store `LogEntry::Extension` で使う domain 名。 +/// pointer の永続化と読み出しはこの定数を使う側が一致している必要がある。 +pub const EXTRACT_DOMAIN: &str = "memory.extract"; diff --git a/crates/memory/src/extract/payload.rs b/crates/memory/src/extract/payload.rs new file mode 100644 index 00000000..8e0e7d5a --- /dev/null +++ b/crates/memory/src/extract/payload.rs @@ -0,0 +1,88 @@ +//! Phase 1 抽出の出力 schema。 +//! +//! LLM は [`ExtractedPayload`] そのもの(source 抜き)を返し、Pod 側 +//! ラッパーが [`StagingRecord`] に組み立てて staging へ書き出す。 +//! source は機械付与する契約 (`docs/plan/memory.md` §Phase 1)。 + +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +use crate::schema::SourceRef; + +/// LLM が返す活動ログ候補の集合。すべて optional(空配列は許容)。 +#[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema)] +pub struct ExtractedPayload { + #[serde(default)] + pub decisions: Vec, + #[serde(default)] + pub discussions: Vec, + #[serde(default)] + pub attempts: Vec, + #[serde(default)] + pub requests: Vec, +} + +impl ExtractedPayload { + /// すべての配列が空であれば true。空ペイロードは + /// "Nothing to save" 扱いで staging への書き込みを省いてよい。 + pub fn is_empty(&self) -> bool { + self.decisions.is_empty() + && self.discussions.is_empty() + && self.attempts.is_empty() + && self.requests.is_empty() + } +} + +/// 判断したこと(選択肢 + 選んだ + 根拠)。 +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] +pub struct DecisionEntry { + /// 検討された選択肢の列挙。 + pub options: Vec, + /// 採用された選択肢。 + pub chosen: String, + /// 採用理由 / 根拠。 + pub rationale: String, +} + +/// 議論したこと(トピック + 論点)。結論が出ていなくてもよい。 +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] +pub struct DiscussionEntry { + /// 議論の主題。 + pub topic: String, + /// 主題の中で挙がった論点 / 観点。 + pub points: Vec, +} + +/// 試したこと(試行 + 結果 + 成否)。 +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] +pub struct AttemptEntry { + /// 何を試したか。 + pub action: String, + /// 試した結果。 + pub result: String, + /// 試行が目的に対して成功したか。失敗 / 部分成功も含めて bool で表現する。 + pub succeeded: bool, +} + +/// ユーザー submit の構造化要約。 +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] +pub struct RequestEntry { + /// ユーザーの意図 / ゴール。 + pub intent: String, + /// 対象ファイル / モジュール / 機能(任意)。 + #[serde(default, skip_serializing_if = "Option::is_none")] + pub target: Option, + /// 一文サマリ。 + pub summary: String, +} + +/// staging に書き出される 1 ファイル分のレコード。 +/// +/// `source` は Pod 側ラッパーが session_id と log entry range を +/// 機械付与する。LLM はこのフィールドを見ない / 推論しない。 +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StagingRecord { + pub source: SourceRef, + #[serde(flatten)] + pub payload: ExtractedPayload, +} diff --git a/crates/memory/src/extract/pointer.rs b/crates/memory/src/extract/pointer.rs new file mode 100644 index 00000000..ebe30b74 --- /dev/null +++ b/crates/memory/src/extract/pointer.rs @@ -0,0 +1,91 @@ +//! `LogEntry::Extension { domain: "memory.extract", payload }` の payload 形式と +//! restore 時の fold ヘルパー。memory crate がドメインを所有するので、 +//! session-store / Pod は payload 構造を知らない。 + +use serde::{Deserialize, Serialize}; + +use super::EXTRACT_DOMAIN; + +/// Phase 1 完了境界の永続化 payload。session log の Extension entry +/// として 1 回ずつ書かれ、最新の 1 件が現行 pointer として有効になる。 +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct ExtractPointerPayload { + /// 直近 extract が処理した最後の session-store HashedEntry の index。 + /// 次回の `source.range.start` はこの値 + 1。 + pub processed_through_entry: usize, + /// 直近 extract 時点の `history.len()`。次回入力は + /// `history[processed_through_history_len..]` を切り出す。 + pub processed_through_history_len: usize, + /// 書き出した staging file の UUIDv7 文字列。LLM が空 payload を返した + /// 場合は staging file を作らず空文字列で記録する(pointer は前進する)。 + pub staging_id: String, +} + +/// `RestoredState.extensions` から最新の Phase 1 pointer を取り出す。 +/// 未抽出セッションでは `None`。 +pub fn fold_pointer( + extensions: &[(String, serde_json::Value)], +) -> Option { + extensions + .iter() + .rev() + .find(|(domain, _)| domain == EXTRACT_DOMAIN) + .and_then(|(_, value)| serde_json::from_value(value.clone()).ok()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn fold_returns_latest_when_multiple_present() { + let exts = vec![ + ( + EXTRACT_DOMAIN.to_string(), + serde_json::json!({ + "processed_through_entry": 5, + "processed_through_history_len": 4, + "staging_id": "old" + }), + ), + ( + "other.domain".to_string(), + serde_json::json!({ "x": 1 }), + ), + ( + EXTRACT_DOMAIN.to_string(), + serde_json::json!({ + "processed_through_entry": 11, + "processed_through_history_len": 8, + "staging_id": "new" + }), + ), + ]; + let p = fold_pointer(&exts).unwrap(); + assert_eq!(p.processed_through_entry, 11); + assert_eq!(p.processed_through_history_len, 8); + assert_eq!(p.staging_id, "new"); + } + + #[test] + fn fold_returns_none_when_absent() { + let exts = vec![( + "other.domain".to_string(), + serde_json::json!({ "x": 1 }), + )]; + assert!(fold_pointer(&exts).is_none()); + } + + #[test] + fn fold_skips_malformed_entries() { + let exts = vec![ + ( + EXTRACT_DOMAIN.to_string(), + serde_json::json!({ "wrong_shape": true }), + ), + ]; + // 現状は最新を取り出して JSON 不一致なら None。古いものに fallback + // しないのは、壊れた最新を黙って無視すると意図しない再抽出を招くため。 + assert!(fold_pointer(&exts).is_none()); + } +} diff --git a/crates/memory/src/extract/prompt.rs b/crates/memory/src/extract/prompt.rs new file mode 100644 index 00000000..767bdd11 --- /dev/null +++ b/crates/memory/src/extract/prompt.rs @@ -0,0 +1,32 @@ +//! Phase 1 sub-Worker の system prompt。 +//! +//! 内容は `docs/plan/memory-prompts.md` §共通原則 / §Phase 1 を縮約。 +//! 「派生物を作らず、起きたことを抽出する」段階に縛り、JSON schema +//! 準拠以外の自由文を許さない。 + +pub const EXTRACT_SYSTEM_PROMPT: &str = r#"You are the Phase 1 activity extractor for an INSOMNIA memory subsystem. + +Your single job: read the supplied conversation slice and emit a structured JSON record of "what happened" via the `write_extracted` tool. You are not consolidating, summarising, or generating knowledge — that is a later phase's job. + +# Hard rules + +- Call `write_extracted` exactly once. Do not narrate, ask questions, or send any other tool output. +- The argument is an object with four arrays: `decisions`, `discussions`, `attempts`, `requests`. Any of them may be empty. If nothing in the slice is worth recording, call `write_extracted({"decisions": [], "discussions": [], "attempts": [], "requests": []})` and stop. +- Do NOT include `source`, `session_id`, entry indices, timestamps, or any provenance metadata. The wrapper attaches them mechanically. +- Do NOT add free-form commentary, summaries, or explanatory prose outside the schema fields. + +# Extraction guidance + +- `decisions`: judgements made during the slice. Each entry needs `options` (the alternatives considered), `chosen` (what was picked), and `rationale` (why). +- `discussions`: topics that were debated. `topic` plus `points` (the considerations raised). Open / unresolved discussions are valid. +- `attempts`: things that were tried. `action`, `result`, and a `succeeded` boolean. Partial success is `false` with the result text describing the partial outcome. +- `requests`: structured summaries of user submissions. `intent` (what the user wants), optional `target` (file / module / feature), and a one-line `summary`. + +# Quality bar + +- Drop one-off chit-chat, shallow questions, and turn-by-turn progress noise. Keep entries with long-term reference value. +- Do not duplicate content already captured by static project docs (AGENTS.md, plan documents) — those are not "what happened in this slice". +- Prefer concise, fact-shaped strings. Do not pad rationale or summary fields. + +When you have produced the JSON, call `write_extracted` and end the turn. No follow-up text. +"#; diff --git a/crates/memory/src/extract/staging.rs b/crates/memory/src/extract/staging.rs new file mode 100644 index 00000000..16595daf --- /dev/null +++ b/crates/memory/src/extract/staging.rs @@ -0,0 +1,110 @@ +//! `/.insomnia/memory/_staging/.json` への書き出しヘルパー。 +//! +//! 1 件 1 ファイル、UUIDv7 命名(短命なので衝突回避と順序を兼ねる)。 +//! `source` を機械付与した [`StagingRecord`] 形式で保存する。 + +use std::fs; +use std::path::PathBuf; + +use uuid::Uuid; + +use crate::extract::payload::{ExtractedPayload, StagingRecord}; +use crate::schema::SourceRef; +use crate::workspace::WorkspaceLayout; + +/// staging 書き出し時のエラー。 +#[derive(Debug, thiserror::Error)] +pub enum StagingError { + #[error("failed to create staging dir {}: {source}", .path.display())] + CreateDir { + path: PathBuf, + #[source] + source: std::io::Error, + }, + #[error("failed to write staging file {}: {source}", .path.display())] + Write { + path: PathBuf, + #[source] + source: std::io::Error, + }, + #[error("failed to serialize staging record: {0}")] + Serialize(#[from] serde_json::Error), +} + +/// `payload` を `source` で wrap して staging に書き出す。 +/// +/// 戻り値は割り当てられた staging file の (id, path)。`payload` が +/// 完全に空の場合は呼び出し側が事前に `is_empty()` で skip 推奨だが、 +/// この関数は空でも正規に書き出す(仕様 §Phase 1 で空配列許容と +/// 明記されており、書く / 書かないの判断は呼び出し側に委ねる)。 +pub fn write_staging( + layout: &WorkspaceLayout, + source: SourceRef, + payload: ExtractedPayload, +) -> Result<(Uuid, PathBuf), StagingError> { + let staging_dir = layout.staging_dir(); + fs::create_dir_all(&staging_dir).map_err(|source| StagingError::CreateDir { + path: staging_dir.clone(), + source, + })?; + + let id = Uuid::now_v7(); + let path = staging_dir.join(format!("{id}.json")); + let record = StagingRecord { source, payload }; + let json = serde_json::to_string_pretty(&record)?; + fs::write(&path, json).map_err(|source| StagingError::Write { + path: path.clone(), + source, + })?; + Ok((id, path)) +} + +#[cfg(test)] +mod tests { + use super::*; + + use crate::extract::payload::{DecisionEntry, ExtractedPayload}; + + #[test] + fn writes_record_with_machine_attached_source() { + let tmp = tempfile::TempDir::new().unwrap(); + let layout = WorkspaceLayout::new(tmp.path().to_path_buf()); + + let source = SourceRef { + session_id: "sess-1".into(), + range: [3, 7], + }; + let payload = ExtractedPayload { + decisions: vec![DecisionEntry { + options: vec!["a".into(), "b".into()], + chosen: "a".into(), + rationale: "shorter".into(), + }], + ..Default::default() + }; + let (id, path) = write_staging(&layout, source.clone(), payload).unwrap(); + assert_eq!(path.parent().unwrap(), layout.staging_dir()); + assert!(path.file_name().unwrap().to_string_lossy().contains(&id.to_string())); + + let written: StagingRecord = + serde_json::from_str(&fs::read_to_string(&path).unwrap()).unwrap(); + assert_eq!(written.source.session_id, "sess-1"); + assert_eq!(written.source.range, [3, 7]); + assert_eq!(written.payload.decisions.len(), 1); + } + + #[test] + fn empty_payload_is_written_verbatim() { + let tmp = tempfile::TempDir::new().unwrap(); + let layout = WorkspaceLayout::new(tmp.path().to_path_buf()); + let source = SourceRef { + session_id: "sess".into(), + range: [0, 0], + }; + let (_, path) = + write_staging(&layout, source, ExtractedPayload::default()).unwrap(); + let written: StagingRecord = + serde_json::from_str(&fs::read_to_string(&path).unwrap()).unwrap(); + assert!(written.payload.is_empty()); + } +} diff --git a/crates/memory/src/extract/tool.rs b/crates/memory/src/extract/tool.rs new file mode 100644 index 00000000..48a9456f --- /dev/null +++ b/crates/memory/src/extract/tool.rs @@ -0,0 +1,163 @@ +//! `write_extracted` ツール実装と sub-Worker 用 context。 +//! +//! sub-Worker からは extract worker が出した [`ExtractedPayload`] を +//! 受け取って `Mutex` 越しに [`ExtractWorkerContext`] に置くだけ。 +//! Pod 側はランループ完了後に `take_payload()` で取り出して +//! [`super::staging::write_staging`] に渡す。 + +use std::sync::{Arc, Mutex}; + +use async_trait::async_trait; +use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput}; + +use crate::extract::payload::ExtractedPayload; + +const WRITE_EXTRACTED_DESCRIPTION: &str = "Submit the final activity-log JSON for this slice. \ +Pass an object with `decisions`, `discussions`, `attempts`, and `requests` arrays (any may be empty). \ +Call this exactly once and end the turn. Do not include `source`, session metadata, or free-form prose — \ +the wrapper attaches provenance mechanically."; + +/// Phase 1 sub-Worker の出力受け口。`ExtractedPayload` 1 件をホストする。 +#[derive(Debug, Default)] +pub struct ExtractWorkerContext { + payload: Mutex>, + /// `write_extracted` が複数回呼ばれた回数(debug 用)。 + /// 後勝ちで上書きするが、Pod 側で warn を出したい場合に参照する。 + call_count: Mutex, +} + +impl ExtractWorkerContext { + pub fn new() -> Self { + Self::default() + } + + /// sub-Worker 終了後に Pod が呼んで payload を取り出す。 + /// 一度も `write_extracted` が呼ばれなければ `None`。 + pub fn take_payload(&self) -> Option { + self.payload + .lock() + .expect("extract worker payload poisoned") + .take() + } + + pub fn call_count(&self) -> usize { + *self + .call_count + .lock() + .expect("extract worker call_count poisoned") + } +} + +struct WriteExtractedTool { + ctx: Arc, +} + +#[async_trait] +impl Tool for WriteExtractedTool { + async fn execute(&self, input_json: &str) -> Result { + let payload: ExtractedPayload = serde_json::from_str(input_json).map_err(|e| { + ToolError::InvalidArgument(format!("invalid write_extracted input: {e}")) + })?; + let summary = format!( + "Recorded activity log: decisions={} discussions={} attempts={} requests={}", + payload.decisions.len(), + payload.discussions.len(), + payload.attempts.len(), + payload.requests.len(), + ); + { + let mut guard = self + .ctx + .payload + .lock() + .expect("extract worker payload poisoned"); + *guard = Some(payload); + } + { + let mut count = self + .ctx + .call_count + .lock() + .expect("extract worker call_count poisoned"); + *count += 1; + } + Ok(ToolOutput { + summary, + content: None, + }) + } +} + +/// sub-Worker に register する `write_extracted` ツール定義を返す。 +pub fn write_extracted_tool(ctx: Arc) -> ToolDefinition { + Arc::new(move || { + let schema = schemars::schema_for!(ExtractedPayload); + let schema_value = serde_json::to_value(schema).unwrap_or(serde_json::json!({})); + let meta = ToolMeta::new("write_extracted") + .description(WRITE_EXTRACTED_DESCRIPTION) + .input_schema(schema_value); + let tool: Arc = Arc::new(WriteExtractedTool { ctx: ctx.clone() }); + (meta, tool) + }) +} + +#[cfg(test)] +mod tests { + use super::*; + + use llm_worker::tool::Tool; + + #[tokio::test] + async fn write_extracted_records_payload() { + let ctx = Arc::new(ExtractWorkerContext::new()); + let tool: Arc = Arc::new(WriteExtractedTool { ctx: ctx.clone() }); + let input = serde_json::json!({ + "decisions": [{ + "options": ["a", "b"], + "chosen": "a", + "rationale": "test" + }], + "discussions": [], + "attempts": [], + "requests": [] + }) + .to_string(); + let out = tool.execute(&input).await.unwrap(); + assert!(out.summary.contains("decisions=1")); + let payload = ctx.take_payload().unwrap(); + assert_eq!(payload.decisions.len(), 1); + assert_eq!(ctx.call_count(), 1); + } + + #[tokio::test] + async fn last_call_wins_on_multiple_invocations() { + let ctx = Arc::new(ExtractWorkerContext::new()); + let tool: Arc = Arc::new(WriteExtractedTool { ctx: ctx.clone() }); + + let first = serde_json::json!({"decisions": [], "discussions": [], "attempts": [], "requests": []}) + .to_string(); + tool.execute(&first).await.unwrap(); + + let second = serde_json::json!({ + "decisions": [], + "discussions": [], + "attempts": [{"action": "x", "result": "ok", "succeeded": true}], + "requests": [] + }) + .to_string(); + tool.execute(&second).await.unwrap(); + + let payload = ctx.take_payload().unwrap(); + assert_eq!(payload.attempts.len(), 1); + assert_eq!(ctx.call_count(), 2); + } + + #[tokio::test] + async fn invalid_json_returns_invalid_argument() { + let ctx = Arc::new(ExtractWorkerContext::new()); + let tool: Arc = Arc::new(WriteExtractedTool { ctx: ctx.clone() }); + let res = tool.execute("not json").await; + assert!(matches!(res, Err(ToolError::InvalidArgument(_)))); + assert!(ctx.take_payload().is_none()); + } +} diff --git a/crates/memory/src/lib.rs b/crates/memory/src/lib.rs index e3b8147c..94c69dd8 100644 --- a/crates/memory/src/lib.rs +++ b/crates/memory/src/lib.rs @@ -7,6 +7,7 @@ //! denying them at the Scope level when memory is enabled. pub mod error; +pub mod extract; pub mod linter; pub mod resident; pub mod schema; @@ -16,6 +17,7 @@ pub mod tool; pub mod workspace; pub use error::{LintError, LintWarning, MemoryError}; +pub use extract::ExtractPointerPayload; pub use linter::{LintReport, Linter}; pub use resident::{ResidentKnowledgeEntry, collect_resident_knowledge}; pub use scope::deny_write_rules; diff --git a/crates/pod/src/controller.rs b/crates/pod/src/controller.rs index 7c335fb8..7ae5637c 100644 --- a/crates/pod/src/controller.rs +++ b/crates/pod/src/controller.rs @@ -332,6 +332,14 @@ impl PodController { .await; if new_status == PodStatus::Idle { + if let Err(e) = pod.try_post_run_extract().await { + tracing::warn!(error = %e, "Post-run memory extract error"); + alerter.alert( + AlertLevel::Warn, + AlertSource::Pod, + format!("post-run memory extract error: {e}"), + ); + } if let Err(e) = pod.try_post_run_compact().await { tracing::warn!(error = %e, "Post-run compaction error"); alerter.alert( @@ -382,6 +390,14 @@ impl PodController { .await; if new_status == PodStatus::Idle { + if let Err(e) = pod.try_post_run_extract().await { + tracing::warn!(error = %e, "Post-run memory extract error"); + alerter.alert( + AlertLevel::Warn, + AlertSource::Pod, + format!("post-run memory extract error: {e}"), + ); + } if let Err(e) = pod.try_post_run_compact().await { tracing::warn!(error = %e, "Post-run compaction error"); alerter.alert( @@ -429,6 +445,14 @@ impl PodController { .await; if new_status == PodStatus::Idle { + if let Err(e) = pod.try_post_run_extract().await { + tracing::warn!(error = %e, "Post-run memory extract error"); + alerter.alert( + AlertLevel::Warn, + AlertSource::Pod, + format!("post-run memory extract error: {e}"), + ); + } if let Err(e) = pod.try_post_run_compact().await { tracing::warn!(error = %e, "Post-run compaction error"); alerter.alert( @@ -518,6 +542,14 @@ impl PodController { .await; if new_status == PodStatus::Idle { + if let Err(e) = pod.try_post_run_extract().await { + tracing::warn!(error = %e, "Post-run memory extract error"); + alerter.alert( + AlertLevel::Warn, + AlertSource::Pod, + format!("post-run memory extract error: {e}"), + ); + } if let Err(e) = pod.try_post_run_compact().await { tracing::warn!(error = %e, "Post-run compaction error"); alerter.alert( diff --git a/crates/pod/src/pod.rs b/crates/pod/src/pod.rs index 696a2153..9e0ea3f4 100644 --- a/crates/pod/src/pod.rs +++ b/crates/pod/src/pod.rs @@ -1,4 +1,5 @@ use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; use llm_worker::Item; @@ -124,6 +125,17 @@ pub struct Pod { /// Phase 2 (consolidation) workers set this to false so the /// agentic worker pulls knowledge through the search tools instead. inject_resident_knowledge: bool, + /// Phase 1 (memory.extract) reentry guard. `true` while an extract + /// worker is running; subsequent triggers are skipped per spec + /// (`docs/plan/memory.md` §Phase 1 並走防止). `Arc` so + /// the flag survives across `try_post_run_extract` calls without a + /// `&mut self` race. + extract_in_flight: Arc, + /// Last completed Phase 1 boundary. `None` means no extract has + /// run yet on this session — next extract starts from entry 0. + /// Restored from `RestoredState.extensions` on `restore`, updated + /// after each successful extract via `save_extension`. + extract_pointer: Mutex>, } impl Pod { @@ -171,6 +183,8 @@ impl Pod { callback_socket: None, prompts, inject_resident_knowledge: true, + extract_in_flight: Arc::new(AtomicBool::new(false)), + extract_pointer: Mutex::new(None), }; pod.apply_prune_from_manifest(); Ok(pod) @@ -237,6 +251,7 @@ impl Pod { } let prompts = PromptCatalog::builtins_only()?; + let extract_pointer = memory::extract::fold_pointer(&state.extensions); let mut pod = Self { manifest, worker: Some(worker), @@ -259,6 +274,8 @@ impl Pod { callback_socket: None, prompts, inject_resident_knowledge: true, + extract_in_flight: Arc::new(AtomicBool::new(false)), + extract_pointer: Mutex::new(extract_pointer), }; pod.apply_prune_from_manifest(); Ok(pod) @@ -1213,6 +1230,256 @@ impl Pod { let worker = self.worker.as_ref().expect("worker taken during run"); Ok(worker.client().clone_boxed()) } + + /// Build the LlmClient for the Phase 1 (memory.extract) Worker. + /// + /// Uses `memory.extract_model` from manifest if set, otherwise clones + /// the main client. + fn build_extractor_client( + &self, + memory_cfg: &manifest::MemoryConfig, + ) -> Result, PodError> { + if let Some(ref m) = memory_cfg.extract_model { + let client = provider::build_client(m)?; + return Ok(client); + } + let worker = self.worker.as_ref().expect("worker taken during run"); + Ok(worker.client().clone_boxed()) + } + + /// Cumulative `input_total_tokens` of usage records added after the + /// item-count boundary `history_len_pointer`. Used by Phase 1 trigger. + /// + /// `history_len_pointer == 0` means "everything so far". + fn cumulative_input_tokens_since(&self, history_len_pointer: usize) -> u64 { + self.usage_history + .lock() + .expect("usage_history poisoned") + .iter() + .filter(|r| r.history_len > history_len_pointer) + .map(|r| r.input_total_tokens) + .sum() + } + + /// Phase 1 (memory.extract) post-run trigger. + /// + /// Called by the Controller **before** [`try_post_run_compact`] so + /// the extract worker sees a stable session-log entry range + /// (compact rewrites history). Best-effort: failures are logged but + /// not propagated. + /// + /// Behaviour follows `docs/plan/memory.md` §Phase 1 並走防止: + /// in-flight 中の trigger は skip し、完了時点で閾値再評価する + /// (the loop below). Pending state is not retained — the + /// re-evaluation happens naturally because the in-memory pointer + /// has advanced. + pub async fn try_post_run_extract(&mut self) -> Result<(), PodError> { + let Some(memory_cfg) = self.manifest.memory.clone() else { + return Ok(()); + }; + let Some(threshold) = memory_cfg.extract_threshold else { + return Ok(()); + }; + + loop { + // CAS the in-flight flag. If another task is already running + // an extract for this Pod, skip per spec. + if self + .extract_in_flight + .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) + .is_err() + { + return Ok(()); + } + let result = self.run_extract_once(&memory_cfg, threshold).await; + self.extract_in_flight.store(false, Ordering::Release); + + match result { + Ok(ExtractDecision::Skipped) => return Ok(()), + Ok(ExtractDecision::Completed) => { + // Re-evaluate threshold against the newly advanced + // pointer. In the current synchronous architecture + // this normally exits via Skipped on the next pass, + // but the loop is forward-looking for the case + // where new activity piles up while extract runs. + continue; + } + Err(e) => { + tracing::warn!(error = %e, "Phase 1 extract failed"); + self.alert( + AlertLevel::Warn, + AlertSource::Pod, + format!("memory Phase 1 extract failed: {e}"), + ); + return Ok(()); + } + } + } + } + + /// Single extract iteration: snapshot pointer, decide whether to + /// fire, run the worker if so, persist results and the new pointer. + async fn run_extract_once( + &mut self, + memory_cfg: &manifest::MemoryConfig, + threshold: u64, + ) -> Result { + use memory::extract; + + let pointer_snapshot = self + .extract_pointer + .lock() + .expect("extract_pointer poisoned") + .clone(); + let processed_history_len = pointer_snapshot + .as_ref() + .map(|p| p.processed_through_history_len) + .unwrap_or(0); + + let tokens_since = self.cumulative_input_tokens_since(processed_history_len); + if tokens_since < threshold { + return Ok(ExtractDecision::Skipped); + } + + let current_history_len = self.worker.as_ref().expect("worker present").history().len(); + if current_history_len <= processed_history_len { + return Ok(ExtractDecision::Skipped); + } + + // Read the session log to get the current entry count. This is + // the boundary for the source.range end_entry. Called once per + // extract, on a small local file. + let entries_now = self.store.read_all(self.session_id).await?.len(); + if entries_now == 0 { + return Ok(ExtractDecision::Skipped); + } + let end_entry = entries_now - 1; + let start_entry = pointer_snapshot + .as_ref() + .map(|p| p.processed_through_entry + 1) + .unwrap_or(0); + if start_entry > end_entry { + return Ok(ExtractDecision::Skipped); + } + + let items_to_extract = self + .worker + .as_ref() + .expect("worker present") + .history()[processed_history_len..current_history_len] + .to_vec(); + + let layout = memory::WorkspaceLayout::resolve(memory_cfg, &self.pwd); + let cap = memory_cfg + .extract_worker_max_input_tokens + .unwrap_or(manifest::defaults::MEMORY_EXTRACT_WORKER_MAX_INPUT_TOKENS); + + let client = self.build_extractor_client(memory_cfg)?; + let mut extract_worker = Worker::new(client) + .system_prompt(extract::EXTRACT_SYSTEM_PROMPT) + .temperature(0.0); + extract_worker.set_max_tokens(4096); + + // Cumulative input-token meter + interceptor (mirror of + // CompactWorkerInterceptor). Aborts the extract worker if its + // own input usage crosses the cap. + let input_so_far = Arc::new(std::sync::atomic::AtomicU64::new(0)); + { + let acc = input_so_far.clone(); + extract_worker.on_usage(move |event| { + if let Some(tokens) = event.input_tokens { + acc.fetch_add(tokens, Ordering::Relaxed); + } + }); + } + extract_worker.set_interceptor(MemoryExtractWorkerInterceptor { + input_so_far: input_so_far.clone(), + max_input_tokens: cap, + }); + + let ctx = Arc::new(extract::ExtractWorkerContext::new()); + extract_worker.register_tool(extract::write_extracted_tool(ctx.clone())); + + let input_text = extract::build_extract_input(&items_to_extract); + extract_worker.run(input_text).await.map_err(PodError::Worker)?; + + let payload = ctx.take_payload().unwrap_or_else(|| { + tracing::warn!( + "Phase 1 extract worker did not call write_extracted; \ + advancing pointer with empty payload" + ); + extract::ExtractedPayload::default() + }); + + let staging_id = if payload.is_empty() { + String::new() + } else { + let source = memory::schema::SourceRef { + session_id: self.session_id.to_string(), + range: [start_entry as u64, end_entry as u64], + }; + let (id, _) = extract::write_staging(&layout, source, payload) + .map_err(PodError::ExtractStaging)?; + id.to_string() + }; + + let pointer_payload = extract::ExtractPointerPayload { + processed_through_entry: end_entry, + processed_through_history_len: current_history_len, + staging_id, + }; + let payload_value = serde_json::to_value(&pointer_payload) + .expect("ExtractPointerPayload is always JSON-serializable"); + session_store::save_extension( + &self.store, + self.session_id, + &mut self.head_hash, + extract::EXTRACT_DOMAIN, + payload_value, + ) + .await?; + + *self + .extract_pointer + .lock() + .expect("extract_pointer poisoned") = Some(pointer_payload); + + Ok(ExtractDecision::Completed) + } +} + +/// Outcome of a single Phase 1 extract iteration. Internal to +/// `try_post_run_extract` / `run_extract_once`. +enum ExtractDecision { + /// Threshold not reached, or no items to extract. + Skipped, + /// Extract ran and pointer advanced. Caller re-evaluates threshold. + Completed, +} + +/// Pre-request interceptor for the Phase 1 extract worker. Aborts when +/// cumulative input tokens cross `max_input_tokens`. Mirror of +/// `compact::worker::CompactWorkerInterceptor`; kept separate so each +/// subsystem can tune its own message and budget. +struct MemoryExtractWorkerInterceptor { + input_so_far: Arc, + max_input_tokens: u64, +} + +#[async_trait] +impl llm_worker::interceptor::Interceptor for MemoryExtractWorkerInterceptor { + async fn pre_llm_request( + &self, + _context: &mut Vec, + ) -> llm_worker::interceptor::PreRequestAction { + if self.input_so_far.load(Ordering::Relaxed) > self.max_input_tokens { + return llm_worker::interceptor::PreRequestAction::Cancel(format!( + "Phase 1 extract worker input exceeded {} tokens", + self.max_input_tokens + )); + } + llm_worker::interceptor::PreRequestAction::Continue + } } impl Pod, St> { @@ -1296,6 +1563,8 @@ impl Pod, St> { callback_socket: None, prompts, inject_resident_knowledge: true, + extract_in_flight: Arc::new(AtomicBool::new(false)), + extract_pointer: Mutex::new(None), }; pod.apply_prune_from_manifest(); Ok(pod) @@ -1360,6 +1629,8 @@ impl Pod, St> { callback_socket: Some(callback_socket), prompts, inject_resident_knowledge: true, + extract_in_flight: Arc::new(AtomicBool::new(false)), + extract_pointer: Mutex::new(None), }; pod.apply_prune_from_manifest(); Ok(pod) @@ -1555,6 +1826,9 @@ pub enum PodError { #[error(transparent)] PromptCatalog(#[from] CatalogError), + + #[error("memory Phase 1 staging write failed: {0}")] + ExtractStaging(#[source] memory::extract::StagingError), } /// Build the Pod's runtime [`Scope`] from the manifest, layering the diff --git a/crates/session-store/src/lib.rs b/crates/session-store/src/lib.rs index d3cef025..e0372afd 100644 --- a/crates/session-store/src/lib.rs +++ b/crates/session-store/src/lib.rs @@ -37,7 +37,7 @@ pub use fs_store::FsStore; pub use session::{ SessionStartState, create_compacted_session, create_session, create_session_with_id, ensure_head_or_fork, fork, fork_at, restore, save_cache_locked, save_cache_unlocked, - save_config_changed, save_delta, save_outcome, save_turn_end, save_usage, + save_config_changed, save_delta, save_extension, save_outcome, save_turn_end, save_usage, }; pub use session_log::{ EntryHash, HashedEntry, LogEntry, Outcome, RestoredState, SessionOrigin, UsageRecord, diff --git a/crates/session-store/src/session.rs b/crates/session-store/src/session.rs index a142e0ce..9fb183f7 100644 --- a/crates/session-store/src/session.rs +++ b/crates/session-store/src/session.rs @@ -326,6 +326,31 @@ pub async fn save_cache_unlocked( .await } +/// Log an `Extension` entry — domain-tagged opaque payload. +/// +/// session-store treats `payload` as an unstructured `serde_json::Value`. +/// Each domain is responsible for serializing into and folding out of it. +/// Use `RestoredState.extensions` to read entries back at restore time. +pub async fn save_extension( + store: &impl Store, + session_id: SessionId, + head_hash: &mut Option, + domain: impl Into, + payload: serde_json::Value, +) -> Result<(), StoreError> { + append_entry( + store, + session_id, + head_hash, + LogEntry::Extension { + ts: session_log::now_millis(), + domain: domain.into(), + payload, + }, + ) + .await +} + /// Log a `ConfigChanged` entry. pub async fn save_config_changed( store: &impl Store, diff --git a/crates/session-store/src/session_log.rs b/crates/session-store/src/session_log.rs index ccfbf6df..056109d6 100644 --- a/crates/session-store/src/session_log.rs +++ b/crates/session-store/src/session_log.rs @@ -163,6 +163,20 @@ pub enum LogEntry { cache_write_tokens: u64, output_tokens: u64, }, + + /// 汎用拡張点。ドメイン名で名前空間を切って任意 JSON を載せる。 + /// session-store は payload を不透明扱いし、replay 時は + /// `RestoredState.extensions` に `(domain, payload)` を順に積むだけ。 + /// 各ドメイン側が自前で fold して最新値を取り出す前提。 + /// + /// 想定用途: memory subsystem の Phase 1 処理境界 pointer 等、 + /// 「session 寿命に縛りたいが session-store の型を汚したくない」 + /// メタデータ。 + Extension { + ts: u64, + domain: String, + payload: serde_json::Value, + }, } /// Provenance reference to a parent session. @@ -204,6 +218,9 @@ pub struct RestoredState { /// `LogEntry::LlmUsage` を replay して時系列順に積まれる。 /// 任意位置のトークン数推定に使う。 pub usage_history: Vec, + /// `LogEntry::Extension` を replay 順に積んだもの。`(domain, payload)`。 + /// session-store は domain を不透明扱いし、各ドメインが自前で fold する。 + pub extensions: Vec<(String, serde_json::Value)>, } /// LLM リクエスト送信時点での占有量スナップショット。 @@ -234,6 +251,7 @@ pub fn collect_state(entries: &[HashedEntry]) -> RestoredState { last_run_interrupted: false, head_hash: None, usage_history: Vec::new(), + extensions: Vec::new(), }; for hashed in entries { @@ -295,6 +313,11 @@ pub fn collect_state(entries: &[HashedEntry]) -> RestoredState { output_tokens: *output_tokens, }); } + LogEntry::Extension { + domain, payload, .. + } => { + state.extensions.push((domain.clone(), payload.clone())); + } } } @@ -618,6 +641,65 @@ mod tests { } } + #[test] + fn replay_extension_collects_domain_payload_pairs() { + let entries = build_chain(&[ + LogEntry::SessionStart { + ts: 1000, + system_prompt: None, + config: RequestConfig::default(), + history: vec![], + forked_from: None, + compacted_from: None, + }, + LogEntry::Extension { + ts: 2000, + domain: "memory.extract".to_string(), + payload: serde_json::json!({ "processed_through_entry": 7 }), + }, + LogEntry::Extension { + ts: 3000, + domain: "memory.extract".to_string(), + payload: serde_json::json!({ "processed_through_entry": 12 }), + }, + LogEntry::Extension { + ts: 4000, + domain: "other.domain".to_string(), + payload: serde_json::json!({ "x": 1 }), + }, + ]); + let state = collect_state(&entries); + // 順序保持で全件積まれる。fold は呼び出し側の責務。 + assert_eq!(state.extensions.len(), 3); + assert_eq!(state.extensions[0].0, "memory.extract"); + assert_eq!(state.extensions[1].1["processed_through_entry"], 12); + assert_eq!(state.extensions[2].0, "other.domain"); + } + + #[test] + fn extension_entry_round_trip_via_json() { + let entry = LogEntry::Extension { + ts: 9999, + domain: "memory.extract".to_string(), + payload: serde_json::json!({ "a": 1, "b": "two" }), + }; + let json = serde_json::to_string(&entry).unwrap(); + let parsed: LogEntry = serde_json::from_str(&json).unwrap(); + match parsed { + LogEntry::Extension { + ts, + domain, + payload, + } => { + assert_eq!(ts, 9999); + assert_eq!(domain, "memory.extract"); + assert_eq!(payload["a"], 1); + assert_eq!(payload["b"], "two"); + } + other => panic!("expected Extension, got {:?}", other), + } + } + #[test] fn hash_hex_round_trip() { let entry = LogEntry::SessionStart { diff --git a/docs/plan/memory.md b/docs/plan/memory.md index ded4d6e5..9d97b07f 100644 --- a/docs/plan/memory.md +++ b/docs/plan/memory.md @@ -120,9 +120,9 @@ Workflow 保護は専用 tool schema のトリックではなく Linter ルー #### Phase 1: 活動抽出 -- **Trigger**: activity tokens の累積閾値。tool call カウントは不採用(ツールカスタマイズ非依存・大小重みづけのため) +- **Trigger**: activity tokens の累積閾値(cumulative input tokens since last pointer)。tool call カウントは不採用(ツールカスタマイズ非依存・大小重みづけのため) - **実行主体**: 既存 compact と同じ Worker spawn 機構を再利用。Pod は立てない -- **入力**: 前回 Phase 1 以降の session log 範囲。処理済み境界の pointer は session 側に保持(寿命を session と揃える) +- **入力**: 前回 Phase 1 以降の session log 範囲。処理済み境界の pointer は session log 側に保持し、寿命を session と揃える。session-store のドメイン純度を保つため、汎用拡張点 `LogEntry::Extension { domain, payload }`(domain = `"memory.extract"`)に寄せ、session-store は memory ドメインを知らない - **出力**: JSON schema で**活動ログ**の候補配列を返す。Knowledge 等の派生物は Phase 2 が活動ログから導出するので、Phase 1 では純粋な「起きたこと」に絞る - `decisions`: 判断したこと(選択肢 + 選んだ + 根拠) - `discussions`: 議論したこと(トピック + 論点) @@ -132,6 +132,8 @@ Workflow 保護は専用 tool schema のトリックではなく Linter ルー - **書き込み先**: `memory/_staging/.json` - LLM 出力(活動ログ JSON)は pod 側ラッパーが `source: { session_id, range: [start_entry, end_entry] }` を**機械付与**して wrap。LLM には source を推論させない - **モデル**: `memory.extract_model`。軽量だが文脈理解できる中堅クラス(Haiku / 4o-mini / Flash 相当)を想定 +- **Compact との順序**: 同一 turn 完了後の post-run チェックで Phase 1 を **compact より前** に走らせる。compact は history を組み替えるので、extract の入力範囲(session log 上の entry index)は compact 前のほうが安定する +- **並走防止 (Phase 1 同士)**: Pod 上の `extract_in_flight` フラグで in-flight 中の新規 trigger を skip。完了時点で閾値超過していれば直ちに次回を発火し、新 pointer 以降の最大範囲を回収する(pending 状態は保持しない=完了時の閾値再評価で coalesce 相当の挙動を成立させる) #### Phase 2: 永続化への統合 diff --git a/tickets/memory-phase1-extract.md b/tickets/memory-phase1-extract.md index 294d500e..e7af4f8c 100644 --- a/tickets/memory-phase1-extract.md +++ b/tickets/memory-phase1-extract.md @@ -10,16 +10,30 @@ Pod を立てずに既存 compact と同じ Worker spawn 機構を再利用す ### Trigger -- activity tokens 累積閾値(設定ファイルで tune) +- activity tokens 累積閾値(設定ファイルで tune)。input tokens cumulative since last pointer を使う - tool call カウントは不採用(ツールカスタマイズ非依存・大小重みづけのため) +- 発火点は Pod の post-run hook で、**compact より前** に走らせる(compact は history を組み替えるため、extract の入力範囲を安定させたい) ### 実行主体と入出力 - 既存 compact の Worker spawn 機構を再利用、Pod は立てない -- 入力: 前回 Phase 1 以降の session log 範囲(処理済み境界 pointer は session 側に保持、寿命を session と揃える) +- 入力: 前回 Phase 1 以降の session log 範囲 - 出力 JSON schema: `decisions`, `discussions`, `attempts`, `requests` の候補配列。抽出対象なしは空配列 - 出力に自由文の補足説明を入れさせない(schema 準拠のみ) +### 処理境界の pointer 永続化 + +- pointer は session log に書き、寿命を session と揃える +- session-store のドメイン純度を保つため、汎用拡張点 `LogEntry::Extension { domain: String, payload: serde_json::Value }` を **本チケットで session-store に新設**し、`domain = "memory.extract"` で payload に `{ processed_through_entry: usize, staging_id: String }` を載せる +- `RestoredState` には `extensions: Vec<(String, serde_json::Value)>` 形で raw 集積し、memory crate 側が `domain` で fold して最新 pointer を取り出す(session-store は memory のことを知らない) + +### 並走防止 (Phase 1 同士) + +- Pod 上の `extract_in_flight: AtomicBool` で in-flight 中の新規 trigger を skip +- 完了時点で閾値再評価し、超過していれば直ちに次回を発火(新 pointer 以降の最大範囲を回収) +- pending 状態は別途保持しない(完了時の再評価で coalesce 相当が自然に成立) +- Phase 2 の進行状況ファイルとは別物(こちらは別チケット範囲外) + ### 書き込み - 書き込み先: `memory/_staging/.json`(1 件 1 ファイル、UUIDv7 可) @@ -29,6 +43,7 @@ Pod を立てずに既存 compact と同じ Worker spawn 機構を再利用す ### モデル - 設定 key `memory.extract_model`(軽量だが文脈理解できる中堅クラス想定) +- 副次設定: `memory.extract_threshold`(input tokens 累積閾値、未設定で disable)、`memory.extract_worker_max_input_tokens`(extract worker 自身の input cap) ### prompt @@ -39,6 +54,7 @@ Pod を立てずに既存 compact と同じ Worker spawn 機構を再利用す - Phase 2 による staging の消費・クリーンアップ(別チケット) - staging の cleanup 戦略の詳細(Phase 2 で完了時に消す、実行中追加分は残す、という契約だけ本チケットで守る) - compact Worker spawn 機構自体の拡張(既存をそのまま使う。共通化が必要になったら別途) +- Phase 2 並走防止ファイル(別チケット) ## 完了条件