fix: route worker history appends through callbacks

This commit is contained in:
Keisuke Hirata 2026-05-24 06:44:19 +09:00
parent b64e098b5b
commit 65c399e6d9
5 changed files with 178 additions and 73 deletions

View File

@ -127,7 +127,7 @@ enum ToolExecutionResult {
///
/// // To edit between turns, unlock back to Mutable
/// let mut worker = worker.unlock();
/// worker.history_mut().truncate(5);
/// worker.truncate_history(5);
/// let out = worker.run("Continue").await?;
/// let mut worker = out.worker;
/// ```
@ -400,7 +400,7 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
}
}
fn extend_history_with_callbacks(&mut self, items: impl IntoIterator<Item = Item>) {
fn append_history_items(&mut self, items: impl IntoIterator<Item = Item>) {
for item in items {
self.emit_history_append(&item);
self.history.push(item);
@ -985,7 +985,7 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
// get persisted by the upper layer that owns history.json.
let pending = self.interceptor.pending_history_appends().await;
if !pending.is_empty() {
self.extend_history_with_callbacks(pending);
self.append_history_items(pending);
}
// Clone the history into a per-request context. Everything
@ -1093,7 +1093,7 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
self.turn_count += 1;
// Collect and commit assistant items. Routed through
// `extend_history_with_callbacks` so observers (e.g. the
// `append_history_items` so observers (e.g. the
// Pod-side per-item session-log committer) see each item
// as it lands.
let reasoning_items = self.reasoning_item_collector.take_collected();
@ -1101,7 +1101,7 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
let tool_calls = self.tool_call_collector.take_collected();
let assistant_items =
self.build_assistant_items(&reasoning_items, &text_blocks, &tool_calls);
self.extend_history_with_callbacks(assistant_items);
self.append_history_items(assistant_items);
if tool_calls.is_empty() {
match self.interceptor.on_turn_end(&self.history).await {
@ -1110,7 +1110,7 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
return Ok(WorkerResult::Finished);
}
TurnEndAction::ContinueWithMessages(additional) => {
self.extend_history_with_callbacks(additional);
self.append_history_items(additional);
continue;
}
TurnEndAction::Pause => {
@ -1227,7 +1227,7 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
result.is_error,
)
});
self.extend_history_with_callbacks(items);
self.append_history_items(items);
Ok(None)
}
Err(err) => {
@ -1411,38 +1411,28 @@ impl<C: LlmClient> Worker<C, Mutable> {
}
}
/// Get a mutable reference to history
/// Replace history during restore/rebuild without emitting append callbacks.
///
/// Available only in Mutable state.
pub fn history_mut(&mut self) -> &mut Vec<Item> {
&mut self.history
}
/// Set history
/// This is not a history-growth API. Live append paths must use
/// [`append_history`](Self::append_history) so `on_history_append` observers
/// see every inserted item.
pub fn set_history(&mut self, items: Vec<Item>) {
self.history = items;
}
/// Add an item to history (builder pattern)
pub fn with_item(mut self, item: Item) -> Self {
self.history.push(item);
self
/// Append items to history and notify history-append observers for each
/// item before it lands. This is the only public Mutable-state API for
/// growing worker history; callers that need session-log persistence must
/// install [`on_history_append`](Self::on_history_append) before calling it.
pub fn append_history(&mut self, items: impl IntoIterator<Item = Item>) {
self.append_history_items(items);
}
/// Add an item to history
pub fn push_item(&mut self, item: Item) {
self.history.push(item);
}
/// Add multiple items to history (builder pattern)
pub fn with_items(mut self, items: impl IntoIterator<Item = Item>) -> Self {
self.history.extend(items);
self
}
/// Add multiple items to history
pub fn extend_history(&mut self, items: impl IntoIterator<Item = Item>) {
self.history.extend(items);
/// Truncate history without emitting append callbacks.
///
/// This is an edit operation, not a history-growth path.
pub fn truncate_history(&mut self, len: usize) {
self.history.truncate(len);
}
/// Clear history
@ -1575,9 +1565,9 @@ impl<C: LlmClient> Worker<C, Locked> {
PromptAction::Continue => Vec::new(),
PromptAction::ContinueWith(items) => items,
};
self.history.push(user_item);
self.append_history_items(std::iter::once(user_item));
if !extras.is_empty() {
self.extend_history_with_callbacks(extras);
self.append_history_items(extras);
}
let result = self.run_turn_loop().await;
self.finalize_interruption(result).await

View File

@ -5,8 +5,8 @@
mod common;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use async_trait::async_trait;
use common::MockLlmClient;
@ -44,14 +44,12 @@ fn test_mutable_history_manipulation() {
assert!(worker.history().is_empty());
// Add to history
worker.push_item(Item::user_message("Hello"));
worker.push_item(Item::assistant_message("Hi there!"));
worker.append_history(vec![Item::user_message("Hello")]);
worker.append_history(vec![Item::assistant_message("Hi there!")]);
assert_eq!(worker.history().len(), 2);
// Mutable access to history
worker
.history_mut()
.push(Item::user_message("How are you?"));
// Append to history via the callback-aware API.
worker.append_history(vec![Item::user_message("How are you?")]);
assert_eq!(worker.history().len(), 3);
// Clear history
@ -71,34 +69,38 @@ fn test_mutable_history_manipulation() {
#[test]
fn test_mutable_builder_pattern() {
let client = MockLlmClient::new(vec![]);
let worker = Worker::new(client)
.system_prompt("System prompt")
.with_item(Item::user_message("Hello"))
.with_item(Item::assistant_message("Hi!"))
.with_items(vec![
Item::user_message("How are you?"),
Item::assistant_message("I'm fine!"),
]);
let worker = Worker::new(client).system_prompt("System prompt");
assert_eq!(worker.get_system_prompt(), Some("System prompt"));
assert_eq!(worker.history().len(), 4);
assert!(worker.history().is_empty());
}
/// Verify that multiple items can be added with extend_history
/// Verify that multiple items can be added with append_history and callbacks fire.
#[test]
fn test_mutable_extend_history() {
fn test_mutable_append_history() {
let client = MockLlmClient::new(vec![]);
let observed = Arc::new(Mutex::new(Vec::new()));
let observed_for_callback = Arc::clone(&observed);
let mut worker = Worker::new(client);
worker.on_history_append(move |item| {
if let Some(text) = item.as_text() {
observed_for_callback.lock().unwrap().push(text.to_string());
}
});
worker.push_item(Item::user_message("First"));
worker.append_history(vec![Item::user_message("First")]);
worker.extend_history(vec![
worker.append_history(vec![
Item::assistant_message("Response 1"),
Item::user_message("Second"),
Item::assistant_message("Response 2"),
]);
assert_eq!(worker.history().len(), 4);
assert_eq!(
observed.lock().unwrap().as_slice(),
["First", "Response 1", "Second", "Response 2"]
);
}
#[derive(Clone)]
@ -162,8 +164,8 @@ fn test_lock_transition() {
let mut worker = Worker::new(client);
worker.set_system_prompt("System");
worker.push_item(Item::user_message("Hello"));
worker.push_item(Item::assistant_message("Hi"));
worker.append_history(vec![Item::user_message("Hello")]);
worker.append_history(vec![Item::assistant_message("Hi")]);
// Lock
let locked_worker = worker.lock();
@ -180,14 +182,14 @@ fn test_unlock_transition() {
let client = MockLlmClient::new(vec![]);
let mut worker = Worker::new(client);
worker.push_item(Item::user_message("Hello"));
worker.append_history(vec![Item::user_message("Hello")]);
let locked_worker = worker.lock();
// Unlock
let mut worker = locked_worker.unlock();
// History operations are available again in Mutable state
worker.push_item(Item::assistant_message("Hi"));
worker.append_history(vec![Item::assistant_message("Hi")]);
worker.clear_history();
assert!(worker.history().is_empty());
}
@ -310,8 +312,8 @@ async fn test_locked_prefix_len_tracking() {
let mut worker = Worker::new(client);
// Add items beforehand
worker.push_item(Item::user_message("Pre-existing message 1"));
worker.push_item(Item::assistant_message("Pre-existing response 1"));
worker.append_history(vec![Item::user_message("Pre-existing message 1")]);
worker.append_history(vec![Item::assistant_message("Pre-existing response 1")]);
assert_eq!(worker.history().len(), 2);
@ -380,9 +382,11 @@ async fn test_unlock_edit_relock() {
}),
]]);
let worker = Worker::new(client)
.with_item(Item::user_message("Hello"))
.with_item(Item::assistant_message("Hi"));
let mut worker = Worker::new(client);
worker.append_history(vec![
Item::user_message("Hello"),
Item::assistant_message("Hi"),
]);
// Lock -> Unlock
let locked = worker.lock();
@ -392,7 +396,7 @@ async fn test_unlock_edit_relock() {
// Edit history
unlocked.clear_history();
unlocked.push_item(Item::user_message("Fresh start"));
unlocked.append_history(vec![Item::user_message("Fresh start")]);
// Re-lock
let relocked = unlocked.lock();

View File

@ -470,9 +470,10 @@ impl<C: LlmClient + Clone + 'static, St: Store + Clone + 'static> Pod<C, St> {
///
/// `user_message` items are skipped because they are committed
/// up-front via `commit_entry(LogEntry::UserInput { segments })`.
/// `role:system` items are committed by `PodInterceptor` as typed
/// `LogEntry::SystemItem` entries before they reach the worker's
/// history (so this callback would otherwise double-write them).
/// `role:system` items are committed as typed `LogEntry::SystemItem`
/// entries by their producers (for example `PodInterceptor` and
/// interrupted-turn prep) before they reach the worker's history, so this
/// callback would otherwise double-write them.
pub fn wire_history_persistence(&mut self) {
let writer = self.log_writer_handle();
self.worker_mut().on_history_append(move |item| {
@ -1312,9 +1313,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
&mut self,
snapshot: EmptyTurnRollbackSnapshot,
) -> Result<(), StoreError> {
self.worker_mut()
.history_mut()
.truncate(snapshot.history_len);
self.worker_mut().truncate_history(snapshot.history_len);
self.worker_mut()
.set_last_run_interrupted(snapshot.last_run_interrupted);
self.user_segments.truncate(snapshot.user_segments_len);
@ -1661,10 +1660,18 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
&tool_result_summary,
);
if !closures.is_empty() {
self.worker_mut().extend_history(closures);
self.worker_mut().append_history(closures);
}
self.commit_entry(LogEntry::SystemItem {
ts: segment_log::now_millis(),
item: SystemItem::Interrupt {
body: system_note.clone(),
},
})?;
self.worker_mut()
.push_item(llm_worker::Item::system_message(system_note));
.append_history(std::iter::once(llm_worker::Item::system_message(
system_note,
)));
Ok(())
}
@ -3748,6 +3755,102 @@ mod build_summary_prompt_tests {
assert_eq!(prompt, "[User] fix the bug\n\n[Assistant] done");
}
#[derive(Clone)]
struct NoopClient;
#[async_trait]
impl LlmClient for NoopClient {
async fn stream(
&self,
_request: llm_worker::llm_client::Request,
) -> Result<
std::pin::Pin<
Box<
dyn futures::Stream<
Item = Result<
llm_worker::llm_client::event::Event,
llm_worker::llm_client::ClientError,
>,
> + Send,
>,
>,
llm_worker::llm_client::ClientError,
> {
Ok(Box::pin(futures::stream::empty()))
}
fn clone_boxed(&self) -> Box<dyn LlmClient> {
Box::new(self.clone())
}
}
#[tokio::test]
async fn apply_interrupt_prep_appends_via_callback_and_logs_independent_entries() {
let dir = tempfile::tempdir().unwrap();
let manifest = minimal_manifest_with_skills(vec![]);
let store = session_store::FsStore::new(dir.path().join("sessions")).unwrap();
let pwd = dir.path().join("workspace");
std::fs::create_dir_all(&pwd).unwrap();
let scope = Scope::writable(&pwd).unwrap();
let mut pod = Pod::new(manifest, Worker::new(NoopClient), store, pwd, scope)
.await
.unwrap();
pod.ensure_segment_head().unwrap();
pod.wire_history_persistence();
pod.worker_mut()
.set_history(vec![Item::tool_call("call-1", "Read", "{}")]);
pod.apply_interrupt_prep().unwrap();
let history = pod.worker().history();
assert_eq!(history.len(), 3);
assert!(matches!(history[1], Item::ToolResult { ref call_id, .. } if call_id == "call-1"));
assert!(matches!(
history[2],
Item::Message {
role: Role::System,
..
}
));
let interrupt_note = history[2].as_text().unwrap().to_string();
let entries = pod
.store
.read_all(
pod.segment_state.session_id(),
pod.segment_state.segment_id(),
)
.unwrap();
let tool_result_count = entries
.iter()
.filter(|entry| {
matches!(
entry,
LogEntry::ToolResult {
item: session_store::LoggedItem::ToolResult { call_id, .. },
..
} if call_id == "call-1"
)
})
.count();
let interrupt_system_count = entries
.iter()
.filter(|entry| {
matches!(
entry,
LogEntry::SystemItem {
item: SystemItem::Interrupt { body },
..
} if body == &interrupt_note
)
})
.count();
assert_eq!(tool_result_count, 1);
assert_eq!(interrupt_system_count, 1);
}
fn minimal_manifest_with_skills(dirs: Vec<PathBuf>) -> PodManifest {
// Construct the smallest possible PodManifest that resolves; only
// the `skills` field matters for `skill_dir_read_rules`.

View File

@ -67,6 +67,11 @@ pub enum SystemItem {
/// `/<slug>` Workflow invocation. `body` is the workflow's
/// prompt body materialized into the LLM context.
Workflow { slug: String, body: String },
/// Synthetic note inserted after an interrupted turn before the next
/// user input. `body` is the exact LLM-context text explaining that the
/// previous turn was cut short.
Interrupt { body: String },
}
impl SystemItem {
@ -79,6 +84,7 @@ impl SystemItem {
SystemItem::FileAttachment { body, .. } => body.clone(),
SystemItem::Knowledge { body, .. } => body.clone(),
SystemItem::Workflow { body, .. } => body.clone(),
SystemItem::Interrupt { body } => body.clone(),
}
}
@ -97,6 +103,7 @@ impl SystemItem {
SystemItem::FileAttachment { .. } => "file_attachment",
SystemItem::Knowledge { .. } => "knowledge",
SystemItem::Workflow { .. } => "workflow",
SystemItem::Interrupt { .. } => "interrupt",
}
}
}

View File

@ -1160,7 +1160,8 @@ impl App {
}
session_store::SystemItem::FileAttachment { body, .. }
| session_store::SystemItem::Knowledge { body, .. }
| session_store::SystemItem::Workflow { body, .. } => {
| session_store::SystemItem::Workflow { body, .. }
| session_store::SystemItem::Interrupt { body } => {
self.task_store.apply_system_message_text(&body);
self.blocks.push(Block::SystemMessage { text: body });
}