From f1bd498df78895b950f04c1b825a7876e3645b2b Mon Sep 17 00:00:00 2001 From: Hare Date: Sun, 24 May 2026 06:44:19 +0900 Subject: [PATCH] fix: route worker history appends through callbacks --- crates/llm-worker/src/worker.rs | 58 ++++----- crates/llm-worker/tests/worker_state_test.rs | 64 +++++----- crates/pod/src/pod.rs | 119 +++++++++++++++++-- crates/session-store/src/system_item.rs | 7 ++ crates/tui/src/app.rs | 3 +- 5 files changed, 178 insertions(+), 73 deletions(-) diff --git a/crates/llm-worker/src/worker.rs b/crates/llm-worker/src/worker.rs index d5d00bbc..b7da3917 100644 --- a/crates/llm-worker/src/worker.rs +++ b/crates/llm-worker/src/worker.rs @@ -127,7 +127,7 @@ enum ToolExecutionResult { /// /// // To edit between turns, unlock back to Mutable /// let mut worker = worker.unlock(); -/// worker.history_mut().truncate(5); +/// worker.truncate_history(5); /// let out = worker.run("Continue").await?; /// let mut worker = out.worker; /// ``` @@ -400,7 +400,7 @@ impl Worker { } } - fn extend_history_with_callbacks(&mut self, items: impl IntoIterator) { + fn append_history_items(&mut self, items: impl IntoIterator) { for item in items { self.emit_history_append(&item); self.history.push(item); @@ -985,7 +985,7 @@ impl Worker { // get persisted by the upper layer that owns history.json. let pending = self.interceptor.pending_history_appends().await; if !pending.is_empty() { - self.extend_history_with_callbacks(pending); + self.append_history_items(pending); } // Clone the history into a per-request context. Everything @@ -1093,7 +1093,7 @@ impl Worker { self.turn_count += 1; // Collect and commit assistant items. Routed through - // `extend_history_with_callbacks` so observers (e.g. the + // `append_history_items` so observers (e.g. the // Pod-side per-item session-log committer) see each item // as it lands. let reasoning_items = self.reasoning_item_collector.take_collected(); @@ -1101,7 +1101,7 @@ impl Worker { let tool_calls = self.tool_call_collector.take_collected(); let assistant_items = self.build_assistant_items(&reasoning_items, &text_blocks, &tool_calls); - self.extend_history_with_callbacks(assistant_items); + self.append_history_items(assistant_items); if tool_calls.is_empty() { match self.interceptor.on_turn_end(&self.history).await { @@ -1110,7 +1110,7 @@ impl Worker { return Ok(WorkerResult::Finished); } TurnEndAction::ContinueWithMessages(additional) => { - self.extend_history_with_callbacks(additional); + self.append_history_items(additional); continue; } TurnEndAction::Pause => { @@ -1227,7 +1227,7 @@ impl Worker { result.is_error, ) }); - self.extend_history_with_callbacks(items); + self.append_history_items(items); Ok(None) } Err(err) => { @@ -1411,38 +1411,28 @@ impl Worker { } } - /// Get a mutable reference to history + /// Replace history during restore/rebuild without emitting append callbacks. /// - /// Available only in Mutable state. - pub fn history_mut(&mut self) -> &mut Vec { - &mut self.history - } - - /// Set history + /// This is not a history-growth API. Live append paths must use + /// [`append_history`](Self::append_history) so `on_history_append` observers + /// see every inserted item. pub fn set_history(&mut self, items: Vec) { self.history = items; } - /// Add an item to history (builder pattern) - pub fn with_item(mut self, item: Item) -> Self { - self.history.push(item); - self + /// Append items to history and notify history-append observers for each + /// item before it lands. This is the only public Mutable-state API for + /// growing worker history; callers that need session-log persistence must + /// install [`on_history_append`](Self::on_history_append) before calling it. + pub fn append_history(&mut self, items: impl IntoIterator) { + self.append_history_items(items); } - /// Add an item to history - pub fn push_item(&mut self, item: Item) { - self.history.push(item); - } - - /// Add multiple items to history (builder pattern) - pub fn with_items(mut self, items: impl IntoIterator) -> Self { - self.history.extend(items); - self - } - - /// Add multiple items to history - pub fn extend_history(&mut self, items: impl IntoIterator) { - self.history.extend(items); + /// Truncate history without emitting append callbacks. + /// + /// This is an edit operation, not a history-growth path. + pub fn truncate_history(&mut self, len: usize) { + self.history.truncate(len); } /// Clear history @@ -1575,9 +1565,9 @@ impl Worker { PromptAction::Continue => Vec::new(), PromptAction::ContinueWith(items) => items, }; - self.history.push(user_item); + self.append_history_items(std::iter::once(user_item)); if !extras.is_empty() { - self.extend_history_with_callbacks(extras); + self.append_history_items(extras); } let result = self.run_turn_loop().await; self.finalize_interruption(result).await diff --git a/crates/llm-worker/tests/worker_state_test.rs b/crates/llm-worker/tests/worker_state_test.rs index 656adcb5..c8a513ad 100644 --- a/crates/llm-worker/tests/worker_state_test.rs +++ b/crates/llm-worker/tests/worker_state_test.rs @@ -5,8 +5,8 @@ mod common; -use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex}; use async_trait::async_trait; use common::MockLlmClient; @@ -44,14 +44,12 @@ fn test_mutable_history_manipulation() { assert!(worker.history().is_empty()); // Add to history - worker.push_item(Item::user_message("Hello")); - worker.push_item(Item::assistant_message("Hi there!")); + worker.append_history(vec![Item::user_message("Hello")]); + worker.append_history(vec![Item::assistant_message("Hi there!")]); assert_eq!(worker.history().len(), 2); - // Mutable access to history - worker - .history_mut() - .push(Item::user_message("How are you?")); + // Append to history via the callback-aware API. + worker.append_history(vec![Item::user_message("How are you?")]); assert_eq!(worker.history().len(), 3); // Clear history @@ -71,34 +69,38 @@ fn test_mutable_history_manipulation() { #[test] fn test_mutable_builder_pattern() { let client = MockLlmClient::new(vec![]); - let worker = Worker::new(client) - .system_prompt("System prompt") - .with_item(Item::user_message("Hello")) - .with_item(Item::assistant_message("Hi!")) - .with_items(vec![ - Item::user_message("How are you?"), - Item::assistant_message("I'm fine!"), - ]); + let worker = Worker::new(client).system_prompt("System prompt"); assert_eq!(worker.get_system_prompt(), Some("System prompt")); - assert_eq!(worker.history().len(), 4); + assert!(worker.history().is_empty()); } -/// Verify that multiple items can be added with extend_history +/// Verify that multiple items can be added with append_history and callbacks fire. #[test] -fn test_mutable_extend_history() { +fn test_mutable_append_history() { let client = MockLlmClient::new(vec![]); + let observed = Arc::new(Mutex::new(Vec::new())); + let observed_for_callback = Arc::clone(&observed); let mut worker = Worker::new(client); + worker.on_history_append(move |item| { + if let Some(text) = item.as_text() { + observed_for_callback.lock().unwrap().push(text.to_string()); + } + }); - worker.push_item(Item::user_message("First")); + worker.append_history(vec![Item::user_message("First")]); - worker.extend_history(vec![ + worker.append_history(vec![ Item::assistant_message("Response 1"), Item::user_message("Second"), Item::assistant_message("Response 2"), ]); assert_eq!(worker.history().len(), 4); + assert_eq!( + observed.lock().unwrap().as_slice(), + ["First", "Response 1", "Second", "Response 2"] + ); } #[derive(Clone)] @@ -162,8 +164,8 @@ fn test_lock_transition() { let mut worker = Worker::new(client); worker.set_system_prompt("System"); - worker.push_item(Item::user_message("Hello")); - worker.push_item(Item::assistant_message("Hi")); + worker.append_history(vec![Item::user_message("Hello")]); + worker.append_history(vec![Item::assistant_message("Hi")]); // Lock let locked_worker = worker.lock(); @@ -180,14 +182,14 @@ fn test_unlock_transition() { let client = MockLlmClient::new(vec![]); let mut worker = Worker::new(client); - worker.push_item(Item::user_message("Hello")); + worker.append_history(vec![Item::user_message("Hello")]); let locked_worker = worker.lock(); // Unlock let mut worker = locked_worker.unlock(); // History operations are available again in Mutable state - worker.push_item(Item::assistant_message("Hi")); + worker.append_history(vec![Item::assistant_message("Hi")]); worker.clear_history(); assert!(worker.history().is_empty()); } @@ -310,8 +312,8 @@ async fn test_locked_prefix_len_tracking() { let mut worker = Worker::new(client); // Add items beforehand - worker.push_item(Item::user_message("Pre-existing message 1")); - worker.push_item(Item::assistant_message("Pre-existing response 1")); + worker.append_history(vec![Item::user_message("Pre-existing message 1")]); + worker.append_history(vec![Item::assistant_message("Pre-existing response 1")]); assert_eq!(worker.history().len(), 2); @@ -380,9 +382,11 @@ async fn test_unlock_edit_relock() { }), ]]); - let worker = Worker::new(client) - .with_item(Item::user_message("Hello")) - .with_item(Item::assistant_message("Hi")); + let mut worker = Worker::new(client); + worker.append_history(vec![ + Item::user_message("Hello"), + Item::assistant_message("Hi"), + ]); // Lock -> Unlock let locked = worker.lock(); @@ -392,7 +396,7 @@ async fn test_unlock_edit_relock() { // Edit history unlocked.clear_history(); - unlocked.push_item(Item::user_message("Fresh start")); + unlocked.append_history(vec![Item::user_message("Fresh start")]); // Re-lock let relocked = unlocked.lock(); diff --git a/crates/pod/src/pod.rs b/crates/pod/src/pod.rs index ec8bc52c..396bdcea 100644 --- a/crates/pod/src/pod.rs +++ b/crates/pod/src/pod.rs @@ -470,9 +470,10 @@ impl Pod { /// /// `user_message` items are skipped because they are committed /// up-front via `commit_entry(LogEntry::UserInput { segments })`. - /// `role:system` items are committed by `PodInterceptor` as typed - /// `LogEntry::SystemItem` entries before they reach the worker's - /// history (so this callback would otherwise double-write them). + /// `role:system` items are committed as typed `LogEntry::SystemItem` + /// entries by their producers (for example `PodInterceptor` and + /// interrupted-turn prep) before they reach the worker's history, so this + /// callback would otherwise double-write them. pub fn wire_history_persistence(&mut self) { let writer = self.log_writer_handle(); self.worker_mut().on_history_append(move |item| { @@ -1312,9 +1313,7 @@ impl Pod { &mut self, snapshot: EmptyTurnRollbackSnapshot, ) -> Result<(), StoreError> { - self.worker_mut() - .history_mut() - .truncate(snapshot.history_len); + self.worker_mut().truncate_history(snapshot.history_len); self.worker_mut() .set_last_run_interrupted(snapshot.last_run_interrupted); self.user_segments.truncate(snapshot.user_segments_len); @@ -1661,10 +1660,18 @@ impl Pod { &tool_result_summary, ); if !closures.is_empty() { - self.worker_mut().extend_history(closures); + self.worker_mut().append_history(closures); } + self.commit_entry(LogEntry::SystemItem { + ts: segment_log::now_millis(), + item: SystemItem::Interrupt { + body: system_note.clone(), + }, + })?; self.worker_mut() - .push_item(llm_worker::Item::system_message(system_note)); + .append_history(std::iter::once(llm_worker::Item::system_message( + system_note, + ))); Ok(()) } @@ -3748,6 +3755,102 @@ mod build_summary_prompt_tests { assert_eq!(prompt, "[User] fix the bug\n\n[Assistant] done"); } + #[derive(Clone)] + struct NoopClient; + + #[async_trait] + impl LlmClient for NoopClient { + async fn stream( + &self, + _request: llm_worker::llm_client::Request, + ) -> Result< + std::pin::Pin< + Box< + dyn futures::Stream< + Item = Result< + llm_worker::llm_client::event::Event, + llm_worker::llm_client::ClientError, + >, + > + Send, + >, + >, + llm_worker::llm_client::ClientError, + > { + Ok(Box::pin(futures::stream::empty())) + } + + fn clone_boxed(&self) -> Box { + Box::new(self.clone()) + } + } + + #[tokio::test] + async fn apply_interrupt_prep_appends_via_callback_and_logs_independent_entries() { + let dir = tempfile::tempdir().unwrap(); + let manifest = minimal_manifest_with_skills(vec![]); + let store = session_store::FsStore::new(dir.path().join("sessions")).unwrap(); + let pwd = dir.path().join("workspace"); + std::fs::create_dir_all(&pwd).unwrap(); + let scope = Scope::writable(&pwd).unwrap(); + let mut pod = Pod::new(manifest, Worker::new(NoopClient), store, pwd, scope) + .await + .unwrap(); + + pod.ensure_segment_head().unwrap(); + pod.wire_history_persistence(); + pod.worker_mut() + .set_history(vec![Item::tool_call("call-1", "Read", "{}")]); + + pod.apply_interrupt_prep().unwrap(); + + let history = pod.worker().history(); + assert_eq!(history.len(), 3); + assert!(matches!(history[1], Item::ToolResult { ref call_id, .. } if call_id == "call-1")); + assert!(matches!( + history[2], + Item::Message { + role: Role::System, + .. + } + )); + + let interrupt_note = history[2].as_text().unwrap().to_string(); + let entries = pod + .store + .read_all( + pod.segment_state.session_id(), + pod.segment_state.segment_id(), + ) + .unwrap(); + let tool_result_count = entries + .iter() + .filter(|entry| { + matches!( + entry, + LogEntry::ToolResult { + item: session_store::LoggedItem::ToolResult { call_id, .. }, + .. + } if call_id == "call-1" + ) + }) + .count(); + let interrupt_system_count = entries + .iter() + .filter(|entry| { + matches!( + entry, + LogEntry::SystemItem { + item: SystemItem::Interrupt { body }, + .. + } if body == &interrupt_note + ) + }) + .count(); + + assert_eq!(tool_result_count, 1); + assert_eq!(interrupt_system_count, 1); + } + fn minimal_manifest_with_skills(dirs: Vec) -> PodManifest { // Construct the smallest possible PodManifest that resolves; only // the `skills` field matters for `skill_dir_read_rules`. diff --git a/crates/session-store/src/system_item.rs b/crates/session-store/src/system_item.rs index c85e29a0..ed318410 100644 --- a/crates/session-store/src/system_item.rs +++ b/crates/session-store/src/system_item.rs @@ -67,6 +67,11 @@ pub enum SystemItem { /// `/` Workflow invocation. `body` is the workflow's /// prompt body materialized into the LLM context. Workflow { slug: String, body: String }, + + /// Synthetic note inserted after an interrupted turn before the next + /// user input. `body` is the exact LLM-context text explaining that the + /// previous turn was cut short. + Interrupt { body: String }, } impl SystemItem { @@ -79,6 +84,7 @@ impl SystemItem { SystemItem::FileAttachment { body, .. } => body.clone(), SystemItem::Knowledge { body, .. } => body.clone(), SystemItem::Workflow { body, .. } => body.clone(), + SystemItem::Interrupt { body } => body.clone(), } } @@ -97,6 +103,7 @@ impl SystemItem { SystemItem::FileAttachment { .. } => "file_attachment", SystemItem::Knowledge { .. } => "knowledge", SystemItem::Workflow { .. } => "workflow", + SystemItem::Interrupt { .. } => "interrupt", } } } diff --git a/crates/tui/src/app.rs b/crates/tui/src/app.rs index 3b32f84a..89705aad 100644 --- a/crates/tui/src/app.rs +++ b/crates/tui/src/app.rs @@ -1160,7 +1160,8 @@ impl App { } session_store::SystemItem::FileAttachment { body, .. } | session_store::SystemItem::Knowledge { body, .. } - | session_store::SystemItem::Workflow { body, .. } => { + | session_store::SystemItem::Workflow { body, .. } + | session_store::SystemItem::Interrupt { body } => { self.task_store.apply_system_message_text(&body); self.blocks.push(Block::SystemMessage { text: body }); }