merge: session task reminders

This commit is contained in:
Keisuke Hirata 2026-05-29 13:30:25 +09:00
commit 4c3b09e789
No known key found for this signature in database
3 changed files with 331 additions and 8 deletions

View File

@ -23,6 +23,7 @@ use tracing::warn;
use crate::compact::state::CompactState;
use session_store::SystemItem;
use tools::{TaskEntry, TaskStatus, TaskStore};
use crate::hook::{
AbortInfo, HookPromptAction, HookRegistry, PreRequestInfo, PromptSubmitInfo, ToolCallSummary,
@ -36,6 +37,53 @@ use llm_worker::token_counter::total_tokens;
/// Maximum number of bytes copied into `TurnEndInfo::final_text_preview`.
const FINAL_TEXT_PREVIEW_LIMIT: usize = 512;
const TASK_REMINDER_REQUEST_THRESHOLD: usize = 8;
const TASK_REMINDER_COOLDOWN_REQUESTS: usize = 8;
const TASK_MANAGEMENT_TOOL_NAMES: [&str; 2] = ["TaskCreate", "TaskUpdate"];
#[derive(Debug)]
pub(crate) struct TaskReminderState {
requests_since_last_task_management: AtomicUsize,
requests_since_last_reminder: AtomicUsize,
}
impl Default for TaskReminderState {
fn default() -> Self {
Self {
requests_since_last_task_management: AtomicUsize::new(0),
requests_since_last_reminder: AtomicUsize::new(TASK_REMINDER_COOLDOWN_REQUESTS),
}
}
}
impl TaskReminderState {
pub(crate) fn new() -> Self {
Self::default()
}
fn note_request(&self) -> (usize, usize) {
let since_task_management = self
.requests_since_last_task_management
.fetch_add(1, Ordering::Relaxed)
.saturating_add(1);
let since_reminder = self
.requests_since_last_reminder
.fetch_add(1, Ordering::Relaxed)
.saturating_add(1);
(since_task_management, since_reminder)
}
fn note_task_management(&self) {
self.requests_since_last_task_management
.store(0, Ordering::Relaxed);
}
fn note_reminder(&self) {
self.requests_since_last_reminder
.store(0, Ordering::Relaxed);
}
}
pub(crate) struct PodInterceptor {
registry: Arc<HookRegistry>,
compact_state: Option<Arc<CompactState>>,
@ -55,7 +103,12 @@ pub(crate) struct PodInterceptor {
/// `PromptAction::ContinueWith`. Populated by `Pod::run`
/// immediately before handing off to the worker.
pending_attachments: Arc<Mutex<Vec<SystemItem>>>,
/// Prompt catalog used to render the injected notification wrapper.
/// Task state observed by built-in task tools. Used to nudge the main
/// worker when active tasks have gone unmentioned for several requests.
task_store: TaskStore,
task_reminder_state: Arc<TaskReminderState>,
/// Prompt catalog used to render pending notification entries into the
/// same system-message text that will be persisted in history.
prompts: Arc<PromptCatalog>,
/// Type-erased commit handle. The interceptor uses it to commit
/// `LogEntry::SystemItem` entries directly (sync) before
@ -76,6 +129,8 @@ impl PodInterceptor {
usage_history: Option<Arc<Mutex<Vec<UsageRecord>>>>,
pending_notifies: NotifyBuffer,
pending_attachments: Arc<Mutex<Vec<SystemItem>>>,
task_store: TaskStore,
task_reminder_state: Arc<TaskReminderState>,
prompts: Arc<PromptCatalog>,
log_writer: Option<Arc<dyn SystemItemCommitter>>,
) -> Self {
@ -85,6 +140,8 @@ impl PodInterceptor {
usage_history,
pending_notifies,
pending_attachments,
task_store,
task_reminder_state,
prompts,
log_writer,
next_turn_index: AtomicUsize::new(0),
@ -121,6 +178,48 @@ impl PodInterceptor {
let records = handle.lock().expect("usage_history poisoned").clone();
Some(total_tokens(context, &records).tokens)
}
fn task_reminder_system_item(&self) -> Option<SystemItem> {
let active_tasks: Vec<TaskEntry> = self
.task_store
.list()
.into_iter()
.filter(|task| matches!(task.status, TaskStatus::Pending | TaskStatus::Inprogress))
.collect();
if active_tasks.is_empty() {
return None;
}
let (since_task_management, since_reminder) = self.task_reminder_state.note_request();
if since_task_management < TASK_REMINDER_REQUEST_THRESHOLD
|| since_reminder < TASK_REMINDER_COOLDOWN_REQUESTS
{
return None;
}
self.task_reminder_state.note_reminder();
Some(SystemItem::TaskReminder {
body: render_task_reminder(&active_tasks),
})
}
}
fn is_task_management_tool(name: &str) -> bool {
TASK_MANAGEMENT_TOOL_NAMES.contains(&name)
}
fn render_task_reminder(active_tasks: &[TaskEntry]) -> String {
let mut body = String::from(
"<system-reminder>\nActive session tasks are still open. If progress changed, call TaskUpdate.\n",
);
for task in active_tasks {
body.push_str(&format!(
"- taskid {} ({}) {}\n",
task.taskid, task.status, task.subject
));
}
body.push_str("</system-reminder>");
body
}
#[async_trait]
@ -161,11 +260,13 @@ impl Interceptor for PodInterceptor {
async fn pending_history_appends(&self) -> Vec<Item> {
let drained = self.pending_notifies.drain();
if drained.is_empty() {
let task_reminder = self.task_reminder_system_item();
if drained.is_empty() && task_reminder.is_none() {
return Vec::new();
}
let mut system_items: Vec<SystemItem> = Vec::with_capacity(drained.len());
let mut items: Vec<Item> = Vec::with_capacity(drained.len());
let mut system_items: Vec<SystemItem> = Vec::with_capacity(drained.len() + 1);
let mut items: Vec<Item> = Vec::with_capacity(drained.len() + 1);
for entry in drained {
match build_system_item(&entry, &self.prompts) {
Ok(system_item) => {
@ -188,6 +289,10 @@ impl Interceptor for PodInterceptor {
}
}
}
if let Some(system_item) = task_reminder {
items.push(system_item.to_history_item());
system_items.push(system_item);
}
self.commit_system_items(&system_items);
items
}
@ -237,6 +342,9 @@ impl Interceptor for PodInterceptor {
return action;
}
}
if is_task_management_tool(&info.call.name) {
self.task_reminder_state.note_task_management();
}
self.tool_calls_this_turn.fetch_add(1, Ordering::Relaxed);
PreToolAction::Continue
}
@ -333,12 +441,51 @@ mod tests {
}
}
fn registry_with_pre_llm_hook(counter: Arc<AtomicUsize>) -> Arc<HookRegistry> {
fn registry_with_pre_llm_hook(count: Arc<AtomicUsize>) -> Arc<HookRegistry> {
let mut builder = HookRegistryBuilder::new();
builder.add_pre_llm_request(CountingHook(counter));
builder.add_pre_llm_request(CountingHook(count));
Arc::new(builder.build())
}
fn interceptor_for_task_reminders(
task_store: TaskStore,
task_reminder_state: Arc<TaskReminderState>,
) -> PodInterceptor {
PodInterceptor::new(
Arc::new(HookRegistryBuilder::new().build()),
None,
None,
NotifyBuffer::new(),
Arc::new(Mutex::new(Vec::new())),
task_store,
task_reminder_state,
PromptCatalog::builtins_only().unwrap(),
None,
)
}
async fn call_pre_tool(interceptor: &PodInterceptor, name: &str) {
let def = tools::task_tools(TaskStore::new())
.into_iter()
.find(|def| {
let (meta, _) = def();
meta.name == name
})
.expect("task tool definition");
let (meta, tool) = def();
let mut info = ToolCallInfo {
call: llm_worker::tool::ToolCall {
id: "call-id".into(),
name: name.into(),
input: serde_json::json!({}),
},
meta,
tool,
};
let action = interceptor.pre_tool_call(&mut info).await;
assert!(matches!(action, PreToolAction::Continue));
}
/// Build a usage_history handle with a single record pinned at the
/// current `context_len` so that `total_tokens` returns exactly
/// `tokens` (Measured, no interpolation or byte-based fallback).
@ -367,6 +514,8 @@ mod tests {
Some(history),
NotifyBuffer::new(),
Arc::new(Mutex::new(Vec::new())),
TaskStore::new(),
Arc::new(TaskReminderState::new()),
PromptCatalog::builtins_only().unwrap(),
None,
);
@ -393,6 +542,8 @@ mod tests {
Some(history),
NotifyBuffer::new(),
Arc::new(Mutex::new(Vec::new())),
TaskStore::new(),
Arc::new(TaskReminderState::new()),
PromptCatalog::builtins_only().unwrap(),
None,
);
@ -420,6 +571,8 @@ mod tests {
Some(history),
NotifyBuffer::new(),
Arc::new(Mutex::new(Vec::new())),
TaskStore::new(),
Arc::new(TaskReminderState::new()),
PromptCatalog::builtins_only().unwrap(),
None,
);
@ -441,6 +594,8 @@ mod tests {
None,
NotifyBuffer::new(),
Arc::new(Mutex::new(Vec::new())),
TaskStore::new(),
Arc::new(TaskReminderState::new()),
PromptCatalog::builtins_only().unwrap(),
None,
);
@ -474,6 +629,8 @@ mod tests {
None,
buffer.clone(),
Arc::new(Mutex::new(Vec::new())),
TaskStore::new(),
Arc::new(TaskReminderState::new()),
PromptCatalog::builtins_only().unwrap(),
None,
);
@ -496,6 +653,150 @@ mod tests {
assert!(again.is_empty());
}
#[tokio::test]
async fn task_reminder_appends_after_inactive_request_threshold() {
let task_store = TaskStore::new();
task_store.create("keep going".into(), "long task description".into());
let interceptor =
interceptor_for_task_reminders(task_store, Arc::new(TaskReminderState::new()));
for _ in 0..TASK_REMINDER_REQUEST_THRESHOLD - 1 {
assert!(interceptor.pending_history_appends().await.is_empty());
}
let items = interceptor.pending_history_appends().await;
assert_eq!(items.len(), 1);
let body = items[0].as_text().unwrap_or_default();
assert!(body.contains("<system-reminder>"));
assert!(body.contains("</system-reminder>"));
assert!(body.contains("taskid 1"));
assert!(body.contains("pending"));
assert!(body.contains("keep going"));
assert!(!body.contains("long task description"));
}
#[test]
fn task_reminder_state_starts_with_initial_cooldown_elapsed() {
let state = TaskReminderState::new();
assert_eq!(
state.requests_since_last_reminder.load(Ordering::Relaxed),
TASK_REMINDER_COOLDOWN_REQUESTS
);
assert_eq!(
state
.requests_since_last_task_management
.load(Ordering::Relaxed),
0
);
}
#[tokio::test]
async fn task_management_tool_call_resets_reminder_inactivity_counter() {
let task_store = TaskStore::new();
task_store.create("track me".into(), String::new());
let interceptor =
interceptor_for_task_reminders(task_store, Arc::new(TaskReminderState::new()));
for _ in 0..TASK_REMINDER_REQUEST_THRESHOLD - 1 {
assert!(interceptor.pending_history_appends().await.is_empty());
}
call_pre_tool(&interceptor, "TaskUpdate").await;
for _ in 0..TASK_REMINDER_REQUEST_THRESHOLD - 1 {
assert!(interceptor.pending_history_appends().await.is_empty());
}
assert_eq!(interceptor.pending_history_appends().await.len(), 1);
}
#[tokio::test]
async fn task_reminder_respects_cooldown_after_reminder() {
let task_store = TaskStore::new();
task_store.create("cooldown".into(), String::new());
let interceptor =
interceptor_for_task_reminders(task_store, Arc::new(TaskReminderState::new()));
for _ in 0..TASK_REMINDER_REQUEST_THRESHOLD {
let _ = interceptor.pending_history_appends().await;
}
for _ in 0..TASK_REMINDER_COOLDOWN_REQUESTS - 1 {
assert!(interceptor.pending_history_appends().await.is_empty());
}
assert_eq!(interceptor.pending_history_appends().await.len(), 1);
}
#[tokio::test]
async fn task_reminder_is_silent_when_no_active_tasks_exist() {
let task_store = TaskStore::new();
let done = task_store.create("done".into(), String::new()).taskid;
task_store
.update(done, Some(TaskStatus::Completed), None, None)
.expect("complete task");
let interceptor =
interceptor_for_task_reminders(task_store, Arc::new(TaskReminderState::new()));
for _ in 0..TASK_REMINDER_REQUEST_THRESHOLD * 2 {
assert!(interceptor.pending_history_appends().await.is_empty());
}
}
#[tokio::test]
async fn inactive_requests_without_active_tasks_do_not_prime_task_reminder() {
let task_store = TaskStore::new();
let interceptor =
interceptor_for_task_reminders(task_store.clone(), Arc::new(TaskReminderState::new()));
for _ in 0..TASK_REMINDER_REQUEST_THRESHOLD * 2 {
assert!(interceptor.pending_history_appends().await.is_empty());
}
task_store.create("new active".into(), String::new());
for _ in 0..TASK_REMINDER_REQUEST_THRESHOLD - 1 {
assert!(interceptor.pending_history_appends().await.is_empty());
}
assert_eq!(interceptor.pending_history_appends().await.len(), 1);
}
#[tokio::test]
async fn task_create_reset_does_not_block_first_reminder_cooldown() {
let task_store = TaskStore::new();
let state = Arc::new(TaskReminderState::new());
let interceptor = interceptor_for_task_reminders(task_store.clone(), state.clone());
for _ in 0..TASK_REMINDER_REQUEST_THRESHOLD * 2 {
assert!(interceptor.pending_history_appends().await.is_empty());
}
call_pre_tool(&interceptor, "TaskCreate").await;
task_store.create("created after idle".into(), String::new());
assert_eq!(
state.requests_since_last_reminder.load(Ordering::Relaxed),
TASK_REMINDER_COOLDOWN_REQUESTS,
"TaskCreate reset must not clear the initial reminder cooldown"
);
for _ in 0..TASK_REMINDER_REQUEST_THRESHOLD - 1 {
assert!(interceptor.pending_history_appends().await.is_empty());
}
assert_eq!(interceptor.pending_history_appends().await.len(), 1);
}
#[tokio::test]
async fn task_reminder_lands_in_pending_history_appends_lane() {
let task_store = TaskStore::new();
task_store.create("lane".into(), String::new());
let interceptor =
interceptor_for_task_reminders(task_store, Arc::new(TaskReminderState::new()));
let mut ctx = vec![Item::user_message("hi")];
for _ in 0..TASK_REMINDER_REQUEST_THRESHOLD {
let _ = interceptor.pending_history_appends().await;
}
let action = interceptor.pre_llm_request(&mut ctx).await;
assert!(matches!(action, PreRequestAction::Continue));
assert_eq!(ctx.len(), 1, "pre_llm_request must not inject reminders");
}
#[tokio::test]
async fn pre_llm_request_does_not_touch_pending_notifies() {
// The drain lane has moved to `pending_history_appends`;
@ -511,6 +812,8 @@ mod tests {
None,
buffer.clone(),
Arc::new(Mutex::new(Vec::new())),
TaskStore::new(),
Arc::new(TaskReminderState::new()),
PromptCatalog::builtins_only().unwrap(),
None,
);
@ -541,6 +844,8 @@ mod tests {
None,
NotifyBuffer::new(),
Arc::new(Mutex::new(Vec::new())),
TaskStore::new(),
Arc::new(TaskReminderState::new()),
PromptCatalog::builtins_only().unwrap(),
None,
);

View File

@ -29,7 +29,7 @@ use crate::hook::{
PreRequestInfo, PreToolCall,
};
use crate::ipc::alerter::Alerter;
use crate::ipc::interceptor::PodInterceptor;
use crate::ipc::interceptor::{PodInterceptor, TaskReminderState};
use crate::ipc::notify_buffer::NotifyBuffer;
use crate::prompt::agents_md::read_agents_md;
use crate::prompt::catalog::{CatalogError, PromptCatalog};
@ -272,6 +272,10 @@ pub struct Pod<C: LlmClient, St: Store> {
/// compaction by keeping the same handle while the Worker history is
/// replaced. Restored Pods reconstruct it by replaying Task* tool calls.
task_store: tools::TaskStore,
/// Session-lifetime counters for active-Task reminder nudges.
/// Restored Pods start these at zero; the only consequence is a delayed
/// first reminder after resume.
task_reminder_state: Arc<TaskReminderState>,
/// Parsed system-prompt template awaiting first-turn materialisation.
/// `Some` until `ensure_system_prompt_materialized` renders it once,
/// then `None` forever — including after compaction.
@ -431,6 +435,7 @@ impl<C: LlmClient + Clone + 'static, St: Store + Clone + 'static> Pod<C, St> {
usage_history: self.usage_history.clone(),
tracker: None,
task_store: self.task_store.clone(),
task_reminder_state: self.task_reminder_state.clone(),
system_prompt_template: None,
alerter: self.alerter.clone(),
event_tx: self.event_tx.clone(),
@ -610,6 +615,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
usage_history: Arc::new(Mutex::new(Vec::<UsageRecord>::new())),
tracker: None,
task_store: tools::TaskStore::new(),
task_reminder_state: Arc::new(TaskReminderState::new()),
system_prompt_template: None,
alerter: None,
event_tx: None,
@ -1260,6 +1266,8 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
usage_history_handle,
self.pending_notifies.clone(),
self.pending_attachments.clone(),
self.task_store.clone(),
self.task_reminder_state.clone(),
self.prompts.clone(),
self.log_writer.clone(),
);
@ -3797,6 +3805,7 @@ where
usage_history: Arc::new(Mutex::new(Vec::new())),
tracker: None,
task_store: tools::TaskStore::new(),
task_reminder_state: Arc::new(TaskReminderState::new()),
system_prompt_template: common.system_prompt_template,
alerter: None,
event_tx: None,
@ -3876,6 +3885,7 @@ where
usage_history: Arc::new(Mutex::new(Vec::new())),
tracker: None,
task_store: tools::TaskStore::new(),
task_reminder_state: Arc::new(TaskReminderState::new()),
system_prompt_template: common.system_prompt_template,
alerter: None,
event_tx: None,
@ -4052,6 +4062,7 @@ where
usage_history: Arc::new(Mutex::new(state.usage_history)),
tracker: None,
task_store,
task_reminder_state: Arc::new(TaskReminderState::new()),
// Restore replays the saved system_prompt verbatim — no
// template re-render on resume.
system_prompt_template: None,

View File

@ -35,7 +35,7 @@ use serde::{Deserialize, Serialize};
/// resume.
///
/// New variants get added here as fresh injection kinds come online
/// (e.g. `Reminder`). The `kind` JSON tag is the snake_case form of
/// (e.g. `TaskReminder`). The `kind` JSON tag is the snake_case form of
/// the variant name.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
@ -68,6 +68,11 @@ pub enum SystemItem {
/// prompt body materialized into the LLM context.
Workflow { slug: String, body: String },
/// Task-management inactivity reminder inserted before an LLM request.
/// `body` is the exact LLM-context text wrapped in a
/// `<system-reminder>` block.
TaskReminder { body: String },
/// Synthetic note inserted after an interrupted turn before the next
/// user input. `body` is the exact LLM-context text explaining that the
/// previous turn was cut short.
@ -84,6 +89,7 @@ impl SystemItem {
SystemItem::FileAttachment { body, .. } => body.clone(),
SystemItem::Knowledge { body, .. } => body.clone(),
SystemItem::Workflow { body, .. } => body.clone(),
SystemItem::TaskReminder { body } => body.clone(),
SystemItem::Interrupt { body } => body.clone(),
}
}
@ -103,6 +109,7 @@ impl SystemItem {
SystemItem::FileAttachment { .. } => "file_attachment",
SystemItem::Knowledge { .. } => "knowledge",
SystemItem::Workflow { .. } => "workflow",
SystemItem::TaskReminder { .. } => "task_reminder",
SystemItem::Interrupt { .. } => "interrupt",
}
}