diff --git a/crates/manifest/src/config.rs b/crates/manifest/src/config.rs index ca447e19..9b3c9e57 100644 --- a/crates/manifest/src/config.rs +++ b/crates/manifest/src/config.rs @@ -88,6 +88,10 @@ pub struct CompactionConfigPartial { #[serde(default)] pub compact_retained_tokens: Option, #[serde(default)] + pub compact_auto_read_budget: Option, + #[serde(default)] + pub compact_worker_max_input_tokens: Option, + #[serde(default)] pub provider: Option, } @@ -244,6 +248,12 @@ impl CompactionConfigPartial { compact_retained_tokens: upper .compact_retained_tokens .or(self.compact_retained_tokens), + compact_auto_read_budget: upper + .compact_auto_read_budget + .or(self.compact_auto_read_budget), + compact_worker_max_input_tokens: upper + .compact_worker_max_input_tokens + .or(self.compact_worker_max_input_tokens), provider: merge_option(self.provider, upper.provider, ProviderConfigPartial::merge), } } @@ -374,6 +384,12 @@ impl TryFrom for PodManifest { compact_retained_tokens: c .compact_retained_tokens .unwrap_or(defaults::COMPACT_RETAINED_TOKENS), + compact_auto_read_budget: c + .compact_auto_read_budget + .unwrap_or(defaults::COMPACT_AUTO_READ_BUDGET), + compact_worker_max_input_tokens: c + .compact_worker_max_input_tokens + .unwrap_or(defaults::COMPACT_WORKER_MAX_INPUT_TOKENS), provider: comp_provider, }) }) diff --git a/crates/manifest/src/defaults.rs b/crates/manifest/src/defaults.rs index cca50e1f..5317c269 100644 --- a/crates/manifest/src/defaults.rs +++ b/crates/manifest/src/defaults.rs @@ -28,3 +28,20 @@ pub const COMPACT_RETAINED_TOKENS: u64 = 8000; /// is omitted. See the `PromptLoader` prefix addressing scheme for the /// `$insomnia/` / `$user/` / `$workspace/` namespaces. pub const DEFAULT_INSTRUCTION: &str = "$insomnia/default"; + +/// Token budget for auto-read file contents injected into the new +/// session after compaction. Limits how much raw file text the +/// compact worker can pull into the compacted context via +/// `mark_read_required`. See +/// [`crate::CompactionConfig::compact_auto_read_budget`]. +pub const COMPACT_AUTO_READ_BUDGET: u64 = 8000; + +/// Cumulative input-token cap for the compact worker's own LLM +/// calls. Exceeding this aborts the compact run (circuit-breaker +/// path). See +/// [`crate::CompactionConfig::compact_worker_max_input_tokens`]. +pub const COMPACT_WORKER_MAX_INPUT_TOKENS: u64 = 50_000; + +/// Number of recently-touched files fed to the compact worker as +/// default references. +pub const COMPACT_DEFAULT_REFERENCE_COUNT: usize = 5; diff --git a/crates/manifest/src/lib.rs b/crates/manifest/src/lib.rs index d387b619..e50c9893 100644 --- a/crates/manifest/src/lib.rs +++ b/crates/manifest/src/lib.rs @@ -201,6 +201,16 @@ pub struct CompactionConfig { #[serde(default = "default_compact_retained_tokens")] pub compact_retained_tokens: u64, + /// Aggregate token budget for auto-read file contents injected into + /// the compacted session by the compact worker. + #[serde(default = "default_compact_auto_read_budget")] + pub compact_auto_read_budget: u64, + + /// Cumulative input-token cap for the compact worker's own LLM + /// calls. Exceeding this aborts the compact run. + #[serde(default = "default_compact_worker_max_input_tokens")] + pub compact_worker_max_input_tokens: u64, + /// Optional provider for the compactor (summary) LLM. /// If omitted, the main provider is cloned via `clone_boxed()`. #[serde(default)] @@ -216,6 +226,12 @@ fn default_prune_min_savings() -> u64 { fn default_compact_retained_tokens() -> u64 { defaults::COMPACT_RETAINED_TOKENS } +fn default_compact_auto_read_budget() -> u64 { + defaults::COMPACT_AUTO_READ_BUDGET +} +fn default_compact_worker_max_input_tokens() -> u64 { + defaults::COMPACT_WORKER_MAX_INPUT_TOKENS +} impl Default for CompactionConfig { fn default() -> Self { @@ -225,6 +241,8 @@ impl Default for CompactionConfig { compact_threshold: None, compact_request_threshold: None, compact_retained_tokens: default_compact_retained_tokens(), + compact_auto_read_budget: default_compact_auto_read_budget(), + compact_worker_max_input_tokens: default_compact_worker_max_input_tokens(), provider: None, } } diff --git a/crates/pod/src/compact_worker.rs b/crates/pod/src/compact_worker.rs new file mode 100644 index 00000000..ce9ed4c0 --- /dev/null +++ b/crates/pod/src/compact_worker.rs @@ -0,0 +1,384 @@ +//! Compact worker state and the four tools that drive it. +//! +//! The compact worker is a disposable `Worker` instance spun up by +//! [`Pod::compact`]. It receives the history to summarise plus a list of +//! default reference files (from the session-lifetime `Tracker`) and runs +//! a tool-driven LLM loop. The tools here let it: +//! +//! - `read_file` — inspect referenced files (reuses `tools::read_tool`) +//! - `mark_read_required(path, offset?, limit?)` — nominate a file whose +//! contents should be injected into the compacted context as an +//! auto-read system message +//! - `add_reference(path)` — nominate a file the next session should +//! know about by name only (contents not included) +//! - `write_summary(text)` — deliver (or overwrite) the structured summary +//! +//! Everything the worker decides ends up in [`CompactWorkerContext`], +//! which `Pod::compact` drains after the loop and turns into the +//! compacted session's opening system messages. + +use std::path::PathBuf; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::{Arc, Mutex}; + +use async_trait::async_trait; +use llm_worker::Item; +use llm_worker::interceptor::{Interceptor, PreRequestAction}; +use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput}; +use serde::Deserialize; +use tools::ScopedFs; + +/// A file the compact worker has marked for auto-read in the new session. +#[derive(Debug, Clone)] +pub(crate) struct ReadRequirement { + pub path: PathBuf, + /// 0-based line offset. `None` means from the start of the file. + pub offset: Option, + /// Maximum number of lines. `None` means to the end of the file. + pub limit: Option, +} + +/// Aggregated output of a compact worker run. +#[derive(Debug, Default, Clone)] +pub(crate) struct CompactWorkerContext { + pub read_required: Vec, + pub references: Vec, + pub summary: Option, + /// Tokens already consumed by `mark_read_required` calls. + pub auto_read_consumed: u64, + /// Aggregate cap. `0` treats the budget as disabled. + pub auto_read_budget: u64, +} + +impl CompactWorkerContext { + pub(crate) fn with_budget(auto_read_budget: u64) -> Self { + Self { + auto_read_budget, + ..Self::default() + } + } + + fn remaining_budget(&self) -> u64 { + self.auto_read_budget.saturating_sub(self.auto_read_consumed) + } +} + +/// Input to `mark_read_required`. +#[derive(Debug, Deserialize, schemars::JsonSchema)] +struct MarkParams { + /// Absolute path to the file. + pub file_path: PathBuf, + /// 0-based line offset. + #[serde(default)] + pub offset: Option, + /// Maximum number of lines to inject. + #[serde(default)] + pub limit: Option, +} + +/// Input to `add_reference`. +#[derive(Debug, Deserialize, schemars::JsonSchema)] +struct ReferenceParams { + /// Absolute path to the file. + pub file_path: PathBuf, +} + +/// Input to `write_summary`. +#[derive(Debug, Deserialize, schemars::JsonSchema)] +struct SummaryParams { + /// Full structured summary text (overwrites any previous call). + pub text: String, +} + +const MARK_DESCRIPTION: &str = "Inject a file's contents into the compacted context so the \ +next session starts with it already read. Use this for files the next task needs in full. \ +Optionally specify `offset` (0-based line) and `limit` (line count) to inject only a slice. \ +Counts against `auto_read_budget`; overflow returns an error and the mark is not recorded. \ +Paths must be absolute."; + +const REFERENCE_DESCRIPTION: &str = "Record a file path as a named reference in the compacted \ +context without injecting its contents. Use for files that are contextually relevant but \ +whose current content the next session can fetch on demand."; + +const SUMMARY_DESCRIPTION: &str = "Provide the final structured summary text. Subsequent calls \ +replace the previous content; only the last call is used. Must be called before the compact run \ +ends or compaction fails."; + +struct MarkReadRequiredTool { + fs: ScopedFs, + ctx: Arc>, +} + +#[async_trait] +impl Tool for MarkReadRequiredTool { + async fn execute(&self, input_json: &str) -> Result { + let params: MarkParams = serde_json::from_str(input_json).map_err(|e| { + ToolError::InvalidArgument(format!("invalid mark_read_required input: {e}")) + })?; + + // Read the file through the shared ScopedFs so scope and I/O + // errors surface the same way the regular `read_file` tool does. + let bytes = self + .fs + .read_bytes(¶ms.file_path) + .map_err(|e| ToolError::ExecutionFailed(format!("read failed: {e}")))?; + let text = String::from_utf8_lossy(&bytes); + let slice = slice_lines(&text, params.offset.unwrap_or(0), params.limit); + let estimated_tokens = estimate_tokens(slice.len()); + + let mut guard = self.ctx.lock().expect("compact worker context poisoned"); + let budget = guard.auto_read_budget; + let would_consume = guard.auto_read_consumed.saturating_add(estimated_tokens); + if budget > 0 && would_consume > budget { + return Err(ToolError::ExecutionFailed(format!( + "auto-read budget exhausted ({budget} tokens). Remove an existing mark or use \ + add_reference instead." + ))); + } + guard.read_required.push(ReadRequirement { + path: params.file_path.clone(), + offset: params.offset, + limit: params.limit, + }); + guard.auto_read_consumed = would_consume; + let remaining = guard.remaining_budget(); + drop(guard); + + let mut summary = format!( + "Marked {} for auto-read (≈{estimated_tokens} tokens). \ + Budget: {remaining}/{budget} tokens remaining.", + params.file_path.display() + ); + if budget > 0 && remaining * 2 <= budget { + summary.push_str( + "\nNote: auto-read budget is at least half consumed. \ + Consider calling write_summary and finishing up soon.", + ); + } + Ok(ToolOutput { + summary, + content: None, + }) + } +} + +struct AddReferenceTool { + ctx: Arc>, +} + +#[async_trait] +impl Tool for AddReferenceTool { + async fn execute(&self, input_json: &str) -> Result { + let params: ReferenceParams = serde_json::from_str(input_json) + .map_err(|e| ToolError::InvalidArgument(format!("invalid add_reference input: {e}")))?; + let mut guard = self.ctx.lock().expect("compact worker context poisoned"); + if !guard + .references + .iter() + .any(|p| p.as_path() == params.file_path.as_path()) + { + guard.references.push(params.file_path.clone()); + } + Ok(ToolOutput { + summary: format!("Added reference {}", params.file_path.display()), + content: None, + }) + } +} + +struct WriteSummaryTool { + ctx: Arc>, +} + +#[async_trait] +impl Tool for WriteSummaryTool { + async fn execute(&self, input_json: &str) -> Result { + let params: SummaryParams = serde_json::from_str(input_json) + .map_err(|e| ToolError::InvalidArgument(format!("invalid write_summary input: {e}")))?; + let mut guard = self.ctx.lock().expect("compact worker context poisoned"); + let overwritten = guard.summary.is_some(); + guard.summary = Some(params.text); + drop(guard); + let note = if overwritten { + "Summary replaced." + } else { + "Summary recorded." + }; + Ok(ToolOutput { + summary: note.to_string(), + content: None, + }) + } +} + +pub(crate) fn mark_read_required_tool( + fs: ScopedFs, + ctx: Arc>, +) -> ToolDefinition { + Arc::new(move || { + let schema = schemars::schema_for!(MarkParams); + let schema_value = serde_json::to_value(schema).unwrap_or(serde_json::json!({})); + let meta = ToolMeta::new("mark_read_required") + .description(MARK_DESCRIPTION) + .input_schema(schema_value); + let tool: Arc = Arc::new(MarkReadRequiredTool { + fs: fs.clone(), + ctx: ctx.clone(), + }); + (meta, tool) + }) +} + +pub(crate) fn add_reference_tool(ctx: Arc>) -> ToolDefinition { + Arc::new(move || { + let schema = schemars::schema_for!(ReferenceParams); + let schema_value = serde_json::to_value(schema).unwrap_or(serde_json::json!({})); + let meta = ToolMeta::new("add_reference") + .description(REFERENCE_DESCRIPTION) + .input_schema(schema_value); + let tool: Arc = Arc::new(AddReferenceTool { ctx: ctx.clone() }); + (meta, tool) + }) +} + +pub(crate) fn write_summary_tool(ctx: Arc>) -> ToolDefinition { + Arc::new(move || { + let schema = schemars::schema_for!(SummaryParams); + let schema_value = serde_json::to_value(schema).unwrap_or(serde_json::json!({})); + let meta = ToolMeta::new("write_summary") + .description(SUMMARY_DESCRIPTION) + .input_schema(schema_value); + let tool: Arc = Arc::new(WriteSummaryTool { ctx: ctx.clone() }); + (meta, tool) + }) +} + +/// Interceptor that aborts the compact worker as soon as its cumulative +/// input-token count crosses `max_input_tokens`. Pairs with the +/// `on_usage` callback registered by `Pod::compact`, which is what +/// actually accumulates `input_so_far`. +pub(crate) struct CompactWorkerInterceptor { + pub input_so_far: Arc, + pub max_input_tokens: u64, +} + +#[async_trait] +impl Interceptor for CompactWorkerInterceptor { + async fn pre_llm_request(&self, _context: &mut Vec) -> PreRequestAction { + if self.input_so_far.load(Ordering::Relaxed) > self.max_input_tokens { + return PreRequestAction::Cancel(format!( + "compact worker input exceeded {} tokens", + self.max_input_tokens + )); + } + PreRequestAction::Continue + } +} + +/// Crude bytes→tokens estimate; good enough for budget accounting. +fn estimate_tokens(bytes: usize) -> u64 { + (bytes as u64).div_ceil(4) +} + +/// Return the slice of `text` covered by `offset` (line index) and +/// optional `limit` (line count), preserving the original newline +/// separation. Returns the whole file when both defaults apply. +pub(crate) fn slice_lines(text: &str, offset: usize, limit: Option) -> String { + let lines: Vec<&str> = text.lines().collect(); + let start = offset.min(lines.len()); + let end = limit + .map(|n| start.saturating_add(n).min(lines.len())) + .unwrap_or(lines.len()); + lines[start..end].join("\n") +} + +#[cfg(test)] +mod tests { + use super::*; + use manifest::Scope; + + fn make_fs(tmp: &std::path::Path) -> ScopedFs { + let scope = Scope::writable(tmp.to_path_buf()).unwrap(); + ScopedFs::new(scope, tmp.to_path_buf()) + } + + #[tokio::test] + async fn mark_read_required_records_and_deducts_budget() { + let tmp = tempfile::TempDir::new().unwrap(); + let path = tmp.path().join("hello.txt"); + std::fs::write(&path, "hello world\n").unwrap(); + + let ctx = Arc::new(Mutex::new(CompactWorkerContext::with_budget(1_000))); + let tool: Arc = Arc::new(MarkReadRequiredTool { + fs: make_fs(tmp.path()), + ctx: ctx.clone(), + }); + let input = serde_json::json!({ "file_path": path.to_str().unwrap() }).to_string(); + let out = tool.execute(&input).await.unwrap(); + + assert!(out.summary.starts_with("Marked")); + let guard = ctx.lock().unwrap(); + assert_eq!(guard.read_required.len(), 1); + assert!(guard.auto_read_consumed > 0); + assert!(guard.auto_read_consumed <= 1_000); + } + + #[tokio::test] + async fn mark_read_required_rejects_over_budget() { + let tmp = tempfile::TempDir::new().unwrap(); + let path = tmp.path().join("big.txt"); + std::fs::write(&path, "x".repeat(4_096)).unwrap(); // ≈1024 tokens + + let ctx = Arc::new(Mutex::new(CompactWorkerContext::with_budget(100))); + let tool: Arc = Arc::new(MarkReadRequiredTool { + fs: make_fs(tmp.path()), + ctx: ctx.clone(), + }); + let input = serde_json::json!({ "file_path": path.to_str().unwrap() }).to_string(); + let res = tool.execute(&input).await; + + assert!(matches!(res, Err(ToolError::ExecutionFailed(_)))); + let guard = ctx.lock().unwrap(); + assert!(guard.read_required.is_empty()); + assert_eq!(guard.auto_read_consumed, 0); + } + + #[tokio::test] + async fn write_summary_overwrites_previous_call() { + let ctx = Arc::new(Mutex::new(CompactWorkerContext::with_budget(0))); + let tool: Arc = Arc::new(WriteSummaryTool { ctx: ctx.clone() }); + + let first = serde_json::json!({ "text": "first" }).to_string(); + let out1 = tool.execute(&first).await.unwrap(); + assert!(out1.summary.contains("recorded")); + + let second = serde_json::json!({ "text": "second" }).to_string(); + let out2 = tool.execute(&second).await.unwrap(); + assert!(out2.summary.contains("replaced")); + + assert_eq!(ctx.lock().unwrap().summary.as_deref(), Some("second")); + } + + #[tokio::test] + async fn add_reference_deduplicates() { + let ctx = Arc::new(Mutex::new(CompactWorkerContext::with_budget(0))); + let tool: Arc = Arc::new(AddReferenceTool { ctx: ctx.clone() }); + + let p = "/abs/path.rs"; + let input = serde_json::json!({ "file_path": p }).to_string(); + tool.execute(&input).await.unwrap(); + tool.execute(&input).await.unwrap(); + + let guard = ctx.lock().unwrap(); + assert_eq!(guard.references.len(), 1); + assert_eq!(guard.references[0], PathBuf::from(p)); + } + + #[test] + fn slice_lines_handles_offset_and_limit() { + let text = "a\nb\nc\nd"; + assert_eq!(slice_lines(text, 0, None), "a\nb\nc\nd"); + assert_eq!(slice_lines(text, 1, Some(2)), "b\nc"); + assert_eq!(slice_lines(text, 10, None), ""); + } +} diff --git a/crates/pod/src/lib.rs b/crates/pod/src/lib.rs index 6897567a..46e89e24 100644 --- a/crates/pod/src/lib.rs +++ b/crates/pod/src/lib.rs @@ -12,6 +12,7 @@ pub mod spawned_pod_registry; mod agents_md; mod compact_state; +mod compact_worker; mod factory; mod notification_buffer; mod pod; diff --git a/crates/pod/src/pod.rs b/crates/pod/src/pod.rs index 247ece40..bded686c 100644 --- a/crates/pod/src/pod.rs +++ b/crates/pod/src/pod.rs @@ -47,18 +47,35 @@ impl Hook for UsageTrackingHook { } const SUMMARY_SYSTEM_PROMPT: &str = "\ -You are a context compaction assistant. \ -Summarise the conversation below into a structured summary. \ -Preserve concrete details: file paths, function names, error messages, decisions made. \ -Use the following format:\n\n\ -## Original Task\n\ -(the user's original request)\n\n\ -## Completed Work\n\ -- (what was done, with specifics)\n\n\ -## Key Discoveries\n\ -- (facts, constraints, errors found)\n\n\ -## Current State\n\ -- (files changed, remaining work)"; +You are a context compaction assistant. Your job is to hand the next session a \ +structured summary plus pointers to the files it actually needs.\n\n\ +Tools you can call:\n\ +- `read_file(file_path, offset?, limit?)` — inspect referenced files before deciding.\n\ +- `mark_read_required(file_path, offset?, limit?)` — inject a file's contents into the \ + next session as an auto-read system message. Counts against `auto_read_budget`.\n\ +- `add_reference(file_path)` — record a file path the next session should know about \ + without embedding its contents.\n\ +- `write_summary(text)` — deliver the final structured summary. May be called multiple \ + times; only the last call is kept.\n\n\ +Always finish by calling `write_summary`. Produce the summary in this exact format:\n\n\ +## Completed Tasks\n\ +### (task name)\n\ +- what was done (use concrete type / file / function names)\n\ +- gotchas or facts that came up\n\n\ +## Active Task\n\ +### (task name)\n\ +- goal\n\ +- current state (what is done / not done)\n\ +- next step\n\n\ +## Key Decisions\n\ +- (decision) — (reason)\n\n\ +## User Directives\n\ +- \"verbatim user line\" — only include directives whose wording the next session \ + should not lose.\n\n\ +## Current Work\n\ +(2–3 lines on what was happening just before compaction).\n\n\ +Keep code snippets and raw tool output OUT of the summary — that is what auto-read \ +and references are for. Target 1000–2000 tokens."; /// An independent agent execution unit. /// @@ -792,6 +809,13 @@ impl Pod { /// /// Returns the new session ID. pub async fn compact(&mut self, retained_tokens: u64) -> Result { + use std::sync::atomic::{AtomicU64, Ordering}; + + use crate::compact_worker::{ + CompactWorkerContext, CompactWorkerInterceptor, add_reference_tool, + mark_read_required_tool, slice_lines, write_summary_tool, + }; + // Decide the cut point by projecting the UsageRecord timeline onto // the current history: keep the tail whose estimated token count is // within `retained_tokens`. Item-granular, turn boundaries ignored. @@ -801,41 +825,179 @@ impl Pod { let history = worker.history(); let retain_from = cut.index.min(history.len()); let retained_items = history[retain_from..].to_vec(); - let items_to_summarise = &history[..retain_from]; + let items_to_summarise = history[..retain_from].to_vec(); - // Build summary prompt. - let summary_prompt = build_summary_prompt(items_to_summarise); + // Compaction-related knobs. Fall through to manifest defaults when + // `[compaction]` is omitted entirely. + let (auto_read_budget, compact_worker_max_input_tokens) = self + .manifest + .compaction + .as_ref() + .map(|c| (c.compact_auto_read_budget, c.compact_worker_max_input_tokens)) + .unwrap_or(( + manifest::defaults::COMPACT_AUTO_READ_BUDGET, + manifest::defaults::COMPACT_WORKER_MAX_INPUT_TOKENS, + )); - // Create a disposable summary Worker. + // Default references: the N most-recently-touched files in the + // session, surfaced so the compact worker can inspect them and + // decide which (if any) the next session needs. + let default_refs: Vec = self + .tracker + .as_ref() + .map(|t| t.recent_files(manifest::defaults::COMPACT_DEFAULT_REFERENCE_COUNT)) + .unwrap_or_default(); + + // Input text fed to the compact worker. Includes the default + // references and the (pruned) conversation text. + let summary_input = build_summary_input(&items_to_summarise, &default_refs); + + // Worker-side state collected by the compact worker's tool calls. + let ctx = Arc::new(std::sync::Mutex::new(CompactWorkerContext::with_budget( + auto_read_budget, + ))); + + // Build an independent compact worker. Scope and pwd are shared + // with the main Pod (reads go through the same policy) but the + // Tracker is fresh — compact-time reads must not pollute the + // main session's recency list, which feeds `default_refs` above. + let scoped_fs = tools::ScopedFs::new(self.scope.clone(), self.pwd.clone()); + let summary_tracker = tools::Tracker::new(); let summary_client: Box = self.build_compactor_client()?; let mut summary_worker = Worker::new(summary_client) .system_prompt(SUMMARY_SYSTEM_PROMPT) .temperature(0.0); - summary_worker.set_max_tokens(2048); + summary_worker.set_max_tokens(4096); + + // Cumulative input-token meter + interceptor. The meter is bumped + // from the on_usage callback and read on every pre_llm_request. + let input_so_far = Arc::new(AtomicU64::new(0)); + { + let acc = input_so_far.clone(); + summary_worker.on_usage(move |event| { + if let Some(tokens) = event.input_tokens { + acc.fetch_add(tokens, Ordering::Relaxed); + } + }); + } + summary_worker.set_interceptor(CompactWorkerInterceptor { + input_so_far: input_so_far.clone(), + max_input_tokens: compact_worker_max_input_tokens, + }); + + // Tools: read_file (shared scope, fresh tracker) + the three + // compact-specific tools that populate `ctx`. + summary_worker.register_tool(tools::read_tool(scoped_fs.clone(), summary_tracker)); + summary_worker + .register_tool(mark_read_required_tool(scoped_fs.clone(), ctx.clone())); + summary_worker.register_tool(add_reference_tool(ctx.clone())); + summary_worker.register_tool(write_summary_tool(ctx.clone())); let out = summary_worker - .run(summary_prompt) + .run(summary_input) .await .map_err(PodError::Worker)?; - let summary_text = out - .worker - .history() - .iter() - .filter_map(|item| { - if item.is_assistant_message() { - item.as_text().map(String::from) - } else { - None - } - }) - .collect::>() - .join("\n"); + let mut locked_worker = out.worker; - // Build new history: [summary as user message, ...retained]. - let mut new_history = Vec::with_capacity(retained_items.len() + 1); + // Guard: nudge the worker once more if the expected outputs + // (summary, and any auto-read nominations when default refs + // existed) were not produced on the first pass. `write_summary` + // is idempotent-by-overwrite so a second call is safe. + let nudge = { + let snapshot = ctx.lock().expect("compact ctx poisoned").clone(); + if snapshot.summary.is_none() { + Some( + "You have not called `write_summary` yet. Deliver the structured \ + summary now (Completed Tasks / Active Task / Key Decisions / \ + User Directives / Current Work) and nominate any files the next \ + session needs with `mark_read_required`." + .to_string(), + ) + } else if snapshot.read_required.is_empty() && !default_refs.is_empty() { + Some( + "Summary received. If any of the referenced files are required \ + for the next session to continue the task, call \ + `mark_read_required` on them now. Otherwise reply briefly to \ + close out." + .to_string(), + ) + } else { + None + } + }; + if let Some(prompt) = nudge { + let _ = locked_worker + .run(prompt) + .await + .map_err(PodError::Worker)?; + } + + let final_ctx = ctx.lock().expect("compact ctx poisoned").clone(); + let summary_text = final_ctx + .summary + .clone() + .ok_or(PodError::CompactSummaryMissing)?; + + // Re-read each auto-read target through ScopedFs and render the + // requested slice. Errors are logged and skipped rather than + // aborting compaction — a missing / moved file should not fail + // the whole compact. + let mut auto_read_messages = Vec::new(); + for req in &final_ctx.read_required { + match scoped_fs.read_bytes(&req.path) { + Ok(bytes) => { + let text = String::from_utf8_lossy(&bytes).into_owned(); + let body = slice_lines(&text, req.offset.unwrap_or(0), req.limit); + let range = match (req.offset, req.limit) { + (None, None) => String::new(), + (Some(off), None) => format!(":{}-", off + 1), + (None, Some(lim)) => format!(":1-{lim}"), + (Some(off), Some(lim)) => { + format!(":{}-{}", off + 1, off.saturating_add(lim)) + } + }; + auto_read_messages.push(Item::system_message(format!( + "[Auto-read file: {}{range}]\n{body}", + req.path.display() + ))); + } + Err(e) => { + warn!( + path = %req.path.display(), + error = %e, + "auto-read target could not be read; skipping", + ); + } + } + } + + // Reference list as a single system message; omitted when empty. + let reference_message = (!final_ctx.references.is_empty()).then(|| { + let list = final_ctx + .references + .iter() + .map(|p| format!("- {}", p.display())) + .collect::>() + .join("\n"); + Item::system_message(format!( + "[Referenced files — read before compaction, contents not included]\n\ + {list}\n\ + Use read_file to access current contents if needed." + )) + }); + + // Build new history: [summary, ...auto-read, references, ...retained]. + let mut new_history = Vec::with_capacity( + 1 + auto_read_messages.len() + reference_message.is_some() as usize + + retained_items.len(), + ); new_history.push(Item::system_message(format!( "[Compacted context summary]\n\n{summary_text}" ))); + new_history.extend(auto_read_messages); + if let Some(msg) = reference_message { + new_history.push(msg); + } new_history.extend(retained_items); // Persist as a new compacted session. @@ -1087,6 +1249,37 @@ impl From for PodRunResult { } } +/// Build the compact worker's input: default-reference instructions, +/// the list of recently-touched files, and the pruned conversation +/// produced by [`build_summary_prompt`]. +fn build_summary_input(items: &[Item], default_refs: &[PathBuf]) -> String { + let mut out = String::new(); + out.push_str( + "Summarise the conversation below into a structured summary and nominate \ + files the next session needs.\n\n", + ); + if !default_refs.is_empty() { + out.push_str( + "These files were touched recently in this session. Use `read_file` \ + on them as needed, then call `mark_read_required` for any whose \ + contents the next session must have, and `add_reference` for files \ + it should know about by name only.\n\n## Referenced files\n", + ); + for p in default_refs { + out.push_str("- "); + out.push_str(&p.display().to_string()); + out.push('\n'); + } + out.push('\n'); + } + out.push_str("## Conversation\n"); + out.push_str(&build_summary_prompt(items)); + out.push_str( + "\n\nWhen you are done, call `write_summary` with the final 5-section text.", + ); + out +} + /// Format conversation items into a text prompt for the summary Worker. /// /// The summary should capture decisions and user intent, not recreate code. @@ -1158,6 +1351,9 @@ pub enum PodError { #[error("compaction thrash: context still exceeds threshold immediately after compact")] CompactThrash, + #[error("compact worker did not produce a summary (write_summary was never called)")] + CompactSummaryMissing, + #[error("invalid system prompt template: {source}")] InvalidSystemPromptTemplate { #[source] diff --git a/crates/pod/tests/system_prompt_template_test.rs b/crates/pod/tests/system_prompt_template_test.rs index 1fe3d1a1..f7d7f3ac 100644 --- a/crates/pod/tests/system_prompt_template_test.rs +++ b/crates/pod/tests/system_prompt_template_test.rs @@ -61,6 +61,19 @@ fn single_text_events(text: &str) -> Vec { ] } +/// Emit a single `write_summary(text=...)` tool call as one LLM response. +fn write_summary_tool_use_events(call_id: &str, text: &str) -> Vec { + let input = serde_json::json!({ "text": text }).to_string(); + vec![ + LlmEvent::tool_use_start(0, call_id, "write_summary"), + LlmEvent::tool_input_delta(0, input), + LlmEvent::tool_use_stop(0), + LlmEvent::Status(StatusEvent { + status: ResponseStatus::Completed, + }), + ] +} + const MINIMAL_MANIFEST_TOML: &str = r#" [pod] name = "test-pod" @@ -233,10 +246,11 @@ async fn agents_md_absent_omits_trailing_section() { #[tokio::test] async fn agents_md_not_reread_after_compact() { let client = MockClient::new(vec![ - single_text_events("a"), - single_text_events("b"), - single_text_events("summary"), - single_text_events("c"), + single_text_events("a"), // pod.run("first") + single_text_events("b"), // pod.run("second") + write_summary_tool_use_events("call-1", "compacted summary"), // compact worker: tool_use + single_text_events("done"), // compact worker: close + single_text_events("c"), // pod.run("third") ]); let (mut pod, pwd) = make_pod_with_body("BODY", client).await.unwrap(); let agents_path = pwd.join("AGENTS.md"); @@ -264,10 +278,11 @@ async fn agents_md_not_reread_after_compact() { #[tokio::test] async fn compact_preserves_system_prompt() { let client = MockClient::new(vec![ - single_text_events("a"), - single_text_events("b"), - single_text_events("summary"), - single_text_events("c"), + single_text_events("a"), // pod.run("first") + single_text_events("b"), // pod.run("second") + write_summary_tool_use_events("call-1", "compacted summary"), // compact worker: tool_use + single_text_events("done"), // compact worker: close + single_text_events("c"), // pod.run("third") ]); let (mut pod, _pwd) = make_pod_with_body("SP cwd={{ cwd }}", client) .await