compact: compact worker をツール駆動マルチターンに再設計

段階 4〜9 を一括で実装:
- mark_read_required / add_reference / write_summary + read_file の 4 ツールで
  compact worker を駆動。結果は CompactWorkerContext に集約
- 新セッションの先頭を [summary, ...auto-read, references, ...retained] で構築
- デフォルトリファレンスは tracker.recent_files(5) から
- auto-read は compact_auto_read_budget で総量制限。超過は即エラー
- compact worker 自身は compact_worker_max_input_tokens で累計入力を制限
- 5 セクション要約フォーマットに system prompt を更新
- write_summary 未呼び出し / auto-read 空のときは 1 回追加プロンプトで促す
This commit is contained in:
Keisuke Hirata 2026-04-19 09:26:55 +09:00
parent db2dd8a3c0
commit da021103e4
7 changed files with 688 additions and 41 deletions

View File

@ -88,6 +88,10 @@ pub struct CompactionConfigPartial {
#[serde(default)]
pub compact_retained_tokens: Option<u64>,
#[serde(default)]
pub compact_auto_read_budget: Option<u64>,
#[serde(default)]
pub compact_worker_max_input_tokens: Option<u64>,
#[serde(default)]
pub provider: Option<ProviderConfigPartial>,
}
@ -244,6 +248,12 @@ impl CompactionConfigPartial {
compact_retained_tokens: upper
.compact_retained_tokens
.or(self.compact_retained_tokens),
compact_auto_read_budget: upper
.compact_auto_read_budget
.or(self.compact_auto_read_budget),
compact_worker_max_input_tokens: upper
.compact_worker_max_input_tokens
.or(self.compact_worker_max_input_tokens),
provider: merge_option(self.provider, upper.provider, ProviderConfigPartial::merge),
}
}
@ -374,6 +384,12 @@ impl TryFrom<PodManifestConfig> for PodManifest {
compact_retained_tokens: c
.compact_retained_tokens
.unwrap_or(defaults::COMPACT_RETAINED_TOKENS),
compact_auto_read_budget: c
.compact_auto_read_budget
.unwrap_or(defaults::COMPACT_AUTO_READ_BUDGET),
compact_worker_max_input_tokens: c
.compact_worker_max_input_tokens
.unwrap_or(defaults::COMPACT_WORKER_MAX_INPUT_TOKENS),
provider: comp_provider,
})
})

View File

@ -28,3 +28,20 @@ pub const COMPACT_RETAINED_TOKENS: u64 = 8000;
/// is omitted. See the `PromptLoader` prefix addressing scheme for the
/// `$insomnia/` / `$user/` / `$workspace/` namespaces.
pub const DEFAULT_INSTRUCTION: &str = "$insomnia/default";
/// Token budget for auto-read file contents injected into the new
/// session after compaction. Limits how much raw file text the
/// compact worker can pull into the compacted context via
/// `mark_read_required`. See
/// [`crate::CompactionConfig::compact_auto_read_budget`].
pub const COMPACT_AUTO_READ_BUDGET: u64 = 8000;
/// Cumulative input-token cap for the compact worker's own LLM
/// calls. Exceeding this aborts the compact run (circuit-breaker
/// path). See
/// [`crate::CompactionConfig::compact_worker_max_input_tokens`].
pub const COMPACT_WORKER_MAX_INPUT_TOKENS: u64 = 50_000;
/// Number of recently-touched files fed to the compact worker as
/// default references.
pub const COMPACT_DEFAULT_REFERENCE_COUNT: usize = 5;

View File

@ -201,6 +201,16 @@ pub struct CompactionConfig {
#[serde(default = "default_compact_retained_tokens")]
pub compact_retained_tokens: u64,
/// Aggregate token budget for auto-read file contents injected into
/// the compacted session by the compact worker.
#[serde(default = "default_compact_auto_read_budget")]
pub compact_auto_read_budget: u64,
/// Cumulative input-token cap for the compact worker's own LLM
/// calls. Exceeding this aborts the compact run.
#[serde(default = "default_compact_worker_max_input_tokens")]
pub compact_worker_max_input_tokens: u64,
/// Optional provider for the compactor (summary) LLM.
/// If omitted, the main provider is cloned via `clone_boxed()`.
#[serde(default)]
@ -216,6 +226,12 @@ fn default_prune_min_savings() -> u64 {
fn default_compact_retained_tokens() -> u64 {
defaults::COMPACT_RETAINED_TOKENS
}
fn default_compact_auto_read_budget() -> u64 {
defaults::COMPACT_AUTO_READ_BUDGET
}
fn default_compact_worker_max_input_tokens() -> u64 {
defaults::COMPACT_WORKER_MAX_INPUT_TOKENS
}
impl Default for CompactionConfig {
fn default() -> Self {
@ -225,6 +241,8 @@ impl Default for CompactionConfig {
compact_threshold: None,
compact_request_threshold: None,
compact_retained_tokens: default_compact_retained_tokens(),
compact_auto_read_budget: default_compact_auto_read_budget(),
compact_worker_max_input_tokens: default_compact_worker_max_input_tokens(),
provider: None,
}
}

View File

@ -0,0 +1,384 @@
//! Compact worker state and the four tools that drive it.
//!
//! The compact worker is a disposable `Worker` instance spun up by
//! [`Pod::compact`]. It receives the history to summarise plus a list of
//! default reference files (from the session-lifetime `Tracker`) and runs
//! a tool-driven LLM loop. The tools here let it:
//!
//! - `read_file` — inspect referenced files (reuses `tools::read_tool`)
//! - `mark_read_required(path, offset?, limit?)` — nominate a file whose
//! contents should be injected into the compacted context as an
//! auto-read system message
//! - `add_reference(path)` — nominate a file the next session should
//! know about by name only (contents not included)
//! - `write_summary(text)` — deliver (or overwrite) the structured summary
//!
//! Everything the worker decides ends up in [`CompactWorkerContext`],
//! which `Pod::compact` drains after the loop and turns into the
//! compacted session's opening system messages.
use std::path::PathBuf;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use async_trait::async_trait;
use llm_worker::Item;
use llm_worker::interceptor::{Interceptor, PreRequestAction};
use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput};
use serde::Deserialize;
use tools::ScopedFs;
/// A file the compact worker has marked for auto-read in the new session.
#[derive(Debug, Clone)]
pub(crate) struct ReadRequirement {
pub path: PathBuf,
/// 0-based line offset. `None` means from the start of the file.
pub offset: Option<usize>,
/// Maximum number of lines. `None` means to the end of the file.
pub limit: Option<usize>,
}
/// Aggregated output of a compact worker run.
#[derive(Debug, Default, Clone)]
pub(crate) struct CompactWorkerContext {
pub read_required: Vec<ReadRequirement>,
pub references: Vec<PathBuf>,
pub summary: Option<String>,
/// Tokens already consumed by `mark_read_required` calls.
pub auto_read_consumed: u64,
/// Aggregate cap. `0` treats the budget as disabled.
pub auto_read_budget: u64,
}
impl CompactWorkerContext {
pub(crate) fn with_budget(auto_read_budget: u64) -> Self {
Self {
auto_read_budget,
..Self::default()
}
}
fn remaining_budget(&self) -> u64 {
self.auto_read_budget.saturating_sub(self.auto_read_consumed)
}
}
/// Input to `mark_read_required`.
#[derive(Debug, Deserialize, schemars::JsonSchema)]
struct MarkParams {
/// Absolute path to the file.
pub file_path: PathBuf,
/// 0-based line offset.
#[serde(default)]
pub offset: Option<usize>,
/// Maximum number of lines to inject.
#[serde(default)]
pub limit: Option<usize>,
}
/// Input to `add_reference`.
#[derive(Debug, Deserialize, schemars::JsonSchema)]
struct ReferenceParams {
/// Absolute path to the file.
pub file_path: PathBuf,
}
/// Input to `write_summary`.
#[derive(Debug, Deserialize, schemars::JsonSchema)]
struct SummaryParams {
/// Full structured summary text (overwrites any previous call).
pub text: String,
}
const MARK_DESCRIPTION: &str = "Inject a file's contents into the compacted context so the \
next session starts with it already read. Use this for files the next task needs in full. \
Optionally specify `offset` (0-based line) and `limit` (line count) to inject only a slice. \
Counts against `auto_read_budget`; overflow returns an error and the mark is not recorded. \
Paths must be absolute.";
const REFERENCE_DESCRIPTION: &str = "Record a file path as a named reference in the compacted \
context without injecting its contents. Use for files that are contextually relevant but \
whose current content the next session can fetch on demand.";
const SUMMARY_DESCRIPTION: &str = "Provide the final structured summary text. Subsequent calls \
replace the previous content; only the last call is used. Must be called before the compact run \
ends or compaction fails.";
struct MarkReadRequiredTool {
fs: ScopedFs,
ctx: Arc<Mutex<CompactWorkerContext>>,
}
#[async_trait]
impl Tool for MarkReadRequiredTool {
async fn execute(&self, input_json: &str) -> Result<ToolOutput, ToolError> {
let params: MarkParams = serde_json::from_str(input_json).map_err(|e| {
ToolError::InvalidArgument(format!("invalid mark_read_required input: {e}"))
})?;
// Read the file through the shared ScopedFs so scope and I/O
// errors surface the same way the regular `read_file` tool does.
let bytes = self
.fs
.read_bytes(&params.file_path)
.map_err(|e| ToolError::ExecutionFailed(format!("read failed: {e}")))?;
let text = String::from_utf8_lossy(&bytes);
let slice = slice_lines(&text, params.offset.unwrap_or(0), params.limit);
let estimated_tokens = estimate_tokens(slice.len());
let mut guard = self.ctx.lock().expect("compact worker context poisoned");
let budget = guard.auto_read_budget;
let would_consume = guard.auto_read_consumed.saturating_add(estimated_tokens);
if budget > 0 && would_consume > budget {
return Err(ToolError::ExecutionFailed(format!(
"auto-read budget exhausted ({budget} tokens). Remove an existing mark or use \
add_reference instead."
)));
}
guard.read_required.push(ReadRequirement {
path: params.file_path.clone(),
offset: params.offset,
limit: params.limit,
});
guard.auto_read_consumed = would_consume;
let remaining = guard.remaining_budget();
drop(guard);
let mut summary = format!(
"Marked {} for auto-read (≈{estimated_tokens} tokens). \
Budget: {remaining}/{budget} tokens remaining.",
params.file_path.display()
);
if budget > 0 && remaining * 2 <= budget {
summary.push_str(
"\nNote: auto-read budget is at least half consumed. \
Consider calling write_summary and finishing up soon.",
);
}
Ok(ToolOutput {
summary,
content: None,
})
}
}
struct AddReferenceTool {
ctx: Arc<Mutex<CompactWorkerContext>>,
}
#[async_trait]
impl Tool for AddReferenceTool {
async fn execute(&self, input_json: &str) -> Result<ToolOutput, ToolError> {
let params: ReferenceParams = serde_json::from_str(input_json)
.map_err(|e| ToolError::InvalidArgument(format!("invalid add_reference input: {e}")))?;
let mut guard = self.ctx.lock().expect("compact worker context poisoned");
if !guard
.references
.iter()
.any(|p| p.as_path() == params.file_path.as_path())
{
guard.references.push(params.file_path.clone());
}
Ok(ToolOutput {
summary: format!("Added reference {}", params.file_path.display()),
content: None,
})
}
}
struct WriteSummaryTool {
ctx: Arc<Mutex<CompactWorkerContext>>,
}
#[async_trait]
impl Tool for WriteSummaryTool {
async fn execute(&self, input_json: &str) -> Result<ToolOutput, ToolError> {
let params: SummaryParams = serde_json::from_str(input_json)
.map_err(|e| ToolError::InvalidArgument(format!("invalid write_summary input: {e}")))?;
let mut guard = self.ctx.lock().expect("compact worker context poisoned");
let overwritten = guard.summary.is_some();
guard.summary = Some(params.text);
drop(guard);
let note = if overwritten {
"Summary replaced."
} else {
"Summary recorded."
};
Ok(ToolOutput {
summary: note.to_string(),
content: None,
})
}
}
pub(crate) fn mark_read_required_tool(
fs: ScopedFs,
ctx: Arc<Mutex<CompactWorkerContext>>,
) -> ToolDefinition {
Arc::new(move || {
let schema = schemars::schema_for!(MarkParams);
let schema_value = serde_json::to_value(schema).unwrap_or(serde_json::json!({}));
let meta = ToolMeta::new("mark_read_required")
.description(MARK_DESCRIPTION)
.input_schema(schema_value);
let tool: Arc<dyn Tool> = Arc::new(MarkReadRequiredTool {
fs: fs.clone(),
ctx: ctx.clone(),
});
(meta, tool)
})
}
pub(crate) fn add_reference_tool(ctx: Arc<Mutex<CompactWorkerContext>>) -> ToolDefinition {
Arc::new(move || {
let schema = schemars::schema_for!(ReferenceParams);
let schema_value = serde_json::to_value(schema).unwrap_or(serde_json::json!({}));
let meta = ToolMeta::new("add_reference")
.description(REFERENCE_DESCRIPTION)
.input_schema(schema_value);
let tool: Arc<dyn Tool> = Arc::new(AddReferenceTool { ctx: ctx.clone() });
(meta, tool)
})
}
pub(crate) fn write_summary_tool(ctx: Arc<Mutex<CompactWorkerContext>>) -> ToolDefinition {
Arc::new(move || {
let schema = schemars::schema_for!(SummaryParams);
let schema_value = serde_json::to_value(schema).unwrap_or(serde_json::json!({}));
let meta = ToolMeta::new("write_summary")
.description(SUMMARY_DESCRIPTION)
.input_schema(schema_value);
let tool: Arc<dyn Tool> = Arc::new(WriteSummaryTool { ctx: ctx.clone() });
(meta, tool)
})
}
/// Interceptor that aborts the compact worker as soon as its cumulative
/// input-token count crosses `max_input_tokens`. Pairs with the
/// `on_usage` callback registered by `Pod::compact`, which is what
/// actually accumulates `input_so_far`.
pub(crate) struct CompactWorkerInterceptor {
pub input_so_far: Arc<AtomicU64>,
pub max_input_tokens: u64,
}
#[async_trait]
impl Interceptor for CompactWorkerInterceptor {
async fn pre_llm_request(&self, _context: &mut Vec<Item>) -> PreRequestAction {
if self.input_so_far.load(Ordering::Relaxed) > self.max_input_tokens {
return PreRequestAction::Cancel(format!(
"compact worker input exceeded {} tokens",
self.max_input_tokens
));
}
PreRequestAction::Continue
}
}
/// Crude bytes→tokens estimate; good enough for budget accounting.
fn estimate_tokens(bytes: usize) -> u64 {
(bytes as u64).div_ceil(4)
}
/// Return the slice of `text` covered by `offset` (line index) and
/// optional `limit` (line count), preserving the original newline
/// separation. Returns the whole file when both defaults apply.
pub(crate) fn slice_lines(text: &str, offset: usize, limit: Option<usize>) -> String {
let lines: Vec<&str> = text.lines().collect();
let start = offset.min(lines.len());
let end = limit
.map(|n| start.saturating_add(n).min(lines.len()))
.unwrap_or(lines.len());
lines[start..end].join("\n")
}
#[cfg(test)]
mod tests {
use super::*;
use manifest::Scope;
fn make_fs(tmp: &std::path::Path) -> ScopedFs {
let scope = Scope::writable(tmp.to_path_buf()).unwrap();
ScopedFs::new(scope, tmp.to_path_buf())
}
#[tokio::test]
async fn mark_read_required_records_and_deducts_budget() {
let tmp = tempfile::TempDir::new().unwrap();
let path = tmp.path().join("hello.txt");
std::fs::write(&path, "hello world\n").unwrap();
let ctx = Arc::new(Mutex::new(CompactWorkerContext::with_budget(1_000)));
let tool: Arc<dyn Tool> = Arc::new(MarkReadRequiredTool {
fs: make_fs(tmp.path()),
ctx: ctx.clone(),
});
let input = serde_json::json!({ "file_path": path.to_str().unwrap() }).to_string();
let out = tool.execute(&input).await.unwrap();
assert!(out.summary.starts_with("Marked"));
let guard = ctx.lock().unwrap();
assert_eq!(guard.read_required.len(), 1);
assert!(guard.auto_read_consumed > 0);
assert!(guard.auto_read_consumed <= 1_000);
}
#[tokio::test]
async fn mark_read_required_rejects_over_budget() {
let tmp = tempfile::TempDir::new().unwrap();
let path = tmp.path().join("big.txt");
std::fs::write(&path, "x".repeat(4_096)).unwrap(); // ≈1024 tokens
let ctx = Arc::new(Mutex::new(CompactWorkerContext::with_budget(100)));
let tool: Arc<dyn Tool> = Arc::new(MarkReadRequiredTool {
fs: make_fs(tmp.path()),
ctx: ctx.clone(),
});
let input = serde_json::json!({ "file_path": path.to_str().unwrap() }).to_string();
let res = tool.execute(&input).await;
assert!(matches!(res, Err(ToolError::ExecutionFailed(_))));
let guard = ctx.lock().unwrap();
assert!(guard.read_required.is_empty());
assert_eq!(guard.auto_read_consumed, 0);
}
#[tokio::test]
async fn write_summary_overwrites_previous_call() {
let ctx = Arc::new(Mutex::new(CompactWorkerContext::with_budget(0)));
let tool: Arc<dyn Tool> = Arc::new(WriteSummaryTool { ctx: ctx.clone() });
let first = serde_json::json!({ "text": "first" }).to_string();
let out1 = tool.execute(&first).await.unwrap();
assert!(out1.summary.contains("recorded"));
let second = serde_json::json!({ "text": "second" }).to_string();
let out2 = tool.execute(&second).await.unwrap();
assert!(out2.summary.contains("replaced"));
assert_eq!(ctx.lock().unwrap().summary.as_deref(), Some("second"));
}
#[tokio::test]
async fn add_reference_deduplicates() {
let ctx = Arc::new(Mutex::new(CompactWorkerContext::with_budget(0)));
let tool: Arc<dyn Tool> = Arc::new(AddReferenceTool { ctx: ctx.clone() });
let p = "/abs/path.rs";
let input = serde_json::json!({ "file_path": p }).to_string();
tool.execute(&input).await.unwrap();
tool.execute(&input).await.unwrap();
let guard = ctx.lock().unwrap();
assert_eq!(guard.references.len(), 1);
assert_eq!(guard.references[0], PathBuf::from(p));
}
#[test]
fn slice_lines_handles_offset_and_limit() {
let text = "a\nb\nc\nd";
assert_eq!(slice_lines(text, 0, None), "a\nb\nc\nd");
assert_eq!(slice_lines(text, 1, Some(2)), "b\nc");
assert_eq!(slice_lines(text, 10, None), "");
}
}

View File

@ -12,6 +12,7 @@ pub mod spawned_pod_registry;
mod agents_md;
mod compact_state;
mod compact_worker;
mod factory;
mod notification_buffer;
mod pod;

View File

@ -47,18 +47,35 @@ impl Hook<PreLlmRequest> for UsageTrackingHook {
}
const SUMMARY_SYSTEM_PROMPT: &str = "\
You are a context compaction assistant. \
Summarise the conversation below into a structured summary. \
Preserve concrete details: file paths, function names, error messages, decisions made. \
Use the following format:\n\n\
## Original Task\n\
(the user's original request)\n\n\
## Completed Work\n\
- (what was done, with specifics)\n\n\
## Key Discoveries\n\
- (facts, constraints, errors found)\n\n\
## Current State\n\
- (files changed, remaining work)";
You are a context compaction assistant. Your job is to hand the next session a \
structured summary plus pointers to the files it actually needs.\n\n\
Tools you can call:\n\
- `read_file(file_path, offset?, limit?)` inspect referenced files before deciding.\n\
- `mark_read_required(file_path, offset?, limit?)` inject a file's contents into the \
next session as an auto-read system message. Counts against `auto_read_budget`.\n\
- `add_reference(file_path)` record a file path the next session should know about \
without embedding its contents.\n\
- `write_summary(text)` deliver the final structured summary. May be called multiple \
times; only the last call is kept.\n\n\
Always finish by calling `write_summary`. Produce the summary in this exact format:\n\n\
## Completed Tasks\n\
### (task name)\n\
- what was done (use concrete type / file / function names)\n\
- gotchas or facts that came up\n\n\
## Active Task\n\
### (task name)\n\
- goal\n\
- current state (what is done / not done)\n\
- next step\n\n\
## Key Decisions\n\
- (decision) (reason)\n\n\
## User Directives\n\
- \"verbatim user line\" — only include directives whose wording the next session \
should not lose.\n\n\
## Current Work\n\
(23 lines on what was happening just before compaction).\n\n\
Keep code snippets and raw tool output OUT of the summary that is what auto-read \
and references are for. Target 10002000 tokens.";
/// An independent agent execution unit.
///
@ -792,6 +809,13 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
///
/// Returns the new session ID.
pub async fn compact(&mut self, retained_tokens: u64) -> Result<SessionId, PodError> {
use std::sync::atomic::{AtomicU64, Ordering};
use crate::compact_worker::{
CompactWorkerContext, CompactWorkerInterceptor, add_reference_tool,
mark_read_required_tool, slice_lines, write_summary_tool,
};
// Decide the cut point by projecting the UsageRecord timeline onto
// the current history: keep the tail whose estimated token count is
// within `retained_tokens`. Item-granular, turn boundaries ignored.
@ -801,41 +825,179 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
let history = worker.history();
let retain_from = cut.index.min(history.len());
let retained_items = history[retain_from..].to_vec();
let items_to_summarise = &history[..retain_from];
let items_to_summarise = history[..retain_from].to_vec();
// Build summary prompt.
let summary_prompt = build_summary_prompt(items_to_summarise);
// Compaction-related knobs. Fall through to manifest defaults when
// `[compaction]` is omitted entirely.
let (auto_read_budget, compact_worker_max_input_tokens) = self
.manifest
.compaction
.as_ref()
.map(|c| (c.compact_auto_read_budget, c.compact_worker_max_input_tokens))
.unwrap_or((
manifest::defaults::COMPACT_AUTO_READ_BUDGET,
manifest::defaults::COMPACT_WORKER_MAX_INPUT_TOKENS,
));
// Create a disposable summary Worker.
// Default references: the N most-recently-touched files in the
// session, surfaced so the compact worker can inspect them and
// decide which (if any) the next session needs.
let default_refs: Vec<PathBuf> = self
.tracker
.as_ref()
.map(|t| t.recent_files(manifest::defaults::COMPACT_DEFAULT_REFERENCE_COUNT))
.unwrap_or_default();
// Input text fed to the compact worker. Includes the default
// references and the (pruned) conversation text.
let summary_input = build_summary_input(&items_to_summarise, &default_refs);
// Worker-side state collected by the compact worker's tool calls.
let ctx = Arc::new(std::sync::Mutex::new(CompactWorkerContext::with_budget(
auto_read_budget,
)));
// Build an independent compact worker. Scope and pwd are shared
// with the main Pod (reads go through the same policy) but the
// Tracker is fresh — compact-time reads must not pollute the
// main session's recency list, which feeds `default_refs` above.
let scoped_fs = tools::ScopedFs::new(self.scope.clone(), self.pwd.clone());
let summary_tracker = tools::Tracker::new();
let summary_client: Box<dyn LlmClient> = self.build_compactor_client()?;
let mut summary_worker = Worker::new(summary_client)
.system_prompt(SUMMARY_SYSTEM_PROMPT)
.temperature(0.0);
summary_worker.set_max_tokens(2048);
summary_worker.set_max_tokens(4096);
// Cumulative input-token meter + interceptor. The meter is bumped
// from the on_usage callback and read on every pre_llm_request.
let input_so_far = Arc::new(AtomicU64::new(0));
{
let acc = input_so_far.clone();
summary_worker.on_usage(move |event| {
if let Some(tokens) = event.input_tokens {
acc.fetch_add(tokens, Ordering::Relaxed);
}
});
}
summary_worker.set_interceptor(CompactWorkerInterceptor {
input_so_far: input_so_far.clone(),
max_input_tokens: compact_worker_max_input_tokens,
});
// Tools: read_file (shared scope, fresh tracker) + the three
// compact-specific tools that populate `ctx`.
summary_worker.register_tool(tools::read_tool(scoped_fs.clone(), summary_tracker));
summary_worker
.register_tool(mark_read_required_tool(scoped_fs.clone(), ctx.clone()));
summary_worker.register_tool(add_reference_tool(ctx.clone()));
summary_worker.register_tool(write_summary_tool(ctx.clone()));
let out = summary_worker
.run(summary_prompt)
.run(summary_input)
.await
.map_err(PodError::Worker)?;
let summary_text = out
.worker
.history()
.iter()
.filter_map(|item| {
if item.is_assistant_message() {
item.as_text().map(String::from)
} else {
None
}
})
.collect::<Vec<_>>()
.join("\n");
let mut locked_worker = out.worker;
// Build new history: [summary as user message, ...retained].
let mut new_history = Vec::with_capacity(retained_items.len() + 1);
// Guard: nudge the worker once more if the expected outputs
// (summary, and any auto-read nominations when default refs
// existed) were not produced on the first pass. `write_summary`
// is idempotent-by-overwrite so a second call is safe.
let nudge = {
let snapshot = ctx.lock().expect("compact ctx poisoned").clone();
if snapshot.summary.is_none() {
Some(
"You have not called `write_summary` yet. Deliver the structured \
summary now (Completed Tasks / Active Task / Key Decisions / \
User Directives / Current Work) and nominate any files the next \
session needs with `mark_read_required`."
.to_string(),
)
} else if snapshot.read_required.is_empty() && !default_refs.is_empty() {
Some(
"Summary received. If any of the referenced files are required \
for the next session to continue the task, call \
`mark_read_required` on them now. Otherwise reply briefly to \
close out."
.to_string(),
)
} else {
None
}
};
if let Some(prompt) = nudge {
let _ = locked_worker
.run(prompt)
.await
.map_err(PodError::Worker)?;
}
let final_ctx = ctx.lock().expect("compact ctx poisoned").clone();
let summary_text = final_ctx
.summary
.clone()
.ok_or(PodError::CompactSummaryMissing)?;
// Re-read each auto-read target through ScopedFs and render the
// requested slice. Errors are logged and skipped rather than
// aborting compaction — a missing / moved file should not fail
// the whole compact.
let mut auto_read_messages = Vec::new();
for req in &final_ctx.read_required {
match scoped_fs.read_bytes(&req.path) {
Ok(bytes) => {
let text = String::from_utf8_lossy(&bytes).into_owned();
let body = slice_lines(&text, req.offset.unwrap_or(0), req.limit);
let range = match (req.offset, req.limit) {
(None, None) => String::new(),
(Some(off), None) => format!(":{}-", off + 1),
(None, Some(lim)) => format!(":1-{lim}"),
(Some(off), Some(lim)) => {
format!(":{}-{}", off + 1, off.saturating_add(lim))
}
};
auto_read_messages.push(Item::system_message(format!(
"[Auto-read file: {}{range}]\n{body}",
req.path.display()
)));
}
Err(e) => {
warn!(
path = %req.path.display(),
error = %e,
"auto-read target could not be read; skipping",
);
}
}
}
// Reference list as a single system message; omitted when empty.
let reference_message = (!final_ctx.references.is_empty()).then(|| {
let list = final_ctx
.references
.iter()
.map(|p| format!("- {}", p.display()))
.collect::<Vec<_>>()
.join("\n");
Item::system_message(format!(
"[Referenced files — read before compaction, contents not included]\n\
{list}\n\
Use read_file to access current contents if needed."
))
});
// Build new history: [summary, ...auto-read, references, ...retained].
let mut new_history = Vec::with_capacity(
1 + auto_read_messages.len() + reference_message.is_some() as usize
+ retained_items.len(),
);
new_history.push(Item::system_message(format!(
"[Compacted context summary]\n\n{summary_text}"
)));
new_history.extend(auto_read_messages);
if let Some(msg) = reference_message {
new_history.push(msg);
}
new_history.extend(retained_items);
// Persist as a new compacted session.
@ -1087,6 +1249,37 @@ impl From<WorkerResult> for PodRunResult {
}
}
/// Build the compact worker's input: default-reference instructions,
/// the list of recently-touched files, and the pruned conversation
/// produced by [`build_summary_prompt`].
fn build_summary_input(items: &[Item], default_refs: &[PathBuf]) -> String {
let mut out = String::new();
out.push_str(
"Summarise the conversation below into a structured summary and nominate \
files the next session needs.\n\n",
);
if !default_refs.is_empty() {
out.push_str(
"These files were touched recently in this session. Use `read_file` \
on them as needed, then call `mark_read_required` for any whose \
contents the next session must have, and `add_reference` for files \
it should know about by name only.\n\n## Referenced files\n",
);
for p in default_refs {
out.push_str("- ");
out.push_str(&p.display().to_string());
out.push('\n');
}
out.push('\n');
}
out.push_str("## Conversation\n");
out.push_str(&build_summary_prompt(items));
out.push_str(
"\n\nWhen you are done, call `write_summary` with the final 5-section text.",
);
out
}
/// Format conversation items into a text prompt for the summary Worker.
///
/// The summary should capture decisions and user intent, not recreate code.
@ -1158,6 +1351,9 @@ pub enum PodError {
#[error("compaction thrash: context still exceeds threshold immediately after compact")]
CompactThrash,
#[error("compact worker did not produce a summary (write_summary was never called)")]
CompactSummaryMissing,
#[error("invalid system prompt template: {source}")]
InvalidSystemPromptTemplate {
#[source]

View File

@ -61,6 +61,19 @@ fn single_text_events(text: &str) -> Vec<LlmEvent> {
]
}
/// Emit a single `write_summary(text=...)` tool call as one LLM response.
fn write_summary_tool_use_events(call_id: &str, text: &str) -> Vec<LlmEvent> {
let input = serde_json::json!({ "text": text }).to_string();
vec![
LlmEvent::tool_use_start(0, call_id, "write_summary"),
LlmEvent::tool_input_delta(0, input),
LlmEvent::tool_use_stop(0),
LlmEvent::Status(StatusEvent {
status: ResponseStatus::Completed,
}),
]
}
const MINIMAL_MANIFEST_TOML: &str = r#"
[pod]
name = "test-pod"
@ -233,10 +246,11 @@ async fn agents_md_absent_omits_trailing_section() {
#[tokio::test]
async fn agents_md_not_reread_after_compact() {
let client = MockClient::new(vec![
single_text_events("a"),
single_text_events("b"),
single_text_events("summary"),
single_text_events("c"),
single_text_events("a"), // pod.run("first")
single_text_events("b"), // pod.run("second")
write_summary_tool_use_events("call-1", "compacted summary"), // compact worker: tool_use
single_text_events("done"), // compact worker: close
single_text_events("c"), // pod.run("third")
]);
let (mut pod, pwd) = make_pod_with_body("BODY", client).await.unwrap();
let agents_path = pwd.join("AGENTS.md");
@ -264,10 +278,11 @@ async fn agents_md_not_reread_after_compact() {
#[tokio::test]
async fn compact_preserves_system_prompt() {
let client = MockClient::new(vec![
single_text_events("a"),
single_text_events("b"),
single_text_events("summary"),
single_text_events("c"),
single_text_events("a"), // pod.run("first")
single_text_events("b"), // pod.run("second")
write_summary_tool_use_events("call-1", "compacted summary"), // compact worker: tool_use
single_text_events("done"), // compact worker: close
single_text_events("c"), // pod.run("third")
]);
let (mut pod, _pwd) = make_pod_with_body("SP cwd={{ cwd }}", client)
.await