use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; use tokio::sync::Mutex as AsyncMutex; use llm_worker::Item; use llm_worker::llm_client::RequestConfig; use llm_worker::llm_client::client::LlmClient; use llm_worker::state::Mutable; use llm_worker::{Role, ToolOutputLimits, UsageRecord, Worker, WorkerError, WorkerResult}; use session_store::{EntryHash, PodScopeSnapshot, SessionId, SessionStartState, Store, StoreError}; use tracing::{info, warn}; use manifest::{ Permission, PodManifest, PodManifestConfig, ResolveError, Scope, ScopeConfig, ScopeError, ScopeRule, SharedScope, WorkerManifest, }; use crate::compact::state::CompactState; use crate::compact::usage_tracker::UsageTracker; use crate::hook::{ Hook, HookRegistryBuilder, OnAbort, OnPromptSubmit, OnTurnEnd, PostToolCall, PreLlmRequest, PreRequestInfo, PreToolCall, }; use crate::ipc::alerter::Alerter; use crate::ipc::interceptor::PodInterceptor; use crate::ipc::notify_buffer::NotifyBuffer; use crate::prompt::agents_md::read_agents_md; use crate::prompt::catalog::{CatalogError, PromptCatalog}; use crate::prompt::loader::PromptLoader; use crate::prompt::system::{SystemPromptContext, SystemPromptError, SystemPromptTemplate}; use crate::runtime::dir; use crate::runtime::pod_registry::{self, ScopeAllocationGuard, ScopeLockError}; use crate::workflow::WorkflowResolveError; use async_trait::async_trait; use llm_worker::interceptor::PreRequestAction; use protocol::{AlertLevel, AlertSource, Event, Segment}; use tokio::sync::broadcast; use tokio::task::JoinHandle; struct SessionHead { session_id: SessionId, head_hash: Option, } /// Pre-LLM-request hook that records `history.len()` at send time into a /// shared `UsageTracker`. The on_usage callback later pairs this with the /// aggregated UsageEvent to produce one `UsageRecord` per LLM call. struct UsageTrackingHook { tracker: Arc, } #[async_trait] impl Hook for UsageTrackingHook { async fn call(&self, info: &PreRequestInfo) -> PreRequestAction { self.tracker.note_request(info.item_count); PreRequestAction::Continue } } /// An independent agent execution unit. /// /// Holds a [`Worker`] directly and persists session state via /// `session-store` functions after each turn. pub struct Pod { manifest: PodManifest, /// Always `Some` outside of `run()`/`resume()`. worker: Option>, store: St, session_id: SessionId, session_head: Arc>, /// Absolute working directory of the Pod. pwd: PathBuf, /// Shared, atomically-swappable view of the Pod's resolved scope. /// Cloned out to `ScopedFs` instances (builtin tools, fs_view, /// compact worker) so scope updates propagate to every consumer /// at the next permission check. scope: SharedScope, hook_builder: HookRegistryBuilder, interceptor_installed: bool, /// Shared compaction state (present when compact_threshold is configured). compact_state: Option>, /// Per-LLM-request Usage tracker. Always present after construction. /// Captures `(history_len, UsageEvent)` pairs during a run; drained /// in `persist_turn` and persisted as `LogEntry::LlmUsage` entries. usage_tracker: Arc, /// Sync-side buffer for `Metric` values queued from inside Worker /// callbacks (currently the prune observer). Drained in `persist_turn` /// and written via `session_metrics::record_metric` alongside /// `LogEntry::LlmUsage`. Always present after construction. metrics_tracker: Arc, /// Cumulative Usage measurement timeline, one entry per LLM call. /// Restored from session log on `restore`, appended on each persist. /// Read by token-accounting APIs (`Pod::total_tokens`, etc.). /// /// Wrapped in `Arc` so that callbacks injected into the /// Worker (e.g. the savings estimator used by the prune projection) /// can share the same view via [`Pod::usage_history_handle`]. usage_history: Arc>>, /// Session-lifetime file-operation tracker from the builtin `tools` /// crate. Populated by the Controller when it registers the builtin /// tools so that Pod-owned operations (e.g. compaction) can consult /// the recency of touched files. tracker: Option, /// Session-lifetime task store from the builtin `tools` crate. Shared by /// TaskCreate / TaskUpdate / TaskList / TaskGet and preserved across /// compaction by keeping the same handle while the Worker history is /// replaced. Restored Pods reconstruct it by replaying Task* tool calls. task_store: tools::TaskStore, /// Parsed system-prompt template awaiting first-turn materialisation. /// `Some` until `ensure_system_prompt_materialized` renders it once, /// then `None` forever — including after compaction. system_prompt_template: Option, /// User-facing notification sink attached by the Controller at /// spawn time. `None` in tests / direct `Pod::new` usage. alerter: Option, /// Broadcast sender for typed lifecycle `Event`s (compact progress, /// etc.). Attached by the Controller alongside `alerter`. Unlike /// notifications, events sent here are NOT replayed to clients that /// connect after the fact — they are fire-and-forget broadcasts. event_tx: Option>, /// Queue of pending `Method::Notify` notifications awaiting /// injection into the next LLM request. Shared with the /// PodInterceptor installed in `ensure_interceptor_installed`. pending_notifies: NotifyBuffer, /// Submit-scoped stash for resolver-produced system messages /// (currently `@` file content). `Pod::run` fills this /// before handing off to the worker; `PodInterceptor::on_prompt_submit` /// drains it and returns `ContinueWith` so the items land in /// history right after the user message that referenced them. pending_attachments: Arc>>, /// Scope allocation in the machine-wide lock file. `Some` for /// Pods built via `from_manifest` / `from_manifest_spawned` / /// `restore_from_manifest` (production paths); `None` for the /// low-level `Pod::new` constructor used in tests, which bypasses /// the registry. Kept purely for its `Drop` impl, which releases /// the allocation when the Pod is dropped. #[allow(dead_code)] scope_allocation: Option, /// Socket path of the spawning Pod. `Some` only for Pods built via /// `from_manifest_spawned`. Consumed by the controller to fire /// `Method::PodEvent` reports upward (turn end, error, shutdown, /// scope sub-delegation). callback_socket: Option, /// Central catalog of Pod-level prompt strings (compaction system /// prompt, notification wrapper, interrupt notes, trailing system /// sections, ...). Built from the 4-layer overlay in /// [`Self::from_manifest`], or defaults to the builtin pack when a /// Pod is constructed through lower-level paths that have no loader. prompts: Arc, /// Registry loaded from `/.insomnia/workflow/*.md` when /// memory is enabled. Missing memory config keeps this empty. workflow_registry: workflow_crate::WorkflowRegistry, /// Memory workspace layout used by the workflow resolver to load required /// Knowledge records by exact slug. memory_layout: Option, /// When true (default), the system-prompt assembler walks /// `/knowledge/*` and appends a `## Resident knowledge` /// section listing records with `model_invokation: true`. /// consolidation workers set this to false so the /// agentic worker pulls knowledge through the search tools instead. inject_resident_knowledge: bool, /// Latest runtime scope snapshot queued by dynamic scope changes. /// Drained into the session log before the next turn result is /// persisted, so resume never silently reclaims delegated writes. pending_scope_snapshot: Arc>>, /// extract (memory.extract) reentry guard. `true` while an extract /// worker is running; subsequent triggers are skipped per spec /// (`docs/plan/memory.md` §Extract 並走防止). `Arc` so /// the flag survives across `try_post_run_extract` calls without a /// `&mut self` race. extract_in_flight: Arc, /// consolidation (memory.consolidation) in-process reentry guard. The /// staging-side `StagingLock` already provides cross-process /// exclusion, but this AtomicBool keeps a careless concurrent caller /// inside the same Pod from racing on the staging snapshot. consolidation_in_flight: Arc, /// Last completed extract boundary. `None` means no extract has /// run yet on this session — next extract starts from entry 0. /// Restored from `RestoredState.extensions` on `restore`, updated /// after each successful extract via `save_extension`. extract_pointer: Arc>>, /// extract/consolidation memory job running outside the controller method loop. /// The task owns the extract/consolidate worker execution and is joined /// at shutdown. A single slot is enough: extract/consolidation implementations loop /// until thresholds fall below their trigger points, and concurrent /// triggers are coalesced by skipping when this handle is still active. memory_task: Option>, /// Typed user submissions in submit order. K-th entry corresponds to /// the K-th `Item::user_message` in `worker.history()` (modulo seed /// history loaded via `SessionStart.history`, whose original segments /// are not preserved). Populated from log on `restore_from_manifest`, /// appended after `save_user_input` on each `run`. Mirrored to /// `PodSharedState` by the controller for `Event::History` use. user_segments: Vec>, } impl Pod { pub async fn wait_for_memory_jobs(&mut self) { if let Some(handle) = self.memory_task.take() && let Err(e) = handle.await { tracing::warn!(error = %e, "Post-run memory task join failed"); } } } impl Pod { fn clone_for_memory_task(&self) -> Self { // The cloned Pod's worker exists only as a snapshot for the memory // task: `run_extract_once` reads `worker.history()`, and the // extract/consolidate workers are built fresh inside their own // methods using `worker.client()` as fallback when no override // model is configured. system_prompt / request_config / cache_key // are unused on this path, so we deliberately skip copying them. let source_worker = self.worker.as_ref().expect("worker present"); let mut worker = Worker::new(source_worker.client().clone()); worker.set_history(source_worker.history().to_vec()); Self { manifest: self.manifest.clone(), worker: Some(worker), store: self.store.clone(), session_id: self.session_id, session_head: self.session_head.clone(), pwd: self.pwd.clone(), scope: self.scope.clone(), hook_builder: HookRegistryBuilder::new(), interceptor_installed: false, compact_state: None, usage_tracker: Arc::new(UsageTracker::new()), metrics_tracker: Arc::new(crate::compact::metrics_tracker::MetricsTracker::new()), usage_history: self.usage_history.clone(), tracker: None, task_store: self.task_store.clone(), system_prompt_template: None, alerter: self.alerter.clone(), event_tx: self.event_tx.clone(), pending_notifies: NotifyBuffer::new(), pending_attachments: Arc::new(Mutex::new(Vec::new())), scope_allocation: None, callback_socket: None, prompts: self.prompts.clone(), workflow_registry: self.workflow_registry.clone(), memory_layout: self.memory_layout.clone(), inject_resident_knowledge: self.inject_resident_knowledge, pending_scope_snapshot: self.pending_scope_snapshot.clone(), extract_in_flight: self.extract_in_flight.clone(), consolidation_in_flight: self.consolidation_in_flight.clone(), extract_pointer: self.extract_pointer.clone(), memory_task: None, user_segments: self.user_segments.clone(), } } pub fn spawn_post_run_memory_jobs(&mut self) { // Drop a finished prior handle so we can spawn a fresh task. // If the prior task is still running, coalesce by skipping — // extract/consolidation implementations re-evaluate thresholds on completion. self.cleanup_finished_memory_task(); if self.memory_task.is_some() { return; } let mut pod = self.clone_for_memory_task(); self.memory_task = Some(tokio::spawn(async move { if let Err(e) = pod.try_post_run_extract().await { tracing::warn!(error = %e, "Post-run memory extract task error"); } if let Err(e) = pod.try_post_run_consolidate().await { tracing::warn!(error = %e, "Post-run memory consolidate task error"); } })); } } impl Pod { /// Create a new Pod from a pre-built Worker and store. /// /// Callers must pre-resolve `pwd` (absolute) and build a [`Scope`] /// — typically via [`Scope::from_config`] when coming from a /// manifest, or [`Scope::writable`] in tests. /// /// Note: this constructor does **not** parse `manifest.worker.system_prompt` /// as a template. `Pod::from_manifest` is the production path for /// templated prompts; callers of `Pod::new` that want a template /// should parse it themselves and call [`set_system_prompt_template`]. pub async fn new( manifest: PodManifest, worker: Worker, store: St, pwd: PathBuf, scope: Scope, ) -> Result { // Session creation is deferred to `ensure_session_head` at first // run so a later-installed system-prompt template (see // `set_system_prompt_template`) can be captured by `SessionStart`. let session_id = session_store::new_session_id(); let prompts = PromptCatalog::builtins_only()?; let mut pod = Self { manifest, worker: Some(worker), store, session_id, session_head: Arc::new(AsyncMutex::new(SessionHead { session_id, head_hash: None, })), pwd, scope: SharedScope::new(scope), hook_builder: HookRegistryBuilder::new(), interceptor_installed: false, compact_state: None, usage_tracker: Arc::new(UsageTracker::new()), metrics_tracker: Arc::new(crate::compact::metrics_tracker::MetricsTracker::new()), usage_history: Arc::new(Mutex::new(Vec::::new())), tracker: None, task_store: tools::TaskStore::new(), system_prompt_template: None, alerter: None, event_tx: None, pending_notifies: NotifyBuffer::new(), pending_attachments: Arc::new(Mutex::new(Vec::new())), scope_allocation: None, callback_socket: None, prompts, workflow_registry: workflow_crate::WorkflowRegistry::empty(), memory_layout: None, inject_resident_knowledge: true, pending_scope_snapshot: Arc::new(Mutex::new(None)), extract_in_flight: Arc::new(AtomicBool::new(false)), consolidation_in_flight: Arc::new(AtomicBool::new(false)), extract_pointer: Arc::new(Mutex::new(None)), memory_task: None, user_segments: Vec::new(), }; pod.apply_permissions_from_manifest(); pod.apply_prune_from_manifest(); Ok(pod) } /// Install a parsed system-prompt template that will be rendered /// exactly once, immediately before the first LLM turn. Mirrors the /// path used by `Pod::from_manifest` and is exposed for tests and /// other callers that build a Pod without going through a manifest. pub fn set_system_prompt_template(&mut self, template: SystemPromptTemplate) { self.system_prompt_template = Some(template); } /// Toggle the resident-knowledge section of the system prompt. /// /// Default `true`: when memory is enabled in the manifest, the /// assembler walks `/knowledge/*` and lists records with /// `model_invokation: true`. consolidation workers and /// other agentic memory paths set this to `false` so the worker /// pulls knowledge through the search tools instead of riding on /// the resident system-prompt budget. Idempotent if called multiple /// times before the first turn; ineffective once the system prompt /// has been materialised. pub fn set_resident_knowledge_injection(&mut self, enabled: bool) { self.inject_resident_knowledge = enabled; } /// Shared handle to the prompt catalog. Cheap to clone (`Arc`). pub fn prompts(&self) -> &Arc { &self.prompts } /// The session ID used for persistence. pub fn session_id(&self) -> SessionId { self.session_id } /// The Pod's manifest. pub fn manifest(&self) -> &PodManifest { &self.manifest } /// The Pod's working directory. pub fn pwd(&self) -> &Path { &self.pwd } /// The Pod's directory scope, as a shared atomically-swappable /// handle. Clone it to share scope state with another consumer /// (e.g. a tool that needs to mutate scope dynamically). pub fn scope(&self) -> &SharedScope { &self.scope } /// Snapshot the current scope as an owned `Arc`. Subsequent /// scope mutations do not affect the returned snapshot. pub fn scope_snapshot(&self) -> Arc { self.scope.snapshot() } /// Apply `extra_allow` to the Pod's runtime scope. Future tool /// permission checks (read/write/glob/grep) reflect the broadened /// scope; in-flight tool calls keep the snapshot they captured at /// invocation time. pub fn add_scope_rules( &self, extra_allow: impl IntoIterator, ) -> Result<(), ScopeError> { let extra: Vec = extra_allow.into_iter().collect(); self.scope .update(|cur| cur.with_added_allow_rules(extra.clone())) } /// Strip `revoke` rules from the Pod's runtime scope by adding /// matching deny rules. A `Permission::Write` revoke caps effective /// access at `Read` (mirroring the pod-registry `effective_write` /// semantics — Write is the only permission tracked across Pods). /// A `Permission::Read` revoke removes access entirely. pub fn revoke_scope_rules( &self, revoke: impl IntoIterator, ) -> Result<(), ScopeError> { let revoke: Vec = revoke.into_iter().collect(); self.scope .update(|cur| cur.with_added_deny_rules(revoke.clone())) } /// Snapshot the current runtime scope in the session log. The entry /// is intentionally appended as soon as a session head exists: if the /// process later exits while children keep their allocations, resume /// can restore the narrowed scope instead of reclaiming delegated /// writes. pub async fn persist_scope_snapshot(&mut self) -> Result<(), StoreError> { let mut head = self.session_head.lock().await; if head.head_hash.is_none() { return Ok(()); } let snapshot = { let scope = self.scope.snapshot(); PodScopeSnapshot { allow: scope.allow_rules(), deny: scope.deny_rules(), } }; session_store::save_pod_scope(&self.store, head.session_id, &mut head.head_hash, &snapshot) .await } /// Cloneable callback handed to dynamic-scope tools. It cannot append /// directly to the async store from a sync tool callback, so it records /// the latest snapshot and the controller flushes it after the tool /// turn completes. pub fn scope_change_sink(&self) -> Arc { let pending = self.pending_scope_snapshot.clone(); Arc::new(move |snapshot| { *pending.lock().expect("pending_scope_snapshot poisoned") = Some(snapshot); }) } async fn flush_pending_scope_snapshot(&mut self) -> Result<(), StoreError> { let snapshot = self .pending_scope_snapshot .lock() .expect("pending_scope_snapshot poisoned") .take(); if let Some(snapshot) = snapshot { let mut head = self.session_head.lock().await; session_store::save_pod_scope( &self.store, head.session_id, &mut head.head_hash, &snapshot, ) .await?; } Ok(()) } /// Direct access to the underlying Worker. pub fn worker(&self) -> &Worker { self.worker.as_ref().expect("worker taken during run") } /// Mutable access to the underlying Worker. /// /// Use this to register tools, hooks, or subscribers before calling /// [`run`](Self::run). pub fn worker_mut(&mut self) -> &mut Worker { self.worker.as_mut().expect("worker taken during run") } /// Reference to the store. pub fn store(&self) -> &St { &self.store } /// Current history items held by the underlying Worker. pub fn history(&self) -> &[Item] { self.worker().history() } /// Snapshot of the cumulative LLM Usage measurement timeline. /// /// One entry per LLM call. Restored on `restore` and appended in /// `persist_turn`. Used by token-accounting APIs in [`token_counter`]. /// Returns a clone since the underlying vector is shared with hooks /// running on the Worker. pub fn usage_history(&self) -> Vec { self.usage_history .lock() .expect("usage_history poisoned") .clone() } /// Snapshot of the extract (memory.extract) boundary pointer. /// /// `None` means no extract has run yet on the current session — the /// next extract will start from entry 0. Updated by /// [`try_post_run_extract`](Self::try_post_run_extract) on success /// and reset by [`compact`](Self::compact) (the new compacted /// session has a fresh log with no `LogEntry::Extension` entries). /// Cheap clone via `Option`. /// Snapshot of the typed user segments tracked alongside worker /// history. The K-th entry corresponds to the K-th `Item::user_message` /// derived from `LogEntry::UserInput` entries (post-compaction); seed /// history loaded via `SessionStart.history` does not contribute, /// which is acceptable because the original segments are unrecoverable. pub fn user_segments(&self) -> &[Vec] { &self.user_segments } pub fn extract_pointer(&self) -> Option { self.extract_pointer .lock() .expect("extract_pointer poisoned") .clone() } /// Test/diagnostic handle to the consolidation in-flight guard. Production /// callers do not need this; tests use it to assert that the reentry /// guard skips an in-progress consolidation without losing data. #[doc(hidden)] pub fn consolidation_in_flight_handle(&self) -> Arc { self.consolidation_in_flight.clone() } /// Shared handle to the cumulative Usage history. /// /// Callbacks that need live access to the latest measurements (e.g. /// the savings estimator that `attach_prune` installs on the Worker) /// clone this `Arc` and read it at request time. The handle outlives /// any individual run. /// /// **Locking contract:** the inner `Mutex` is held only for a short /// clone (`lock().unwrap().clone()`) and released immediately. /// Callers must not hold the guard across `.await` points, I/O, or /// long computations — the guard is implicitly assumed to be /// non-contended at every Pod lifecycle event. pub fn usage_history_handle(&self) -> Arc>> { self.usage_history.clone() } /// Handle to the per-LLM-request `UsageTracker`. /// /// Sibling modules (e.g. the prune observer) clone this `Arc` to stash /// per-request side state (e.g. a `correlation_id`) that pairs with /// the next `LlmUsage`. pub(crate) fn usage_tracker_handle(&self) -> Arc { self.usage_tracker.clone() } /// Handle to the synchronous `MetricsTracker` buffer. /// /// Worker callbacks (e.g. the prune observer) clone this `Arc` and /// `.push(metric)` into it; Pod drains it in `persist_turn` and /// writes each metric via `session_metrics::record_metric`. pub(crate) fn metrics_tracker_handle( &self, ) -> Arc { self.metrics_tracker.clone() } /// Attach the session-scoped file-operation tracker from the builtin /// `tools` crate. Called by the Controller immediately after it /// registers the builtin tools on the Worker. Overwrites any /// previously attached tracker. pub fn attach_tracker(&mut self, tracker: tools::Tracker) { self.tracker = Some(tracker); } /// Attach the session-scoped TaskStore from the builtin `tools` crate. /// Called by the Controller before registering builtin tools so the Pod /// and Worker share one store. pub fn attach_task_store(&mut self, task_store: tools::TaskStore) { self.task_store = task_store; } /// Shared TaskStore handle. pub fn task_store(&self) -> tools::TaskStore { self.task_store.clone() } /// The attached session-scoped file-operation tracker, if any. pub fn tracker(&self) -> Option<&tools::Tracker> { self.tracker.as_ref() } /// Attach a user-facing notification sink. /// /// Called by the Controller immediately after spawning so that /// Pod-internal operations (compaction failures, AGENTS.md /// ingestion warnings) can surface messages to connected clients. pub fn attach_alerter(&mut self, alerter: Alerter) { self.alerter = Some(alerter); } /// Attach the broadcast sender used for typed lifecycle `Event`s. /// /// The Controller wires this alongside [`attach_alerter`] so that /// Pod-internal operations (currently: compaction) can surface /// progress to connected clients. pub fn attach_event_tx(&mut self, event_tx: broadcast::Sender) { self.event_tx = Some(event_tx); } fn alert(&self, level: AlertLevel, source: AlertSource, message: String) { if let Some(n) = self.alerter.as_ref() { n.alert(level, source, message); } } /// Append a metric, swallowing errors so observability writes never /// fail the surrounding turn. On failure the head hash stays put /// (the entry is dropped) and a `Warn` alert + `tracing::warn!` are /// emitted so the failure isn't completely silent. async fn try_record_metric(&mut self, metric: &session_metrics::Metric) { let mut head = self.session_head.lock().await; if let Err(err) = session_metrics::record_metric( &self.store, head.session_id, &mut head.head_hash, metric, ) .await { warn!(name = %metric.name, error = %err, "failed to record session metric; dropping"); self.alert( AlertLevel::Warn, AlertSource::Pod, format!("failed to record metric `{}`: {}", metric.name, err), ); } } /// Broadcast a typed `Event` to connected clients. No-op when no /// `event_tx` is attached (tests / direct `Pod::new` usage) or when /// no clients are currently subscribed. fn send_event(&self, event: Event) { if let Some(tx) = self.event_tx.as_ref() { let _ = tx.send(event); } } fn broadcast_system_message_item(&self, item: &Item) { if !matches!( item, Item::Message { role: Role::System, .. } ) { return; } let value = serde_json::to_value(item).expect("Item is Serialize"); self.send_event(Event::SystemMessage { item: value }); } /// Push a `Method::Notify` (or rendered `Method::PodEvent`) entry /// onto the pending buffer. /// /// The notification will be appended to `worker.history` as an /// `Item::system_message` just before the next LLM request, via /// `PodInterceptor::pending_history_appends`. See [`NotifyBuffer`] /// for overflow behaviour and the lane-of-record rationale. pub fn push_notify(&self, message: String) { self.pending_notifies.push(message); } /// Shared handle to the pending notification buffer. /// /// The Controller holds a clone so that `Method::Notify` arriving /// while `pod.run()` is in flight can still reach the interceptor. pub fn notify_buffer_handle(&self) -> NotifyBuffer { self.pending_notifies.clone() } /// Parent callback socket set by `from_manifest_spawned`. /// /// Consumed by the Controller to fire `Method::PodEvent` upward on /// lifecycle transitions. `None` for top-level Pods, in which case /// the Controller silently skips the send. pub fn callback_socket(&self) -> Option<&PathBuf> { self.callback_socket.as_ref() } // --- Hook registration --- fn assert_hooks_open(&self) { assert!( !self.interceptor_installed, "cannot add hooks after run() or resume() has been called" ); } /// Register a hook that runs after receiving user input. pub fn add_on_prompt_submit_hook(&mut self, hook: impl Hook + 'static) { self.assert_hooks_open(); self.hook_builder.add_on_prompt_submit(hook); } /// Register a hook that runs before each LLM request. pub fn add_pre_llm_request_hook(&mut self, hook: impl Hook + 'static) { self.assert_hooks_open(); self.hook_builder.add_pre_llm_request(hook); } /// Register a hook that runs before each tool call. pub fn add_pre_tool_call_hook(&mut self, hook: impl Hook + 'static) { self.assert_hooks_open(); self.hook_builder.add_pre_tool_call(hook); } /// Register a hook that runs after each tool call. pub fn add_post_tool_call_hook(&mut self, hook: impl Hook + 'static) { self.assert_hooks_open(); self.hook_builder.add_post_tool_call(hook); } /// Register a hook that runs at the end of a turn. pub fn add_on_turn_end_hook(&mut self, hook: impl Hook + 'static) { self.assert_hooks_open(); self.hook_builder.add_on_turn_end(hook); } /// Register a hook that runs when execution is aborted. pub fn add_on_abort_hook(&mut self, hook: impl Hook + 'static) { self.assert_hooks_open(); self.hook_builder.add_on_abort(hook); } /// Install the hook-based interceptor on the Worker if not already done. /// /// When either compaction threshold (`compact_threshold` or /// `compact_request_threshold`) is configured in the manifest, allocates /// a shared [`CompactState`] and wires the interceptor to read current /// occupancy through the `UsageRecord` timeline. fn ensure_interceptor_installed(&mut self) { if !self.interceptor_installed { // Pre-LLM-request hook: record the item count at send time // so the on_usage callback can pair it with the measured // input_tokens. self.hook_builder.add_pre_llm_request(UsageTrackingHook { tracker: self.usage_tracker.clone(), }); let builder = std::mem::take(&mut self.hook_builder); let registry = Arc::new(builder.build()); let (post_run_threshold, request_threshold, retained) = self .manifest .compaction .as_ref() .map(|c| { ( c.compact_threshold, c.compact_request_threshold, c.compact_retained_tokens, ) }) .unwrap_or((None, None, manifest::defaults::COMPACT_RETAINED_TOKENS)); let tracker_for_usage = self.usage_tracker.clone(); self.worker_mut().on_usage(move |event| { tracker_for_usage.record_usage(event); }); let compact_state = if post_run_threshold.is_some() || request_threshold.is_some() { if let (Some(post), Some(req)) = (post_run_threshold, request_threshold) { if post > req { warn!( post_run_threshold = post, request_threshold = req, "compact_threshold > compact_request_threshold; \ proactive check will never fire before the safety net" ); } } let state = Arc::new(CompactState::new( post_run_threshold, request_threshold, retained, )); self.compact_state = Some(state.clone()); Some(state) } else { None }; let usage_history_handle = compact_state.as_ref().map(|_| self.usage_history.clone()); let interceptor = PodInterceptor::new( registry, compact_state, usage_history_handle, self.pending_notifies.clone(), self.pending_attachments.clone(), self.prompts.clone(), ); self.worker_mut().set_interceptor(interceptor); self.interceptor_installed = true; } } /// Render the manifest-supplied instruction template exactly once, /// just before the first LLM turn, append the fixed trailing /// section (scope summary + optional AGENTS.md), and hand the /// resulting string to the Worker via `set_system_prompt`. /// Subsequent invocations are no-ops: the template field is /// consumed with `Option::take()`, so the materialised value /// persists across all later turns and compaction. fn ensure_system_prompt_materialized(&mut self) -> Result<(), PodError> { let Some(template) = self.system_prompt_template.take() else { return Ok(()); }; let alerter = self.alerter.clone(); let tool_names: Vec = { let worker = self.worker.as_mut().expect("worker present"); // Materialise any pending tool factories so the template sees the // full list of tool names. Redundant with the flush inside // `Worker::lock()`; safe because `flush_pending` is idempotent. worker.tool_server_handle().flush_pending(); worker .tool_server_handle() .tool_definitions_sorted() .into_iter() .map(|d| d.name) .collect() }; let agents_md_read = read_agents_md(&self.pwd); for warning in agents_md_read.warnings { if let Some(n) = alerter.as_ref() { n.alert(AlertLevel::Warn, AlertSource::AgentsMd, warning); } } // Resident-injection collection: only when memory is enabled in // the manifest AND this Pod opts in (consolidation workers opt out). // Owned `Vec` lives for the duration of `render` below; the // context borrows a slice into it. let resident: Vec = if self.inject_resident_knowledge { self.memory_layout .as_ref() .map(memory::collect_resident_knowledge) .unwrap_or_default() } else { Vec::new() }; let resident_slice: Option<&[memory::ResidentKnowledgeEntry]> = if self.inject_resident_knowledge && self.memory_layout.is_some() { Some(&resident) } else { None }; let resident_workflows: Vec = if self.inject_resident_knowledge && self.memory_layout.is_some() { self.workflow_registry.resident_entries() } else { Vec::new() }; let resident_workflow_slice: Option<&[workflow_crate::ResidentWorkflowEntry]> = if self.inject_resident_knowledge && self.memory_layout.is_some() { Some(&resident_workflows) } else { None }; let resident_exposure_snapshots = self.resident_exposure_snapshots(&resident, &resident_workflows); let worker_language = worker_language(&self.manifest.worker); let scope_snapshot = self.scope.snapshot(); let ctx = SystemPromptContext { now: chrono::Utc::now(), cwd: &self.pwd, language: worker_language, scope: &scope_snapshot, tool_names, agents_md: agents_md_read.body, resident_knowledge: resident_slice, resident_workflows: resident_workflow_slice, prompts: &self.prompts, }; let rendered = template .render(&ctx) .map_err(|source| PodError::SystemPromptRender { source })?; self.worker .as_mut() .expect("worker present") .set_system_prompt(rendered); self.append_resident_exposure_event(resident_exposure_snapshots); Ok(()) } /// Convenience: run with a single `Segment::Text`. /// /// Equivalent to `run(vec![Segment::text(s)])`. The dumb-client /// counterpart of [`protocol::Method::run_text`]; primarily for /// tests and tools that have only a string in hand. pub async fn run_text(&mut self, s: impl Into) -> Result { self.run(vec![Segment::text(s)]).await } /// Drop the prior memory_task handle if it has finished. Keep it if /// still running so callers can decide whether to wait or coalesce. fn cleanup_finished_memory_task(&mut self) { if self.memory_task.as_ref().is_some_and(|h| h.is_finished()) { self.memory_task = None; } } /// Wait for the in-flight memory task (if any) to finish. Used before /// compact rewrites history (extract reads the same history). async fn join_memory_task(&mut self) { if let Some(handle) = self.memory_task.take() && let Err(e) = handle.await { tracing::warn!(error = %e, "Memory task join failed"); } } /// Whether `try_pre_run_compact` would actually compact. The same /// check is duplicated inside `try_pre_run_compact` itself for /// defensive reasons; this is the gate for joining the memory task /// before the compact runs. fn should_pre_run_compact(&self) -> bool { self.compact_state.as_ref().is_some_and(|s| { !s.is_disabled() && !s.just_compacted() && s.exceeds_post_run(self.total_tokens().tokens) }) } /// Prelude shared by `run` / `run_for_notification` / `resume`. /// Wires up worker hooks, ensures the session is materialized on the /// store, and runs pre-run compact (joining any in-flight memory task /// first so extract sees a stable history range). async fn prepare_for_run(&mut self) -> Result<(), PodError> { self.ensure_interceptor_installed(); self.ensure_system_prompt_materialized()?; self.cleanup_finished_memory_task(); self.ensure_session_head().await?; if self.should_pre_run_compact() { self.join_memory_task().await; } self.try_pre_run_compact().await; Ok(()) } /// Send user input and run until the LLM turn completes. /// /// `input` is a typed segment list (see [`protocol::Segment`]). The /// Pod flattens it into a single user-message string for the /// underlying Worker, expanding paste content inline, resolving file refs /// into adjacent attachments where possible, and surfacing alerts for /// unresolved refs / unsupported segment kinds. /// /// If the between-turns compaction threshold is exceeded mid-run, /// the Worker is aborted, history is compacted, and execution resumes /// automatically. pub async fn run(&mut self, input: Vec) -> Result { self.prepare_for_run().await?; // Persist the user input as typed segments before the worker // pushes its flattened copy into history. save_delta deliberately // skips the resulting `is_user_message()` item to avoid double-write. { let mut head = self.session_head.lock().await; self.session_id = head.session_id; session_store::save_user_input( &self.store, head.session_id, &mut head.head_hash, input.clone(), ) .await?; } self.user_segments.push(input.clone()); // Resolve `@` refs, `#` Knowledge refs, and `/` // workflow invocations to system messages stashed for the // PodInterceptor to attach right after the user message. File and // Knowledge failures are non-fatal alerts; explicit workflow invocation // failures abort before the Worker sees the turn. let mut attachments = self.resolve_file_refs(&input); attachments.extend(self.resolve_knowledge_refs(&input)); attachments.extend(self.resolve_workflow_invocations(&input)?); if !attachments.is_empty() { *self .pending_attachments .lock() .expect("pending_attachments poisoned") = attachments; } let flattened = self.flatten_segments(&input); let history_before = self.worker.as_ref().unwrap().history().len(); // lock → run → unlock let worker = self.worker.take().expect("worker taken during run"); let mut locked = worker.lock(); let result = locked.run(flattened).await; self.worker = Some(locked.unlock()); self.handle_worker_result(result, history_before).await } /// Resolve every `Segment::FileRef` in `segments` to a `[File: ]` /// or shallow `[Dir: ]` system message via `PodFsView`. Resolution /// failures (out-of-scope, not-found, binary, I/O, unsupported symlink /// directory) surface as `AlertLevel::Warn` Alerts and are skipped — the /// unresolved placeholder stays in the flattened user message so the LLM /// still sees the intent. fn resolve_file_refs(&self, segments: &[Segment]) -> Vec { let view = crate::fs_view::PodFsView::new(tools::ScopedFs::with_shared_scope( self.scope.clone(), self.pwd.clone(), )); let mut out = Vec::new(); for seg in segments { let Segment::FileRef { path } = seg else { continue; }; match view.resolve_file_ref(path, self.manifest.worker.file_upload.max_bytes) { Ok(item) => out.push(item), Err(e) => { self.alert( AlertLevel::Warn, AlertSource::Pod, format!("file ref @{path} could not be resolved: {e}"), ); } } } out } fn resolve_knowledge_refs(&self, segments: &[Segment]) -> Vec { let Some(layout) = self.memory_layout.as_ref() else { return Vec::new(); }; let mut out = Vec::new(); for seg in segments { let Segment::KnowledgeRef { slug } = seg else { continue; }; let parsed = match memory::Slug::parse(slug.clone()) { Ok(slug) => slug, Err(e) => { self.alert( AlertLevel::Warn, AlertSource::Pod, format!("knowledge ref #{slug} has invalid slug: {e}"), ); continue; } }; let path = layout.knowledge_path(&parsed); let bytes = match std::fs::read(&path) { Ok(bytes) => bytes, Err(e) => { self.alert( AlertLevel::Warn, AlertSource::Pod, format!("knowledge ref #{slug} could not be read: {e}"), ); continue; } }; let raw = String::from_utf8_lossy(&bytes).into_owned(); let body = match memory::schema::split_frontmatter(&raw) { Ok((_yaml, body)) => body, Err(e) => { self.alert( AlertLevel::Warn, AlertSource::Pod, format!("knowledge ref #{slug} has invalid frontmatter: {e}"), ); continue; } }; let snapshot = memory::snapshot_record_from_bytes( memory::workspace::RecordKind::Knowledge, slug.clone(), &bytes, ); self.append_memory_use_event(memory::UsageSource::KnowledgeRef, vec![snapshot]); out.push(Item::system_message(format!( "[Knowledge #{}]\n{}", slug, body.trim_end() ))); } out } fn resident_exposure_snapshots( &self, knowledge: &[memory::ResidentKnowledgeEntry], workflows: &[workflow_crate::ResidentWorkflowEntry], ) -> Vec { let Some(layout) = self.memory_layout.as_ref() else { return Vec::new(); }; let mut snapshots = Vec::new(); for entry in knowledge { match memory::snapshot_record_from_layout( layout, memory::workspace::RecordKind::Knowledge, &entry.slug, ) { Ok(snapshot) => snapshots.push(snapshot), Err(err) => { warn!(knowledge = %entry.slug, error = %err, "failed to snapshot resident knowledge exposure") } } } for entry in workflows { match memory::snapshot_record_from_layout( layout, memory::workspace::RecordKind::Workflow, &entry.slug, ) { Ok(snapshot) => snapshots.push(snapshot), Err(err) => { warn!(workflow = %entry.slug, error = %err, "failed to snapshot resident workflow exposure") } } } snapshots } fn append_memory_use_event( &self, source: memory::UsageSource, records: Vec, ) { let Some(layout) = self.memory_layout.as_ref() else { return; }; if let Err(err) = memory::append_use_event(layout, self.session_id.to_string(), source, records) { warn!(error = %err, "failed to append memory usage event"); } } fn append_resident_exposure_event(&self, records: Vec) { let Some(layout) = self.memory_layout.as_ref() else { return; }; if let Err(err) = memory::append_resident_exposure_event(layout, self.session_id.to_string(), records) { warn!(error = %err, "failed to append resident exposure event"); } } fn resolve_workflow_invocations( &self, segments: &[Segment], ) -> Result, WorkflowResolveError> { let Some(layout) = self.memory_layout.as_ref() else { if let Some(slug) = segments.iter().find_map(|seg| match seg { Segment::WorkflowInvoke { slug } => Some(slug.clone()), _ => None, }) { return Err(WorkflowResolveError::NotFound { slug }); } return Ok(Vec::new()); }; let mut out = Vec::new(); for seg in segments { let Segment::WorkflowInvoke { slug } = seg else { continue; }; let items = crate::workflow::resolve_workflow_invocation( &self.workflow_registry, layout, slug, )?; match memory::snapshot_record_from_layout( layout, memory::workspace::RecordKind::Workflow, slug, ) { Ok(snapshot) => { self.append_memory_use_event( memory::UsageSource::WorkflowInvoke, vec![snapshot], ); } Err(err) => { warn!(workflow = %slug, error = %err, "failed to snapshot workflow usage"); } } out.extend(items); } Ok(out) } /// Validate explicit workflow invocations without reading dependency /// bodies. Used by the controller before broadcasting `UserMessage` so /// user-invocation errors are returned immediately and never reach the /// Worker or client history. pub fn validate_workflow_invocations( &self, segments: &[Segment], ) -> Result<(), WorkflowResolveError> { for seg in segments { let Segment::WorkflowInvoke { slug } = seg else { continue; }; let parsed = workflow_crate::Slug::parse(slug.clone()) .map_err(|source| WorkflowResolveError::InvalidSlug(source.into()))?; let record = self .workflow_registry .get(&parsed) .ok_or_else(|| WorkflowResolveError::NotFound { slug: slug.clone() })?; if !record.user_invocable { return Err(WorkflowResolveError::NotUserInvocable { slug: slug.clone() }); } } Ok(()) } pub fn workflow_completions(&self) -> Vec { self.workflow_registry.list_user_invocable("") } pub fn knowledge_completions(&self) -> Vec { self.memory_layout .as_ref() .map(memory::list_knowledge_slugs) .unwrap_or_default() } /// Flatten a typed segment list into the single string the Worker /// receives as the user message, and emit user-facing alerts for /// segments that fall through to placeholder (knowledge / workflow /// refs without a resolver, or unknown variants from a newer client). /// `FileRef` is handled separately by `resolve_file_refs`. The text /// reconstruction itself comes from `Segment::flatten_to_text`, /// shared with replay paths that should not re-alert. fn flatten_segments(&self, segments: &[Segment]) -> String { for seg in segments { match seg { Segment::Text { .. } | Segment::Paste { .. } | Segment::FileRef { .. } => {} Segment::KnowledgeRef { slug } => { if self.memory_layout.is_none() { self.alert( AlertLevel::Warn, AlertSource::Pod, format!( "knowledge ref #{slug} cannot be resolved \ because memory is disabled; passed to LLM as placeholder" ), ); } } Segment::WorkflowInvoke { .. } => {} Segment::Unknown => { self.alert( AlertLevel::Warn, AlertSource::Pod, "received unknown segment kind from a newer client; \ passed to LLM as placeholder" .into(), ); } } } Segment::flatten_to_text(segments) } /// Run a turn triggered by `Method::Notify` while the Pod is idle. /// /// Unlike [`run`](Self::run), no user message is appended to /// history. The `PodInterceptor::pre_llm_request` drains the /// pending-notification buffer and injects each entry as an /// `Item::system_message` into the per-request context, then the /// Worker's resume path issues the LLM request without a new /// user turn. pub async fn run_for_notification(&mut self) -> Result { self.prepare_for_run().await?; let history_before = self.worker.as_ref().unwrap().history().len(); let worker = self.worker.take().expect("worker taken during run"); let mut locked = worker.lock(); let result = locked.resume().await; self.worker = Some(locked.unlock()); self.handle_worker_result(result, history_before).await } /// Resume from a paused state. pub async fn resume(&mut self) -> Result { self.prepare_for_run().await?; let history_before = self.worker.as_ref().unwrap().history().len(); // lock → resume → unlock let worker = self.worker.take().expect("worker taken during run"); let mut locked = worker.lock(); let result = locked.resume().await; self.worker = Some(locked.unlock()); self.handle_worker_result(result, history_before).await } /// Ensure the session exists and its head still matches ours. /// /// On the first call for a Pod built via `from_manifest`, the session /// has not been written to the store yet — this is when we append the /// initial `SessionStart` entry, carrying the system prompt that /// `ensure_system_prompt_materialized` has just rendered. Subsequent /// calls fall through to `ensure_head_or_fork`, which auto-forks when /// another writer has advanced the store head behind our back. async fn ensure_session_head(&mut self) -> Result<(), PodError> { let w = self.worker.as_ref().unwrap(); let state = SessionStartState { system_prompt: w.get_system_prompt(), config: w.request_config(), history: w.history(), }; let mut head = self.session_head.lock().await; if head.head_hash.is_none() { let hash = session_store::create_session_with_id(&self.store, head.session_id, state).await?; head.head_hash = Some(hash); drop(head); self.persist_scope_snapshot().await?; return Ok(()); } let prev_session_id = head.session_id; let mut session_id = head.session_id; let mut head_hash = head.head_hash.clone(); session_store::ensure_head_or_fork(&self.store, &mut session_id, &mut head_hash, state) .await?; head.session_id = session_id; head.head_hash = head_hash; self.session_id = session_id; // ensure_head_or_fork mints a fresh session_id when it auto- // forks. Sync that to pods.json so a concurrent // restore_from_manifest can't see "no live writer" for the new // session and grab it. if session_id != prev_session_id && self.scope_allocation.is_some() { pod_registry::update_session(&self.manifest.pod.name, session_id)?; } Ok(()) } /// Handle Worker result: always persist the turn first, then if /// `Yielded`, perform compaction and resume. /// /// Persisting before compaction ensures that if compact fails, the /// turn is fully recorded in the old session (interrupted, outcome /// `Yielded`), so restore remains consistent. async fn handle_worker_result( &mut self, result: Result, history_before: usize, ) -> Result { self.persist_turn(history_before, &result).await?; if matches!(result, Ok(WorkerResult::Yielded)) { return self.do_compact_and_resume().await; } if result.is_ok() { if let Some(ref state) = self.compact_state { state.set_just_compacted(false); } } result.map(PodRunResult::from).map_err(PodError::Worker) } /// Perform compaction after a `compact_needed` abort and resume execution. /// /// Uses `Box::pin` for the recursive `resume()` call to break the /// async layout cycle (`run → handle_worker_result → do_compact_and_resume → resume`). fn do_compact_and_resume( &mut self, ) -> std::pin::Pin< Box> + Send + '_>, > { Box::pin(async move { // Thrash detection: if we just compacted and hit the threshold again, // something is wrong. if let Some(ref state) = self.compact_state { if state.just_compacted() { state.set_just_compacted(false); return Err(PodError::CompactThrash); } } let retained = self .compact_state .as_ref() .map(|s| s.retained_tokens()) .unwrap_or(manifest::defaults::COMPACT_RETAINED_TOKENS); self.send_event(Event::CompactStart); match self.compact(retained).await { Ok(new_session_id) => { info!( new_session_id = %new_session_id, "Compaction succeeded, resuming execution" ); self.send_event(Event::CompactDone { new_session_id }); if let Some(ref state) = self.compact_state { state.record_compact_success(); } self.resume().await } Err(e) => { warn!(error = %e, "Compaction failed during run"); self.send_event(Event::CompactFailed { error: e.to_string(), }); self.alert( AlertLevel::Error, AlertSource::Compactor, format!("mid-run compaction failed: {e}"), ); if let Some(ref state) = self.compact_state { state.record_compact_failure(); } Err(e) } } }) } /// Attempt proactive compaction at the beginning of a controller Run. /// /// This used to run in the controller's post-run path. Keeping it here /// preserves the ordering requirement that the next turn starts with a /// compacted history, without introducing a separate Busy controller state. /// Best-effort: failures are logged and surfaced, but do not abort the /// user turn that triggered the check. pub async fn try_pre_run_compact(&mut self) { let state = match self.compact_state.as_ref() { Some(s) if !s.is_disabled() && !s.just_compacted() => s.clone(), _ => return, }; let current_tokens = self.total_tokens().tokens; if !state.exceeds_post_run(current_tokens) { return; } let retained = state.retained_tokens(); self.send_event(Event::CompactStart); match self.compact(retained).await { Ok(new_session_id) => { info!( new_session_id = %new_session_id, "Proactive pre-run compaction succeeded" ); self.send_event(Event::CompactDone { new_session_id }); state.record_compact_success(); } Err(e) => { warn!(error = %e, "Proactive pre-run compaction failed"); self.send_event(Event::CompactFailed { error: e.to_string(), }); self.alert( AlertLevel::Warn, AlertSource::Compactor, format!("pre-run compaction failed: {e}"), ); state.record_compact_failure(); } } } /// Persist delta + turn end + outcome after a run/resume. async fn persist_turn( &mut self, history_before: usize, result: &Result, ) -> Result<(), StoreError> { // Use direct field access for split borrows (worker immutable, // head_hash mutable). let w = self.worker.as_ref().unwrap(); let new_items = &w.history()[history_before..]; let mut head = self.session_head.lock().await; self.session_id = head.session_id; session_store::save_delta(&self.store, head.session_id, &mut head.head_hash, new_items) .await?; drop(head); self.flush_pending_scope_snapshot().await?; let turn_count = self.worker.as_ref().unwrap().turn_count(); let mut head = self.session_head.lock().await; session_store::save_turn_end( &self.store, head.session_id, &mut head.head_hash, turn_count, ) .await?; drop(head); // Flush any sync-buffered metrics from this run first // (currently `prune.fire` / `prune.skip` from the prune observer). // Ordered before LlmUsage so that a `prune.fire` and the // `prune.post_request` derived from the matching usage record // appear in the log close together. // // Metric writes are intentionally non-fatal: a failure here // surfaces as a `Warn` alert + `tracing::warn!` and the loop // continues. Metrics are observability data, not load-bearing // for run correctness, so a transient FS error must not poison // the turn record (`save_delta` / `save_turn_end` already landed // by this point, and `save_run_completed` still needs to land). let pending_metrics = self.metrics_tracker.drain(); for metric in pending_metrics { self.try_record_metric(&metric).await; } // Persist any LLM Usage measurements collected during this run. // One LogEntry::LlmUsage per LLM call (the tool loop may have run // many calls within a single Pod::run). Each is also appended to // the in-memory `usage_history` so token-accounting APIs see it // before the next run. Records carrying a `correlation_id` (set // by an upstream observer such as the prune projection) also get // a paired `prune.post_request` metric so cache_read/write can be // joined back to the originating event. let usage_records = self.usage_tracker.drain(); for recorded in usage_records { let crate::compact::usage_tracker::RecordedUsage { record, correlation_id, } = recorded; let mut head = self.session_head.lock().await; session_store::save_usage( &self.store, head.session_id, &mut head.head_hash, record.history_len, record.input_total_tokens, record.cache_read_tokens, record.cache_write_tokens, record.output_tokens, ) .await?; drop(head); if let Some(id) = correlation_id { let metric = session_metrics::Metric::now("prune.post_request") .with_correlation_id(&id) .with_value(record.cache_read_tokens as f64) .with_dimension("cache_write_tokens", record.cache_write_tokens.to_string()) .with_dimension("history_len", record.history_len.to_string()); self.try_record_metric(&metric).await; } self.usage_history .lock() .expect("usage_history poisoned") .push(record); } let interrupted = self.worker.as_ref().unwrap().last_run_interrupted(); match result { Ok(r) => { let mut head = self.session_head.lock().await; session_store::save_run_completed( &self.store, head.session_id, &mut head.head_hash, r.clone(), interrupted, ) .await?; } Err(e) => { let mut head = self.session_head.lock().await; session_store::save_run_errored( &self.store, head.session_id, &mut head.head_hash, e.to_string(), interrupted, ) .await?; } } Ok(()) } /// Compact the current session by summarising history via a /// disposable Worker, then replacing history with /// `[summary, ...recent_turns]` and creating a new session. /// /// The summary Worker uses: /// - `compaction.model` from the manifest if configured, or /// - a clone of the main LlmClient via `clone_boxed()`. /// /// Returns the new session ID. pub async fn compact(&mut self, retained_tokens: u64) -> Result { use crate::compact::worker::{ CompactWorkerContext, CompactWorkerInterceptor, add_reference_tool, mark_read_required_tool, write_summary_tool, }; use crate::fs_view::PodFsView; // Decide the cut point by projecting the UsageRecord timeline onto // the current history: keep the tail whose estimated token count is // within `retained_tokens`. Item-granular, turn boundaries ignored. let cut = self.split_for_retained(retained_tokens); let worker = self.worker.as_ref().expect("worker taken during run"); let history = worker.history(); let retain_from = cut.index.min(history.len()); let retained_items = history[retain_from..].to_vec(); let items_to_summarise = history[..retain_from].to_vec(); // Compaction-related knobs. Fall through to manifest defaults when // `[compaction]` is omitted entirely. let (auto_read_budget, compact_worker_max_input_tokens, compact_worker_max_turns) = self .manifest .compaction .as_ref() .map(|c| { ( c.compact_auto_read_budget, c.compact_worker_max_input_tokens, c.compact_worker_max_turns, ) }) .unwrap_or(( manifest::defaults::COMPACT_AUTO_READ_BUDGET, manifest::defaults::COMPACT_WORKER_MAX_INPUT_TOKENS, manifest::defaults::COMPACT_WORKER_MAX_TURNS, )); // Default references: the N most-recently-touched files in the // session, surfaced so the compact worker can inspect them and // decide which (if any) the next session needs. let default_refs: Vec = self .tracker .as_ref() .map(|t| t.recent_files(manifest::defaults::COMPACT_DEFAULT_REFERENCE_COUNT)) .unwrap_or_default(); // Input text fed to the compact worker. Includes the default // references, current TaskStore snapshot, and the (pruned) // conversation text. let task_snapshot_text = self.task_store.snapshot_text(); let summary_input = build_summary_input( &items_to_summarise, &default_refs, Some(task_snapshot_text.as_str()), ); // Worker-side state collected by the compact worker's tool calls. let ctx = Arc::new(std::sync::Mutex::new(CompactWorkerContext::with_budget( auto_read_budget, ))); // Build an independent compact worker. Scope and pwd are shared // with the main Pod (reads go through the same policy) but the // Tracker is fresh — compact-time reads must not pollute the // main session's recency list, which feeds `default_refs` above. let scoped_fs = tools::ScopedFs::with_shared_scope(self.scope.clone(), self.pwd.clone()); let summary_tracker = tools::Tracker::new(); let summary_client: Box = self.build_compactor_client()?; let summary_system_prompt = self .prompts .compact_system() .map_err(PodError::PromptCatalog)?; let mut summary_worker = Worker::new(summary_client).system_prompt(summary_system_prompt); summary_worker.set_cache_key(Some(self.session_id.to_string())); // Occupancy-based input-token meter + interceptor. The tracker pairs // each pre-request history length with the following UsageEvent, then // the interceptor projects current prompt occupancy with the same // UsageRecord counter used by the main Pod thresholds. let summary_usage_tracker = Arc::new(UsageTracker::new()); { let tracker = summary_usage_tracker.clone(); summary_worker.on_usage(move |event| { tracker.record_usage(event); }); } summary_worker.set_interceptor(CompactWorkerInterceptor { usage_tracker: summary_usage_tracker, max_input_tokens: compact_worker_max_input_tokens, }); summary_worker.set_max_turns(compact_worker_max_turns); // Tools: read_file (shared scope, fresh tracker) + the three // compact-specific tools that populate `ctx`. summary_worker.register_tool(tools::read_tool(scoped_fs.clone(), summary_tracker)); summary_worker.register_tool(mark_read_required_tool(scoped_fs.clone(), ctx.clone())); summary_worker.register_tool(add_reference_tool(ctx.clone())); summary_worker.register_tool(write_summary_tool(ctx.clone())); let out = summary_worker .run(summary_input) .await .map_err(PodError::Worker)?; let mut locked_worker = out.worker; // Guard: nudge the worker once more if the expected outputs // (summary, and any auto-read nominations when default refs // existed) were not produced on the first pass. `write_summary` // is idempotent-by-overwrite so a second call is safe. let nudge = { let snapshot = ctx.lock().expect("compact ctx poisoned").clone(); if snapshot.summary.is_none() { Some( "You have not called `write_summary` yet. Deliver the structured \ summary now (Completed Tasks / Active Task / Key Decisions / \ User Directives / Current Work) and nominate any files the next \ session needs with `mark_read_required`." .to_string(), ) } else if snapshot.read_required.is_empty() && !default_refs.is_empty() { Some( "Summary received. If any of the referenced files are required \ for the next session to continue the task, call \ `mark_read_required` on them now. Otherwise reply briefly to \ close out." .to_string(), ) } else { None } }; if let Some(prompt) = nudge { let _ = locked_worker.run(prompt).await.map_err(PodError::Worker)?; } let final_ctx = ctx.lock().expect("compact ctx poisoned").clone(); let summary_text = final_ctx .summary .clone() .ok_or(PodError::CompactSummaryMissing)?; // Re-read each auto-read target via the Pod FS view. Errors are // logged and skipped inside `render_auto_read` rather than // aborting compaction — a missing / moved file should not fail // the whole compact. let auto_read_messages = PodFsView::new(scoped_fs.clone()).render_auto_read(&final_ctx.read_required); // Reference list as a single system message; omitted when empty. let reference_message = (!final_ctx.references.is_empty()).then(|| { let list = final_ctx .references .iter() .map(|p| format!("- {}", p.display())) .collect::>() .join("\n"); Item::system_message(format!( "[Referenced files — read before compaction, contents not included]\n\ {list}\n\ Use read_file to access current contents if needed." )) }); // Count surviving user_messages before consuming `retained_items` // — needed to align `self.user_segments` after the swap below. let retained_user_msgs = retained_items .iter() .filter(|i| i.is_user_message()) .count(); // Build new history: [summary, ...auto-read, references, ...retained, task snapshot, TaskList synthetic call/result]. // The TaskStore snapshot trails the retained items so that, on resume, // `replay_history` walks any pre-compact Task* calls preserved verbatim // in retained_items first and the trailing snapshot's `replace_with` // is the final word — pre-compact `TaskCreate` calls cannot leak as // duplicate entries. let mut new_history = Vec::with_capacity( 1 + auto_read_messages.len() + 3 + reference_message.is_some() as usize + retained_items.len(), ); let mut compact_introduced_system_messages = Vec::with_capacity(2 + auto_read_messages.len() + reference_message.is_some() as usize); let summary_message = Item::system_message(format!("[Compacted context summary]\n\n{summary_text}")); compact_introduced_system_messages.push(summary_message.clone()); compact_introduced_system_messages.extend(auto_read_messages.iter().cloned()); if let Some(msg) = reference_message.as_ref() { compact_introduced_system_messages.push(msg.clone()); } let task_snapshot_message = Item::system_message(format!( "[Session TaskStore snapshot]\n\n{task_snapshot_text}\n\n\ This is the complete session task list preserved across compaction. \ The following TaskList tool result presents the same state through the tool lane." )); compact_introduced_system_messages.push(task_snapshot_message.clone()); new_history.push(summary_message); new_history.extend(auto_read_messages); if let Some(msg) = reference_message { new_history.push(msg); } new_history.extend(retained_items); new_history.push(task_snapshot_message); new_history.push(Item::tool_call("compact-tasklist", "TaskList", "{}")); new_history.push(Item::tool_result_with_content( "compact-tasklist", tools::task::snapshot_overview(&self.task_store.list()), task_snapshot_text.clone(), )); // Persist as a new compacted session. let mut head = self.session_head.lock().await; let old_session_id = head.session_id; let old_head_hash = head .head_hash .clone() .expect("head_hash should be set after at least one entry"); let w = self.worker.as_ref().unwrap(); let state = SessionStartState { system_prompt: w.get_system_prompt(), config: w.request_config(), history: &new_history, }; let (new_session_id, new_head_hash) = session_store::create_compacted_session( &self.store, state, old_session_id, old_head_hash, ) .await?; // Swap in the new session state. usage_history belongs to the old // session — the new compacted session starts with no measurements // until its first LLM call. self.session_id = new_session_id; head.session_id = new_session_id; head.head_hash = Some(new_head_hash); // Keep pods.json pointing at the live session_id. Without this // a concurrent `restore_from_manifest(new_session_id)` would // see no live writer and grab the session this Pod just moved // into, causing two writers to race on the same jsonl. Skipped // when no allocation is installed (e.g. compact under // `Pod::new` in tests). if self.scope_allocation.is_some() { pod_registry::update_session(&self.manifest.pod.name, new_session_id)?; } drop(head); // Align user_segments with the post-compaction history. Items // before `retain_from` (now folded into the summary) lose their // segments; only the user_messages surviving in retained_items // keep them. They are always the trailing K entries of // `self.user_segments` because submissions are appended in order. let drop_n = self.user_segments.len().saturating_sub(retained_user_msgs); if drop_n > 0 { self.user_segments.drain(..drop_n); } self.worker.as_mut().unwrap().set_history(new_history); for item in &compact_introduced_system_messages { self.broadcast_system_message_item(item); } let worker = self.worker.as_mut().unwrap(); // Anchor the prompt cache at the summary item so that Anthropic // can place a durable `cache_control` breakpoint there — our // compact layout guarantees history[0] is the summary. worker.set_cache_anchor(Some(0)); // Re-key the OpenAI Responses prompt cache namespace to the new // session_id so post-compact turns share a key with extract / // consolidate workers running in the same session. worker.set_cache_key(Some(new_session_id.to_string())); self.usage_history .lock() .expect("usage_history poisoned") .clear(); self.persist_scope_snapshot().await?; // Reset extract pointer alongside usage_history: the compacted // session has a fresh log with no `LogEntry::Extension` entries // yet, so a cold restore here would set extract_pointer to None // via fold_pointer. The in-memory pointer must match — otherwise // `tokens_added_since(old_history_len)` would treat the new // (shorter) history as if it had already been processed, and // extract would stop firing for the rest of the process's // lifetime. *self .extract_pointer .lock() .expect("extract_pointer poisoned") = None; Ok(new_session_id) } /// Build the LlmClient for the compactor Worker. /// /// Uses `compaction.model` from manifest if set, otherwise clones /// the main client. fn build_compactor_client(&self) -> Result, PodError> { if let Some(ref compaction) = self.manifest.compaction { if let Some(ref model_config) = compaction.model { let client = provider::build_client(model_config)?; return Ok(client); } } let worker = self.worker.as_ref().expect("worker taken during run"); Ok(worker.client().clone_boxed()) } /// Build the LlmClient for the extract (memory.extract) Worker. /// /// Uses `memory.extract_model` from manifest if set, otherwise clones /// the main client. fn build_extractor_client( &self, memory_cfg: &manifest::MemoryConfig, ) -> Result, PodError> { if let Some(ref m) = memory_cfg.extract_model { let client = provider::build_client(m)?; return Ok(client); } let worker = self.worker.as_ref().expect("worker taken during run"); Ok(worker.client().clone_boxed()) } /// pointer 以降に増えたプロンプト全長の推定。extract trigger が /// 閾値判定に使う。 /// /// `total_tokens_at(now) - total_tokens_at(pointer)` の差分で、 /// compact と同じ accounting (measured / interpolated / extrapolated) /// に乗る。`history_len_pointer == 0` は「未抽出」扱いで現プロンプト /// 全長そのものが返る。 /// /// 素朴な `usage_history.input_total_tokens` の合計は使わない: /// `input_total_tokens` は **送信時の prompt prefix 全長** であって /// 増分ではないので、長い turn 内の連続 LLM call では super-set を /// 何度も足し込んでしまい実消費の数倍に膨らむ。 fn tokens_added_since(&self, history_len_pointer: usize) -> u64 { let now = self.history().len(); let total_now = self.total_tokens_at(now).tokens; let total_at_pointer = self.total_tokens_at(history_len_pointer).tokens; total_now.saturating_sub(total_at_pointer) } /// extract (memory.extract) post-run trigger. /// /// Called by the Controller before spawning the background memory task so /// the extract worker sees a stable session-log entry range while compact /// is deferred until the next turn starts. Best-effort: failures are /// logged but not propagated. /// /// Behaviour follows `docs/plan/memory.md` §Extract 並走防止: /// in-flight 中の trigger は skip し、完了時点で閾値再評価する /// (the loop below). Pending state is not retained — the /// re-evaluation happens naturally because the in-memory pointer /// has advanced. pub async fn try_post_run_extract(&mut self) -> Result<(), PodError> { let Some(memory_cfg) = self.manifest.memory.clone() else { return Ok(()); }; // `Some(0)` means disabled, same as `None`. Otherwise the // `tokens_since >= 0` comparison would fire on every post-run. let Some(threshold) = memory_cfg.extract_threshold.filter(|n| *n > 0) else { return Ok(()); }; loop { // CAS the in-flight flag. If another task is already running // an extract for this Pod, skip per spec. if self .extract_in_flight .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) .is_err() { return Ok(()); } let result = self.run_extract_once(&memory_cfg, threshold).await; self.extract_in_flight.store(false, Ordering::Release); match result { Ok(ExtractDecision::Skipped) => return Ok(()), Ok(ExtractDecision::Completed) => { // Re-evaluate threshold against the newly advanced // pointer. In the current synchronous architecture // this normally exits via Skipped on the next pass, // but the loop is forward-looking for the case // where new activity piles up while extract runs. continue; } Err(e) => { tracing::warn!(error = %e, "extract failed"); self.alert( AlertLevel::Warn, AlertSource::Pod, format!("memory extract failed: {e}"), ); return Ok(()); } } } } /// Single extract iteration: snapshot pointer, decide whether to /// fire, run the worker if so, persist results and the new pointer. async fn run_extract_once( &mut self, memory_cfg: &manifest::MemoryConfig, threshold: u64, ) -> Result { use memory::extract; let pointer_snapshot = self .extract_pointer .lock() .expect("extract_pointer poisoned") .clone(); let processed_history_len = pointer_snapshot .as_ref() .map(|p| p.processed_through_history_len) .unwrap_or(0); let tokens_since = self.tokens_added_since(processed_history_len); if tokens_since < threshold { return Ok(ExtractDecision::Skipped); } let current_history_len = self .worker .as_ref() .expect("worker present") .history() .len(); if current_history_len <= processed_history_len { return Ok(ExtractDecision::Skipped); } // Read the session log to get the current entry count. This is // the boundary for the source.range end_entry. Called once per // extract, on a small local file. let entries_now = self.store.read_all(self.session_id).await?.len(); if entries_now == 0 { return Ok(ExtractDecision::Skipped); } let end_entry = entries_now - 1; let start_entry = pointer_snapshot .as_ref() .map(|p| p.processed_through_entry + 1) .unwrap_or(0); if start_entry > end_entry { return Ok(ExtractDecision::Skipped); } let items_to_extract = self.worker.as_ref().expect("worker present").history() [processed_history_len..current_history_len] .to_vec(); let layout = memory::WorkspaceLayout::resolve(memory_cfg, &self.pwd); let cap = memory_cfg .extract_worker_max_input_tokens .unwrap_or(manifest::defaults::MEMORY_EXTRACT_WORKER_MAX_INPUT_TOKENS); let extract_worker_max_turns = memory_cfg .extract_worker_max_turns .or(manifest::defaults::MEMORY_EXTRACT_WORKER_MAX_TURNS); let client = self.build_extractor_client(memory_cfg)?; let memory_language = memory_language(memory_cfg); let extract_system_prompt = self .prompts .memory_extract_system(memory_language) .map_err(PodError::PromptCatalog)?; let mut extract_worker = Worker::new(client).system_prompt(extract_system_prompt); extract_worker.set_cache_key(Some(self.session_id.to_string())); // Occupancy-based input-token meter + interceptor. The tracker pairs // each pre-request history length with the following UsageEvent, then // the interceptor projects current prompt occupancy with the same // UsageRecord counter used by the main Pod thresholds. let extract_usage_tracker = Arc::new(UsageTracker::new()); { let tracker = extract_usage_tracker.clone(); extract_worker.on_usage(move |event| { tracker.record_usage(event); }); } extract_worker.set_interceptor(MemoryExtractWorkerInterceptor { usage_tracker: extract_usage_tracker, max_input_tokens: cap, }); extract_worker.set_max_turns(extract_worker_max_turns); let ctx = Arc::new(extract::ExtractWorkerContext::new()); extract_worker.register_tool(extract::write_extracted_tool(ctx.clone())); let input_text = extract::build_extract_input(&items_to_extract); extract_worker .run(input_text) .await .map_err(PodError::Worker)?; let payload = ctx.take_payload().unwrap_or_else(|| { tracing::warn!( "extract worker did not call write_extracted; \ advancing pointer with empty payload" ); extract::ExtractedPayload::default() }); let source_session_id = self.session_head.lock().await.session_id; let staging_id = if payload.is_empty() { String::new() } else { let source = memory::schema::SourceRef { session_id: source_session_id.to_string(), range: [start_entry as u64, end_entry as u64], }; let (id, _) = extract::write_staging(&layout, source, payload) .map_err(PodError::ExtractStaging)?; id.to_string() }; let pointer_payload = extract::ExtractPointerPayload { processed_through_entry: end_entry, processed_through_history_len: current_history_len, staging_id, }; let payload_value = serde_json::to_value(&pointer_payload) .expect("ExtractPointerPayload is always JSON-serializable"); { let mut head = self.session_head.lock().await; session_store::save_extension( &self.store, head.session_id, &mut head.head_hash, extract::EXTRACT_DOMAIN, payload_value, ) .await?; self.session_id = head.session_id; } *self .extract_pointer .lock() .expect("extract_pointer poisoned") = Some(pointer_payload); Ok(ExtractDecision::Completed) } /// Build the LlmClient for the consolidation (memory.consolidation) Worker. /// /// Uses `memory.consolidation_model` from manifest if set, otherwise /// clones the main client. Mirrors [`build_extractor_client`]. fn build_consolidator_client( &self, memory_cfg: &manifest::MemoryConfig, ) -> Result, PodError> { if let Some(ref m) = memory_cfg.consolidation_model { let client = provider::build_client(m)?; return Ok(client); } let worker = self.worker.as_ref().expect("worker taken during run"); Ok(worker.client().clone_boxed()) } /// consolidation (memory.consolidation) trigger. /// /// Intended to run from a background memory task after extract may have /// added staging entries. Compact is deferred until the next turn starts, /// so consolidation no longer blocks the controller's post-run path. /// /// Behaviour follows `docs/plan/memory.md` §Consolidation / §並走防止: /// the staging-side `StagingLock` enforces cross-process exclusion; /// `consolidation_in_flight` keeps in-process callers honest. On /// success, the lock is released *with* consumed-id cleanup; on /// worker failure, only the lock file is unlinked so the staging /// entries remain for a future retry. pub async fn try_post_run_consolidate(&mut self) -> Result<(), PodError> { let Some(memory_cfg) = self.manifest.memory.clone() else { return Ok(()); }; // `Some(0)` collapses to `None` — staging count / bytes always // satisfies `>= 0`, which would fire consolidation on every post-run. // Treating zero as disabled lines up with `extract_threshold` and // matches the "no threshold ⇒ consolidation off" invariant in the // ticket's §Trigger. let files_threshold = memory_cfg.consolidation_threshold_files.filter(|n| *n > 0); let bytes_threshold = memory_cfg.consolidation_threshold_bytes.filter(|n| *n > 0); if files_threshold.is_none() && bytes_threshold.is_none() { return Ok(()); } loop { if self .consolidation_in_flight .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) .is_err() { return Ok(()); } let result = self .run_consolidate_once(&memory_cfg, files_threshold, bytes_threshold) .await; self.consolidation_in_flight.store(false, Ordering::Release); match result { Ok(ConsolidateDecision::Skipped) => return Ok(()), Ok(ConsolidateDecision::Completed) => continue, Err(e) => { tracing::warn!(error = %e, "consolidation failed"); self.alert( AlertLevel::Warn, AlertSource::Pod, format!("memory consolidation failed: {e}"), ); return Ok(()); } } } } /// Single consolidation iteration: snapshot staging, decide whether to /// fire, run the worker if so, release the lock and clean up consumed /// IDs. async fn run_consolidate_once( &mut self, memory_cfg: &manifest::MemoryConfig, files_threshold: Option, bytes_threshold: Option, ) -> Result { use memory::consolidate; let layout = memory::WorkspaceLayout::resolve(memory_cfg, &self.pwd); let entries = consolidate::list_staging_entries(&layout); if entries.is_empty() { return Ok(ConsolidateDecision::Skipped); } let total_files = entries.len(); let total_bytes: u64 = entries.iter().map(|e| e.bytes).sum(); let files_hit = files_threshold.is_some_and(|n| total_files >= n); let bytes_hit = bytes_threshold.is_some_and(|n| total_bytes >= n); if !files_hit && !bytes_hit { return Ok(ConsolidateDecision::Skipped); } let consumed_ids: Vec = entries.iter().map(|e| e.id).collect(); let lock = match consolidate::StagingLock::acquire( &layout, std::process::id(), self.manifest.pod.name.clone(), consumed_ids, ) { Ok(l) => l, Err(memory::consolidate::LockError::InUse { .. }) => { return Ok(ConsolidateDecision::Skipped); } Err(e) => return Err(PodError::ConsolidationLock(e)), }; let client = match self.build_consolidator_client(memory_cfg) { Ok(c) => c, Err(e) => { lock.release_only(); return Err(e); } }; let memory_language = memory_language(memory_cfg); let consolidation_system_prompt = match self.prompts.memory_consolidation_system(memory_language) { Ok(p) => p, Err(e) => { lock.release_only(); return Err(PodError::PromptCatalog(e)); } }; let mut worker = Worker::new(client).system_prompt(consolidation_system_prompt); worker.set_cache_key(Some(self.session_id.to_string())); // Memory tools are self-contained — they bypass ScopedFs and write // directly under the workspace via WorkspaceLayout. Resident // knowledge injection (`Pod::set_resident_knowledge_injection`) is // a Pod-level concern; this disposable Worker is built without it // by construction, in keeping with `docs/plan/memory.md` §Consolidation // のKnowledgeアクセス (agent pulls knowledge through the search // tool instead of via system-prompt residency). let query_cfg = memory::tool::QueryConfig::from(memory_cfg); worker.register_tool(memory::tool::read_tool_with_usage( layout.clone(), self.session_id.to_string(), )); worker.register_tool(memory::tool::write_tool(layout.clone())); worker.register_tool(memory::tool::edit_tool(layout.clone())); worker.register_tool(memory::tool::memory_query_tool(layout.clone(), query_cfg)); worker.register_tool(memory::tool::knowledge_query_tool( layout.clone(), query_cfg, )); let tidy = consolidate::collect_tidy_hints(&layout); let usage_report = match memory::build_usage_report(&layout) { Ok(report) => report, Err(err) => { warn!(error = %err, "failed to build memory usage report for consolidation"); memory::UsageReport::empty() } }; let input_text = consolidate::build_consolidate_input(&layout, &entries, &tidy, &usage_report); let run_result = worker.run(input_text).await; match run_result { Ok(_) => { lock.release_with_cleanup(&layout); Ok(ConsolidateDecision::Completed) } Err(e) => { lock.release_only(); Err(PodError::Worker(e)) } } } } fn memory_language(cfg: &manifest::MemoryConfig) -> &str { cfg.language .as_deref() .map(str::trim) .filter(|language| !language.is_empty()) .unwrap_or(manifest::defaults::MEMORY_LANGUAGE) } fn worker_language(cfg: &manifest::WorkerManifest) -> &str { let language = cfg.language.trim(); if language.is_empty() { manifest::defaults::WORKER_LANGUAGE } else { language } } /// Outcome of a single extract iteration. Internal to /// `try_post_run_extract` / `run_extract_once`. enum ExtractDecision { /// Threshold not reached, or no items to extract. Skipped, /// Extract ran and pointer advanced. Caller re-evaluates threshold. Completed, } /// Pre-request interceptor for the extract worker. Aborts when current /// prompt occupancy crosses `max_input_tokens`. Uses the same /// `UsageRecord` + `llm_worker::token_counter::total_tokens` projection /// as the main Pod compaction thresholds, so prompt-cache hits are not /// counted cumulatively across turns. Kept separate from /// `compact::worker::CompactWorkerInterceptor` so each subsystem can /// tune its own cancel message and budget. struct MemoryExtractWorkerInterceptor { usage_tracker: Arc, max_input_tokens: u64, } #[async_trait] impl llm_worker::interceptor::Interceptor for MemoryExtractWorkerInterceptor { async fn pre_llm_request( &self, context: &mut Vec, ) -> llm_worker::interceptor::PreRequestAction { let records = self.usage_tracker.records(); let estimate = llm_worker::token_counter::total_tokens(context, &records); if estimate.tokens > self.max_input_tokens { return llm_worker::interceptor::PreRequestAction::Cancel(format!( "extract worker input occupancy exceeded {} tokens", self.max_input_tokens )); } self.usage_tracker.note_request(context.len()); llm_worker::interceptor::PreRequestAction::Continue } } /// Outcome of a single consolidation iteration. Internal to /// `try_post_run_consolidate` / `run_consolidate_once`. enum ConsolidateDecision { /// Either threshold not met, no staging, or another Pod holds the lock. Skipped, /// Consolidation ran. Caller re-evaluates threshold against any /// staging entries that arrived during the run (Coalesce). Completed, } impl Pod, St> { /// Create a Pod entirely from a validated manifest. /// /// The Pod's working directory is captured once here from the /// process's `std::env::current_dir()` — callers that want a /// different cwd must `cd` before constructing the Pod (e.g. the /// `SpawnPod` tool sets `Command::current_dir` on the child). The /// captured pwd is canonicalised and validated against /// `manifest.scope`. /// /// `loader` is installed into the system-prompt template /// environment so that `{% include "name" %}` / /// `{% import "name" %}` references resolve against the three-layer /// prompt asset library. pub async fn from_manifest( manifest: PodManifest, store: St, loader: PromptLoader, ) -> Result { let mut common = prepare_pod_common(&manifest, &loader, /* parse_template */ true)?; let skill_shadows = std::mem::take(&mut common.skill_shadows); // Session creation is deferred to the first run (see // `ensure_session_head`) so the SessionStart entry can capture // the rendered system prompt, not the raw template source. The // session_id is allocated here so the pod-registry registration // can record it from the start. let session_id = session_store::new_session_id(); // Register this Pod in the machine-wide pod-registry // before building anything else, so a spawn that conflicts on // scope fails fast. let socket_path = dir::default_base() .map_err(ScopeLockError::from)? .join(&manifest.pod.name) .join("sock"); let scope_allocation = pod_registry::install_top_level( manifest.pod.name.clone(), std::process::id(), socket_path, common.scope.allow_rules(), session_id, )?; let mut worker = Worker::new(common.client); apply_worker_manifest(&mut worker, &manifest.worker); worker.set_cache_key(Some(session_id.to_string())); let mut pod = Self { manifest, worker: Some(worker), store, session_id, session_head: Arc::new(AsyncMutex::new(SessionHead { session_id, head_hash: None, })), pwd: common.pwd, scope: SharedScope::new(common.scope), hook_builder: HookRegistryBuilder::new(), interceptor_installed: false, compact_state: None, usage_tracker: Arc::new(UsageTracker::new()), metrics_tracker: Arc::new(crate::compact::metrics_tracker::MetricsTracker::new()), usage_history: Arc::new(Mutex::new(Vec::new())), tracker: None, task_store: tools::TaskStore::new(), system_prompt_template: common.system_prompt_template, alerter: None, event_tx: None, pending_notifies: NotifyBuffer::new(), pending_attachments: Arc::new(Mutex::new(Vec::new())), scope_allocation: Some(scope_allocation), callback_socket: None, prompts: common.prompts, workflow_registry: common.workflow_registry, memory_layout: common.memory_layout, inject_resident_knowledge: true, pending_scope_snapshot: Arc::new(Mutex::new(None)), extract_in_flight: Arc::new(AtomicBool::new(false)), consolidation_in_flight: Arc::new(AtomicBool::new(false)), extract_pointer: Arc::new(Mutex::new(None)), memory_task: None, user_segments: Vec::new(), }; pod.apply_permissions_from_manifest(); pod.apply_prune_from_manifest(); drain_skill_shadows(&pod, skill_shadows); Ok(pod) } /// Build a Pod spawned by another Pod (sibling process). /// /// Behaves like [`Pod::from_manifest`] but claims the scope /// allocation that the spawner pre-registered via /// [`pod_registry::delegate_scope`], rather than installing a new /// top-level entry. `callback_socket` carries the spawner's /// Unix-socket path so the spawned Pod can send `Method::Notify` /// back to the spawner. pub async fn from_manifest_spawned( manifest: PodManifest, store: St, loader: PromptLoader, callback_socket: PathBuf, ) -> Result { let mut common = prepare_pod_common(&manifest, &loader, /* parse_template */ true)?; let skill_shadows = std::mem::take(&mut common.skill_shadows); let session_id = session_store::new_session_id(); let scope_allocation = pod_registry::adopt_allocation( manifest.pod.name.clone(), std::process::id(), session_id, )?; let mut worker = Worker::new(common.client); apply_worker_manifest(&mut worker, &manifest.worker); worker.set_cache_key(Some(session_id.to_string())); let mut pod = Self { manifest, worker: Some(worker), store, session_id, session_head: Arc::new(AsyncMutex::new(SessionHead { session_id, head_hash: None, })), pwd: common.pwd, scope: SharedScope::new(common.scope), hook_builder: HookRegistryBuilder::new(), interceptor_installed: false, compact_state: None, usage_tracker: Arc::new(UsageTracker::new()), metrics_tracker: Arc::new(crate::compact::metrics_tracker::MetricsTracker::new()), usage_history: Arc::new(Mutex::new(Vec::new())), tracker: None, task_store: tools::TaskStore::new(), system_prompt_template: common.system_prompt_template, alerter: None, event_tx: None, pending_notifies: NotifyBuffer::new(), pending_attachments: Arc::new(Mutex::new(Vec::new())), scope_allocation: Some(scope_allocation), callback_socket: Some(callback_socket), prompts: common.prompts, workflow_registry: common.workflow_registry, memory_layout: common.memory_layout, inject_resident_knowledge: true, pending_scope_snapshot: Arc::new(Mutex::new(None)), extract_in_flight: Arc::new(AtomicBool::new(false)), consolidation_in_flight: Arc::new(AtomicBool::new(false)), extract_pointer: Arc::new(Mutex::new(None)), memory_task: None, user_segments: Vec::new(), }; pod.apply_permissions_from_manifest(); pod.apply_prune_from_manifest(); drain_skill_shadows(&pod, skill_shadows); Ok(pod) } /// Restore a Pod from an existing session log. /// /// Resolves the manifest cascade exactly like [`Self::from_manifest`] /// (pwd / scope / pod-registry / client / prompt catalog), seeds a /// fresh Worker from the source session's `RestoredState`, and /// reuses the same `session_id` so subsequent turns append to the /// source jsonl as a continuation of the same conversation. /// /// Concurrent writers are prevented by the pod-registry: /// the registration carries `session_id`, and this constructor /// refuses to start when `pod_registry::lookup_session` already finds /// a live Pod writing to `session_id`. So there is no need to fork — /// resume is "the same session, a different process owning it". /// /// `system_prompt` is replayed verbatim from the session log — /// templates are not re-rendered on restore so a long-running /// session keeps a stable cache prefix even when the manifest's /// instruction template would render differently today. pub async fn restore_from_manifest( session_id: SessionId, manifest: PodManifest, store: St, loader: PromptLoader, ) -> Result { let state = session_store::restore(&store, session_id).await?; if state.head_hash.is_none() { return Err(PodError::SessionEmpty { session_id }); } let scope_snapshot = state .pod_scope .clone() .ok_or(PodError::SessionScopeMissing { session_id })?; let mut common = prepare_pod_common_with_scope( &manifest, &loader, /* parse_template */ false, ScopeConfig { allow: scope_snapshot.allow, deny: scope_snapshot.deny, }, )?; let skill_shadows = std::mem::take(&mut common.skill_shadows); // Atomic: register_pod inside install_top_level rejects when // another live allocation already holds `session_id`. Wrapping // the lookup + install inside a single `LockFileGuard` is what // makes "no two live Pods write to the same session log" // actually structural rather than a hopeful pre-check. let socket_path = dir::default_base() .map_err(ScopeLockError::from)? .join(&manifest.pod.name) .join("sock"); let scope_allocation = pod_registry::install_top_level_with_deny( manifest.pod.name.clone(), std::process::id(), socket_path, common.scope.allow_rules(), common.scope.deny_rules(), session_id, )?; // Build the worker and apply the manifest defaults first, then // overwrite the pieces the session log is authoritative for. let mut worker = Worker::new(common.client); apply_worker_manifest(&mut worker, &manifest.worker); worker.set_cache_key(Some(session_id.to_string())); if let Some(ref prompt) = state.system_prompt { worker.set_system_prompt(prompt); } // A leading `Role::System` item can only come from `compact` // (the Pod's one and only write path that prepends a summary at // history[0]). Restoring the anchor lets Anthropic re-use a // stable cache prefix for long-lived restored sessions. let anchored_on_summary = matches!( state.history.first(), Some(Item::Message { role: llm_worker::Role::System, .. }) ); worker.set_history(state.history.clone()); worker.set_request_config(state.config.clone()); worker.set_turn_count(state.turn_count); worker.set_last_run_interrupted(state.last_run_interrupted); if anchored_on_summary { worker.set_cache_anchor(Some(0)); } let extract_pointer = memory::extract::fold_pointer(&state.extensions); let task_store = tools::TaskStore::from_history(&state.history); let mut pod = Self { manifest, worker: Some(worker), store, session_id, session_head: Arc::new(AsyncMutex::new(SessionHead { session_id, head_hash: state.head_hash, })), pwd: common.pwd, scope: SharedScope::new(common.scope), hook_builder: HookRegistryBuilder::new(), interceptor_installed: false, compact_state: None, usage_tracker: Arc::new(UsageTracker::new()), metrics_tracker: Arc::new(crate::compact::metrics_tracker::MetricsTracker::new()), usage_history: Arc::new(Mutex::new(state.usage_history)), tracker: None, task_store, // Restore replays the saved system_prompt verbatim — no // template re-render on resume. system_prompt_template: None, alerter: None, event_tx: None, pending_notifies: NotifyBuffer::new(), pending_attachments: Arc::new(Mutex::new(Vec::new())), scope_allocation: Some(scope_allocation), callback_socket: None, prompts: common.prompts, workflow_registry: common.workflow_registry, memory_layout: common.memory_layout, inject_resident_knowledge: true, pending_scope_snapshot: Arc::new(Mutex::new(None)), extract_in_flight: Arc::new(AtomicBool::new(false)), consolidation_in_flight: Arc::new(AtomicBool::new(false)), extract_pointer: Arc::new(Mutex::new(extract_pointer)), memory_task: None, user_segments: state.user_segments, }; pod.apply_permissions_from_manifest(); pod.apply_prune_from_manifest(); drain_skill_shadows(&pod, skill_shadows); Ok(pod) } /// Convenience: build a Pod from a single-layer TOML manifest string. /// /// Parses the TOML into a [`PodManifestConfig`], converts to a /// validated [`PodManifest`] via `TryFrom`, then delegates to /// [`Pod::from_manifest`]. Useful for tests, debugging, and any /// caller that wants to skip the cascade entirely. pub async fn from_manifest_toml(toml: &str, store: St) -> Result { let config = PodManifestConfig::from_toml(toml).map_err(PodError::ManifestParse)?; let manifest = PodManifest::try_from(config).map_err(PodError::ManifestResolve)?; Self::from_manifest(manifest, store, PromptLoader::builtins_only()).await } } /// Apply worker-level manifest settings to a Worker. /// /// Note: `system_prompt` is intentionally not applied here. It is a /// minijinja template that is parsed by `Pod::from_manifest` and /// rendered once at first turn in `ensure_system_prompt_materialized`. pub fn apply_worker_manifest(worker: &mut Worker, wm: &WorkerManifest) { worker.set_request_config(request_config_from_worker_manifest(wm)); worker.set_max_turns(wm.max_turns.map(|n| n.get())); worker.set_tool_output_limits(Some(ToolOutputLimits { default_max_bytes: wm.tool_output.default_max_bytes, per_tool: wm.tool_output.per_tool.clone(), })); } fn request_config_from_worker_manifest(wm: &WorkerManifest) -> RequestConfig { let mut config = RequestConfig::new(); if let Some(max_tokens) = wm.max_tokens { config.max_tokens = Some(max_tokens); } if let Some(temperature) = wm.temperature { config.temperature = Some(temperature); } if let Some(top_p) = wm.top_p { config.top_p = Some(top_p); } if let Some(top_k) = wm.top_k { config.top_k = Some(top_k); } config.stop_sequences = wm.stop_sequences.clone(); config.reasoning = wm.reasoning.clone(); config } /// Result of a Pod run. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum PodRunResult { /// The LLM finished its turn normally. Finished, /// The LLM paused (e.g. awaiting user confirmation via a hook). Paused, /// The worker reached its configured max_turns limit. LimitReached, } impl From for PodRunResult { fn from(r: WorkerResult) -> Self { match r { WorkerResult::Finished => PodRunResult::Finished, WorkerResult::Paused => PodRunResult::Paused, WorkerResult::LimitReached => PodRunResult::LimitReached, // Yielded is internal to Pod: it's always caught by // handle_worker_result and never converted to PodRunResult. WorkerResult::Yielded => unreachable!("Yielded never converts to PodRunResult"), } } } /// Build the compact worker's input: default-reference instructions, /// the list of recently-touched files, and the pruned conversation /// produced by [`build_summary_prompt`]. fn build_summary_input( items: &[Item], default_refs: &[PathBuf], task_snapshot: Option<&str>, ) -> String { let mut out = String::new(); out.push_str( "Summarise the conversation below into a structured summary and nominate \ files the next session needs.\n\n", ); if !default_refs.is_empty() { out.push_str( "These files were touched recently in this session. Use `read_file` \ on them as needed, then call `mark_read_required` for any whose \ contents the next session must have, and `add_reference` for files \ it should know about by name only.\n\n## Referenced files\n", ); for p in default_refs { out.push_str("- "); out.push_str(&p.display().to_string()); out.push('\n'); } out.push('\n'); } if let Some(task_snapshot) = task_snapshot { out.push_str( "## Current Session TaskStore\n\ This is the full current task list. Use it as source material for the \ summary, especially active (pending/inprogress) tasks, but do not edit tasks \ from the compact worker.\n", ); out.push_str(task_snapshot); out.push_str("\n\n"); } out.push_str("## Conversation\n"); out.push_str(&build_summary_prompt(items)); out.push_str("\n\nWhen you are done, call `write_summary` with the final 5-section text."); out } /// Format conversation items into a text prompt for the summary Worker. /// /// The summary should capture decisions and user intent, not recreate code. /// File contents and tool IO belong in auto-read / references, not in the /// summary input. So this strips: /// - `ToolCall.arguments` (keep only the tool name) /// - `ToolResult.content` (keep only the summary line) /// - `Reasoning` entirely (intermediate thought, superseded by decisions) fn build_summary_prompt(items: &[Item]) -> String { let mut lines = Vec::new(); for item in items { match item { Item::Message { role, content, .. } => { let role_label = match role { llm_worker::Role::User => "User", llm_worker::Role::Assistant => "Assistant", llm_worker::Role::System => "System", }; let text: String = content .iter() .map(|p| p.as_text()) .collect::>() .join(""); lines.push(format!("[{role_label}] {text}")); } Item::ToolCall { name, .. } => { lines.push(format!("[ToolCall] {name}")); } Item::ToolResult { summary, .. } => { lines.push(format!("[ToolResult] {summary}")); } Item::Reasoning { .. } => {} } } lines.join("\n\n") } /// Pod errors. #[derive(Debug, thiserror::Error)] pub enum PodError { #[error(transparent)] Worker(#[from] WorkerError), #[error(transparent)] Store(#[from] StoreError), #[error(transparent)] Scope(ScopeError), #[error("pwd is not readable under the configured scope: {}", .pwd.display())] PwdOutsideScope { pwd: PathBuf }, #[error("failed to resolve pwd {}: {source}", .pwd.display())] InvalidPwd { pwd: PathBuf, #[source] source: std::io::Error, }, #[error("failed to parse manifest TOML: {0}")] ManifestParse(#[source] toml::de::Error), #[error("failed to resolve manifest config: {0}")] ManifestResolve(#[source] ResolveError), #[error(transparent)] Provider(#[from] provider::ProviderError), #[error("compaction thrash: context still exceeds threshold immediately after compact")] CompactThrash, #[error("compact worker did not produce a summary (write_summary was never called)")] CompactSummaryMissing, #[error("invalid system prompt template: {source}")] InvalidSystemPromptTemplate { #[source] source: SystemPromptError, }, #[error("failed to render system prompt template: {source}")] SystemPromptRender { #[source] source: SystemPromptError, }, #[error(transparent)] ScopeLock(#[from] ScopeLockError), #[error(transparent)] PromptCatalog(#[from] CatalogError), #[error("memory extract staging write failed: {0}")] ExtractStaging(#[source] memory::extract::StagingError), #[error("memory consolidation lock acquisition failed: {0}")] ConsolidationLock(#[source] memory::consolidate::LockError), #[error("workflow load failed: {0}")] WorkflowLoad(#[source] workflow_crate::WorkflowLoadError), #[error("workflow invocation failed: {0}")] WorkflowResolve(#[from] WorkflowResolveError), #[error("session {session_id} has no entries to restore")] SessionEmpty { session_id: SessionId }, #[error( "session {session_id} has no persisted scope snapshot; refusing resume without explicit scope" )] SessionScopeMissing { session_id: SessionId }, } /// Bundle of resources that every high-level Pod constructor needs: /// pwd, scope, an LLM client, the prompt catalog, and (optionally) a /// parsed system-prompt template. Built once by [`prepare_pod_common`] /// from the manifest cascade and then split into Pod fields. struct PodCommon { pwd: PathBuf, scope: Scope, client: Box, prompts: Arc, workflow_registry: workflow_crate::WorkflowRegistry, memory_layout: Option, system_prompt_template: Option, /// SKILL.md shadow events surfaced during workflow-registry build. /// The Pod constructor drains these into the notify buffer right /// after the Pod is materialised so the first LLM request observes /// any skill ↔ workflow collisions. skill_shadows: Vec, } /// Resolve pwd / scope / LLM client / prompt catalog from a validated /// manifest cascade. Used by `from_manifest`, `from_manifest_spawned`, /// and `restore_from_manifest` so they share one definition of "what /// pieces fall out of a manifest". /// /// `parse_template` controls whether the manifest's instruction is /// parsed as a system-prompt template. New Pods always parse so the /// template is rendered at first turn; restored Pods skip parsing /// because the saved session log replays a previously-rendered /// `system_prompt` verbatim. fn prepare_pod_common( manifest: &PodManifest, loader: &PromptLoader, parse_template: bool, ) -> Result { let pwd = current_pwd()?; let scope = build_scope_with_memory(manifest, &pwd)?; prepare_pod_common_from_scope(manifest, loader, parse_template, pwd, scope) } fn prepare_pod_common_with_scope( manifest: &PodManifest, loader: &PromptLoader, parse_template: bool, scope_config: ScopeConfig, ) -> Result { let pwd = current_pwd()?; let scope = Scope::from_config(&scope_config).map_err(PodError::Scope)?; prepare_pod_common_from_scope(manifest, loader, parse_template, pwd, scope) } fn prepare_pod_common_from_scope( manifest: &PodManifest, loader: &PromptLoader, parse_template: bool, pwd: PathBuf, scope: Scope, ) -> Result { if !scope.is_readable(&pwd) { return Err(PodError::PwdOutsideScope { pwd }); } let client = provider::build_client(&manifest.model)?; let prompts = PromptCatalog::load(loader, manifest.pod.prompt_pack.as_deref())?; let memory_layout = manifest .memory .as_ref() .map(|mem| memory::WorkspaceLayout::resolve(mem, &pwd)); let mut workflow_registry = match memory_layout.as_ref() { Some(layout) => workflow_crate::load_workflows(layout).map_err(PodError::WorkflowLoad)?, None => workflow_crate::WorkflowRegistry::empty(), }; let skill_shadows = ingest_skills(&mut workflow_registry, manifest); let system_prompt_template = if parse_template { Some( SystemPromptTemplate::parse(&manifest.worker.instruction, loader.clone()) .map_err(|source| PodError::InvalidSystemPromptTemplate { source })?, ) } else { None }; Ok(PodCommon { pwd, scope, client, prompts, workflow_registry, memory_layout, system_prompt_template, skill_shadows, }) } /// Ingest external SKILL.md sources into the workflow registry. /// /// Skills come exclusively from the manifest's `[skills] directories` /// list (resolved against the manifest base directory). Internal /// Workflows already loaded via [`workflow_crate::load_workflows`] take priority /// over skills sharing the same slug; collisions are surfaced as /// [`workflow_crate::ShadowedSkill`] events that the caller pushes onto the /// Pod's notification buffer. fn ingest_skills( registry: &mut workflow_crate::WorkflowRegistry, manifest: &PodManifest, ) -> Vec { let mut shadows = Vec::new(); let Some(skills_cfg) = manifest.skills.as_ref() else { return shadows; }; for dir in &skills_cfg.directories { for skill in workflow_crate::load_skills_from_dir(dir) { let source = workflow_crate::WorkflowSource::Skill { dir: dir.clone() }; let record = skill.into_workflow_record(source); if let Some(shadow) = registry.merge_skill(record) { shadows.push(shadow); } } } shadows } /// Drain skill-ingest shadow events into the Pod's notify buffer so the /// first LLM request renders them as system-message attachments. fn drain_skill_shadows(pod: &Pod, shadows: Vec) where C: LlmClient, S: Store, { for shadow in shadows { pod.push_notify(format!("[Skill shadowed] {}", shadow.message())); } } /// Build the Pod's runtime [`Scope`] from the manifest, layering the /// memory subsystem's deny-write rules on top when `[memory]` is /// present, and read-allow rules for any external Agent Skills /// directories ingested. The deny rules cap generic CRUD tools so they /// cannot touch `/memory/` or `/knowledge/` while /// the memory tools (registered separately) bypass `ScopedFs` and write /// through `std::fs` directly. Skill directories are added at /// `Permission::Read` so the agent can `Read` `scripts/` / `references/` /// / `assets/` referenced by the Workflow body. fn build_scope_with_memory(manifest: &PodManifest, pwd: &Path) -> Result { let mut scope_config = manifest.scope.clone(); if let Some(mem) = manifest.memory.as_ref() { let layout = memory::WorkspaceLayout::resolve(mem, pwd); scope_config.deny.extend(memory::deny_write_rules(&layout)); scope_config .deny .extend(workflow_crate::deny_write_rules(&layout)); } scope_config.allow.extend(skill_dir_read_rules(manifest)); Scope::from_config(&scope_config).map_err(PodError::Scope) } /// Allow-rules granting `Read` access to every skill directory the Pod /// will ingest from the manifest's `[skills] directories`. Returned /// rules are recursive so the entire skill bundle (`SKILL.md` + /// `scripts/` + `references/` + `assets/`) is readable. fn skill_dir_read_rules(manifest: &PodManifest) -> Vec { let Some(skills_cfg) = manifest.skills.as_ref() else { return Vec::new(); }; skills_cfg .directories .iter() .map(|dir| ScopeRule { target: dir.clone(), permission: Permission::Read, recursive: true, }) .collect() } /// Snapshot the process's current working directory as the Pod's pwd, /// canonicalising symlinks and any `.`/`..` components. The Pod keeps /// this value for its lifetime; changes to the process-wide cwd after /// construction do not affect scope checks or the system prompt. fn current_pwd() -> Result { let cwd = std::env::current_dir().map_err(|source| PodError::InvalidPwd { pwd: PathBuf::from("."), source, })?; cwd.canonicalize() .map_err(|source| PodError::InvalidPwd { pwd: cwd, source }) } #[cfg(test)] mod build_summary_prompt_tests { use super::*; #[test] fn strips_tool_call_arguments() { let items = vec![Item::tool_call_json( "call-1", "read_file", serde_json::json!({ "path": "src/main.rs" }), )]; let prompt = build_summary_prompt(&items); assert_eq!(prompt, "[ToolCall] read_file"); assert!(!prompt.contains("src/main.rs")); } #[test] fn strips_tool_result_content() { let items = vec![Item::tool_result_with_content( "call-1", "read 3 lines", "fn main() { println!(\"hello\"); }", )]; let prompt = build_summary_prompt(&items); assert_eq!(prompt, "[ToolResult] read 3 lines"); assert!(!prompt.contains("println")); } #[test] fn drops_reasoning_entirely() { let items = vec![ Item::user_message("hi"), Item::reasoning("internal deliberation"), Item::assistant_message("hello"), ]; let prompt = build_summary_prompt(&items); assert!(prompt.contains("[User] hi")); assert!(prompt.contains("[Assistant] hello")); assert!(!prompt.contains("Reasoning")); assert!(!prompt.contains("deliberation")); } #[test] fn worker_manifest_generation_settings_become_request_config() { let manifest = WorkerManifest { instruction: "unused".into(), language: manifest::defaults::WORKER_LANGUAGE.into(), max_tokens: Some(1024), max_turns: None, temperature: Some(0.2), top_p: Some(0.9), top_k: Some(40), stop_sequences: vec!["\n\n".into(), "".into()], reasoning: None, tool_output: manifest::ToolOutputLimits::default(), file_upload: manifest::FileUploadLimits::default(), }; let config = request_config_from_worker_manifest(&manifest); assert_eq!(config.max_tokens, Some(1024)); assert_eq!(config.temperature, Some(0.2)); assert_eq!(config.top_p, Some(0.9)); assert_eq!(config.top_k, Some(40)); assert_eq!(config.stop_sequences, vec!["\n\n", ""]); } #[test] fn keeps_user_and_assistant_messages() { let items = vec![ Item::user_message("fix the bug"), Item::assistant_message("done"), ]; let prompt = build_summary_prompt(&items); assert_eq!(prompt, "[User] fix the bug\n\n[Assistant] done"); } fn minimal_manifest_with_skills(dirs: Vec) -> PodManifest { // Construct the smallest possible PodManifest that resolves; only // the `skills` field matters for `skill_dir_read_rules`. let toml_str = r#" [pod] name = "x" [model] scheme = "anthropic" model_id = "claude-sonnet-4-20250514" [worker] [[scope.allow]] target = "/abs/scope" permission = "write" "#; let mut manifest = PodManifest::from_toml(toml_str).unwrap(); if !dirs.is_empty() { manifest.skills = Some(manifest::SkillsConfig { directories: dirs }); } manifest } #[test] fn skill_dir_read_rules_lists_workspace_skill_directories() { let manifest = minimal_manifest_with_skills(vec![ PathBuf::from("/abs/skills-a"), PathBuf::from("/abs/skills-b"), ]); let rules = skill_dir_read_rules(&manifest); let workspace_rules: Vec<_> = rules .iter() .filter(|r| { r.target == PathBuf::from("/abs/skills-a") || r.target == PathBuf::from("/abs/skills-b") }) .collect(); assert_eq!(workspace_rules.len(), 2); for rule in &workspace_rules { assert_eq!(rule.permission, Permission::Read); assert!(rule.recursive); } } #[test] fn skill_dir_read_rules_empty_when_skills_section_missing() { let manifest = minimal_manifest_with_skills(vec![]); let rules = skill_dir_read_rules(&manifest); assert!(rules.is_empty()); } #[test] fn ingest_skills_returns_empty_when_skills_section_missing() { let manifest = minimal_manifest_with_skills(vec![]); let mut registry = workflow_crate::WorkflowRegistry::empty(); let shadows = ingest_skills(&mut registry, &manifest); assert!(shadows.is_empty()); assert!(registry.is_empty()); } #[test] fn ingest_skills_loads_from_workspace_directories() { let dir = tempfile::tempdir().unwrap(); let skills_root = dir.path().join("skills"); std::fs::create_dir_all(skills_root.join("alpha")).unwrap(); std::fs::write( skills_root.join("alpha").join("SKILL.md"), "---\nname: alpha\ndescription: Alpha skill\n---\nbody\n", ) .unwrap(); let manifest = minimal_manifest_with_skills(vec![skills_root.clone()]); let mut registry = workflow_crate::WorkflowRegistry::empty(); let shadows = ingest_skills(&mut registry, &manifest); // workspace skill `alpha` should be registered (no collision). assert!( registry .get(&workflow_crate::Slug::parse("alpha").unwrap()) .is_some() ); // No workflow exists to shadow `alpha`, so no shadow event for it. assert!(shadows.iter().all(|s| s.slug.as_str() != "alpha")); } } #[cfg(test)] mod memory_extract_interceptor_tests { use super::*; use llm_worker::interceptor::{Interceptor, PreRequestAction}; use llm_worker::timeline::event::UsageEvent; fn make_usage(input: u64) -> UsageEvent { UsageEvent { input_tokens: Some(input), output_tokens: Some(0), total_tokens: Some(input), cache_read_input_tokens: None, cache_creation_input_tokens: None, } } #[tokio::test] async fn extract_interceptor_uses_occupancy_not_cumulative_usage() { let tracker = Arc::new(UsageTracker::new()); let interceptor = MemoryExtractWorkerInterceptor { usage_tracker: tracker.clone(), max_input_tokens: 150, }; let mut context = vec![Item::user_message("hello")]; assert!(matches!( interceptor.pre_llm_request(&mut context).await, PreRequestAction::Continue )); tracker.record_usage(&make_usage(100)); assert!(matches!( interceptor.pre_llm_request(&mut context).await, PreRequestAction::Continue )); tracker.record_usage(&make_usage(100)); // Two 100-token requests would exceed a cumulative 150-token cap, but // current occupancy is still the latest 100-token measurement. assert!(matches!( interceptor.pre_llm_request(&mut context).await, PreRequestAction::Continue )); } #[tokio::test] async fn extract_interceptor_cancels_when_occupancy_exceeds_cap() { let tracker = Arc::new(UsageTracker::new()); let interceptor = MemoryExtractWorkerInterceptor { usage_tracker: tracker.clone(), max_input_tokens: 99, }; let mut context = vec![Item::user_message("hello")]; assert!(matches!( interceptor.pre_llm_request(&mut context).await, PreRequestAction::Continue )); tracker.record_usage(&make_usage(100)); assert!(matches!( interceptor.pre_llm_request(&mut context).await, PreRequestAction::Cancel(message) if message.contains("occupancy") )); } }