merge: move task state into feature

This commit is contained in:
Keisuke Hirata 2026-06-05 11:23:22 +09:00
commit 9eceb189ba
No known key found for this signature in database
8 changed files with 749 additions and 478 deletions

View File

@ -42,6 +42,12 @@ pub enum PreRequestAction {
/// to: the items are committed before the request so later turns can see
/// why the worker changed course.
ContinueWith(Vec<Item>),
/// Yield after appending these items to durable worker history.
///
/// This is for host-mediated pre-request appends that must be visible to
/// usage accounting and compaction checks before the current LLM request is
/// allowed to proceed.
YieldWith(Vec<Item>),
/// Cancel with a reason (treated as an error).
Cancel(String),
/// Yield control to the caller for external processing.

View File

@ -1177,6 +1177,16 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
self.last_run_interrupted = true;
return Err(WorkerError::Aborted(reason));
}
PreRequestAction::YieldWith(items) => {
self.append_history_items(items.clone());
request_context.extend(items);
info!("Yielded by interceptor after pre-request history append");
for cb in &self.turn_end_cbs {
cb(current_turn);
}
self.last_run_interrupted = true;
return Ok(WorkerResult::Yielded);
}
PreRequestAction::Yield => {
info!("Yielded by interceptor");
for cb in &self.turn_end_cbs {

View File

@ -9,7 +9,7 @@ use session_store::Store;
use tokio::sync::{broadcast, mpsc, oneshot};
use crate::discovery::{PodDiscovery, list_pods_tool, restore_pod_tool, send_to_peer_pod_tool};
use crate::feature::{FeatureRegistryBuilder, builtin::task_tools_feature};
use crate::feature::FeatureRegistryBuilder;
use crate::ipc::alerter::Alerter;
use crate::ipc::notify_buffer::NotifyBuffer;
use crate::ipc::server::SocketServer;
@ -493,7 +493,7 @@ where
// below so the worker borrow doesn't conflict with reads on `pod`.
let scope_handle = pod.scope().clone();
let pwd = pod.pwd().to_path_buf();
let task_store = pod.task_store();
let task_feature = pod.task_feature();
let session_id_for_usage = pod.segment_id().to_string();
let memory_config = pod.manifest().memory.clone();
let web_config = pod.manifest().web.clone();
@ -522,7 +522,7 @@ where
));
let mut feature_registry = FeatureRegistryBuilder::new();
feature_registry.add_module(task_tools_feature(task_store));
feature_registry.add_module(task_feature);
let _feature_install_report = pod.install_features(feature_registry);
let worker = pod.worker_mut();

View File

@ -1746,18 +1746,27 @@ mod tests {
}
#[test]
fn builtin_internal_task_feature_descriptor_has_exact_tools_and_no_authorities() {
let descriptor = builtin::task_tools_feature(tools::TaskStore::new()).descriptor();
fn builtin_internal_task_feature_descriptor_has_exact_tools_hooks_and_no_authorities() {
let descriptor = builtin::task_tools_feature().descriptor();
let tool_names: Vec<_> = descriptor
.tools
.iter()
.map(|tool| tool.name.as_str())
.collect();
let hook_points: Vec<_> = descriptor
.hooks
.iter()
.map(|hook| hook.point.clone())
.collect();
assert_eq!(descriptor.id.as_str(), "builtin:task-tools");
assert_eq!(descriptor.runtime, FeatureRuntimeKind::Builtin);
assert!(descriptor.requested_authorities.is_empty());
assert!(descriptor.hooks.is_empty());
assert_eq!(
hook_points,
vec![FeatureHookPoint::PreRequest, FeatureHookPoint::PreToolCall]
);
assert!(descriptor.background_tasks.is_empty());
assert!(descriptor.provides_services.is_empty());
assert!(descriptor.requires_services.is_empty());
@ -1769,11 +1778,10 @@ mod tests {
#[test]
fn builtin_internal_task_feature_installs_declared_tools_without_host_authorities() {
let task_store = tools::TaskStore::new();
let mut hook_builder = HookRegistryBuilder::default();
let mut pending_tools = Vec::new();
let mut builder = FeatureRegistryBuilder::new();
builder.add_module(builtin::task_tools_feature(task_store));
builder.add_module(builtin::task_tools_feature());
let mut declared_names: Vec<_> = builder.descriptors()[0]
.tools
.iter()
@ -1797,6 +1805,10 @@ mod tests {
);
assert!(report.reports[0].skipped.is_empty());
assert!(report.reports[0].diagnostics.is_empty());
assert_eq!(report.reports[0].installed_hooks.len(), 2);
let hook_registry = hook_builder.build();
assert_eq!(hook_registry.pre_llm_request.len(), 1);
assert_eq!(hook_registry.pre_tool_call.len(), 1);
assert_eq!(declared_names, sorted_installed_names);
assert_eq!(
installed_names,
@ -1810,11 +1822,10 @@ mod tests {
#[test]
fn builtin_task_feature_installs_through_worker_tool_path() {
let task_store = tools::TaskStore::new();
let mut worker = Worker::new(DummyClient);
let mut hook_builder = HookRegistryBuilder::default();
let report = FeatureRegistryBuilder::new()
.with_module(builtin::task_tools_feature(task_store))
.with_module(builtin::task_tools_feature())
.install_into_worker(&mut worker, &mut hook_builder);
worker.tool_server_handle().flush_pending();

View File

@ -6,4 +6,4 @@
pub mod task;
pub use task::task_tools_feature;
pub use task::{TaskFeature, task_tools_feature};

View File

@ -1,30 +1,102 @@
//! Task tools built-in feature module.
//!
//! This is the reference path for extracting an internal built-in module into
//! the feature contribution boundary. The Pod host still owns the Pod-lifetime
//! [`tools::TaskStore`] and passes the shared handle in at construction time;
//! the module requests no sandbox/external-plugin host authorities.
//! The built-in Task feature owns the session-lifetime [`tools::TaskStore`]
//! shared by the Task tools and reminder hooks. Pod hosts install this module
//! through the feature contribution boundary and use its narrow snapshot surface
//! for restore/rewind/compaction compatibility; Pod does not own Task-specific
//! store or reminder state.
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use async_trait::async_trait;
use llm_worker::Item;
use crate::feature::{
FeatureDescriptor, FeatureInstallContext, FeatureInstallError, FeatureModule, ToolContribution,
ToolDeclaration,
FeatureDescriptor, FeatureHookPoint, FeatureInstallContext, FeatureInstallError, FeatureModule,
HookDeclaration, ToolContribution, ToolDeclaration,
};
use crate::hook::{
Hook, HookPreRequestAction, HookPreToolAction, PreLlmRequest, PreRequestContext, PreToolCall,
ToolCallSummary,
};
/// Construct the built-in Task tool feature module.
const TASK_REMINDER_REQUEST_THRESHOLD: usize = 24;
const TASK_REMINDER_COOLDOWN_REQUESTS: usize = 24;
const TASK_MANAGEMENT_TOOL_NAMES: [&str; 2] = ["TaskCreate", "TaskUpdate"];
/// Construct the built-in Task feature module with a fresh session store.
///
/// The returned module contributes only `TaskCreate`, `TaskUpdate`, `TaskGet`,
/// and `TaskList` through descriptor-approved tool registration. It does not
/// request host authorities; normal ToolRegistry and PreToolCall permission
/// policy still applies at call time.
pub fn task_tools_feature(task_store: tools::TaskStore) -> impl FeatureModule {
TaskToolsFeature { task_store }
/// The returned module contributes `TaskCreate`, `TaskUpdate`, `TaskGet`, and
/// `TaskList` through descriptor-approved tool registration, plus built-in hooks
/// that maintain Task-reminder state. It does not request sandbox/external-plugin
/// host authorities; normal ToolRegistry and PreToolCall permission policy still
/// applies at call time.
pub fn task_tools_feature() -> TaskFeature {
TaskFeature::new()
}
struct TaskToolsFeature {
/// Built-in Task feature state and contribution module.
#[derive(Clone, Debug)]
pub struct TaskFeature {
state: Arc<TaskFeatureState>,
}
#[derive(Debug)]
struct TaskFeatureState {
task_store: tools::TaskStore,
reminder_state: TaskReminderState,
}
impl FeatureModule for TaskToolsFeature {
impl TaskFeature {
pub fn new() -> Self {
Self::from_store(tools::TaskStore::new())
}
pub fn from_history(history: &[Item]) -> Self {
Self::from_store(tools::TaskStore::from_history(history))
}
fn from_store(task_store: tools::TaskStore) -> Self {
Self {
state: Arc::new(TaskFeatureState {
task_store,
reminder_state: TaskReminderState::new(),
}),
}
}
/// Restore the feature-owned store by replaying durable history into the
/// existing shared store handle. Existing Task tool instances and hooks keep
/// pointing at the same feature-owned store after rewind.
pub fn restore_from_history(&self, history: &[Item]) {
let restored = tools::TaskStore::from_history(history);
self.state.task_store.replace_with(restored.list());
}
/// Feature-owned snapshot text used by compaction to preserve Task state.
pub fn snapshot_text(&self) -> String {
self.state.task_store.snapshot_text()
}
/// Feature-owned compact summary used for the synthetic TaskList result.
pub fn snapshot_overview(&self) -> String {
tools::task::snapshot_overview(&self.state.task_store.list())
}
#[cfg(test)]
fn task_store(&self) -> tools::TaskStore {
self.state.task_store.clone()
}
}
impl Default for TaskFeature {
fn default() -> Self {
Self::new()
}
}
impl FeatureModule for TaskFeature {
fn descriptor(&self) -> FeatureDescriptor {
FeatureDescriptor::builtin("task-tools", "Task tools")
.with_description("Session-lifetime task tracking builtin tools")
@ -44,18 +116,458 @@ impl FeatureModule for TaskToolsFeature {
"TaskList",
"List session-lifetime user-visible tasks",
))
.with_hook(HookDeclaration::new(
"task-reminder-pre-request",
FeatureHookPoint::PreRequest,
))
.with_hook(HookDeclaration::new(
"task-reminder-tool-usage",
FeatureHookPoint::PreToolCall,
))
}
fn install(&self, context: &mut FeatureInstallContext<'_>) -> Result<(), FeatureInstallError> {
let names = ["TaskCreate", "TaskList", "TaskGet", "TaskUpdate"];
for (name, definition) in names
.into_iter()
.zip(tools::task_tools(self.task_store.clone()))
.zip(tools::task_tools(self.state.task_store.clone()))
{
context
.tools()
.register(ToolContribution::new(name, definition))?;
}
context.hooks().add_pre_request(
"task-reminder-pre-request",
TaskReminderPreRequestHook {
state: Arc::clone(&self.state),
},
)?;
context.hooks().add_pre_tool_call(
"task-reminder-tool-usage",
TaskReminderToolUsageHook {
state: Arc::clone(&self.state),
},
)?;
Ok(())
}
}
#[derive(Debug)]
struct TaskReminderState {
requests_since_last_task_management: AtomicUsize,
requests_since_last_reminder: AtomicUsize,
}
impl Default for TaskReminderState {
fn default() -> Self {
Self {
requests_since_last_task_management: AtomicUsize::new(0),
requests_since_last_reminder: AtomicUsize::new(TASK_REMINDER_COOLDOWN_REQUESTS),
}
}
}
impl TaskReminderState {
fn new() -> Self {
Self::default()
}
fn note_request(&self) -> (usize, usize) {
let since_task_management = self
.requests_since_last_task_management
.fetch_add(1, Ordering::Relaxed)
.saturating_add(1);
let since_reminder = self
.requests_since_last_reminder
.fetch_add(1, Ordering::Relaxed)
.saturating_add(1);
(since_task_management, since_reminder)
}
fn note_task_management(&self) {
self.requests_since_last_task_management
.store(0, Ordering::Relaxed);
}
fn note_reminder(&self) {
self.requests_since_last_reminder
.store(0, Ordering::Relaxed);
}
}
struct TaskReminderPreRequestHook {
state: Arc<TaskFeatureState>,
}
#[async_trait]
impl Hook<PreLlmRequest> for TaskReminderPreRequestHook {
async fn call(&self, input: &PreRequestContext) -> HookPreRequestAction {
let active_tasks: Vec<tools::TaskEntry> = self
.state
.task_store
.list()
.into_iter()
.filter(|task| {
matches!(
task.status,
tools::TaskStatus::Pending | tools::TaskStatus::Inprogress
)
})
.collect();
if active_tasks.is_empty() {
return HookPreRequestAction::Continue;
}
let (since_task_management, since_reminder) = self.state.reminder_state.note_request();
if since_task_management < TASK_REMINDER_REQUEST_THRESHOLD
|| since_reminder < TASK_REMINDER_COOLDOWN_REQUESTS
{
return HookPreRequestAction::Continue;
}
if let Some(system_items) = input.system_items() {
self.state.reminder_state.note_reminder();
system_items.append_task_reminder(render_task_reminder_body(&active_tasks));
}
HookPreRequestAction::Continue
}
}
struct TaskReminderToolUsageHook {
state: Arc<TaskFeatureState>,
}
#[async_trait]
impl Hook<PreToolCall> for TaskReminderToolUsageHook {
async fn call(&self, input: &ToolCallSummary) -> HookPreToolAction {
if is_task_management_tool(&input.tool_name) {
self.state.reminder_state.note_task_management();
}
HookPreToolAction::Continue
}
}
fn is_task_management_tool(name: &str) -> bool {
TASK_MANAGEMENT_TOOL_NAMES.contains(&name)
}
fn render_task_reminder_body(active_tasks: &[tools::TaskEntry]) -> String {
let mut body = String::from(
"Active session tasks are still open. If progress changed, call TaskUpdate.\n",
);
for task in active_tasks {
body.push_str(&format!(
"- taskid {} ({}) {}\n",
task.taskid, task.status, task.subject
));
}
body.trim_end_matches('\n').to_string()
}
#[cfg(test)]
mod tests {
use std::sync::{Arc, Mutex};
use session_store::{SystemItem, SystemReminderSource};
use super::*;
use crate::hook::{PreRequestInfo, SystemItemAppendHandle};
fn pre_request_context(pending: Arc<Mutex<Vec<SystemItem>>>) -> PreRequestContext {
PreRequestContext::new(
PreRequestInfo {
item_count: 1,
estimated_tokens: None,
turn_index: 0,
tool_calls_this_turn: 0,
},
Some(SystemItemAppendHandle::new(pending)),
)
}
fn tool_summary(name: &str) -> ToolCallSummary {
ToolCallSummary {
call_id: "call-id".into(),
tool_name: name.into(),
arguments: serde_json::json!({}),
}
}
#[tokio::test]
async fn task_reminder_hook_appends_after_inactive_request_threshold() {
let feature = TaskFeature::new();
feature
.task_store()
.create("keep going".into(), "long task description".into());
let hook = TaskReminderPreRequestHook {
state: Arc::clone(&feature.state),
};
let pending = Arc::new(Mutex::new(Vec::new()));
for _ in 0..TASK_REMINDER_REQUEST_THRESHOLD - 1 {
let _ = hook.call(&pre_request_context(Arc::clone(&pending))).await;
assert!(pending.lock().expect("pending queue poisoned").is_empty());
}
let _ = hook.call(&pre_request_context(Arc::clone(&pending))).await;
let queued = pending.lock().expect("pending queue poisoned");
assert_eq!(queued.len(), 1);
let SystemItem::TaskReminder { body, .. } = &queued[0] else {
panic!("unexpected system item: {:?}", queued[0]);
};
assert_eq!(body.matches("<system-reminder>").count(), 1);
assert_eq!(body.matches("</system-reminder>").count(), 1);
assert!(body.contains("taskid 1"));
assert!(body.contains("pending"));
assert!(body.contains("keep going"));
assert!(!body.contains("long task description"));
}
#[tokio::test]
async fn task_reminder_hook_retains_source() {
let feature = TaskFeature::new();
feature.task_store().create("typed".into(), String::new());
let hook = TaskReminderPreRequestHook {
state: Arc::clone(&feature.state),
};
let pending = Arc::new(Mutex::new(Vec::new()));
for _ in 0..TASK_REMINDER_REQUEST_THRESHOLD {
let _ = hook.call(&pre_request_context(Arc::clone(&pending))).await;
}
let queued = pending.lock().expect("pending queue poisoned");
let SystemItem::TaskReminder { source, body } = &queued[0] else {
panic!("unexpected system item: {:?}", queued[0]);
};
assert_eq!(*source, SystemReminderSource::TaskInactivity);
assert_eq!(body.matches("<system-reminder>").count(), 1);
assert_eq!(body.matches("</system-reminder>").count(), 1);
assert!(body.contains("typed"));
}
#[test]
fn render_task_reminder_body_is_unwrapped_for_system_reminder_helper() {
let feature = TaskFeature::new();
let task = feature.task_store().create("body".into(), String::new());
let body = render_task_reminder_body(&[task]);
assert!(!body.contains("<system-reminder>"));
assert!(!body.contains("</system-reminder>"));
assert!(body.contains("TaskUpdate"));
assert!(body.contains("taskid 1"));
}
#[test]
fn task_reminder_state_starts_with_initial_cooldown_elapsed() {
let state = TaskReminderState::new();
assert_eq!(
state.requests_since_last_reminder.load(Ordering::Relaxed),
TASK_REMINDER_COOLDOWN_REQUESTS
);
assert_eq!(
state
.requests_since_last_task_management
.load(Ordering::Relaxed),
0
);
}
#[tokio::test]
async fn task_management_tool_call_resets_reminder_inactivity_counter() {
let feature = TaskFeature::new();
feature
.task_store()
.create("track me".into(), String::new());
let pre_request = TaskReminderPreRequestHook {
state: Arc::clone(&feature.state),
};
let pre_tool = TaskReminderToolUsageHook {
state: Arc::clone(&feature.state),
};
let pending = Arc::new(Mutex::new(Vec::new()));
for _ in 0..TASK_REMINDER_REQUEST_THRESHOLD - 1 {
let _ = pre_request
.call(&pre_request_context(Arc::clone(&pending)))
.await;
assert!(pending.lock().expect("pending queue poisoned").is_empty());
}
let _ = pre_tool.call(&tool_summary("TaskUpdate")).await;
for _ in 0..TASK_REMINDER_REQUEST_THRESHOLD - 1 {
let _ = pre_request
.call(&pre_request_context(Arc::clone(&pending)))
.await;
assert!(pending.lock().expect("pending queue poisoned").is_empty());
}
let _ = pre_request
.call(&pre_request_context(Arc::clone(&pending)))
.await;
assert_eq!(pending.lock().expect("pending queue poisoned").len(), 1);
}
#[tokio::test]
async fn task_reminder_respects_cooldown_after_reminder() {
let feature = TaskFeature::new();
feature
.task_store()
.create("cooldown".into(), String::new());
let hook = TaskReminderPreRequestHook {
state: Arc::clone(&feature.state),
};
let pending = Arc::new(Mutex::new(Vec::new()));
for _ in 0..TASK_REMINDER_REQUEST_THRESHOLD {
let _ = hook.call(&pre_request_context(Arc::clone(&pending))).await;
}
pending.lock().expect("pending queue poisoned").clear();
for _ in 0..TASK_REMINDER_COOLDOWN_REQUESTS - 1 {
let _ = hook.call(&pre_request_context(Arc::clone(&pending))).await;
assert!(pending.lock().expect("pending queue poisoned").is_empty());
}
let _ = hook.call(&pre_request_context(Arc::clone(&pending))).await;
assert_eq!(pending.lock().expect("pending queue poisoned").len(), 1);
}
#[tokio::test]
async fn task_reminder_is_silent_when_no_active_tasks_exist() {
let feature = TaskFeature::new();
let done = feature
.task_store()
.create("done".into(), String::new())
.taskid;
feature
.task_store()
.update(done, Some(tools::TaskStatus::Completed), None, None)
.expect("complete task");
let hook = TaskReminderPreRequestHook {
state: Arc::clone(&feature.state),
};
let pending = Arc::new(Mutex::new(Vec::new()));
for _ in 0..TASK_REMINDER_REQUEST_THRESHOLD * 2 {
let _ = hook.call(&pre_request_context(Arc::clone(&pending))).await;
assert!(pending.lock().expect("pending queue poisoned").is_empty());
}
}
#[tokio::test]
async fn inactive_requests_without_active_tasks_do_not_prime_task_reminder() {
let feature = TaskFeature::new();
let hook = TaskReminderPreRequestHook {
state: Arc::clone(&feature.state),
};
let pending = Arc::new(Mutex::new(Vec::new()));
for _ in 0..TASK_REMINDER_REQUEST_THRESHOLD * 2 {
let _ = hook.call(&pre_request_context(Arc::clone(&pending))).await;
assert!(pending.lock().expect("pending queue poisoned").is_empty());
}
feature
.task_store()
.create("new active".into(), String::new());
for _ in 0..TASK_REMINDER_REQUEST_THRESHOLD - 1 {
let _ = hook.call(&pre_request_context(Arc::clone(&pending))).await;
assert!(pending.lock().expect("pending queue poisoned").is_empty());
}
let _ = hook.call(&pre_request_context(Arc::clone(&pending))).await;
assert_eq!(pending.lock().expect("pending queue poisoned").len(), 1);
}
#[tokio::test]
async fn task_create_reset_does_not_block_first_reminder_cooldown() {
let feature = TaskFeature::new();
let pre_request = TaskReminderPreRequestHook {
state: Arc::clone(&feature.state),
};
let pre_tool = TaskReminderToolUsageHook {
state: Arc::clone(&feature.state),
};
let pending = Arc::new(Mutex::new(Vec::new()));
for _ in 0..TASK_REMINDER_REQUEST_THRESHOLD * 2 {
let _ = pre_request
.call(&pre_request_context(Arc::clone(&pending)))
.await;
assert!(pending.lock().expect("pending queue poisoned").is_empty());
}
let _ = pre_tool.call(&tool_summary("TaskCreate")).await;
feature
.task_store()
.create("created after idle".into(), String::new());
assert_eq!(
feature
.state
.reminder_state
.requests_since_last_reminder
.load(Ordering::Relaxed),
TASK_REMINDER_COOLDOWN_REQUESTS,
"TaskCreate reset must not clear the initial reminder cooldown"
);
for _ in 0..TASK_REMINDER_REQUEST_THRESHOLD - 1 {
let _ = pre_request
.call(&pre_request_context(Arc::clone(&pending)))
.await;
assert!(pending.lock().expect("pending queue poisoned").is_empty());
}
let _ = pre_request
.call(&pre_request_context(Arc::clone(&pending)))
.await;
assert_eq!(pending.lock().expect("pending queue poisoned").len(), 1);
}
#[tokio::test]
async fn missing_system_item_handle_does_not_mark_reminder_sent() {
let feature = TaskFeature::new();
feature.task_store().create("handle".into(), String::new());
let hook = TaskReminderPreRequestHook {
state: Arc::clone(&feature.state),
};
let no_handle = PreRequestContext::new(
PreRequestInfo {
item_count: 1,
estimated_tokens: None,
turn_index: 0,
tool_calls_this_turn: 0,
},
None,
);
for _ in 0..TASK_REMINDER_REQUEST_THRESHOLD {
let _ = hook.call(&no_handle).await;
}
assert_eq!(
feature
.state
.reminder_state
.requests_since_last_reminder
.load(Ordering::Relaxed),
TASK_REMINDER_COOLDOWN_REQUESTS + TASK_REMINDER_REQUEST_THRESHOLD,
"without a handle the hook must not record a reminder as emitted"
);
}
#[test]
fn restore_from_history_keeps_existing_store_handle_for_installed_tools() {
let feature = TaskFeature::new();
let handle = feature.task_store();
handle.create("old".into(), String::new());
let history = vec![Item::tool_call(
"c1",
"TaskCreate",
r#"{"subject":"restored","description":"from history"}"#,
)];
feature.restore_from_history(&history);
let tasks = handle.list();
assert_eq!(tasks.len(), 1);
assert_eq!(tasks[0].subject, "restored");
}
}

View File

@ -7,6 +7,7 @@
//! event-specific read-only contexts and only return control-flow
//! decisions (continue / skip / abort / pause).
use std::borrow::Cow;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
@ -23,8 +24,7 @@ use tracing::warn;
use crate::compact::state::CompactState;
use crate::compact::usage_tracker::UsageTracker;
use session_store::{SystemItem, SystemReminder};
use tools::{TaskEntry, TaskStatus, TaskStore};
use session_store::SystemItem;
use crate::hook::{
AbortInfo, HookPostToolAction, HookPreRequestAction, HookPreToolAction, HookPromptAction,
@ -39,53 +39,6 @@ use llm_worker::token_counter::total_tokens;
/// Maximum number of bytes copied into `TurnEndInfo::final_text_preview`.
const FINAL_TEXT_PREVIEW_LIMIT: usize = 512;
const TASK_REMINDER_REQUEST_THRESHOLD: usize = 24;
const TASK_REMINDER_COOLDOWN_REQUESTS: usize = 24;
const TASK_MANAGEMENT_TOOL_NAMES: [&str; 2] = ["TaskCreate", "TaskUpdate"];
#[derive(Debug)]
pub(crate) struct TaskReminderState {
requests_since_last_task_management: AtomicUsize,
requests_since_last_reminder: AtomicUsize,
}
impl Default for TaskReminderState {
fn default() -> Self {
Self {
requests_since_last_task_management: AtomicUsize::new(0),
requests_since_last_reminder: AtomicUsize::new(TASK_REMINDER_COOLDOWN_REQUESTS),
}
}
}
impl TaskReminderState {
pub(crate) fn new() -> Self {
Self::default()
}
fn note_request(&self) -> (usize, usize) {
let since_task_management = self
.requests_since_last_task_management
.fetch_add(1, Ordering::Relaxed)
.saturating_add(1);
let since_reminder = self
.requests_since_last_reminder
.fetch_add(1, Ordering::Relaxed)
.saturating_add(1);
(since_task_management, since_reminder)
}
fn note_task_management(&self) {
self.requests_since_last_task_management
.store(0, Ordering::Relaxed);
}
fn note_reminder(&self) {
self.requests_since_last_reminder
.store(0, Ordering::Relaxed);
}
}
pub(crate) struct PodInterceptor {
registry: Arc<HookRegistry>,
compact_state: Option<Arc<CompactState>>,
@ -109,10 +62,6 @@ pub(crate) struct PodInterceptor {
/// `PromptAction::ContinueWith`. Populated by `Pod::run`
/// immediately before handing off to the worker.
pending_attachments: Arc<Mutex<Vec<SystemItem>>>,
/// Task state observed by built-in task tools. Used to nudge the main
/// worker when active tasks have gone unmentioned for several requests.
task_store: TaskStore,
task_reminder_state: Arc<TaskReminderState>,
/// Prompt catalog used to render pending notification entries into the
/// same system-message text that will be persisted in history.
prompts: Arc<PromptCatalog>,
@ -135,8 +84,6 @@ impl PodInterceptor {
usage_history: Option<Arc<Mutex<Vec<UsageRecord>>>>,
pending_notifies: NotifyBuffer,
pending_attachments: Arc<Mutex<Vec<SystemItem>>>,
task_store: TaskStore,
task_reminder_state: Arc<TaskReminderState>,
prompts: Arc<PromptCatalog>,
log_writer: Option<Arc<dyn SystemItemCommitter>>,
) -> Self {
@ -147,8 +94,6 @@ impl PodInterceptor {
usage_tracker: None,
pending_notifies,
pending_attachments,
task_store,
task_reminder_state,
prompts,
log_writer,
next_turn_index: AtomicUsize::new(0),
@ -194,47 +139,28 @@ impl PodInterceptor {
Some(total_tokens(context, &records).tokens)
}
fn task_reminder_system_item(&self) -> Option<SystemItem> {
let active_tasks: Vec<TaskEntry> = self
.task_store
.list()
.into_iter()
.filter(|task| matches!(task.status, TaskStatus::Pending | TaskStatus::Inprogress))
.collect();
if active_tasks.is_empty() {
return None;
}
let (since_task_management, since_reminder) = self.task_reminder_state.note_request();
if since_task_management < TASK_REMINDER_REQUEST_THRESHOLD
|| since_reminder < TASK_REMINDER_COOLDOWN_REQUESTS
{
return None;
}
self.task_reminder_state.note_reminder();
Some(
SystemReminder::task_inactivity(render_task_reminder_body(&active_tasks))
.into_system_item(),
)
}
}
fn is_task_management_tool(name: &str) -> bool {
TASK_MANAGEMENT_TOOL_NAMES.contains(&name)
}
fn render_task_reminder_body(active_tasks: &[TaskEntry]) -> String {
let mut body = String::from(
"Active session tasks are still open. If progress changed, call TaskUpdate.\n",
fn request_threshold_exceeded(&self, current_tokens: Option<u64>, context: &[Item]) -> bool {
if let Some(state) = self.compact_state.as_ref() {
if !state.is_disabled() && !state.just_compacted() {
let current = current_tokens.unwrap_or(0);
if state.exceeds_request(current) {
let shape = context_shape(context);
info!(
input_tokens = current,
threshold = state.request_threshold().unwrap_or(0),
items_len = shape.items_len,
items_json_bytes = shape.items_json_bytes,
reasoning_items = shape.reasoning_items,
reasoning_encrypted_content_count = shape.reasoning_encrypted_content_count,
reasoning_encrypted_content_bytes = shape.reasoning_encrypted_content_bytes,
"Between-requests compaction threshold exceeded, yielding"
);
for task in active_tasks {
body.push_str(&format!(
"- taskid {} ({}) {}\n",
task.taskid, task.status, task.subject
));
return true;
}
}
}
false
}
body.trim_end_matches('\n').to_string()
}
#[async_trait]
@ -275,13 +201,12 @@ impl Interceptor for PodInterceptor {
async fn pending_history_appends(&self) -> Vec<Item> {
let drained = self.pending_notifies.drain();
let task_reminder = self.task_reminder_system_item();
if drained.is_empty() && task_reminder.is_none() {
if drained.is_empty() {
return Vec::new();
}
let mut system_items: Vec<SystemItem> = Vec::with_capacity(drained.len() + 1);
let mut items: Vec<Item> = Vec::with_capacity(drained.len() + 1);
let mut system_items: Vec<SystemItem> = Vec::with_capacity(drained.len());
let mut items: Vec<Item> = Vec::with_capacity(drained.len());
for entry in drained {
match build_system_item(&entry, &self.prompts) {
Ok(system_item) => {
@ -304,41 +229,18 @@ impl Interceptor for PodInterceptor {
}
}
}
if let Some(system_item) = task_reminder {
items.push(system_item.to_history_item());
system_items.push(system_item);
}
self.commit_system_items(&system_items);
items
}
async fn pre_llm_request(&self, context: &mut Vec<Item>) -> PreRequestAction {
let current_tokens = self.estimated_tokens(context);
// Internal mechanism: between-requests compaction trigger (safety net).
if let Some(state) = self.compact_state.as_ref() {
if !state.is_disabled() && !state.just_compacted() {
let current = current_tokens.unwrap_or(0);
if state.exceeds_request(current) {
let shape = context_shape(context);
info!(
input_tokens = current,
threshold = state.request_threshold().unwrap_or(0),
items_len = shape.items_len,
items_json_bytes = shape.items_json_bytes,
reasoning_items = shape.reasoning_items,
reasoning_encrypted_content_count = shape.reasoning_encrypted_content_count,
reasoning_encrypted_content_bytes = shape.reasoning_encrypted_content_bytes,
"Between-requests compaction threshold exceeded, yielding"
);
let initial_tokens = self.estimated_tokens(context);
if self.request_threshold_exceeded(initial_tokens, context) {
return PreRequestAction::Yield;
}
}
}
let info = PreRequestInfo {
item_count: context.len(),
estimated_tokens: current_tokens,
estimated_tokens: initial_tokens,
turn_index: self.current_turn_index(),
tool_calls_this_turn: self.tool_calls_this_turn.load(Ordering::Relaxed),
};
@ -360,16 +262,36 @@ impl Interceptor for PodInterceptor {
.lock()
.expect("pending hook system-item queue poisoned"),
);
let appended_items: Vec<Item> = system_items
.iter()
.map(SystemItem::to_history_item)
.collect();
let effective_context = if appended_items.is_empty() {
Cow::Borrowed(context.as_slice())
} else {
let mut effective = context.clone();
effective.extend(appended_items.clone());
Cow::Owned(effective)
};
let current_tokens = self.estimated_tokens(effective_context.as_ref());
if self.request_threshold_exceeded(current_tokens, effective_context.as_ref()) {
self.commit_system_items(&system_items);
return if appended_items.is_empty() {
PreRequestAction::Yield
} else {
PreRequestAction::YieldWith(appended_items)
};
}
if let Some(usage_tracker) = self.usage_tracker.as_ref() {
usage_tracker.note_request(effective_context.len());
}
if system_items.is_empty() {
return PreRequestAction::Continue;
}
self.commit_system_items(&system_items);
PreRequestAction::ContinueWith(
system_items
.into_iter()
.map(|item| item.to_history_item())
.collect(),
)
PreRequestAction::ContinueWith(appended_items)
}
async fn pre_tool_call(&self, info: &mut ToolCallInfo) -> PreToolAction {
@ -384,9 +306,6 @@ impl Interceptor for PodInterceptor {
return action.into_worker_action(summary.call_id.clone());
}
}
if is_task_management_tool(&info.call.name) {
self.task_reminder_state.note_task_management();
}
self.tool_calls_this_turn.fetch_add(1, Ordering::Relaxed);
PreToolAction::Continue
}
@ -502,11 +421,12 @@ mod tests {
use std::sync::atomic::{AtomicBool, AtomicUsize};
use super::*;
use crate::feature::FeatureRegistryBuilder;
use crate::feature::builtin::TaskFeature;
use crate::hook::{
Hook, HookPostToolAction, HookPreRequestAction, HookPreToolAction, HookRegistryBuilder,
HookTurnEndAction, OnTurnEnd, PostToolCall, PreLlmRequest, PreToolCall,
};
use session_store::SystemReminderSource;
struct CountingHook(Arc<AtomicUsize>);
@ -552,25 +472,8 @@ mod tests {
}
}
fn interceptor_for_task_reminders(
task_store: TaskStore,
task_reminder_state: Arc<TaskReminderState>,
) -> PodInterceptor {
PodInterceptor::new(
Arc::new(HookRegistryBuilder::new().build()),
None,
None,
NotifyBuffer::new(),
Arc::new(Mutex::new(Vec::new())),
task_store,
task_reminder_state,
PromptCatalog::builtins_only().unwrap(),
None,
)
}
fn task_tool_call_info(name: &str, input: serde_json::Value) -> ToolCallInfo {
let def = tools::task_tools(TaskStore::new())
let def = tools::task_tools(tools::TaskStore::new())
.into_iter()
.find(|def| {
let (meta, _) = def();
@ -589,12 +492,6 @@ mod tests {
}
}
async fn call_pre_tool(interceptor: &PodInterceptor, name: &str) {
let mut info = task_tool_call_info(name, serde_json::json!({}));
let action = interceptor.pre_tool_call(&mut info).await;
assert!(matches!(action, PreToolAction::Continue));
}
/// Build a usage_history handle with a single record pinned at the
/// current `context_len` so that `total_tokens` returns exactly
/// `tokens` (Measured, no interpolation or byte-based fallback).
@ -623,8 +520,6 @@ mod tests {
Some(history),
NotifyBuffer::new(),
Arc::new(Mutex::new(Vec::new())),
TaskStore::new(),
Arc::new(TaskReminderState::new()),
PromptCatalog::builtins_only().unwrap(),
None,
);
@ -636,6 +531,41 @@ mod tests {
assert_eq!(count.load(Ordering::Relaxed), 0);
}
#[tokio::test]
async fn pre_llm_request_yields_with_hook_appends_when_post_append_threshold_exceeded() {
let saw_handle = Arc::new(AtomicBool::new(false));
let mut builder = HookRegistryBuilder::new();
builder.add_pre_llm_request(AppendingPreRequestHook {
saw_handle: Arc::clone(&saw_handle),
});
let registry = Arc::new(builder.build());
let state = Arc::new(CompactState::new(None, Some(50), 2));
let ctx_items = vec![Item::user_message("hi")];
let history = usage_handle_with(ctx_items.len(), 50);
let committed = Arc::new(Mutex::new(Vec::new()));
let interceptor = PodInterceptor::new(
registry,
Some(state),
Some(history),
NotifyBuffer::new(),
Arc::new(Mutex::new(Vec::new())),
PromptCatalog::builtins_only().unwrap(),
Some(Arc::new(RecordingSystemItemCommitter {
committed: Arc::clone(&committed),
})),
);
let mut ctx = ctx_items;
let action = interceptor.pre_llm_request(&mut ctx).await;
match action {
PreRequestAction::YieldWith(items) => assert_eq!(items.len(), 1),
other => panic!("expected YieldWith queued system item, got {other:?}"),
}
assert!(saw_handle.load(Ordering::Relaxed));
assert_eq!(committed.lock().expect("committed system items").len(), 1);
}
#[tokio::test]
async fn pre_llm_request_counts_in_flight_usage_records() {
let registry = Arc::new(HookRegistryBuilder::new().build());
@ -658,8 +588,6 @@ mod tests {
Some(history),
NotifyBuffer::new(),
Arc::new(Mutex::new(Vec::new())),
TaskStore::new(),
Arc::new(TaskReminderState::new()),
PromptCatalog::builtins_only().unwrap(),
None,
)
@ -685,8 +613,6 @@ mod tests {
Some(history),
NotifyBuffer::new(),
Arc::new(Mutex::new(Vec::new())),
TaskStore::new(),
Arc::new(TaskReminderState::new()),
PromptCatalog::builtins_only().unwrap(),
None,
);
@ -728,8 +654,6 @@ mod tests {
Some(history),
NotifyBuffer::new(),
Arc::new(Mutex::new(Vec::new())),
TaskStore::new(),
Arc::new(TaskReminderState::new()),
PromptCatalog::builtins_only().unwrap(),
None,
);
@ -757,8 +681,6 @@ mod tests {
Some(history),
NotifyBuffer::new(),
Arc::new(Mutex::new(Vec::new())),
TaskStore::new(),
Arc::new(TaskReminderState::new()),
PromptCatalog::builtins_only().unwrap(),
None,
);
@ -780,8 +702,6 @@ mod tests {
None,
NotifyBuffer::new(),
Arc::new(Mutex::new(Vec::new())),
TaskStore::new(),
Arc::new(TaskReminderState::new()),
PromptCatalog::builtins_only().unwrap(),
None,
);
@ -810,8 +730,6 @@ mod tests {
None,
NotifyBuffer::new(),
Arc::new(Mutex::new(Vec::new())),
TaskStore::new(),
Arc::new(TaskReminderState::new()),
PromptCatalog::builtins_only().unwrap(),
Some(committer),
);
@ -859,8 +777,6 @@ mod tests {
None,
NotifyBuffer::new(),
Arc::new(Mutex::new(Vec::new())),
TaskStore::new(),
Arc::new(TaskReminderState::new()),
PromptCatalog::builtins_only().unwrap(),
None,
);
@ -918,8 +834,6 @@ mod tests {
None,
NotifyBuffer::new(),
Arc::new(Mutex::new(Vec::new())),
TaskStore::new(),
Arc::new(TaskReminderState::new()),
PromptCatalog::builtins_only().unwrap(),
None,
);
@ -967,8 +881,6 @@ mod tests {
None,
NotifyBuffer::new(),
Arc::new(Mutex::new(Vec::new())),
TaskStore::new(),
Arc::new(TaskReminderState::new()),
PromptCatalog::builtins_only().unwrap(),
None,
);
@ -1017,8 +929,6 @@ mod tests {
None,
NotifyBuffer::new(),
Arc::new(Mutex::new(Vec::new())),
TaskStore::new(),
Arc::new(TaskReminderState::new()),
PromptCatalog::builtins_only().unwrap(),
None,
);
@ -1030,6 +940,75 @@ mod tests {
assert_eq!(count.load(Ordering::Relaxed), 1);
}
#[tokio::test]
async fn task_reminder_hook_append_is_counted_in_usage_request_len() {
let feature = TaskFeature::from_history(&[Item::tool_call(
"task-create-call",
"TaskCreate",
r#"{"subject":"track active work","description":"exercise reminder path"}"#,
)]);
let mut hook_builder = HookRegistryBuilder::new();
let mut pending_tools = Vec::new();
FeatureRegistryBuilder::new()
.with_module(feature)
.install_into_pending(&mut pending_tools, &mut hook_builder);
let registry = Arc::new(hook_builder.build());
let usage_tracker = Arc::new(UsageTracker::new());
let committed = Arc::new(Mutex::new(Vec::new()));
let interceptor = PodInterceptor::new(
registry,
None,
None,
NotifyBuffer::new(),
Arc::new(Mutex::new(Vec::new())),
PromptCatalog::builtins_only().unwrap(),
Some(Arc::new(RecordingSystemItemCommitter {
committed: Arc::clone(&committed),
})),
)
.with_usage_tracker(Arc::clone(&usage_tracker));
let ctx_items = vec![Item::user_message("hi")];
for _ in 0..23 {
let mut ctx = ctx_items.clone();
let action = interceptor.pre_llm_request(&mut ctx).await;
assert!(matches!(action, PreRequestAction::Continue));
usage_tracker.record_usage(&llm_worker::event::UsageEvent {
input_tokens: Some(10),
output_tokens: Some(0),
total_tokens: Some(10),
cache_read_input_tokens: Some(0),
cache_creation_input_tokens: Some(0),
});
}
let mut ctx = ctx_items.clone();
let action = interceptor.pre_llm_request(&mut ctx).await;
let appended_len = match action {
PreRequestAction::ContinueWith(items) => items.len(),
other => panic!("expected reminder append, got {other:?}"),
};
assert_eq!(appended_len, 1);
usage_tracker.record_usage(&llm_worker::event::UsageEvent {
input_tokens: Some(11),
output_tokens: Some(0),
total_tokens: Some(11),
cache_read_input_tokens: Some(0),
cache_creation_input_tokens: Some(0),
});
let records = usage_tracker.records();
assert_eq!(records.last().expect("usage record").history_len, 2);
let committed = committed
.lock()
.expect("committed system-item list poisoned");
assert_eq!(committed.len(), 1);
let SystemItem::TaskReminder { body, .. } = &committed[0] else {
panic!("expected task reminder, got {:?}", committed[0]);
};
assert!(body.contains("track active work"));
}
#[tokio::test]
async fn pending_history_appends_drains_buffer_into_items() {
let registry = Arc::new(HookRegistryBuilder::new().build());
@ -1043,8 +1022,6 @@ mod tests {
None,
buffer.clone(),
Arc::new(Mutex::new(Vec::new())),
TaskStore::new(),
Arc::new(TaskReminderState::new()),
PromptCatalog::builtins_only().unwrap(),
None,
);
@ -1067,207 +1044,6 @@ mod tests {
assert!(again.is_empty());
}
#[tokio::test]
async fn task_reminder_appends_after_inactive_request_threshold() {
let task_store = TaskStore::new();
task_store.create("keep going".into(), "long task description".into());
let interceptor =
interceptor_for_task_reminders(task_store, Arc::new(TaskReminderState::new()));
for _ in 0..TASK_REMINDER_REQUEST_THRESHOLD - 1 {
assert!(interceptor.pending_history_appends().await.is_empty());
}
let items = interceptor.pending_history_appends().await;
assert_eq!(items.len(), 1);
let body = items[0].as_text().unwrap_or_default();
assert_eq!(body.matches("<system-reminder>").count(), 1);
assert_eq!(body.matches("</system-reminder>").count(), 1);
assert!(body.contains("taskid 1"));
assert!(body.contains("pending"));
assert!(body.contains("keep going"));
assert!(!body.contains("long task description"));
}
#[test]
fn task_reminder_system_item_retains_source() {
let task_store = TaskStore::new();
task_store.create("typed".into(), String::new());
let interceptor =
interceptor_for_task_reminders(task_store, Arc::new(TaskReminderState::new()));
for _ in 0..TASK_REMINDER_REQUEST_THRESHOLD - 1 {
assert!(interceptor.task_reminder_system_item().is_none());
}
let item = interceptor.task_reminder_system_item().unwrap();
match item {
SystemItem::TaskReminder { source, body } => {
assert_eq!(source, SystemReminderSource::TaskInactivity);
assert_eq!(body.matches("<system-reminder>").count(), 1);
assert_eq!(body.matches("</system-reminder>").count(), 1);
assert!(body.contains("typed"));
}
other => panic!("unexpected: {other:?}"),
}
}
#[test]
fn render_task_reminder_body_is_unwrapped_for_system_reminder_helper() {
let task_store = TaskStore::new();
let task = task_store.create("body".into(), String::new());
let body = render_task_reminder_body(&[task]);
assert!(!body.contains("<system-reminder>"));
assert!(!body.contains("</system-reminder>"));
assert!(body.contains("TaskUpdate"));
assert!(body.contains("taskid 1"));
}
#[test]
fn task_reminder_state_starts_with_initial_cooldown_elapsed() {
let state = TaskReminderState::new();
assert_eq!(
state.requests_since_last_reminder.load(Ordering::Relaxed),
TASK_REMINDER_COOLDOWN_REQUESTS
);
assert_eq!(
state
.requests_since_last_task_management
.load(Ordering::Relaxed),
0
);
}
#[tokio::test]
async fn task_management_tool_call_resets_reminder_inactivity_counter() {
let task_store = TaskStore::new();
task_store.create("track me".into(), String::new());
let interceptor =
interceptor_for_task_reminders(task_store, Arc::new(TaskReminderState::new()));
for _ in 0..TASK_REMINDER_REQUEST_THRESHOLD - 1 {
assert!(interceptor.pending_history_appends().await.is_empty());
}
call_pre_tool(&interceptor, "TaskUpdate").await;
for _ in 0..TASK_REMINDER_REQUEST_THRESHOLD - 1 {
assert!(interceptor.pending_history_appends().await.is_empty());
}
assert_eq!(interceptor.pending_history_appends().await.len(), 1);
}
#[tokio::test]
async fn task_reminder_respects_cooldown_after_reminder() {
let task_store = TaskStore::new();
task_store.create("cooldown".into(), String::new());
let interceptor =
interceptor_for_task_reminders(task_store, Arc::new(TaskReminderState::new()));
for _ in 0..TASK_REMINDER_REQUEST_THRESHOLD {
let _ = interceptor.pending_history_appends().await;
}
for _ in 0..TASK_REMINDER_COOLDOWN_REQUESTS - 1 {
assert!(interceptor.pending_history_appends().await.is_empty());
}
assert_eq!(interceptor.pending_history_appends().await.len(), 1);
}
#[tokio::test]
async fn task_reminder_is_silent_when_no_active_tasks_exist() {
let task_store = TaskStore::new();
let done = task_store.create("done".into(), String::new()).taskid;
task_store
.update(done, Some(TaskStatus::Completed), None, None)
.expect("complete task");
let interceptor =
interceptor_for_task_reminders(task_store, Arc::new(TaskReminderState::new()));
for _ in 0..TASK_REMINDER_REQUEST_THRESHOLD * 2 {
assert!(interceptor.pending_history_appends().await.is_empty());
}
}
#[tokio::test]
async fn inactive_requests_without_active_tasks_do_not_prime_task_reminder() {
let task_store = TaskStore::new();
let interceptor =
interceptor_for_task_reminders(task_store.clone(), Arc::new(TaskReminderState::new()));
for _ in 0..TASK_REMINDER_REQUEST_THRESHOLD * 2 {
assert!(interceptor.pending_history_appends().await.is_empty());
}
task_store.create("new active".into(), String::new());
for _ in 0..TASK_REMINDER_REQUEST_THRESHOLD - 1 {
assert!(interceptor.pending_history_appends().await.is_empty());
}
assert_eq!(interceptor.pending_history_appends().await.len(), 1);
}
#[tokio::test]
async fn task_create_reset_does_not_block_first_reminder_cooldown() {
let task_store = TaskStore::new();
let state = Arc::new(TaskReminderState::new());
let interceptor = interceptor_for_task_reminders(task_store.clone(), state.clone());
for _ in 0..TASK_REMINDER_REQUEST_THRESHOLD * 2 {
assert!(interceptor.pending_history_appends().await.is_empty());
}
call_pre_tool(&interceptor, "TaskCreate").await;
task_store.create("created after idle".into(), String::new());
assert_eq!(
state.requests_since_last_reminder.load(Ordering::Relaxed),
TASK_REMINDER_COOLDOWN_REQUESTS,
"TaskCreate reset must not clear the initial reminder cooldown"
);
for _ in 0..TASK_REMINDER_REQUEST_THRESHOLD - 1 {
assert!(interceptor.pending_history_appends().await.is_empty());
}
assert_eq!(interceptor.pending_history_appends().await.len(), 1);
}
#[tokio::test]
async fn task_reminder_lands_in_pending_history_appends_lane() {
let task_store = TaskStore::new();
task_store.create("lane".into(), String::new());
let interceptor =
interceptor_for_task_reminders(task_store, Arc::new(TaskReminderState::new()));
let mut ctx = vec![Item::user_message("hi")];
for _ in 0..TASK_REMINDER_REQUEST_THRESHOLD {
let _ = interceptor.pending_history_appends().await;
}
let action = interceptor.pre_llm_request(&mut ctx).await;
assert!(matches!(action, PreRequestAction::Continue));
assert_eq!(ctx.len(), 1, "pre_llm_request must not inject reminders");
}
#[tokio::test]
async fn pre_llm_request_does_not_touch_task_reminder_lane() {
let task_store = TaskStore::new();
task_store.create("lane".into(), String::new());
let interceptor =
interceptor_for_task_reminders(task_store, Arc::new(TaskReminderState::new()));
let mut ctx = vec![Item::user_message("hi")];
for _ in 0..TASK_REMINDER_REQUEST_THRESHOLD - 1 {
assert!(interceptor.pending_history_appends().await.is_empty());
}
let action = interceptor.pre_llm_request(&mut ctx).await;
assert!(matches!(action, PreRequestAction::Continue));
assert_eq!(ctx.len(), 1, "pre_llm_request must not inject reminders");
let pending = interceptor.pending_history_appends().await;
assert_eq!(
pending.len(),
1,
"reminders stay in pending_history_appends"
);
}
#[tokio::test]
async fn pre_llm_request_does_not_touch_pending_notifies() {
// The drain lane has moved to `pending_history_appends`;
@ -1283,8 +1059,6 @@ mod tests {
None,
buffer.clone(),
Arc::new(Mutex::new(Vec::new())),
TaskStore::new(),
Arc::new(TaskReminderState::new()),
PromptCatalog::builtins_only().unwrap(),
None,
);
@ -1315,8 +1089,6 @@ mod tests {
None,
NotifyBuffer::new(),
Arc::new(Mutex::new(Vec::new())),
TaskStore::new(),
Arc::new(TaskReminderState::new()),
PromptCatalog::builtins_only().unwrap(),
None,
);

View File

@ -28,13 +28,14 @@ use manifest::{
use crate::compact::state::CompactState;
use crate::compact::usage_tracker::UsageTracker;
use crate::feature::builtin::TaskFeature;
use crate::feature::{FeatureRegistryBuilder, FeatureRegistryInstallReport};
use crate::hook::{
Hook, HookPreRequestAction, HookRegistryBuilder, OnAbort, OnPromptSubmit, OnTurnEnd,
PostToolCall, PreLlmRequest, PreRequestContext, PreToolCall,
Hook, HookRegistryBuilder, OnAbort, OnPromptSubmit, OnTurnEnd, PostToolCall, PreLlmRequest,
PreToolCall,
};
use crate::ipc::alerter::Alerter;
use crate::ipc::interceptor::{PodInterceptor, TaskReminderState};
use crate::ipc::interceptor::PodInterceptor;
use crate::ipc::notify_buffer::NotifyBuffer;
use crate::prompt::agents_md::read_agents_md;
use crate::prompt::catalog::{CatalogError, PromptCatalog};
@ -43,6 +44,7 @@ use crate::prompt::system::{SystemPromptContext, SystemPromptError, SystemPrompt
use crate::runtime::dir;
use crate::runtime::pod_registry::{self, ScopeAllocationGuard, ScopeLockError};
use crate::workflow::WorkflowResolveError;
#[cfg(test)]
use async_trait::async_trait;
use protocol::{
AlertLevel, AlertSource, Event, RewindSummary, RewindTarget, RewindTargetId, Segment,
@ -212,21 +214,6 @@ where
}
}
/// Pre-LLM-request hook that records `history.len()` at send time into a
/// shared `UsageTracker`. The on_usage callback later pairs this with the
/// aggregated UsageEvent to produce one `UsageRecord` per LLM call.
struct UsageTrackingHook {
tracker: Arc<UsageTracker>,
}
#[async_trait]
impl Hook<PreLlmRequest> for UsageTrackingHook {
async fn call(&self, info: &PreRequestContext) -> HookPreRequestAction {
self.tracker.note_request(info.item_count);
HookPreRequestAction::Continue
}
}
/// An independent agent execution unit.
///
/// Holds a [`Worker`] directly and persists session state via
@ -277,15 +264,10 @@ pub struct Pod<C: LlmClient, St: Store> {
/// tools so that Pod-owned operations (e.g. compaction) can consult
/// the recency of touched files.
tracker: Option<tools::Tracker>,
/// Pod-lifetime task store from the builtin `tools` crate. Shared by
/// TaskCreate / TaskUpdate / TaskList / TaskGet and preserved across
/// compaction by keeping the same handle while the Worker history is
/// replaced. Restored Pods reconstruct it by replaying Task* tool calls.
task_store: tools::TaskStore,
/// Session-lifetime counters for active-Task reminder nudges.
/// Restored Pods start these at zero; the only consequence is a delayed
/// first reminder after resume.
task_reminder_state: Arc<TaskReminderState>,
/// Built-in Task feature state shared by Task tools, reminder hooks, and
/// the narrow snapshot/restore surface Pod needs for compaction and rewind.
/// Store/reminder ownership stays inside the Task feature module.
task_feature: TaskFeature,
/// Parsed system-prompt template awaiting first-turn materialisation.
/// `Some` until `ensure_system_prompt_materialized` renders it once,
/// then `None` forever — including after compaction.
@ -440,8 +422,7 @@ impl<C: LlmClient + Clone + 'static, St: Store + Clone + 'static> Pod<C, St> {
metrics_tracker: Arc::new(crate::compact::metrics_tracker::MetricsTracker::new()),
usage_history: self.usage_history.clone(),
tracker: None,
task_store: self.task_store.clone(),
task_reminder_state: self.task_reminder_state.clone(),
task_feature: self.task_feature.clone(),
system_prompt_template: None,
alerter: self.alerter.clone(),
event_tx: self.event_tx.clone(),
@ -619,8 +600,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
metrics_tracker: Arc::new(crate::compact::metrics_tracker::MetricsTracker::new()),
usage_history: Arc::new(Mutex::new(Vec::<UsageRecord>::new())),
tracker: None,
task_store: tools::TaskStore::new(),
task_reminder_state: Arc::new(TaskReminderState::new()),
task_feature: TaskFeature::new(),
system_prompt_template: None,
alerter: None,
event_tx: None,
@ -842,7 +822,6 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
let tool_side_effect_warning = suffix_has_tool_side_effects(&entries[truncate_entries..]);
let state = segment_log::collect_state(&retained);
let extract_pointer = memory::extract::fold_pointer(&state.extensions);
let task_store = tools::TaskStore::from_history(&state.history);
let summary = RewindSummary {
truncated_to_entries: truncate_entries,
discarded_entries: entries.len().saturating_sub(truncate_entries),
@ -854,6 +833,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
self.segment_state.set_entries_written(truncate_entries);
self.sink.truncate_silent(truncate_entries);
self.task_feature.restore_from_history(&state.history);
self.worker_mut().set_history(state.history);
self.worker_mut().set_request_config(state.config);
self.worker_mut().set_turn_count(state.turn_count);
@ -869,7 +849,6 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
.extract_pointer
.lock()
.expect("extract_pointer poisoned") = extract_pointer;
self.task_store = task_store;
Ok(RewindAppliedState {
entries: retained,
@ -1013,16 +992,9 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
self.tracker = Some(tracker);
}
/// Attach the session-scoped TaskStore from the builtin `tools` crate.
/// Called by the Controller before registering builtin tools so the Pod
/// and Worker share one store.
pub fn attach_task_store(&mut self, task_store: tools::TaskStore) {
self.task_store = task_store;
}
/// Shared TaskStore handle.
pub fn task_store(&self) -> tools::TaskStore {
self.task_store.clone()
/// Built-in Task feature module and snapshot/restore facade.
pub(crate) fn task_feature(&self) -> TaskFeature {
self.task_feature.clone()
}
/// The attached session-scoped file-operation tracker, if any.
@ -1181,13 +1153,6 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
/// occupancy through the `UsageRecord` timeline.
fn ensure_interceptor_installed(&mut self) {
if !self.interceptor_installed {
// Pre-LLM-request hook: record the item count at send time
// so the on_usage callback can pair it with the measured
// input_tokens.
self.hook_builder.add_pre_llm_request(UsageTrackingHook {
tracker: self.usage_tracker.clone(),
});
let builder = std::mem::take(&mut self.hook_builder);
let registry = Arc::new(builder.build());
@ -1233,8 +1198,6 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
usage_history_handle,
self.pending_notifies.clone(),
self.pending_attachments.clone(),
self.task_store.clone(),
self.task_reminder_state.clone(),
self.prompts.clone(),
self.log_writer.clone(),
)
@ -2425,7 +2388,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
// Input text fed to the compact worker. Includes the default
// references, current TaskStore snapshot, and the (pruned)
// conversation text.
let task_snapshot_text = self.task_store.snapshot_text();
let task_snapshot_text = self.task_feature.snapshot_text();
let summary_input = build_summary_input(
&items_to_summarise,
&default_refs,
@ -2642,7 +2605,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
new_history.push(Item::tool_call("compact-tasklist", "TaskList", "{}"));
new_history.push(Item::tool_result_with_content(
"compact-tasklist",
tools::task::snapshot_overview(&self.task_store.list()),
self.task_feature.snapshot_overview(),
task_snapshot_text.clone(),
));
let result_estimate = llm_worker::token_counter::total_tokens(&new_history, &[]);
@ -3768,8 +3731,7 @@ where
metrics_tracker: Arc::new(crate::compact::metrics_tracker::MetricsTracker::new()),
usage_history: Arc::new(Mutex::new(Vec::new())),
tracker: None,
task_store: tools::TaskStore::new(),
task_reminder_state: Arc::new(TaskReminderState::new()),
task_feature: TaskFeature::new(),
system_prompt_template: common.system_prompt_template,
alerter: None,
event_tx: None,
@ -3847,8 +3809,7 @@ where
metrics_tracker: Arc::new(crate::compact::metrics_tracker::MetricsTracker::new()),
usage_history: Arc::new(Mutex::new(Vec::new())),
tracker: None,
task_store: tools::TaskStore::new(),
task_reminder_state: Arc::new(TaskReminderState::new()),
task_feature: TaskFeature::new(),
system_prompt_template: common.system_prompt_template,
alerter: None,
event_tx: None,
@ -4007,7 +3968,7 @@ where
}
let extract_pointer = memory::extract::fold_pointer(&state.extensions);
let task_store = tools::TaskStore::from_history(&state.history);
let task_feature = TaskFeature::from_history(&state.history);
let pod_metadata_writer = Some(pod_metadata_writer_for_store(&store));
let mut pod = Self {
@ -4025,8 +3986,7 @@ where
metrics_tracker: Arc::new(crate::compact::metrics_tracker::MetricsTracker::new()),
usage_history: Arc::new(Mutex::new(state.usage_history)),
tracker: None,
task_store,
task_reminder_state: Arc::new(TaskReminderState::new()),
task_feature,
// Restore replays the saved system_prompt verbatim — no
// template re-render on resume.
system_prompt_template: None,