From 4f1b17b9bf07124249d0407708f87064ab063693 Mon Sep 17 00:00:00 2001 From: Hare Date: Fri, 29 May 2026 13:23:01 +0900 Subject: [PATCH 1/2] pod: remind inactive task management --- crates/pod/src/ipc/interceptor.rs | 268 +++++++++++++++++++++++- crates/pod/src/pod.rs | 13 +- crates/session-store/src/system_item.rs | 9 +- 3 files changed, 282 insertions(+), 8 deletions(-) diff --git a/crates/pod/src/ipc/interceptor.rs b/crates/pod/src/ipc/interceptor.rs index 2a0b8e14..42b4ca51 100644 --- a/crates/pod/src/ipc/interceptor.rs +++ b/crates/pod/src/ipc/interceptor.rs @@ -23,6 +23,7 @@ use tracing::warn; use crate::compact::state::CompactState; use session_store::SystemItem; +use tools::{TaskEntry, TaskStatus, TaskStore}; use crate::hook::{ AbortInfo, HookPromptAction, HookRegistry, PreRequestInfo, PromptSubmitInfo, ToolCallSummary, @@ -36,6 +37,44 @@ use llm_worker::token_counter::total_tokens; /// Maximum number of bytes copied into `TurnEndInfo::final_text_preview`. const FINAL_TEXT_PREVIEW_LIMIT: usize = 512; +const TASK_REMINDER_REQUEST_THRESHOLD: usize = 8; +const TASK_REMINDER_COOLDOWN_REQUESTS: usize = 8; +const TASK_MANAGEMENT_TOOL_NAMES: [&str; 2] = ["TaskCreate", "TaskUpdate"]; + +#[derive(Debug, Default)] +pub(crate) struct TaskReminderState { + requests_since_last_task_management: AtomicUsize, + requests_since_last_reminder: AtomicUsize, +} + +impl TaskReminderState { + pub(crate) fn new() -> Self { + Self::default() + } + + fn note_request(&self) -> (usize, usize) { + let since_task_management = self + .requests_since_last_task_management + .fetch_add(1, Ordering::Relaxed) + .saturating_add(1); + let since_reminder = self + .requests_since_last_reminder + .fetch_add(1, Ordering::Relaxed) + .saturating_add(1); + (since_task_management, since_reminder) + } + + fn note_task_management(&self) { + self.requests_since_last_task_management + .store(0, Ordering::Relaxed); + } + + fn note_reminder(&self) { + self.requests_since_last_reminder + .store(0, Ordering::Relaxed); + } +} + pub(crate) struct PodInterceptor { registry: Arc, compact_state: Option>, @@ -55,7 +94,12 @@ pub(crate) struct PodInterceptor { /// `PromptAction::ContinueWith`. Populated by `Pod::run` /// immediately before handing off to the worker. pending_attachments: Arc>>, - /// Prompt catalog used to render the injected notification wrapper. + /// Task state observed by built-in task tools. Used to nudge the main + /// worker when active tasks have gone unmentioned for several requests. + task_store: TaskStore, + task_reminder_state: Arc, + /// Prompt catalog used to render pending notification entries into the + /// same system-message text that will be persisted in history. prompts: Arc, /// Type-erased commit handle. The interceptor uses it to commit /// `LogEntry::SystemItem` entries directly (sync) before @@ -76,6 +120,8 @@ impl PodInterceptor { usage_history: Option>>>, pending_notifies: NotifyBuffer, pending_attachments: Arc>>, + task_store: TaskStore, + task_reminder_state: Arc, prompts: Arc, log_writer: Option>, ) -> Self { @@ -85,6 +131,8 @@ impl PodInterceptor { usage_history, pending_notifies, pending_attachments, + task_store, + task_reminder_state, prompts, log_writer, next_turn_index: AtomicUsize::new(0), @@ -121,6 +169,48 @@ impl PodInterceptor { let records = handle.lock().expect("usage_history poisoned").clone(); Some(total_tokens(context, &records).tokens) } + + fn task_reminder_system_item(&self) -> Option { + let active_tasks: Vec = self + .task_store + .list() + .into_iter() + .filter(|task| matches!(task.status, TaskStatus::Pending | TaskStatus::Inprogress)) + .collect(); + if active_tasks.is_empty() { + return None; + } + + let (since_task_management, since_reminder) = self.task_reminder_state.note_request(); + if since_task_management < TASK_REMINDER_REQUEST_THRESHOLD + || since_reminder < TASK_REMINDER_COOLDOWN_REQUESTS + { + return None; + } + + self.task_reminder_state.note_reminder(); + Some(SystemItem::TaskReminder { + body: render_task_reminder(&active_tasks), + }) + } +} + +fn is_task_management_tool(name: &str) -> bool { + TASK_MANAGEMENT_TOOL_NAMES.contains(&name) +} + +fn render_task_reminder(active_tasks: &[TaskEntry]) -> String { + let mut body = String::from( + "\nActive session tasks are still open. If progress changed, call TaskUpdate.\n", + ); + for task in active_tasks { + body.push_str(&format!( + "- taskid {} ({}) {}\n", + task.taskid, task.status, task.subject + )); + } + body.push_str(""); + body } #[async_trait] @@ -161,11 +251,13 @@ impl Interceptor for PodInterceptor { async fn pending_history_appends(&self) -> Vec { let drained = self.pending_notifies.drain(); - if drained.is_empty() { + let task_reminder = self.task_reminder_system_item(); + if drained.is_empty() && task_reminder.is_none() { return Vec::new(); } - let mut system_items: Vec = Vec::with_capacity(drained.len()); - let mut items: Vec = Vec::with_capacity(drained.len()); + + let mut system_items: Vec = Vec::with_capacity(drained.len() + 1); + let mut items: Vec = Vec::with_capacity(drained.len() + 1); for entry in drained { match build_system_item(&entry, &self.prompts) { Ok(system_item) => { @@ -188,6 +280,10 @@ impl Interceptor for PodInterceptor { } } } + if let Some(system_item) = task_reminder { + items.push(system_item.to_history_item()); + system_items.push(system_item); + } self.commit_system_items(&system_items); items } @@ -237,6 +333,9 @@ impl Interceptor for PodInterceptor { return action; } } + if is_task_management_tool(&info.call.name) { + self.task_reminder_state.note_task_management(); + } self.tool_calls_this_turn.fetch_add(1, Ordering::Relaxed); PreToolAction::Continue } @@ -333,12 +432,51 @@ mod tests { } } - fn registry_with_pre_llm_hook(counter: Arc) -> Arc { + fn registry_with_pre_llm_hook(count: Arc) -> Arc { let mut builder = HookRegistryBuilder::new(); - builder.add_pre_llm_request(CountingHook(counter)); + builder.add_pre_llm_request(CountingHook(count)); Arc::new(builder.build()) } + fn interceptor_for_task_reminders( + task_store: TaskStore, + task_reminder_state: Arc, + ) -> PodInterceptor { + PodInterceptor::new( + Arc::new(HookRegistryBuilder::new().build()), + None, + None, + NotifyBuffer::new(), + Arc::new(Mutex::new(Vec::new())), + task_store, + task_reminder_state, + PromptCatalog::builtins_only().unwrap(), + None, + ) + } + + async fn call_pre_tool(interceptor: &PodInterceptor, name: &str) { + let def = tools::task_tools(TaskStore::new()) + .into_iter() + .find(|def| { + let (meta, _) = def(); + meta.name == name + }) + .expect("task tool definition"); + let (meta, tool) = def(); + let mut info = ToolCallInfo { + call: llm_worker::tool::ToolCall { + id: "call-id".into(), + name: name.into(), + input: serde_json::json!({}), + }, + meta, + tool, + }; + let action = interceptor.pre_tool_call(&mut info).await; + assert!(matches!(action, PreToolAction::Continue)); + } + /// Build a usage_history handle with a single record pinned at the /// current `context_len` so that `total_tokens` returns exactly /// `tokens` (Measured, no interpolation or byte-based fallback). @@ -367,6 +505,8 @@ mod tests { Some(history), NotifyBuffer::new(), Arc::new(Mutex::new(Vec::new())), + TaskStore::new(), + Arc::new(TaskReminderState::new()), PromptCatalog::builtins_only().unwrap(), None, ); @@ -393,6 +533,8 @@ mod tests { Some(history), NotifyBuffer::new(), Arc::new(Mutex::new(Vec::new())), + TaskStore::new(), + Arc::new(TaskReminderState::new()), PromptCatalog::builtins_only().unwrap(), None, ); @@ -420,6 +562,8 @@ mod tests { Some(history), NotifyBuffer::new(), Arc::new(Mutex::new(Vec::new())), + TaskStore::new(), + Arc::new(TaskReminderState::new()), PromptCatalog::builtins_only().unwrap(), None, ); @@ -441,6 +585,8 @@ mod tests { None, NotifyBuffer::new(), Arc::new(Mutex::new(Vec::new())), + TaskStore::new(), + Arc::new(TaskReminderState::new()), PromptCatalog::builtins_only().unwrap(), None, ); @@ -474,6 +620,8 @@ mod tests { None, buffer.clone(), Arc::new(Mutex::new(Vec::new())), + TaskStore::new(), + Arc::new(TaskReminderState::new()), PromptCatalog::builtins_only().unwrap(), None, ); @@ -496,6 +644,110 @@ mod tests { assert!(again.is_empty()); } + #[tokio::test] + async fn task_reminder_appends_after_inactive_request_threshold() { + let task_store = TaskStore::new(); + task_store.create("keep going".into(), "long task description".into()); + let interceptor = + interceptor_for_task_reminders(task_store, Arc::new(TaskReminderState::new())); + + for _ in 0..TASK_REMINDER_REQUEST_THRESHOLD - 1 { + assert!(interceptor.pending_history_appends().await.is_empty()); + } + let items = interceptor.pending_history_appends().await; + assert_eq!(items.len(), 1); + let body = items[0].as_text().unwrap_or_default(); + assert!(body.contains("")); + assert!(body.contains("")); + assert!(body.contains("taskid 1")); + assert!(body.contains("pending")); + assert!(body.contains("keep going")); + assert!(!body.contains("long task description")); + } + + #[tokio::test] + async fn task_management_tool_call_resets_reminder_inactivity_counter() { + let task_store = TaskStore::new(); + task_store.create("track me".into(), String::new()); + let interceptor = + interceptor_for_task_reminders(task_store, Arc::new(TaskReminderState::new())); + + for _ in 0..TASK_REMINDER_REQUEST_THRESHOLD - 1 { + assert!(interceptor.pending_history_appends().await.is_empty()); + } + call_pre_tool(&interceptor, "TaskUpdate").await; + + for _ in 0..TASK_REMINDER_REQUEST_THRESHOLD - 1 { + assert!(interceptor.pending_history_appends().await.is_empty()); + } + assert_eq!(interceptor.pending_history_appends().await.len(), 1); + } + + #[tokio::test] + async fn task_reminder_respects_cooldown_after_reminder() { + let task_store = TaskStore::new(); + task_store.create("cooldown".into(), String::new()); + let interceptor = + interceptor_for_task_reminders(task_store, Arc::new(TaskReminderState::new())); + + for _ in 0..TASK_REMINDER_REQUEST_THRESHOLD { + let _ = interceptor.pending_history_appends().await; + } + for _ in 0..TASK_REMINDER_COOLDOWN_REQUESTS - 1 { + assert!(interceptor.pending_history_appends().await.is_empty()); + } + assert_eq!(interceptor.pending_history_appends().await.len(), 1); + } + + #[tokio::test] + async fn task_reminder_is_silent_when_no_active_tasks_exist() { + let task_store = TaskStore::new(); + let done = task_store.create("done".into(), String::new()).taskid; + task_store + .update(done, Some(TaskStatus::Completed), None, None) + .expect("complete task"); + let interceptor = + interceptor_for_task_reminders(task_store, Arc::new(TaskReminderState::new())); + + for _ in 0..TASK_REMINDER_REQUEST_THRESHOLD * 2 { + assert!(interceptor.pending_history_appends().await.is_empty()); + } + } + + #[tokio::test] + async fn inactive_requests_without_active_tasks_do_not_prime_task_reminder() { + let task_store = TaskStore::new(); + let interceptor = + interceptor_for_task_reminders(task_store.clone(), Arc::new(TaskReminderState::new())); + + for _ in 0..TASK_REMINDER_REQUEST_THRESHOLD * 2 { + assert!(interceptor.pending_history_appends().await.is_empty()); + } + + task_store.create("new active".into(), String::new()); + for _ in 0..TASK_REMINDER_REQUEST_THRESHOLD - 1 { + assert!(interceptor.pending_history_appends().await.is_empty()); + } + assert_eq!(interceptor.pending_history_appends().await.len(), 1); + } + + #[tokio::test] + async fn task_reminder_lands_in_pending_history_appends_lane() { + let task_store = TaskStore::new(); + task_store.create("lane".into(), String::new()); + let interceptor = + interceptor_for_task_reminders(task_store, Arc::new(TaskReminderState::new())); + let mut ctx = vec![Item::user_message("hi")]; + + for _ in 0..TASK_REMINDER_REQUEST_THRESHOLD { + let _ = interceptor.pending_history_appends().await; + } + let action = interceptor.pre_llm_request(&mut ctx).await; + + assert!(matches!(action, PreRequestAction::Continue)); + assert_eq!(ctx.len(), 1, "pre_llm_request must not inject reminders"); + } + #[tokio::test] async fn pre_llm_request_does_not_touch_pending_notifies() { // The drain lane has moved to `pending_history_appends`; @@ -511,6 +763,8 @@ mod tests { None, buffer.clone(), Arc::new(Mutex::new(Vec::new())), + TaskStore::new(), + Arc::new(TaskReminderState::new()), PromptCatalog::builtins_only().unwrap(), None, ); @@ -541,6 +795,8 @@ mod tests { None, NotifyBuffer::new(), Arc::new(Mutex::new(Vec::new())), + TaskStore::new(), + Arc::new(TaskReminderState::new()), PromptCatalog::builtins_only().unwrap(), None, ); diff --git a/crates/pod/src/pod.rs b/crates/pod/src/pod.rs index 04c4c826..198e4593 100644 --- a/crates/pod/src/pod.rs +++ b/crates/pod/src/pod.rs @@ -29,7 +29,7 @@ use crate::hook::{ PreRequestInfo, PreToolCall, }; use crate::ipc::alerter::Alerter; -use crate::ipc::interceptor::PodInterceptor; +use crate::ipc::interceptor::{PodInterceptor, TaskReminderState}; use crate::ipc::notify_buffer::NotifyBuffer; use crate::prompt::agents_md::read_agents_md; use crate::prompt::catalog::{CatalogError, PromptCatalog}; @@ -272,6 +272,10 @@ pub struct Pod { /// compaction by keeping the same handle while the Worker history is /// replaced. Restored Pods reconstruct it by replaying Task* tool calls. task_store: tools::TaskStore, + /// Session-lifetime counters for active-Task reminder nudges. + /// Restored Pods start these at zero; the only consequence is a delayed + /// first reminder after resume. + task_reminder_state: Arc, /// Parsed system-prompt template awaiting first-turn materialisation. /// `Some` until `ensure_system_prompt_materialized` renders it once, /// then `None` forever — including after compaction. @@ -431,6 +435,7 @@ impl Pod { usage_history: self.usage_history.clone(), tracker: None, task_store: self.task_store.clone(), + task_reminder_state: self.task_reminder_state.clone(), system_prompt_template: None, alerter: self.alerter.clone(), event_tx: self.event_tx.clone(), @@ -610,6 +615,7 @@ impl Pod { usage_history: Arc::new(Mutex::new(Vec::::new())), tracker: None, task_store: tools::TaskStore::new(), + task_reminder_state: Arc::new(TaskReminderState::new()), system_prompt_template: None, alerter: None, event_tx: None, @@ -1260,6 +1266,8 @@ impl Pod { usage_history_handle, self.pending_notifies.clone(), self.pending_attachments.clone(), + self.task_store.clone(), + self.task_reminder_state.clone(), self.prompts.clone(), self.log_writer.clone(), ); @@ -3797,6 +3805,7 @@ where usage_history: Arc::new(Mutex::new(Vec::new())), tracker: None, task_store: tools::TaskStore::new(), + task_reminder_state: Arc::new(TaskReminderState::new()), system_prompt_template: common.system_prompt_template, alerter: None, event_tx: None, @@ -3876,6 +3885,7 @@ where usage_history: Arc::new(Mutex::new(Vec::new())), tracker: None, task_store: tools::TaskStore::new(), + task_reminder_state: Arc::new(TaskReminderState::new()), system_prompt_template: common.system_prompt_template, alerter: None, event_tx: None, @@ -4052,6 +4062,7 @@ where usage_history: Arc::new(Mutex::new(state.usage_history)), tracker: None, task_store, + task_reminder_state: Arc::new(TaskReminderState::new()), // Restore replays the saved system_prompt verbatim — no // template re-render on resume. system_prompt_template: None, diff --git a/crates/session-store/src/system_item.rs b/crates/session-store/src/system_item.rs index ed318410..5ba621cb 100644 --- a/crates/session-store/src/system_item.rs +++ b/crates/session-store/src/system_item.rs @@ -35,7 +35,7 @@ use serde::{Deserialize, Serialize}; /// resume. /// /// New variants get added here as fresh injection kinds come online -/// (e.g. `Reminder`). The `kind` JSON tag is the snake_case form of +/// (e.g. `TaskReminder`). The `kind` JSON tag is the snake_case form of /// the variant name. #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "kind", rename_all = "snake_case")] @@ -68,6 +68,11 @@ pub enum SystemItem { /// prompt body materialized into the LLM context. Workflow { slug: String, body: String }, + /// Task-management inactivity reminder inserted before an LLM request. + /// `body` is the exact LLM-context text wrapped in a + /// `` block. + TaskReminder { body: String }, + /// Synthetic note inserted after an interrupted turn before the next /// user input. `body` is the exact LLM-context text explaining that the /// previous turn was cut short. @@ -84,6 +89,7 @@ impl SystemItem { SystemItem::FileAttachment { body, .. } => body.clone(), SystemItem::Knowledge { body, .. } => body.clone(), SystemItem::Workflow { body, .. } => body.clone(), + SystemItem::TaskReminder { body } => body.clone(), SystemItem::Interrupt { body } => body.clone(), } } @@ -103,6 +109,7 @@ impl SystemItem { SystemItem::FileAttachment { .. } => "file_attachment", SystemItem::Knowledge { .. } => "knowledge", SystemItem::Workflow { .. } => "workflow", + SystemItem::TaskReminder { .. } => "task_reminder", SystemItem::Interrupt { .. } => "interrupt", } } From e5f0107fa899b040c0965204ee6cd41d05e13f93 Mon Sep 17 00:00:00 2001 From: Hare Date: Fri, 29 May 2026 13:28:50 +0900 Subject: [PATCH 2/2] pod: initialize task reminder cooldown --- crates/pod/src/ipc/interceptor.rs | 51 ++++++++++++++++++++++++++++++- 1 file changed, 50 insertions(+), 1 deletion(-) diff --git a/crates/pod/src/ipc/interceptor.rs b/crates/pod/src/ipc/interceptor.rs index 42b4ca51..6857b514 100644 --- a/crates/pod/src/ipc/interceptor.rs +++ b/crates/pod/src/ipc/interceptor.rs @@ -41,12 +41,21 @@ const TASK_REMINDER_REQUEST_THRESHOLD: usize = 8; const TASK_REMINDER_COOLDOWN_REQUESTS: usize = 8; const TASK_MANAGEMENT_TOOL_NAMES: [&str; 2] = ["TaskCreate", "TaskUpdate"]; -#[derive(Debug, Default)] +#[derive(Debug)] pub(crate) struct TaskReminderState { requests_since_last_task_management: AtomicUsize, requests_since_last_reminder: AtomicUsize, } +impl Default for TaskReminderState { + fn default() -> Self { + Self { + requests_since_last_task_management: AtomicUsize::new(0), + requests_since_last_reminder: AtomicUsize::new(TASK_REMINDER_COOLDOWN_REQUESTS), + } + } +} + impl TaskReminderState { pub(crate) fn new() -> Self { Self::default() @@ -665,6 +674,22 @@ mod tests { assert!(!body.contains("long task description")); } + #[test] + fn task_reminder_state_starts_with_initial_cooldown_elapsed() { + let state = TaskReminderState::new(); + + assert_eq!( + state.requests_since_last_reminder.load(Ordering::Relaxed), + TASK_REMINDER_COOLDOWN_REQUESTS + ); + assert_eq!( + state + .requests_since_last_task_management + .load(Ordering::Relaxed), + 0 + ); + } + #[tokio::test] async fn task_management_tool_call_resets_reminder_inactivity_counter() { let task_store = TaskStore::new(); @@ -731,6 +756,30 @@ mod tests { assert_eq!(interceptor.pending_history_appends().await.len(), 1); } + #[tokio::test] + async fn task_create_reset_does_not_block_first_reminder_cooldown() { + let task_store = TaskStore::new(); + let state = Arc::new(TaskReminderState::new()); + let interceptor = interceptor_for_task_reminders(task_store.clone(), state.clone()); + + for _ in 0..TASK_REMINDER_REQUEST_THRESHOLD * 2 { + assert!(interceptor.pending_history_appends().await.is_empty()); + } + + call_pre_tool(&interceptor, "TaskCreate").await; + task_store.create("created after idle".into(), String::new()); + assert_eq!( + state.requests_since_last_reminder.load(Ordering::Relaxed), + TASK_REMINDER_COOLDOWN_REQUESTS, + "TaskCreate reset must not clear the initial reminder cooldown" + ); + + for _ in 0..TASK_REMINDER_REQUEST_THRESHOLD - 1 { + assert!(interceptor.pending_history_appends().await.is_empty()); + } + assert_eq!(interceptor.pending_history_appends().await.len(), 1); + } + #[tokio::test] async fn task_reminder_lands_in_pending_history_appends_lane() { let task_store = TaskStore::new();