From 3cbd75939715cf5f709db2218c97b279bfe87dcd Mon Sep 17 00:00:00 2001 From: Hare Date: Sun, 3 May 2026 19:03:52 +0900 Subject: [PATCH] =?UTF-8?q?feat:=20=E3=82=BB=E3=83=83=E3=82=B7=E3=83=A7?= =?UTF-8?q?=E3=83=B3=E5=86=85=20Task=20=E3=83=84=E3=83=BC=E3=83=AB=20(Task?= =?UTF-8?q?Create/List/Get/Update=20+=20=E5=B1=A5=E6=AD=B4=20replay=20+=20?= =?UTF-8?q?compact=20=E8=B7=A8=E3=81=8E)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- crates/pod/src/controller.rs | 11 +- crates/pod/src/ipc/interceptor.rs | 2 +- crates/pod/src/pod.rs | 62 ++- crates/tools/src/lib.rs | 9 +- crates/tools/src/task.rs | 612 ++++++++++++++++++++++++++++++ crates/tools/src/tracker.rs | 3 +- crates/tools/tests/edge_cases.rs | 9 +- crates/tools/tests/integration.rs | 68 +++- 8 files changed, 756 insertions(+), 20 deletions(-) create mode 100644 crates/tools/src/task.rs diff --git a/crates/pod/src/controller.rs b/crates/pod/src/controller.rs index 4056c260..ce4a6a33 100644 --- a/crates/pod/src/controller.rs +++ b/crates/pod/src/controller.rs @@ -133,6 +133,7 @@ impl PodController { // Stashed during tool registration below so we can attach a // `PodFsView` to the shared state once the latter exists. let fs_for_view: tools::ScopedFs; + let task_store = pod.task_store(); // Register event bridge callbacks on the worker { @@ -257,13 +258,19 @@ impl PodController { // worker) reads from it, and any future scope mutation // (SpawnPod-style revoke, future GrantScope) propagates // through it. - let fs = tools::ScopedFs::with_shared_scope(scope_handle.clone(), pwd_for_tools.clone()); + let fs = + tools::ScopedFs::with_shared_scope(scope_handle.clone(), pwd_for_tools.clone()); let tracker = tools::Tracker::new(); // The same ScopedFs also powers the IPC `ListCompletions` // query — keep a clone for the FS view we attach below, // since the tools consume `fs` itself. fs_for_view = fs.clone(); - worker.register_tools(tools::builtin_tools(fs, tracker.clone(), bash_output_dir)); + worker.register_tools(tools::builtin_tools( + fs, + tracker.clone(), + task_store.clone(), + bash_output_dir, + )); // Memory subsystem opt-in. When `[memory]` is present in // the manifest, register the memory-specific Read/Write/Edit diff --git a/crates/pod/src/ipc/interceptor.rs b/crates/pod/src/ipc/interceptor.rs index f7b9d9de..359ed9fe 100644 --- a/crates/pod/src/ipc/interceptor.rs +++ b/crates/pod/src/ipc/interceptor.rs @@ -127,7 +127,7 @@ impl Interceptor for PodInterceptor { // Internal mechanism: between-requests compaction trigger (safety net). if let Some(state) = self.compact_state.as_ref() { - if !state.is_disabled() { + if !state.is_disabled() && !state.just_compacted() { let current = current_tokens.unwrap_or(0); if state.exceeds_request(current) { info!( diff --git a/crates/pod/src/pod.rs b/crates/pod/src/pod.rs index b3a4c073..8791c304 100644 --- a/crates/pod/src/pod.rs +++ b/crates/pod/src/pod.rs @@ -95,6 +95,11 @@ pub struct Pod { /// tools so that Pod-owned operations (e.g. compaction) can consult /// the recency of touched files. tracker: Option, + /// Session-lifetime task store from the builtin `tools` crate. Shared by + /// TaskCreate / TaskUpdate / TaskList / TaskGet and preserved across + /// compaction by keeping the same handle while the Worker history is + /// replaced. Restored Pods reconstruct it by replaying Task* tool calls. + task_store: tools::TaskStore, /// Parsed system-prompt template awaiting first-turn materialisation. /// `Some` until `ensure_system_prompt_materialized` renders it once, /// then `None` forever — including after compaction. @@ -211,6 +216,7 @@ impl Pod { metrics_tracker: Arc::new(crate::compact::metrics_tracker::MetricsTracker::new()), usage_history: Arc::new(Mutex::new(Vec::::new())), tracker: None, + task_store: tools::TaskStore::new(), system_prompt_template: None, alerter: None, event_tx: None, @@ -425,6 +431,18 @@ impl Pod { self.tracker = Some(tracker); } + /// Attach the session-scoped TaskStore from the builtin `tools` crate. + /// Called by the Controller before registering builtin tools so the Pod + /// and Worker share one store. + pub fn attach_task_store(&mut self, task_store: tools::TaskStore) { + self.task_store = task_store; + } + + /// Shared TaskStore handle. + pub fn task_store(&self) -> tools::TaskStore { + self.task_store.clone() + } + /// The attached session-scoped file-operation tracker, if any. pub fn tracker(&self) -> Option<&tools::Tracker> { self.tracker.as_ref() @@ -1255,8 +1273,14 @@ impl Pod { .unwrap_or_default(); // Input text fed to the compact worker. Includes the default - // references and the (pruned) conversation text. - let summary_input = build_summary_input(&items_to_summarise, &default_refs); + // references, current TaskStore snapshot, and the (pruned) + // conversation text. + let task_snapshot_text = self.task_store.snapshot_text(); + let summary_input = build_summary_input( + &items_to_summarise, + &default_refs, + Some(task_snapshot_text.as_str()), + ); // Worker-side state collected by the compact worker's tool calls. let ctx = Arc::new(std::sync::Mutex::new(CompactWorkerContext::with_budget( @@ -1371,9 +1395,10 @@ impl Pod { .filter(|i| i.is_user_message()) .count(); - // Build new history: [summary, ...auto-read, references, ...retained]. + // Build new history: [summary, ...auto-read, task snapshot, TaskList result, references, ...retained]. let mut new_history = Vec::with_capacity( 1 + auto_read_messages.len() + + 3 + reference_message.is_some() as usize + retained_items.len(), ); @@ -1381,6 +1406,17 @@ impl Pod { "[Compacted context summary]\n\n{summary_text}" ))); new_history.extend(auto_read_messages); + new_history.push(Item::system_message(format!( + "[Session TaskStore snapshot]\n\n{task_snapshot_text}\n\n\ + This is the complete session task list preserved across compaction. \ + The following TaskList tool result presents the same state through the tool lane." + ))); + new_history.push(Item::tool_call("compact-tasklist", "TaskList", "{}")); + new_history.push(Item::tool_result_with_content( + "compact-tasklist", + tools::task::snapshot_overview(&self.task_store.list()), + task_snapshot_text.clone(), + )); if let Some(msg) = reference_message { new_history.push(msg); } @@ -1978,6 +2014,7 @@ impl Pod, St> { metrics_tracker: Arc::new(crate::compact::metrics_tracker::MetricsTracker::new()), usage_history: Arc::new(Mutex::new(Vec::new())), tracker: None, + task_store: tools::TaskStore::new(), system_prompt_template: common.system_prompt_template, alerter: None, event_tx: None, @@ -2040,6 +2077,7 @@ impl Pod, St> { metrics_tracker: Arc::new(crate::compact::metrics_tracker::MetricsTracker::new()), usage_history: Arc::new(Mutex::new(Vec::new())), tracker: None, + task_store: tools::TaskStore::new(), system_prompt_template: common.system_prompt_template, alerter: None, event_tx: None, @@ -2136,6 +2174,7 @@ impl Pod, St> { } let extract_pointer = memory::extract::fold_pointer(&state.extensions); + let task_store = tools::TaskStore::from_history(&state.history); let mut pod = Self { manifest, @@ -2152,6 +2191,7 @@ impl Pod, St> { metrics_tracker: Arc::new(crate::compact::metrics_tracker::MetricsTracker::new()), usage_history: Arc::new(Mutex::new(state.usage_history)), tracker: None, + task_store, // Restore replays the saved system_prompt verbatim — no // template re-render on resume. system_prompt_template: None, @@ -2247,7 +2287,11 @@ impl From for PodRunResult { /// Build the compact worker's input: default-reference instructions, /// the list of recently-touched files, and the pruned conversation /// produced by [`build_summary_prompt`]. -fn build_summary_input(items: &[Item], default_refs: &[PathBuf]) -> String { +fn build_summary_input( + items: &[Item], + default_refs: &[PathBuf], + task_snapshot: Option<&str>, +) -> String { let mut out = String::new(); out.push_str( "Summarise the conversation below into a structured summary and nominate \ @@ -2267,6 +2311,16 @@ fn build_summary_input(items: &[Item], default_refs: &[PathBuf]) -> String { } out.push('\n'); } + if let Some(task_snapshot) = task_snapshot { + out.push_str( + "## Current Session TaskStore\n\ + This is the full current task list. Use it as source material for the \ + summary, especially active (pending/inprogress) tasks, but do not edit tasks \ + from the compact worker.\n", + ); + out.push_str(task_snapshot); + out.push_str("\n\n"); + } out.push_str("## Conversation\n"); out.push_str(&build_summary_prompt(items)); out.push_str("\n\nWhen you are done, call `write_summary` with the final 5-section text."); diff --git a/crates/tools/src/lib.rs b/crates/tools/src/lib.rs index 28139410..f4af7a39 100644 --- a/crates/tools/src/lib.rs +++ b/crates/tools/src/lib.rs @@ -20,6 +20,7 @@ pub mod error; pub mod scoped_fs; +pub mod task; pub mod tracker; mod bash; @@ -36,6 +37,7 @@ pub use glob::glob_tool; pub use grep::grep_tool; pub use read::read_tool; pub use scoped_fs::ScopedFs; +pub use task::{TaskEntry, TaskSnapshot, TaskStatus, TaskStore, task_tools}; pub use tracker::Tracker; pub use write::write_tool; @@ -53,14 +55,17 @@ pub use write::write_tool; pub fn builtin_tools( fs: ScopedFs, tracker: Tracker, + task_store: TaskStore, bash_output_dir: std::path::PathBuf, ) -> Vec { - vec![ + let mut defs = vec![ read_tool(fs.clone(), tracker.clone()), write_tool(fs.clone(), tracker.clone()), edit_tool(fs.clone(), tracker), glob_tool(fs.clone()), grep_tool(fs.clone()), bash_tool(fs, bash_output_dir), - ] + ]; + defs.extend(task_tools(task_store)); + defs } diff --git a/crates/tools/src/task.rs b/crates/tools/src/task.rs new file mode 100644 index 00000000..9261d396 --- /dev/null +++ b/crates/tools/src/task.rs @@ -0,0 +1,612 @@ +//! Session-scoped TaskStore and builtin task tools. +//! +//! The store is Pod/session-lifetime state shared by the four Task* tools. It +//! is reconstructed on resume by replaying TaskCreate / TaskUpdate tool-call +//! arguments from persisted history. + +use std::collections::HashMap; +use std::fmt::Write as _; +use std::sync::{Arc, Mutex}; + +use async_trait::async_trait; +use llm_worker::Item; +use llm_worker::tool::{Tool, ToolDefinition, ToolError, ToolMeta, ToolOutput}; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema)] +#[serde(rename_all = "lowercase")] +pub enum TaskStatus { + Pending, + Inprogress, + Completed, + Deleted, +} + +impl std::fmt::Display for TaskStatus { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let s = match self { + Self::Pending => "pending", + Self::Inprogress => "inprogress", + Self::Completed => "completed", + Self::Deleted => "deleted", + }; + f.write_str(s) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema)] +pub struct TaskEntry { + pub taskid: u64, + pub status: TaskStatus, + pub subject: String, + pub description: String, +} + +#[derive(Debug, Default)] +struct Inner { + next_taskid: u64, + tasks: Vec, +} + +#[derive(Debug, Clone, Default)] +pub struct TaskStore { + inner: Arc>, +} + +#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema)] +pub struct TaskSnapshot { + pub tasks: Vec, +} + +impl TaskStore { + pub fn new() -> Self { + Self { + inner: Arc::new(Mutex::new(Inner { + next_taskid: 1, + tasks: Vec::new(), + })), + } + } + + pub fn create(&self, subject: String, description: String) -> TaskEntry { + let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner()); + let task = TaskEntry { + taskid: inner.next_taskid, + status: TaskStatus::Pending, + subject, + description, + }; + inner.next_taskid = inner.next_taskid.saturating_add(1); + inner.tasks.push(task.clone()); + task + } + + pub fn list(&self) -> Vec { + self.inner + .lock() + .unwrap_or_else(|e| e.into_inner()) + .tasks + .clone() + } + + pub fn get(&self, taskid: u64) -> Option { + self.inner + .lock() + .unwrap_or_else(|e| e.into_inner()) + .tasks + .iter() + .find(|t| t.taskid == taskid) + .cloned() + } + + pub fn update( + &self, + taskid: u64, + status: Option, + subject: Option, + description: Option, + ) -> Result { + if status.is_none() && subject.is_none() && description.is_none() { + return Err(TaskStoreError::NoFields); + } + let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner()); + let task = inner + .tasks + .iter_mut() + .find(|t| t.taskid == taskid) + .ok_or(TaskStoreError::Missing(taskid))?; + if let Some(status) = status { + task.status = status; + } + if let Some(subject) = subject { + task.subject = subject; + } + if let Some(description) = description { + task.description = description; + } + Ok(task.clone()) + } + + pub fn snapshot(&self) -> TaskSnapshot { + TaskSnapshot { tasks: self.list() } + } + + pub fn replay_history(&self, history: &[Item]) { + for item in history { + match item { + Item::Message { content, .. } => { + for part in content { + let text = part.as_text(); + if let Some(snapshot) = parse_compact_snapshot_text(text) { + self.replace_with(snapshot); + } + } + } + Item::ToolCall { + name, arguments, .. + } => match name.as_str() { + "TaskCreate" => { + if let Ok(params) = serde_json::from_str::(arguments) { + let _ = self.create(params.subject, params.description); + } + } + "TaskUpdate" => { + if let Ok(params) = serde_json::from_str::(arguments) { + let _ = self.update( + params.taskid, + params.status, + params.subject, + params.description, + ); + } + } + _ => {} + }, + _ => {} + } + } + } + + pub fn replace_with(&self, tasks: Vec) { + let next_taskid = tasks + .iter() + .map(|t| t.taskid) + .max() + .unwrap_or(0) + .saturating_add(1) + .max(1); + let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner()); + inner.tasks = tasks; + inner.next_taskid = next_taskid; + } + + pub fn from_history(history: &[Item]) -> Self { + let store = Self::new(); + store.replay_history(history); + store + } + + pub fn snapshot_text(&self) -> String { + render_snapshot(&self.list()) + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum TaskStoreError { + Missing(u64), + NoFields, +} + +impl std::fmt::Display for TaskStoreError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Missing(id) => write!(f, "taskid {id} not found"), + Self::NoFields => { + f.write_str("at least one of status, subject, description is required") + } + } + } +} + +impl std::error::Error for TaskStoreError {} + +#[derive(Debug, Deserialize, schemars::JsonSchema)] +struct TaskCreateParams { + /// One-line task subject. + subject: String, + /// Detailed task description. + description: String, +} + +#[derive(Debug, Deserialize, schemars::JsonSchema)] +struct TaskListParams {} + +#[derive(Debug, Deserialize, schemars::JsonSchema)] +struct TaskGetParams { + taskid: u64, +} + +#[derive(Debug, Deserialize, schemars::JsonSchema)] +struct TaskUpdateParams { + taskid: u64, + #[serde(default)] + status: Option, + #[serde(default)] + subject: Option, + #[serde(default)] + description: Option, +} + +struct TaskCreateTool { + store: TaskStore, +} + +struct TaskListTool { + store: TaskStore, +} + +struct TaskGetTool { + store: TaskStore, +} + +struct TaskUpdateTool { + store: TaskStore, +} + +const CREATE_DESCRIPTION: &str = "Create a session-lifetime task. Input only `subject` and \ +`description`; `taskid` is assigned automatically and initial `status` is `pending`."; +const LIST_DESCRIPTION: &str = "List every session-lifetime task, including completed and \ +deleted entries. Takes an empty object as input."; +const GET_DESCRIPTION: &str = "Get one session-lifetime task by `taskid`. Returns an error if \ +the task does not exist."; +const UPDATE_DESCRIPTION: &str = "Update an existing session-lifetime task. Provide `taskid` and \ +at least one of `status`, `subject`, or `description`. `status` must be one of `pending`, \ +`inprogress`, `completed`, or `deleted`; deletion is logical (`status = deleted`)."; + +#[async_trait] +impl Tool for TaskCreateTool { + async fn execute(&self, input_json: &str) -> Result { + let params: TaskCreateParams = serde_json::from_str(input_json) + .map_err(|e| ToolError::InvalidArgument(format!("invalid TaskCreate input: {e}")))?; + let created = self.store.create(params.subject, params.description); + let tasks = self.store.list(); + Ok(task_output( + format!( + "Created task {} ({})\n{}", + created.taskid, + created.status, + snapshot_overview(&tasks) + ), + &created, + &tasks, + )) + } +} + +#[async_trait] +impl Tool for TaskListTool { + async fn execute(&self, input_json: &str) -> Result { + let _: TaskListParams = serde_json::from_str(input_json) + .map_err(|e| ToolError::InvalidArgument(format!("invalid TaskList input: {e}")))?; + let tasks = self.store.list(); + Ok(ToolOutput { + summary: snapshot_overview(&tasks), + content: Some(render_snapshot(&tasks)), + }) + } +} + +#[async_trait] +impl Tool for TaskGetTool { + async fn execute(&self, input_json: &str) -> Result { + let params: TaskGetParams = serde_json::from_str(input_json) + .map_err(|e| ToolError::InvalidArgument(format!("invalid TaskGet input: {e}")))?; + let task = self.store.get(params.taskid).ok_or_else(|| { + ToolError::ExecutionFailed(format!("taskid {} not found", params.taskid)) + })?; + let content = serde_json::to_string_pretty(&task).unwrap_or_else(|_| format!("{task:?}")); + Ok(ToolOutput { + summary: format!("Task {} ({}) {}", task.taskid, task.status, task.subject), + content: Some(content), + }) + } +} + +#[async_trait] +impl Tool for TaskUpdateTool { + async fn execute(&self, input_json: &str) -> Result { + let params: TaskUpdateParams = serde_json::from_str(input_json) + .map_err(|e| ToolError::InvalidArgument(format!("invalid TaskUpdate input: {e}")))?; + let updated = self + .store + .update( + params.taskid, + params.status, + params.subject, + params.description, + ) + .map_err(|e| ToolError::ExecutionFailed(e.to_string()))?; + let tasks = self.store.list(); + Ok(task_output( + format!( + "Updated task {} ({})\n{}", + updated.taskid, + updated.status, + snapshot_overview(&tasks) + ), + &updated, + &tasks, + )) + } +} + +fn task_output(summary: String, task: &TaskEntry, tasks: &[TaskEntry]) -> ToolOutput { + let content = serde_json::json!({ + "task": task, + "snapshot": { "tasks": tasks }, + }); + ToolOutput { + summary, + content: Some(serde_json::to_string_pretty(&content).unwrap_or_default()), + } +} + +pub fn snapshot_overview(tasks: &[TaskEntry]) -> String { + let pending = tasks + .iter() + .filter(|t| t.status == TaskStatus::Pending) + .count(); + let inprogress = tasks + .iter() + .filter(|t| t.status == TaskStatus::Inprogress) + .count(); + let completed = tasks + .iter() + .filter(|t| t.status == TaskStatus::Completed) + .count(); + let deleted = tasks + .iter() + .filter(|t| t.status == TaskStatus::Deleted) + .count(); + format!( + "TaskStore: {} task(s) (pending: {pending}, inprogress: {inprogress}, completed: {completed}, deleted: {deleted})", + tasks.len() + ) +} + +pub fn render_snapshot(tasks: &[TaskEntry]) -> String { + if tasks.is_empty() { + return "TaskStore is empty.".to_string(); + } + let mut out = String::new(); + let _ = writeln!(&mut out, "{}", snapshot_overview(tasks)); + for task in tasks { + let _ = writeln!( + &mut out, + "\n- taskid: {}\n status: {}\n subject: {}\n description: {}", + task.taskid, task.status, task.subject, task.description + ); + } + out +} + +fn parse_compact_snapshot_text(text: &str) -> Option> { + if !text.starts_with("[Session TaskStore snapshot]") { + return None; + } + let body = text.split_once("\n\n")?.1; + parse_rendered_snapshot(body).or_else(|| { + if body.contains("TaskStore is empty.") { + Some(Vec::new()) + } else { + None + } + }) +} + +fn parse_rendered_snapshot(text: &str) -> Option> { + if text.contains("TaskStore is empty.") { + return Some(Vec::new()); + } + let mut tasks = Vec::new(); + let mut current: HashMap<&str, String> = HashMap::new(); + for line in text.lines() { + let line = line.trim_start(); + if let Some(value) = line.strip_prefix("- taskid: ") { + if !current.is_empty() { + tasks.push(task_from_fields(¤t)?); + current.clear(); + } + current.insert("taskid", value.to_string()); + } else if let Some(value) = line.strip_prefix("status: ") { + current.insert("status", value.to_string()); + } else if let Some(value) = line.strip_prefix("subject: ") { + current.insert("subject", value.to_string()); + } else if let Some(value) = line.strip_prefix("description: ") { + current.insert("description", value.to_string()); + } + } + if !current.is_empty() { + tasks.push(task_from_fields(¤t)?); + } + Some(tasks) +} + +fn task_from_fields(fields: &HashMap<&str, String>) -> Option { + let status = match fields.get("status")?.as_str() { + "pending" => TaskStatus::Pending, + "inprogress" => TaskStatus::Inprogress, + "completed" => TaskStatus::Completed, + "deleted" => TaskStatus::Deleted, + _ => return None, + }; + Some(TaskEntry { + taskid: fields.get("taskid")?.parse().ok()?, + status, + subject: fields.get("subject")?.clone(), + description: fields.get("description")?.clone(), + }) +} + +fn task_create_tool(store: TaskStore) -> ToolDefinition { + Arc::new(move || { + let schema = schemars::schema_for!(TaskCreateParams); + let schema_value = serde_json::to_value(schema).unwrap_or(serde_json::json!({})); + let meta = ToolMeta::new("TaskCreate") + .description(CREATE_DESCRIPTION) + .input_schema(schema_value); + let tool: Arc = Arc::new(TaskCreateTool { + store: store.clone(), + }); + (meta, tool) + }) +} + +fn task_list_tool(store: TaskStore) -> ToolDefinition { + Arc::new(move || { + let schema = schemars::schema_for!(TaskListParams); + let schema_value = serde_json::to_value(schema).unwrap_or(serde_json::json!({})); + let meta = ToolMeta::new("TaskList") + .description(LIST_DESCRIPTION) + .input_schema(schema_value); + let tool: Arc = Arc::new(TaskListTool { + store: store.clone(), + }); + (meta, tool) + }) +} + +fn task_get_tool(store: TaskStore) -> ToolDefinition { + Arc::new(move || { + let schema = schemars::schema_for!(TaskGetParams); + let schema_value = serde_json::to_value(schema).unwrap_or(serde_json::json!({})); + let meta = ToolMeta::new("TaskGet") + .description(GET_DESCRIPTION) + .input_schema(schema_value); + let tool: Arc = Arc::new(TaskGetTool { + store: store.clone(), + }); + (meta, tool) + }) +} + +fn task_update_tool(store: TaskStore) -> ToolDefinition { + Arc::new(move || { + let schema = schemars::schema_for!(TaskUpdateParams); + let schema_value = serde_json::to_value(schema).unwrap_or(serde_json::json!({})); + let meta = ToolMeta::new("TaskUpdate") + .description(UPDATE_DESCRIPTION) + .input_schema(schema_value); + let tool: Arc = Arc::new(TaskUpdateTool { + store: store.clone(), + }); + (meta, tool) + }) +} + +pub fn task_tools(store: TaskStore) -> Vec { + vec![ + task_create_tool(store.clone()), + task_list_tool(store.clone()), + task_get_tool(store.clone()), + task_update_tool(store), + ] +} + +#[cfg(test)] +mod tests { + use super::*; + + fn tool(def: ToolDefinition) -> Arc { + let (_, tool) = def(); + tool + } + + #[tokio::test] + async fn task_tools_create_list_get_update() { + let store = TaskStore::new(); + let create = tool(task_create_tool(store.clone())); + let list = tool(task_list_tool(store.clone())); + let get = tool(task_get_tool(store.clone())); + let update = tool(task_update_tool(store.clone())); + + let out = create + .execute(r#"{"subject":"implement","description":"write code"}"#) + .await + .unwrap(); + assert!(out.summary.contains("Created task 1")); + assert_eq!(store.get(1).unwrap().status, TaskStatus::Pending); + + let out = update + .execute(r#"{"taskid":1,"status":"inprogress","subject":"implement tasks"}"#) + .await + .unwrap(); + assert!(out.summary.contains("Updated task 1")); + let task = store.get(1).unwrap(); + assert_eq!(task.status, TaskStatus::Inprogress); + assert_eq!(task.subject, "implement tasks"); + + let out = get.execute(r#"{"taskid":1}"#).await.unwrap(); + assert!(out.summary.contains("Task 1 (inprogress)")); + assert!(out.content.unwrap().contains("implement tasks")); + + let out = list.execute("{}").await.unwrap(); + assert!(out.summary.contains("1 task(s)")); + assert!(out.content.unwrap().contains("taskid: 1")); + } + + #[tokio::test] + async fn task_update_validates_existing_and_at_least_one_field() { + let store = TaskStore::new(); + store.create("s".into(), "d".into()); + let update = tool(task_update_tool(store)); + + let err = update.execute(r#"{"taskid":1}"#).await.unwrap_err(); + assert!(err.to_string().contains("at least one")); + + let err = update + .execute(r#"{"taskid":99,"status":"deleted"}"#) + .await + .unwrap_err(); + assert!(err.to_string().contains("taskid 99 not found")); + } + + #[test] + fn replay_history_reconstructs_store_and_ignores_malformed_calls() { + let history = vec![ + Item::tool_call("c1", "TaskCreate", r#"{"subject":"a","description":"A"}"#), + Item::tool_call("bad", "TaskCreate", r#"{"subject":1}"#), + Item::tool_call("c2", "TaskCreate", r#"{"subject":"b","description":"B"}"#), + Item::tool_call("u1", "TaskUpdate", r#"{"taskid":2,"status":"completed"}"#), + Item::tool_call("bad2", "TaskUpdate", r#"{"taskid":99,"status":"deleted"}"#), + ]; + let store = TaskStore::from_history(&history); + let tasks = store.list(); + assert_eq!(tasks.len(), 2); + assert_eq!(tasks[0].taskid, 1); + assert_eq!(tasks[0].status, TaskStatus::Pending); + assert_eq!(tasks[1].taskid, 2); + assert_eq!(tasks[1].status, TaskStatus::Completed); + } + + #[test] + fn replay_history_uses_compact_snapshot_and_continues_updates() { + let snapshot = "[Session TaskStore snapshot]\n\nTaskStore: 1 task(s) (pending: 0, inprogress: 1, completed: 0, deleted: 0)\n\n- taskid: 1\n status: inprogress\n subject: kept\n description: from compact\n"; + let history = vec![ + Item::system_message(snapshot), + Item::tool_call("u1", "TaskUpdate", r#"{"taskid":1,"status":"completed"}"#), + Item::tool_call( + "c2", + "TaskCreate", + r#"{"subject":"new","description":"after compact"}"#, + ), + ]; + let store = TaskStore::from_history(&history); + let tasks = store.list(); + assert_eq!(tasks.len(), 2); + assert_eq!(tasks[0].taskid, 1); + assert_eq!(tasks[0].status, TaskStatus::Completed); + assert_eq!(tasks[1].taskid, 2); + assert_eq!(tasks[1].subject, "new"); + } +} diff --git a/crates/tools/src/tracker.rs b/crates/tools/src/tracker.rs index 36671ecd..2d4b6973 100644 --- a/crates/tools/src/tracker.rs +++ b/crates/tools/src/tracker.rs @@ -32,7 +32,8 @@ //! let fs = ScopedFs::new(scope, PathBuf::from("/workspace")); // pod lifetime //! let tracker = Tracker::new(); // session lifetime //! let bash_outputs = PathBuf::from("/run/insomnia/bash-output"); -//! let defs = builtin_tools(fs, tracker, bash_outputs); +//! let task_store = tools::TaskStore::new(); +//! let defs = builtin_tools(fs, tracker, task_store, bash_outputs); //! ``` use std::collections::{HashMap, VecDeque}; diff --git a/crates/tools/tests/edge_cases.rs b/crates/tools/tests/edge_cases.rs index 5f96fef7..0c9c70ac 100644 --- a/crates/tools/tests/edge_cases.rs +++ b/crates/tools/tests/edge_cases.rs @@ -6,7 +6,7 @@ use llm_worker::tool::{Tool, ToolDefinition}; use manifest::{Permission, Scope, ScopeConfig, ScopeRule}; use serde_json::json; use tempfile::TempDir; -use tools::{ScopedFs, Tracker, builtin_tools}; +use tools::{ScopedFs, TaskStore, Tracker, builtin_tools}; struct Registry { entries: Vec<(llm_worker::tool::ToolMeta, Arc)>, @@ -43,7 +43,12 @@ fn setup() -> (TempDir, TempDir, Registry) { let scope = Scope::from_config(&config).unwrap(); let fs = ScopedFs::new(scope, dir.path().to_path_buf()); let tracker = Tracker::new(); - let reg = Registry::new(builtin_tools(fs, tracker, spill.path().to_path_buf())); + let reg = Registry::new(builtin_tools( + fs, + tracker, + TaskStore::new(), + spill.path().to_path_buf(), + )); (dir, spill, reg) } diff --git a/crates/tools/tests/integration.rs b/crates/tools/tests/integration.rs index 328eea18..9ab3cb68 100644 --- a/crates/tools/tests/integration.rs +++ b/crates/tools/tests/integration.rs @@ -11,7 +11,7 @@ use llm_worker::tool::{Tool, ToolDefinition, ToolMeta}; use manifest::{Permission, Scope, ScopeConfig, ScopeRule}; use serde_json::json; use tempfile::TempDir; -use tools::{ScopedFs, Tracker, builtin_tools}; +use tools::{ScopedFs, TaskStore, Tracker, builtin_tools}; fn scope_with_spill(workspace: &Path, spill: &Path) -> Scope { let base = Scope::writable(workspace).unwrap(); @@ -56,7 +56,12 @@ fn setup() -> (TempDir, TempDir, Registry) { let scope = scope_with_spill(dir.path(), spill.path()); let fs = ScopedFs::new(scope, dir.path().to_path_buf()); let tracker = Tracker::new(); - let reg = Registry::new(builtin_tools(fs, tracker, spill.path().to_path_buf())); + let reg = Registry::new(builtin_tools( + fs, + tracker, + TaskStore::new(), + spill.path().to_path_buf(), + )); (dir, spill, reg) } @@ -77,7 +82,21 @@ fn builtin_tools_registers_full_set() { let (_dir, _spill, reg) = setup(); let mut names = reg.names(); names.sort(); - assert_eq!(names, vec!["Bash", "Edit", "Glob", "Grep", "Read", "Write"]); + assert_eq!( + names, + vec![ + "Bash", + "Edit", + "Glob", + "Grep", + "Read", + "TaskCreate", + "TaskGet", + "TaskList", + "TaskUpdate", + "Write" + ] + ); } #[test] @@ -270,16 +289,41 @@ async fn edit_requires_read_across_tools() { #[tokio::test] async fn deterministic_tool_order_is_registration_order() { let (_dir, _spill, reg) = setup(); - // Registration order from builtin_tools(): Read, Write, Edit, Glob, Grep, Bash + // Registration order from builtin_tools(): Read, Write, Edit, Glob, Grep, Bash, TaskCreate, TaskList, TaskGet, TaskUpdate let names: Vec<&str> = reg.entries.iter().map(|(m, _)| m.name.as_str()).collect(); - assert_eq!(names, vec!["Read", "Write", "Edit", "Glob", "Grep", "Bash"]); + assert_eq!( + names, + vec![ + "Read", + "Write", + "Edit", + "Glob", + "Grep", + "Bash", + "TaskCreate", + "TaskList", + "TaskGet", + "TaskUpdate" + ] + ); } // Regression: tool name capitalization matches Claude Code reference #[test] fn tool_names_match_reference_spec() { let (_dir, _spill, reg) = setup(); - for expected in ["Read", "Write", "Edit", "Glob", "Grep", "Bash"] { + for expected in [ + "Read", + "Write", + "Edit", + "Glob", + "Grep", + "Bash", + "TaskCreate", + "TaskList", + "TaskGet", + "TaskUpdate", + ] { assert!( reg.entries.iter().any(|(m, _)| m.name == expected), "missing tool {expected}" @@ -295,7 +339,12 @@ async fn tracker_recent_files_tracks_read_write_edit() { let scope = scope_with_spill(dir.path(), spill.path()); let fs = ScopedFs::new(scope, dir.path().to_path_buf()); let tracker = Tracker::new(); - let reg = Registry::new(builtin_tools(fs, tracker.clone(), spill.path().to_path_buf())); + let reg = Registry::new(builtin_tools( + fs, + tracker.clone(), + TaskStore::new(), + spill.path().to_path_buf(), + )); let a = dir.path().join("a.txt"); let b = dir.path().join("b.txt"); @@ -379,7 +428,10 @@ async fn bash_spilled_file_is_readable_via_read_tool() { let read_body = read_out.content.expect("Read returned content"); // The full 200 lines should be in the saved file even though Bash // returned only the tail of 80. - assert!(read_body.contains("line 1\n"), "missing line 1: {read_body}"); + assert!( + read_body.contains("line 1\n"), + "missing line 1: {read_body}" + ); assert!(read_body.contains("line 200"), "missing line 200"); }