compact: compact worker をツール駆動マルチターンに再設計
段階 4〜9 を一括で実装: - mark_read_required / add_reference / write_summary + read_file の 4 ツールで compact worker を駆動。結果は CompactWorkerContext に集約 - 新セッションの先頭を [summary, ...auto-read, references, ...retained] で構築 - デフォルトリファレンスは tracker.recent_files(5) から - auto-read は compact_auto_read_budget で総量制限。超過は即エラー - compact worker 自身は compact_worker_max_input_tokens で累計入力を制限 - 5 セクション要約フォーマットに system prompt を更新 - write_summary 未呼び出し / auto-read 空のときは 1 回追加プロンプトで促す
This commit is contained in:
parent
758ced5e7f
commit
34d1e78b40
|
|
@ -88,6 +88,10 @@ pub struct CompactionConfigPartial {
|
|||
#[serde(default)]
|
||||
pub compact_retained_tokens: Option<u64>,
|
||||
#[serde(default)]
|
||||
pub compact_auto_read_budget: Option<u64>,
|
||||
#[serde(default)]
|
||||
pub compact_worker_max_input_tokens: Option<u64>,
|
||||
#[serde(default)]
|
||||
pub provider: Option<ProviderConfigPartial>,
|
||||
}
|
||||
|
||||
|
|
@ -244,6 +248,12 @@ impl CompactionConfigPartial {
|
|||
compact_retained_tokens: upper
|
||||
.compact_retained_tokens
|
||||
.or(self.compact_retained_tokens),
|
||||
compact_auto_read_budget: upper
|
||||
.compact_auto_read_budget
|
||||
.or(self.compact_auto_read_budget),
|
||||
compact_worker_max_input_tokens: upper
|
||||
.compact_worker_max_input_tokens
|
||||
.or(self.compact_worker_max_input_tokens),
|
||||
provider: merge_option(self.provider, upper.provider, ProviderConfigPartial::merge),
|
||||
}
|
||||
}
|
||||
|
|
@ -374,6 +384,12 @@ impl TryFrom<PodManifestConfig> for PodManifest {
|
|||
compact_retained_tokens: c
|
||||
.compact_retained_tokens
|
||||
.unwrap_or(defaults::COMPACT_RETAINED_TOKENS),
|
||||
compact_auto_read_budget: c
|
||||
.compact_auto_read_budget
|
||||
.unwrap_or(defaults::COMPACT_AUTO_READ_BUDGET),
|
||||
compact_worker_max_input_tokens: c
|
||||
.compact_worker_max_input_tokens
|
||||
.unwrap_or(defaults::COMPACT_WORKER_MAX_INPUT_TOKENS),
|
||||
provider: comp_provider,
|
||||
})
|
||||
})
|
||||
|
|
|
|||
|
|
@ -28,3 +28,20 @@ pub const COMPACT_RETAINED_TOKENS: u64 = 8000;
|
|||
/// is omitted. See the `PromptLoader` prefix addressing scheme for the
|
||||
/// `$insomnia/` / `$user/` / `$workspace/` namespaces.
|
||||
pub const DEFAULT_INSTRUCTION: &str = "$insomnia/default";
|
||||
|
||||
/// Token budget for auto-read file contents injected into the new
|
||||
/// session after compaction. Limits how much raw file text the
|
||||
/// compact worker can pull into the compacted context via
|
||||
/// `mark_read_required`. See
|
||||
/// [`crate::CompactionConfig::compact_auto_read_budget`].
|
||||
pub const COMPACT_AUTO_READ_BUDGET: u64 = 8000;
|
||||
|
||||
/// Cumulative input-token cap for the compact worker's own LLM
|
||||
/// calls. Exceeding this aborts the compact run (circuit-breaker
|
||||
/// path). See
|
||||
/// [`crate::CompactionConfig::compact_worker_max_input_tokens`].
|
||||
pub const COMPACT_WORKER_MAX_INPUT_TOKENS: u64 = 50_000;
|
||||
|
||||
/// Number of recently-touched files fed to the compact worker as
|
||||
/// default references.
|
||||
pub const COMPACT_DEFAULT_REFERENCE_COUNT: usize = 5;
|
||||
|
|
|
|||
|
|
@ -201,6 +201,16 @@ pub struct CompactionConfig {
|
|||
#[serde(default = "default_compact_retained_tokens")]
|
||||
pub compact_retained_tokens: u64,
|
||||
|
||||
/// Aggregate token budget for auto-read file contents injected into
|
||||
/// the compacted session by the compact worker.
|
||||
#[serde(default = "default_compact_auto_read_budget")]
|
||||
pub compact_auto_read_budget: u64,
|
||||
|
||||
/// Cumulative input-token cap for the compact worker's own LLM
|
||||
/// calls. Exceeding this aborts the compact run.
|
||||
#[serde(default = "default_compact_worker_max_input_tokens")]
|
||||
pub compact_worker_max_input_tokens: u64,
|
||||
|
||||
/// Optional provider for the compactor (summary) LLM.
|
||||
/// If omitted, the main provider is cloned via `clone_boxed()`.
|
||||
#[serde(default)]
|
||||
|
|
@ -216,6 +226,12 @@ fn default_prune_min_savings() -> u64 {
|
|||
fn default_compact_retained_tokens() -> u64 {
|
||||
defaults::COMPACT_RETAINED_TOKENS
|
||||
}
|
||||
fn default_compact_auto_read_budget() -> u64 {
|
||||
defaults::COMPACT_AUTO_READ_BUDGET
|
||||
}
|
||||
fn default_compact_worker_max_input_tokens() -> u64 {
|
||||
defaults::COMPACT_WORKER_MAX_INPUT_TOKENS
|
||||
}
|
||||
|
||||
impl Default for CompactionConfig {
|
||||
fn default() -> Self {
|
||||
|
|
@ -225,6 +241,8 @@ impl Default for CompactionConfig {
|
|||
compact_threshold: None,
|
||||
compact_request_threshold: None,
|
||||
compact_retained_tokens: default_compact_retained_tokens(),
|
||||
compact_auto_read_budget: default_compact_auto_read_budget(),
|
||||
compact_worker_max_input_tokens: default_compact_worker_max_input_tokens(),
|
||||
provider: None,
|
||||
}
|
||||
}
|
||||
|
|
|
|||
384
crates/pod/src/compact_worker.rs
Normal file
384
crates/pod/src/compact_worker.rs
Normal file
|
|
@ -0,0 +1,384 @@
|
|||
//! Compact worker state and the four tools that drive it.
|
||||
//!
|
||||
//! The compact worker is a disposable `Worker` instance spun up by
|
||||
//! [`Pod::compact`]. It receives the history to summarise plus a list of
|
||||
//! default reference files (from the session-lifetime `Tracker`) and runs
|
||||
//! a tool-driven LLM loop. The tools here let it:
|
||||
//!
|
||||
//! - `read_file` — inspect referenced files (reuses `tools::read_tool`)
|
||||
//! - `mark_read_required(path, offset?, limit?)` — nominate a file whose
|
||||
//! contents should be injected into the compacted context as an
|
||||
//! auto-read system message
|
||||
//! - `add_reference(path)` — nominate a file the next session should
|
||||
//! know about by name only (contents not included)
|
||||
//! - `write_summary(text)` — deliver (or overwrite) the structured summary
|
||||
//!
|
||||
//! Everything the worker decides ends up in [`CompactWorkerContext`],
|
||||
//! which `Pod::compact` drains after the loop and turns into the
|
||||
//! compacted session's opening system messages.
|
||||
|
||||
use std::path::PathBuf;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use llm_worker::Item;
|
||||
use llm_worker::interceptor::{Interceptor, PreRequestAction};
|
||||
use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput};
|
||||
use serde::Deserialize;
|
||||
use tools::ScopedFs;
|
||||
|
||||
/// A file the compact worker has marked for auto-read in the new session.
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct ReadRequirement {
|
||||
pub path: PathBuf,
|
||||
/// 0-based line offset. `None` means from the start of the file.
|
||||
pub offset: Option<usize>,
|
||||
/// Maximum number of lines. `None` means to the end of the file.
|
||||
pub limit: Option<usize>,
|
||||
}
|
||||
|
||||
/// Aggregated output of a compact worker run.
|
||||
#[derive(Debug, Default, Clone)]
|
||||
pub(crate) struct CompactWorkerContext {
|
||||
pub read_required: Vec<ReadRequirement>,
|
||||
pub references: Vec<PathBuf>,
|
||||
pub summary: Option<String>,
|
||||
/// Tokens already consumed by `mark_read_required` calls.
|
||||
pub auto_read_consumed: u64,
|
||||
/// Aggregate cap. `0` treats the budget as disabled.
|
||||
pub auto_read_budget: u64,
|
||||
}
|
||||
|
||||
impl CompactWorkerContext {
|
||||
pub(crate) fn with_budget(auto_read_budget: u64) -> Self {
|
||||
Self {
|
||||
auto_read_budget,
|
||||
..Self::default()
|
||||
}
|
||||
}
|
||||
|
||||
fn remaining_budget(&self) -> u64 {
|
||||
self.auto_read_budget.saturating_sub(self.auto_read_consumed)
|
||||
}
|
||||
}
|
||||
|
||||
/// Input to `mark_read_required`.
|
||||
#[derive(Debug, Deserialize, schemars::JsonSchema)]
|
||||
struct MarkParams {
|
||||
/// Absolute path to the file.
|
||||
pub file_path: PathBuf,
|
||||
/// 0-based line offset.
|
||||
#[serde(default)]
|
||||
pub offset: Option<usize>,
|
||||
/// Maximum number of lines to inject.
|
||||
#[serde(default)]
|
||||
pub limit: Option<usize>,
|
||||
}
|
||||
|
||||
/// Input to `add_reference`.
|
||||
#[derive(Debug, Deserialize, schemars::JsonSchema)]
|
||||
struct ReferenceParams {
|
||||
/// Absolute path to the file.
|
||||
pub file_path: PathBuf,
|
||||
}
|
||||
|
||||
/// Input to `write_summary`.
|
||||
#[derive(Debug, Deserialize, schemars::JsonSchema)]
|
||||
struct SummaryParams {
|
||||
/// Full structured summary text (overwrites any previous call).
|
||||
pub text: String,
|
||||
}
|
||||
|
||||
const MARK_DESCRIPTION: &str = "Inject a file's contents into the compacted context so the \
|
||||
next session starts with it already read. Use this for files the next task needs in full. \
|
||||
Optionally specify `offset` (0-based line) and `limit` (line count) to inject only a slice. \
|
||||
Counts against `auto_read_budget`; overflow returns an error and the mark is not recorded. \
|
||||
Paths must be absolute.";
|
||||
|
||||
const REFERENCE_DESCRIPTION: &str = "Record a file path as a named reference in the compacted \
|
||||
context without injecting its contents. Use for files that are contextually relevant but \
|
||||
whose current content the next session can fetch on demand.";
|
||||
|
||||
const SUMMARY_DESCRIPTION: &str = "Provide the final structured summary text. Subsequent calls \
|
||||
replace the previous content; only the last call is used. Must be called before the compact run \
|
||||
ends or compaction fails.";
|
||||
|
||||
struct MarkReadRequiredTool {
|
||||
fs: ScopedFs,
|
||||
ctx: Arc<Mutex<CompactWorkerContext>>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Tool for MarkReadRequiredTool {
|
||||
async fn execute(&self, input_json: &str) -> Result<ToolOutput, ToolError> {
|
||||
let params: MarkParams = serde_json::from_str(input_json).map_err(|e| {
|
||||
ToolError::InvalidArgument(format!("invalid mark_read_required input: {e}"))
|
||||
})?;
|
||||
|
||||
// Read the file through the shared ScopedFs so scope and I/O
|
||||
// errors surface the same way the regular `read_file` tool does.
|
||||
let bytes = self
|
||||
.fs
|
||||
.read_bytes(¶ms.file_path)
|
||||
.map_err(|e| ToolError::ExecutionFailed(format!("read failed: {e}")))?;
|
||||
let text = String::from_utf8_lossy(&bytes);
|
||||
let slice = slice_lines(&text, params.offset.unwrap_or(0), params.limit);
|
||||
let estimated_tokens = estimate_tokens(slice.len());
|
||||
|
||||
let mut guard = self.ctx.lock().expect("compact worker context poisoned");
|
||||
let budget = guard.auto_read_budget;
|
||||
let would_consume = guard.auto_read_consumed.saturating_add(estimated_tokens);
|
||||
if budget > 0 && would_consume > budget {
|
||||
return Err(ToolError::ExecutionFailed(format!(
|
||||
"auto-read budget exhausted ({budget} tokens). Remove an existing mark or use \
|
||||
add_reference instead."
|
||||
)));
|
||||
}
|
||||
guard.read_required.push(ReadRequirement {
|
||||
path: params.file_path.clone(),
|
||||
offset: params.offset,
|
||||
limit: params.limit,
|
||||
});
|
||||
guard.auto_read_consumed = would_consume;
|
||||
let remaining = guard.remaining_budget();
|
||||
drop(guard);
|
||||
|
||||
let mut summary = format!(
|
||||
"Marked {} for auto-read (≈{estimated_tokens} tokens). \
|
||||
Budget: {remaining}/{budget} tokens remaining.",
|
||||
params.file_path.display()
|
||||
);
|
||||
if budget > 0 && remaining * 2 <= budget {
|
||||
summary.push_str(
|
||||
"\nNote: auto-read budget is at least half consumed. \
|
||||
Consider calling write_summary and finishing up soon.",
|
||||
);
|
||||
}
|
||||
Ok(ToolOutput {
|
||||
summary,
|
||||
content: None,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
struct AddReferenceTool {
|
||||
ctx: Arc<Mutex<CompactWorkerContext>>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Tool for AddReferenceTool {
|
||||
async fn execute(&self, input_json: &str) -> Result<ToolOutput, ToolError> {
|
||||
let params: ReferenceParams = serde_json::from_str(input_json)
|
||||
.map_err(|e| ToolError::InvalidArgument(format!("invalid add_reference input: {e}")))?;
|
||||
let mut guard = self.ctx.lock().expect("compact worker context poisoned");
|
||||
if !guard
|
||||
.references
|
||||
.iter()
|
||||
.any(|p| p.as_path() == params.file_path.as_path())
|
||||
{
|
||||
guard.references.push(params.file_path.clone());
|
||||
}
|
||||
Ok(ToolOutput {
|
||||
summary: format!("Added reference {}", params.file_path.display()),
|
||||
content: None,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
struct WriteSummaryTool {
|
||||
ctx: Arc<Mutex<CompactWorkerContext>>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Tool for WriteSummaryTool {
|
||||
async fn execute(&self, input_json: &str) -> Result<ToolOutput, ToolError> {
|
||||
let params: SummaryParams = serde_json::from_str(input_json)
|
||||
.map_err(|e| ToolError::InvalidArgument(format!("invalid write_summary input: {e}")))?;
|
||||
let mut guard = self.ctx.lock().expect("compact worker context poisoned");
|
||||
let overwritten = guard.summary.is_some();
|
||||
guard.summary = Some(params.text);
|
||||
drop(guard);
|
||||
let note = if overwritten {
|
||||
"Summary replaced."
|
||||
} else {
|
||||
"Summary recorded."
|
||||
};
|
||||
Ok(ToolOutput {
|
||||
summary: note.to_string(),
|
||||
content: None,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn mark_read_required_tool(
|
||||
fs: ScopedFs,
|
||||
ctx: Arc<Mutex<CompactWorkerContext>>,
|
||||
) -> ToolDefinition {
|
||||
Arc::new(move || {
|
||||
let schema = schemars::schema_for!(MarkParams);
|
||||
let schema_value = serde_json::to_value(schema).unwrap_or(serde_json::json!({}));
|
||||
let meta = ToolMeta::new("mark_read_required")
|
||||
.description(MARK_DESCRIPTION)
|
||||
.input_schema(schema_value);
|
||||
let tool: Arc<dyn Tool> = Arc::new(MarkReadRequiredTool {
|
||||
fs: fs.clone(),
|
||||
ctx: ctx.clone(),
|
||||
});
|
||||
(meta, tool)
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn add_reference_tool(ctx: Arc<Mutex<CompactWorkerContext>>) -> ToolDefinition {
|
||||
Arc::new(move || {
|
||||
let schema = schemars::schema_for!(ReferenceParams);
|
||||
let schema_value = serde_json::to_value(schema).unwrap_or(serde_json::json!({}));
|
||||
let meta = ToolMeta::new("add_reference")
|
||||
.description(REFERENCE_DESCRIPTION)
|
||||
.input_schema(schema_value);
|
||||
let tool: Arc<dyn Tool> = Arc::new(AddReferenceTool { ctx: ctx.clone() });
|
||||
(meta, tool)
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn write_summary_tool(ctx: Arc<Mutex<CompactWorkerContext>>) -> ToolDefinition {
|
||||
Arc::new(move || {
|
||||
let schema = schemars::schema_for!(SummaryParams);
|
||||
let schema_value = serde_json::to_value(schema).unwrap_or(serde_json::json!({}));
|
||||
let meta = ToolMeta::new("write_summary")
|
||||
.description(SUMMARY_DESCRIPTION)
|
||||
.input_schema(schema_value);
|
||||
let tool: Arc<dyn Tool> = Arc::new(WriteSummaryTool { ctx: ctx.clone() });
|
||||
(meta, tool)
|
||||
})
|
||||
}
|
||||
|
||||
/// Interceptor that aborts the compact worker as soon as its cumulative
|
||||
/// input-token count crosses `max_input_tokens`. Pairs with the
|
||||
/// `on_usage` callback registered by `Pod::compact`, which is what
|
||||
/// actually accumulates `input_so_far`.
|
||||
pub(crate) struct CompactWorkerInterceptor {
|
||||
pub input_so_far: Arc<AtomicU64>,
|
||||
pub max_input_tokens: u64,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Interceptor for CompactWorkerInterceptor {
|
||||
async fn pre_llm_request(&self, _context: &mut Vec<Item>) -> PreRequestAction {
|
||||
if self.input_so_far.load(Ordering::Relaxed) > self.max_input_tokens {
|
||||
return PreRequestAction::Cancel(format!(
|
||||
"compact worker input exceeded {} tokens",
|
||||
self.max_input_tokens
|
||||
));
|
||||
}
|
||||
PreRequestAction::Continue
|
||||
}
|
||||
}
|
||||
|
||||
/// Crude bytes→tokens estimate; good enough for budget accounting.
|
||||
fn estimate_tokens(bytes: usize) -> u64 {
|
||||
(bytes as u64).div_ceil(4)
|
||||
}
|
||||
|
||||
/// Return the slice of `text` covered by `offset` (line index) and
|
||||
/// optional `limit` (line count), preserving the original newline
|
||||
/// separation. Returns the whole file when both defaults apply.
|
||||
pub(crate) fn slice_lines(text: &str, offset: usize, limit: Option<usize>) -> String {
|
||||
let lines: Vec<&str> = text.lines().collect();
|
||||
let start = offset.min(lines.len());
|
||||
let end = limit
|
||||
.map(|n| start.saturating_add(n).min(lines.len()))
|
||||
.unwrap_or(lines.len());
|
||||
lines[start..end].join("\n")
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use manifest::Scope;
|
||||
|
||||
fn make_fs(tmp: &std::path::Path) -> ScopedFs {
|
||||
let scope = Scope::writable(tmp.to_path_buf()).unwrap();
|
||||
ScopedFs::new(scope, tmp.to_path_buf())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn mark_read_required_records_and_deducts_budget() {
|
||||
let tmp = tempfile::TempDir::new().unwrap();
|
||||
let path = tmp.path().join("hello.txt");
|
||||
std::fs::write(&path, "hello world\n").unwrap();
|
||||
|
||||
let ctx = Arc::new(Mutex::new(CompactWorkerContext::with_budget(1_000)));
|
||||
let tool: Arc<dyn Tool> = Arc::new(MarkReadRequiredTool {
|
||||
fs: make_fs(tmp.path()),
|
||||
ctx: ctx.clone(),
|
||||
});
|
||||
let input = serde_json::json!({ "file_path": path.to_str().unwrap() }).to_string();
|
||||
let out = tool.execute(&input).await.unwrap();
|
||||
|
||||
assert!(out.summary.starts_with("Marked"));
|
||||
let guard = ctx.lock().unwrap();
|
||||
assert_eq!(guard.read_required.len(), 1);
|
||||
assert!(guard.auto_read_consumed > 0);
|
||||
assert!(guard.auto_read_consumed <= 1_000);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn mark_read_required_rejects_over_budget() {
|
||||
let tmp = tempfile::TempDir::new().unwrap();
|
||||
let path = tmp.path().join("big.txt");
|
||||
std::fs::write(&path, "x".repeat(4_096)).unwrap(); // ≈1024 tokens
|
||||
|
||||
let ctx = Arc::new(Mutex::new(CompactWorkerContext::with_budget(100)));
|
||||
let tool: Arc<dyn Tool> = Arc::new(MarkReadRequiredTool {
|
||||
fs: make_fs(tmp.path()),
|
||||
ctx: ctx.clone(),
|
||||
});
|
||||
let input = serde_json::json!({ "file_path": path.to_str().unwrap() }).to_string();
|
||||
let res = tool.execute(&input).await;
|
||||
|
||||
assert!(matches!(res, Err(ToolError::ExecutionFailed(_))));
|
||||
let guard = ctx.lock().unwrap();
|
||||
assert!(guard.read_required.is_empty());
|
||||
assert_eq!(guard.auto_read_consumed, 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn write_summary_overwrites_previous_call() {
|
||||
let ctx = Arc::new(Mutex::new(CompactWorkerContext::with_budget(0)));
|
||||
let tool: Arc<dyn Tool> = Arc::new(WriteSummaryTool { ctx: ctx.clone() });
|
||||
|
||||
let first = serde_json::json!({ "text": "first" }).to_string();
|
||||
let out1 = tool.execute(&first).await.unwrap();
|
||||
assert!(out1.summary.contains("recorded"));
|
||||
|
||||
let second = serde_json::json!({ "text": "second" }).to_string();
|
||||
let out2 = tool.execute(&second).await.unwrap();
|
||||
assert!(out2.summary.contains("replaced"));
|
||||
|
||||
assert_eq!(ctx.lock().unwrap().summary.as_deref(), Some("second"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn add_reference_deduplicates() {
|
||||
let ctx = Arc::new(Mutex::new(CompactWorkerContext::with_budget(0)));
|
||||
let tool: Arc<dyn Tool> = Arc::new(AddReferenceTool { ctx: ctx.clone() });
|
||||
|
||||
let p = "/abs/path.rs";
|
||||
let input = serde_json::json!({ "file_path": p }).to_string();
|
||||
tool.execute(&input).await.unwrap();
|
||||
tool.execute(&input).await.unwrap();
|
||||
|
||||
let guard = ctx.lock().unwrap();
|
||||
assert_eq!(guard.references.len(), 1);
|
||||
assert_eq!(guard.references[0], PathBuf::from(p));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn slice_lines_handles_offset_and_limit() {
|
||||
let text = "a\nb\nc\nd";
|
||||
assert_eq!(slice_lines(text, 0, None), "a\nb\nc\nd");
|
||||
assert_eq!(slice_lines(text, 1, Some(2)), "b\nc");
|
||||
assert_eq!(slice_lines(text, 10, None), "");
|
||||
}
|
||||
}
|
||||
|
|
@ -12,6 +12,7 @@ pub mod spawned_pod_registry;
|
|||
|
||||
mod agents_md;
|
||||
mod compact_state;
|
||||
mod compact_worker;
|
||||
mod factory;
|
||||
mod notification_buffer;
|
||||
mod pod;
|
||||
|
|
|
|||
|
|
@ -47,18 +47,35 @@ impl Hook<PreLlmRequest> for UsageTrackingHook {
|
|||
}
|
||||
|
||||
const SUMMARY_SYSTEM_PROMPT: &str = "\
|
||||
You are a context compaction assistant. \
|
||||
Summarise the conversation below into a structured summary. \
|
||||
Preserve concrete details: file paths, function names, error messages, decisions made. \
|
||||
Use the following format:\n\n\
|
||||
## Original Task\n\
|
||||
(the user's original request)\n\n\
|
||||
## Completed Work\n\
|
||||
- (what was done, with specifics)\n\n\
|
||||
## Key Discoveries\n\
|
||||
- (facts, constraints, errors found)\n\n\
|
||||
## Current State\n\
|
||||
- (files changed, remaining work)";
|
||||
You are a context compaction assistant. Your job is to hand the next session a \
|
||||
structured summary plus pointers to the files it actually needs.\n\n\
|
||||
Tools you can call:\n\
|
||||
- `read_file(file_path, offset?, limit?)` — inspect referenced files before deciding.\n\
|
||||
- `mark_read_required(file_path, offset?, limit?)` — inject a file's contents into the \
|
||||
next session as an auto-read system message. Counts against `auto_read_budget`.\n\
|
||||
- `add_reference(file_path)` — record a file path the next session should know about \
|
||||
without embedding its contents.\n\
|
||||
- `write_summary(text)` — deliver the final structured summary. May be called multiple \
|
||||
times; only the last call is kept.\n\n\
|
||||
Always finish by calling `write_summary`. Produce the summary in this exact format:\n\n\
|
||||
## Completed Tasks\n\
|
||||
### (task name)\n\
|
||||
- what was done (use concrete type / file / function names)\n\
|
||||
- gotchas or facts that came up\n\n\
|
||||
## Active Task\n\
|
||||
### (task name)\n\
|
||||
- goal\n\
|
||||
- current state (what is done / not done)\n\
|
||||
- next step\n\n\
|
||||
## Key Decisions\n\
|
||||
- (decision) — (reason)\n\n\
|
||||
## User Directives\n\
|
||||
- \"verbatim user line\" — only include directives whose wording the next session \
|
||||
should not lose.\n\n\
|
||||
## Current Work\n\
|
||||
(2–3 lines on what was happening just before compaction).\n\n\
|
||||
Keep code snippets and raw tool output OUT of the summary — that is what auto-read \
|
||||
and references are for. Target 1000–2000 tokens.";
|
||||
|
||||
/// An independent agent execution unit.
|
||||
///
|
||||
|
|
@ -792,6 +809,13 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
///
|
||||
/// Returns the new session ID.
|
||||
pub async fn compact(&mut self, retained_tokens: u64) -> Result<SessionId, PodError> {
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
|
||||
use crate::compact_worker::{
|
||||
CompactWorkerContext, CompactWorkerInterceptor, add_reference_tool,
|
||||
mark_read_required_tool, slice_lines, write_summary_tool,
|
||||
};
|
||||
|
||||
// Decide the cut point by projecting the UsageRecord timeline onto
|
||||
// the current history: keep the tail whose estimated token count is
|
||||
// within `retained_tokens`. Item-granular, turn boundaries ignored.
|
||||
|
|
@ -801,41 +825,179 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
let history = worker.history();
|
||||
let retain_from = cut.index.min(history.len());
|
||||
let retained_items = history[retain_from..].to_vec();
|
||||
let items_to_summarise = &history[..retain_from];
|
||||
let items_to_summarise = history[..retain_from].to_vec();
|
||||
|
||||
// Build summary prompt.
|
||||
let summary_prompt = build_summary_prompt(items_to_summarise);
|
||||
// Compaction-related knobs. Fall through to manifest defaults when
|
||||
// `[compaction]` is omitted entirely.
|
||||
let (auto_read_budget, compact_worker_max_input_tokens) = self
|
||||
.manifest
|
||||
.compaction
|
||||
.as_ref()
|
||||
.map(|c| (c.compact_auto_read_budget, c.compact_worker_max_input_tokens))
|
||||
.unwrap_or((
|
||||
manifest::defaults::COMPACT_AUTO_READ_BUDGET,
|
||||
manifest::defaults::COMPACT_WORKER_MAX_INPUT_TOKENS,
|
||||
));
|
||||
|
||||
// Create a disposable summary Worker.
|
||||
// Default references: the N most-recently-touched files in the
|
||||
// session, surfaced so the compact worker can inspect them and
|
||||
// decide which (if any) the next session needs.
|
||||
let default_refs: Vec<PathBuf> = self
|
||||
.tracker
|
||||
.as_ref()
|
||||
.map(|t| t.recent_files(manifest::defaults::COMPACT_DEFAULT_REFERENCE_COUNT))
|
||||
.unwrap_or_default();
|
||||
|
||||
// Input text fed to the compact worker. Includes the default
|
||||
// references and the (pruned) conversation text.
|
||||
let summary_input = build_summary_input(&items_to_summarise, &default_refs);
|
||||
|
||||
// Worker-side state collected by the compact worker's tool calls.
|
||||
let ctx = Arc::new(std::sync::Mutex::new(CompactWorkerContext::with_budget(
|
||||
auto_read_budget,
|
||||
)));
|
||||
|
||||
// Build an independent compact worker. Scope and pwd are shared
|
||||
// with the main Pod (reads go through the same policy) but the
|
||||
// Tracker is fresh — compact-time reads must not pollute the
|
||||
// main session's recency list, which feeds `default_refs` above.
|
||||
let scoped_fs = tools::ScopedFs::new(self.scope.clone(), self.pwd.clone());
|
||||
let summary_tracker = tools::Tracker::new();
|
||||
let summary_client: Box<dyn LlmClient> = self.build_compactor_client()?;
|
||||
let mut summary_worker = Worker::new(summary_client)
|
||||
.system_prompt(SUMMARY_SYSTEM_PROMPT)
|
||||
.temperature(0.0);
|
||||
summary_worker.set_max_tokens(2048);
|
||||
summary_worker.set_max_tokens(4096);
|
||||
|
||||
// Cumulative input-token meter + interceptor. The meter is bumped
|
||||
// from the on_usage callback and read on every pre_llm_request.
|
||||
let input_so_far = Arc::new(AtomicU64::new(0));
|
||||
{
|
||||
let acc = input_so_far.clone();
|
||||
summary_worker.on_usage(move |event| {
|
||||
if let Some(tokens) = event.input_tokens {
|
||||
acc.fetch_add(tokens, Ordering::Relaxed);
|
||||
}
|
||||
});
|
||||
}
|
||||
summary_worker.set_interceptor(CompactWorkerInterceptor {
|
||||
input_so_far: input_so_far.clone(),
|
||||
max_input_tokens: compact_worker_max_input_tokens,
|
||||
});
|
||||
|
||||
// Tools: read_file (shared scope, fresh tracker) + the three
|
||||
// compact-specific tools that populate `ctx`.
|
||||
summary_worker.register_tool(tools::read_tool(scoped_fs.clone(), summary_tracker));
|
||||
summary_worker
|
||||
.register_tool(mark_read_required_tool(scoped_fs.clone(), ctx.clone()));
|
||||
summary_worker.register_tool(add_reference_tool(ctx.clone()));
|
||||
summary_worker.register_tool(write_summary_tool(ctx.clone()));
|
||||
|
||||
let out = summary_worker
|
||||
.run(summary_prompt)
|
||||
.run(summary_input)
|
||||
.await
|
||||
.map_err(PodError::Worker)?;
|
||||
let summary_text = out
|
||||
.worker
|
||||
.history()
|
||||
.iter()
|
||||
.filter_map(|item| {
|
||||
if item.is_assistant_message() {
|
||||
item.as_text().map(String::from)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
.join("\n");
|
||||
let mut locked_worker = out.worker;
|
||||
|
||||
// Build new history: [summary as user message, ...retained].
|
||||
let mut new_history = Vec::with_capacity(retained_items.len() + 1);
|
||||
// Guard: nudge the worker once more if the expected outputs
|
||||
// (summary, and any auto-read nominations when default refs
|
||||
// existed) were not produced on the first pass. `write_summary`
|
||||
// is idempotent-by-overwrite so a second call is safe.
|
||||
let nudge = {
|
||||
let snapshot = ctx.lock().expect("compact ctx poisoned").clone();
|
||||
if snapshot.summary.is_none() {
|
||||
Some(
|
||||
"You have not called `write_summary` yet. Deliver the structured \
|
||||
summary now (Completed Tasks / Active Task / Key Decisions / \
|
||||
User Directives / Current Work) and nominate any files the next \
|
||||
session needs with `mark_read_required`."
|
||||
.to_string(),
|
||||
)
|
||||
} else if snapshot.read_required.is_empty() && !default_refs.is_empty() {
|
||||
Some(
|
||||
"Summary received. If any of the referenced files are required \
|
||||
for the next session to continue the task, call \
|
||||
`mark_read_required` on them now. Otherwise reply briefly to \
|
||||
close out."
|
||||
.to_string(),
|
||||
)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
};
|
||||
if let Some(prompt) = nudge {
|
||||
let _ = locked_worker
|
||||
.run(prompt)
|
||||
.await
|
||||
.map_err(PodError::Worker)?;
|
||||
}
|
||||
|
||||
let final_ctx = ctx.lock().expect("compact ctx poisoned").clone();
|
||||
let summary_text = final_ctx
|
||||
.summary
|
||||
.clone()
|
||||
.ok_or(PodError::CompactSummaryMissing)?;
|
||||
|
||||
// Re-read each auto-read target through ScopedFs and render the
|
||||
// requested slice. Errors are logged and skipped rather than
|
||||
// aborting compaction — a missing / moved file should not fail
|
||||
// the whole compact.
|
||||
let mut auto_read_messages = Vec::new();
|
||||
for req in &final_ctx.read_required {
|
||||
match scoped_fs.read_bytes(&req.path) {
|
||||
Ok(bytes) => {
|
||||
let text = String::from_utf8_lossy(&bytes).into_owned();
|
||||
let body = slice_lines(&text, req.offset.unwrap_or(0), req.limit);
|
||||
let range = match (req.offset, req.limit) {
|
||||
(None, None) => String::new(),
|
||||
(Some(off), None) => format!(":{}-", off + 1),
|
||||
(None, Some(lim)) => format!(":1-{lim}"),
|
||||
(Some(off), Some(lim)) => {
|
||||
format!(":{}-{}", off + 1, off.saturating_add(lim))
|
||||
}
|
||||
};
|
||||
auto_read_messages.push(Item::system_message(format!(
|
||||
"[Auto-read file: {}{range}]\n{body}",
|
||||
req.path.display()
|
||||
)));
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(
|
||||
path = %req.path.display(),
|
||||
error = %e,
|
||||
"auto-read target could not be read; skipping",
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Reference list as a single system message; omitted when empty.
|
||||
let reference_message = (!final_ctx.references.is_empty()).then(|| {
|
||||
let list = final_ctx
|
||||
.references
|
||||
.iter()
|
||||
.map(|p| format!("- {}", p.display()))
|
||||
.collect::<Vec<_>>()
|
||||
.join("\n");
|
||||
Item::system_message(format!(
|
||||
"[Referenced files — read before compaction, contents not included]\n\
|
||||
{list}\n\
|
||||
Use read_file to access current contents if needed."
|
||||
))
|
||||
});
|
||||
|
||||
// Build new history: [summary, ...auto-read, references, ...retained].
|
||||
let mut new_history = Vec::with_capacity(
|
||||
1 + auto_read_messages.len() + reference_message.is_some() as usize
|
||||
+ retained_items.len(),
|
||||
);
|
||||
new_history.push(Item::system_message(format!(
|
||||
"[Compacted context summary]\n\n{summary_text}"
|
||||
)));
|
||||
new_history.extend(auto_read_messages);
|
||||
if let Some(msg) = reference_message {
|
||||
new_history.push(msg);
|
||||
}
|
||||
new_history.extend(retained_items);
|
||||
|
||||
// Persist as a new compacted session.
|
||||
|
|
@ -1087,6 +1249,37 @@ impl From<WorkerResult> for PodRunResult {
|
|||
}
|
||||
}
|
||||
|
||||
/// Build the compact worker's input: default-reference instructions,
|
||||
/// the list of recently-touched files, and the pruned conversation
|
||||
/// produced by [`build_summary_prompt`].
|
||||
fn build_summary_input(items: &[Item], default_refs: &[PathBuf]) -> String {
|
||||
let mut out = String::new();
|
||||
out.push_str(
|
||||
"Summarise the conversation below into a structured summary and nominate \
|
||||
files the next session needs.\n\n",
|
||||
);
|
||||
if !default_refs.is_empty() {
|
||||
out.push_str(
|
||||
"These files were touched recently in this session. Use `read_file` \
|
||||
on them as needed, then call `mark_read_required` for any whose \
|
||||
contents the next session must have, and `add_reference` for files \
|
||||
it should know about by name only.\n\n## Referenced files\n",
|
||||
);
|
||||
for p in default_refs {
|
||||
out.push_str("- ");
|
||||
out.push_str(&p.display().to_string());
|
||||
out.push('\n');
|
||||
}
|
||||
out.push('\n');
|
||||
}
|
||||
out.push_str("## Conversation\n");
|
||||
out.push_str(&build_summary_prompt(items));
|
||||
out.push_str(
|
||||
"\n\nWhen you are done, call `write_summary` with the final 5-section text.",
|
||||
);
|
||||
out
|
||||
}
|
||||
|
||||
/// Format conversation items into a text prompt for the summary Worker.
|
||||
///
|
||||
/// The summary should capture decisions and user intent, not recreate code.
|
||||
|
|
@ -1158,6 +1351,9 @@ pub enum PodError {
|
|||
#[error("compaction thrash: context still exceeds threshold immediately after compact")]
|
||||
CompactThrash,
|
||||
|
||||
#[error("compact worker did not produce a summary (write_summary was never called)")]
|
||||
CompactSummaryMissing,
|
||||
|
||||
#[error("invalid system prompt template: {source}")]
|
||||
InvalidSystemPromptTemplate {
|
||||
#[source]
|
||||
|
|
|
|||
|
|
@ -61,6 +61,19 @@ fn single_text_events(text: &str) -> Vec<LlmEvent> {
|
|||
]
|
||||
}
|
||||
|
||||
/// Emit a single `write_summary(text=...)` tool call as one LLM response.
|
||||
fn write_summary_tool_use_events(call_id: &str, text: &str) -> Vec<LlmEvent> {
|
||||
let input = serde_json::json!({ "text": text }).to_string();
|
||||
vec![
|
||||
LlmEvent::tool_use_start(0, call_id, "write_summary"),
|
||||
LlmEvent::tool_input_delta(0, input),
|
||||
LlmEvent::tool_use_stop(0),
|
||||
LlmEvent::Status(StatusEvent {
|
||||
status: ResponseStatus::Completed,
|
||||
}),
|
||||
]
|
||||
}
|
||||
|
||||
const MINIMAL_MANIFEST_TOML: &str = r#"
|
||||
[pod]
|
||||
name = "test-pod"
|
||||
|
|
@ -233,10 +246,11 @@ async fn agents_md_absent_omits_trailing_section() {
|
|||
#[tokio::test]
|
||||
async fn agents_md_not_reread_after_compact() {
|
||||
let client = MockClient::new(vec![
|
||||
single_text_events("a"),
|
||||
single_text_events("b"),
|
||||
single_text_events("summary"),
|
||||
single_text_events("c"),
|
||||
single_text_events("a"), // pod.run("first")
|
||||
single_text_events("b"), // pod.run("second")
|
||||
write_summary_tool_use_events("call-1", "compacted summary"), // compact worker: tool_use
|
||||
single_text_events("done"), // compact worker: close
|
||||
single_text_events("c"), // pod.run("third")
|
||||
]);
|
||||
let (mut pod, pwd) = make_pod_with_body("BODY", client).await.unwrap();
|
||||
let agents_path = pwd.join("AGENTS.md");
|
||||
|
|
@ -264,10 +278,11 @@ async fn agents_md_not_reread_after_compact() {
|
|||
#[tokio::test]
|
||||
async fn compact_preserves_system_prompt() {
|
||||
let client = MockClient::new(vec![
|
||||
single_text_events("a"),
|
||||
single_text_events("b"),
|
||||
single_text_events("summary"),
|
||||
single_text_events("c"),
|
||||
single_text_events("a"), // pod.run("first")
|
||||
single_text_events("b"), // pod.run("second")
|
||||
write_summary_tool_use_events("call-1", "compacted summary"), // compact worker: tool_use
|
||||
single_text_events("done"), // compact worker: close
|
||||
single_text_events("c"), // pod.run("third")
|
||||
]);
|
||||
let (mut pod, _pwd) = make_pod_with_body("SP cwd={{ cwd }}", client)
|
||||
.await
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user