merge: worker-history-append-contract
This commit is contained in:
commit
f1c886e451
|
|
@ -127,7 +127,7 @@ enum ToolExecutionResult {
|
|||
///
|
||||
/// // To edit between turns, unlock back to Mutable
|
||||
/// let mut worker = worker.unlock();
|
||||
/// worker.history_mut().truncate(5);
|
||||
/// worker.truncate_history(5);
|
||||
/// let out = worker.run("Continue").await?;
|
||||
/// let mut worker = out.worker;
|
||||
/// ```
|
||||
|
|
@ -400,7 +400,7 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
|
|||
}
|
||||
}
|
||||
|
||||
fn extend_history_with_callbacks(&mut self, items: impl IntoIterator<Item = Item>) {
|
||||
fn append_history_items(&mut self, items: impl IntoIterator<Item = Item>) {
|
||||
for item in items {
|
||||
self.emit_history_append(&item);
|
||||
self.history.push(item);
|
||||
|
|
@ -985,7 +985,7 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
|
|||
// get persisted by the upper layer that owns history.json.
|
||||
let pending = self.interceptor.pending_history_appends().await;
|
||||
if !pending.is_empty() {
|
||||
self.extend_history_with_callbacks(pending);
|
||||
self.append_history_items(pending);
|
||||
}
|
||||
|
||||
// Clone the history into a per-request context. Everything
|
||||
|
|
@ -1093,7 +1093,7 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
|
|||
self.turn_count += 1;
|
||||
|
||||
// Collect and commit assistant items. Routed through
|
||||
// `extend_history_with_callbacks` so observers (e.g. the
|
||||
// `append_history_items` so observers (e.g. the
|
||||
// Pod-side per-item session-log committer) see each item
|
||||
// as it lands.
|
||||
let reasoning_items = self.reasoning_item_collector.take_collected();
|
||||
|
|
@ -1101,7 +1101,7 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
|
|||
let tool_calls = self.tool_call_collector.take_collected();
|
||||
let assistant_items =
|
||||
self.build_assistant_items(&reasoning_items, &text_blocks, &tool_calls);
|
||||
self.extend_history_with_callbacks(assistant_items);
|
||||
self.append_history_items(assistant_items);
|
||||
|
||||
if tool_calls.is_empty() {
|
||||
match self.interceptor.on_turn_end(&self.history).await {
|
||||
|
|
@ -1110,7 +1110,7 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
|
|||
return Ok(WorkerResult::Finished);
|
||||
}
|
||||
TurnEndAction::ContinueWithMessages(additional) => {
|
||||
self.extend_history_with_callbacks(additional);
|
||||
self.append_history_items(additional);
|
||||
continue;
|
||||
}
|
||||
TurnEndAction::Pause => {
|
||||
|
|
@ -1227,7 +1227,7 @@ impl<C: LlmClient, S: WorkerState> Worker<C, S> {
|
|||
result.is_error,
|
||||
)
|
||||
});
|
||||
self.extend_history_with_callbacks(items);
|
||||
self.append_history_items(items);
|
||||
Ok(None)
|
||||
}
|
||||
Err(err) => {
|
||||
|
|
@ -1411,38 +1411,28 @@ impl<C: LlmClient> Worker<C, Mutable> {
|
|||
}
|
||||
}
|
||||
|
||||
/// Get a mutable reference to history
|
||||
/// Replace history during restore/rebuild without emitting append callbacks.
|
||||
///
|
||||
/// Available only in Mutable state.
|
||||
pub fn history_mut(&mut self) -> &mut Vec<Item> {
|
||||
&mut self.history
|
||||
}
|
||||
|
||||
/// Set history
|
||||
/// This is not a history-growth API. Live append paths must use
|
||||
/// [`append_history`](Self::append_history) so `on_history_append` observers
|
||||
/// see every inserted item.
|
||||
pub fn set_history(&mut self, items: Vec<Item>) {
|
||||
self.history = items;
|
||||
}
|
||||
|
||||
/// Add an item to history (builder pattern)
|
||||
pub fn with_item(mut self, item: Item) -> Self {
|
||||
self.history.push(item);
|
||||
self
|
||||
/// Append items to history and notify history-append observers for each
|
||||
/// item before it lands. This is the only public Mutable-state API for
|
||||
/// growing worker history; callers that need session-log persistence must
|
||||
/// install [`on_history_append`](Self::on_history_append) before calling it.
|
||||
pub fn append_history(&mut self, items: impl IntoIterator<Item = Item>) {
|
||||
self.append_history_items(items);
|
||||
}
|
||||
|
||||
/// Add an item to history
|
||||
pub fn push_item(&mut self, item: Item) {
|
||||
self.history.push(item);
|
||||
}
|
||||
|
||||
/// Add multiple items to history (builder pattern)
|
||||
pub fn with_items(mut self, items: impl IntoIterator<Item = Item>) -> Self {
|
||||
self.history.extend(items);
|
||||
self
|
||||
}
|
||||
|
||||
/// Add multiple items to history
|
||||
pub fn extend_history(&mut self, items: impl IntoIterator<Item = Item>) {
|
||||
self.history.extend(items);
|
||||
/// Truncate history without emitting append callbacks.
|
||||
///
|
||||
/// This is an edit operation, not a history-growth path.
|
||||
pub fn truncate_history(&mut self, len: usize) {
|
||||
self.history.truncate(len);
|
||||
}
|
||||
|
||||
/// Clear history
|
||||
|
|
@ -1575,9 +1565,9 @@ impl<C: LlmClient> Worker<C, Locked> {
|
|||
PromptAction::Continue => Vec::new(),
|
||||
PromptAction::ContinueWith(items) => items,
|
||||
};
|
||||
self.history.push(user_item);
|
||||
self.append_history_items(std::iter::once(user_item));
|
||||
if !extras.is_empty() {
|
||||
self.extend_history_with_callbacks(extras);
|
||||
self.append_history_items(extras);
|
||||
}
|
||||
let result = self.run_turn_loop().await;
|
||||
self.finalize_interruption(result).await
|
||||
|
|
|
|||
|
|
@ -5,8 +5,8 @@
|
|||
|
||||
mod common;
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use common::MockLlmClient;
|
||||
|
|
@ -44,14 +44,12 @@ fn test_mutable_history_manipulation() {
|
|||
assert!(worker.history().is_empty());
|
||||
|
||||
// Add to history
|
||||
worker.push_item(Item::user_message("Hello"));
|
||||
worker.push_item(Item::assistant_message("Hi there!"));
|
||||
worker.append_history(vec![Item::user_message("Hello")]);
|
||||
worker.append_history(vec![Item::assistant_message("Hi there!")]);
|
||||
assert_eq!(worker.history().len(), 2);
|
||||
|
||||
// Mutable access to history
|
||||
worker
|
||||
.history_mut()
|
||||
.push(Item::user_message("How are you?"));
|
||||
// Append to history via the callback-aware API.
|
||||
worker.append_history(vec![Item::user_message("How are you?")]);
|
||||
assert_eq!(worker.history().len(), 3);
|
||||
|
||||
// Clear history
|
||||
|
|
@ -71,34 +69,38 @@ fn test_mutable_history_manipulation() {
|
|||
#[test]
|
||||
fn test_mutable_builder_pattern() {
|
||||
let client = MockLlmClient::new(vec![]);
|
||||
let worker = Worker::new(client)
|
||||
.system_prompt("System prompt")
|
||||
.with_item(Item::user_message("Hello"))
|
||||
.with_item(Item::assistant_message("Hi!"))
|
||||
.with_items(vec![
|
||||
Item::user_message("How are you?"),
|
||||
Item::assistant_message("I'm fine!"),
|
||||
]);
|
||||
let worker = Worker::new(client).system_prompt("System prompt");
|
||||
|
||||
assert_eq!(worker.get_system_prompt(), Some("System prompt"));
|
||||
assert_eq!(worker.history().len(), 4);
|
||||
assert!(worker.history().is_empty());
|
||||
}
|
||||
|
||||
/// Verify that multiple items can be added with extend_history
|
||||
/// Verify that multiple items can be added with append_history and callbacks fire.
|
||||
#[test]
|
||||
fn test_mutable_extend_history() {
|
||||
fn test_mutable_append_history() {
|
||||
let client = MockLlmClient::new(vec![]);
|
||||
let observed = Arc::new(Mutex::new(Vec::new()));
|
||||
let observed_for_callback = Arc::clone(&observed);
|
||||
let mut worker = Worker::new(client);
|
||||
worker.on_history_append(move |item| {
|
||||
if let Some(text) = item.as_text() {
|
||||
observed_for_callback.lock().unwrap().push(text.to_string());
|
||||
}
|
||||
});
|
||||
|
||||
worker.push_item(Item::user_message("First"));
|
||||
worker.append_history(vec![Item::user_message("First")]);
|
||||
|
||||
worker.extend_history(vec![
|
||||
worker.append_history(vec![
|
||||
Item::assistant_message("Response 1"),
|
||||
Item::user_message("Second"),
|
||||
Item::assistant_message("Response 2"),
|
||||
]);
|
||||
|
||||
assert_eq!(worker.history().len(), 4);
|
||||
assert_eq!(
|
||||
observed.lock().unwrap().as_slice(),
|
||||
["First", "Response 1", "Second", "Response 2"]
|
||||
);
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
|
|
@ -162,8 +164,8 @@ fn test_lock_transition() {
|
|||
let mut worker = Worker::new(client);
|
||||
|
||||
worker.set_system_prompt("System");
|
||||
worker.push_item(Item::user_message("Hello"));
|
||||
worker.push_item(Item::assistant_message("Hi"));
|
||||
worker.append_history(vec![Item::user_message("Hello")]);
|
||||
worker.append_history(vec![Item::assistant_message("Hi")]);
|
||||
|
||||
// Lock
|
||||
let locked_worker = worker.lock();
|
||||
|
|
@ -180,14 +182,14 @@ fn test_unlock_transition() {
|
|||
let client = MockLlmClient::new(vec![]);
|
||||
let mut worker = Worker::new(client);
|
||||
|
||||
worker.push_item(Item::user_message("Hello"));
|
||||
worker.append_history(vec![Item::user_message("Hello")]);
|
||||
let locked_worker = worker.lock();
|
||||
|
||||
// Unlock
|
||||
let mut worker = locked_worker.unlock();
|
||||
|
||||
// History operations are available again in Mutable state
|
||||
worker.push_item(Item::assistant_message("Hi"));
|
||||
worker.append_history(vec![Item::assistant_message("Hi")]);
|
||||
worker.clear_history();
|
||||
assert!(worker.history().is_empty());
|
||||
}
|
||||
|
|
@ -310,8 +312,8 @@ async fn test_locked_prefix_len_tracking() {
|
|||
let mut worker = Worker::new(client);
|
||||
|
||||
// Add items beforehand
|
||||
worker.push_item(Item::user_message("Pre-existing message 1"));
|
||||
worker.push_item(Item::assistant_message("Pre-existing response 1"));
|
||||
worker.append_history(vec![Item::user_message("Pre-existing message 1")]);
|
||||
worker.append_history(vec![Item::assistant_message("Pre-existing response 1")]);
|
||||
|
||||
assert_eq!(worker.history().len(), 2);
|
||||
|
||||
|
|
@ -380,9 +382,11 @@ async fn test_unlock_edit_relock() {
|
|||
}),
|
||||
]]);
|
||||
|
||||
let worker = Worker::new(client)
|
||||
.with_item(Item::user_message("Hello"))
|
||||
.with_item(Item::assistant_message("Hi"));
|
||||
let mut worker = Worker::new(client);
|
||||
worker.append_history(vec![
|
||||
Item::user_message("Hello"),
|
||||
Item::assistant_message("Hi"),
|
||||
]);
|
||||
|
||||
// Lock -> Unlock
|
||||
let locked = worker.lock();
|
||||
|
|
@ -392,7 +396,7 @@ async fn test_unlock_edit_relock() {
|
|||
|
||||
// Edit history
|
||||
unlocked.clear_history();
|
||||
unlocked.push_item(Item::user_message("Fresh start"));
|
||||
unlocked.append_history(vec![Item::user_message("Fresh start")]);
|
||||
|
||||
// Re-lock
|
||||
let relocked = unlocked.lock();
|
||||
|
|
|
|||
|
|
@ -470,9 +470,10 @@ impl<C: LlmClient + Clone + 'static, St: Store + Clone + 'static> Pod<C, St> {
|
|||
///
|
||||
/// `user_message` items are skipped because they are committed
|
||||
/// up-front via `commit_entry(LogEntry::UserInput { segments })`.
|
||||
/// `role:system` items are committed by `PodInterceptor` as typed
|
||||
/// `LogEntry::SystemItem` entries before they reach the worker's
|
||||
/// history (so this callback would otherwise double-write them).
|
||||
/// `role:system` items are committed as typed `LogEntry::SystemItem`
|
||||
/// entries by their producers (for example `PodInterceptor` and
|
||||
/// interrupted-turn prep) before they reach the worker's history, so this
|
||||
/// callback would otherwise double-write them.
|
||||
pub fn wire_history_persistence(&mut self) {
|
||||
let writer = self.log_writer_handle();
|
||||
self.worker_mut().on_history_append(move |item| {
|
||||
|
|
@ -1312,9 +1313,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
&mut self,
|
||||
snapshot: EmptyTurnRollbackSnapshot,
|
||||
) -> Result<(), StoreError> {
|
||||
self.worker_mut()
|
||||
.history_mut()
|
||||
.truncate(snapshot.history_len);
|
||||
self.worker_mut().truncate_history(snapshot.history_len);
|
||||
self.worker_mut()
|
||||
.set_last_run_interrupted(snapshot.last_run_interrupted);
|
||||
self.user_segments.truncate(snapshot.user_segments_len);
|
||||
|
|
@ -1661,10 +1660,18 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
|||
&tool_result_summary,
|
||||
);
|
||||
if !closures.is_empty() {
|
||||
self.worker_mut().extend_history(closures);
|
||||
self.worker_mut().append_history(closures);
|
||||
}
|
||||
self.commit_entry(LogEntry::SystemItem {
|
||||
ts: segment_log::now_millis(),
|
||||
item: SystemItem::Interrupt {
|
||||
body: system_note.clone(),
|
||||
},
|
||||
})?;
|
||||
self.worker_mut()
|
||||
.push_item(llm_worker::Item::system_message(system_note));
|
||||
.append_history(std::iter::once(llm_worker::Item::system_message(
|
||||
system_note,
|
||||
)));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
@ -3748,6 +3755,102 @@ mod build_summary_prompt_tests {
|
|||
assert_eq!(prompt, "[User] fix the bug\n\n[Assistant] done");
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct NoopClient;
|
||||
|
||||
#[async_trait]
|
||||
impl LlmClient for NoopClient {
|
||||
async fn stream(
|
||||
&self,
|
||||
_request: llm_worker::llm_client::Request,
|
||||
) -> Result<
|
||||
std::pin::Pin<
|
||||
Box<
|
||||
dyn futures::Stream<
|
||||
Item = Result<
|
||||
llm_worker::llm_client::event::Event,
|
||||
llm_worker::llm_client::ClientError,
|
||||
>,
|
||||
> + Send,
|
||||
>,
|
||||
>,
|
||||
llm_worker::llm_client::ClientError,
|
||||
> {
|
||||
Ok(Box::pin(futures::stream::empty()))
|
||||
}
|
||||
|
||||
fn clone_boxed(&self) -> Box<dyn LlmClient> {
|
||||
Box::new(self.clone())
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn apply_interrupt_prep_appends_via_callback_and_logs_independent_entries() {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let manifest = minimal_manifest_with_skills(vec![]);
|
||||
let store = session_store::FsStore::new(dir.path().join("sessions")).unwrap();
|
||||
let pwd = dir.path().join("workspace");
|
||||
std::fs::create_dir_all(&pwd).unwrap();
|
||||
let scope = Scope::writable(&pwd).unwrap();
|
||||
let mut pod = Pod::new(manifest, Worker::new(NoopClient), store, pwd, scope)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
pod.ensure_segment_head().unwrap();
|
||||
pod.wire_history_persistence();
|
||||
pod.worker_mut()
|
||||
.set_history(vec![Item::tool_call("call-1", "Read", "{}")]);
|
||||
|
||||
pod.apply_interrupt_prep().unwrap();
|
||||
|
||||
let history = pod.worker().history();
|
||||
assert_eq!(history.len(), 3);
|
||||
assert!(matches!(history[1], Item::ToolResult { ref call_id, .. } if call_id == "call-1"));
|
||||
assert!(matches!(
|
||||
history[2],
|
||||
Item::Message {
|
||||
role: Role::System,
|
||||
..
|
||||
}
|
||||
));
|
||||
|
||||
let interrupt_note = history[2].as_text().unwrap().to_string();
|
||||
let entries = pod
|
||||
.store
|
||||
.read_all(
|
||||
pod.segment_state.session_id(),
|
||||
pod.segment_state.segment_id(),
|
||||
)
|
||||
.unwrap();
|
||||
let tool_result_count = entries
|
||||
.iter()
|
||||
.filter(|entry| {
|
||||
matches!(
|
||||
entry,
|
||||
LogEntry::ToolResult {
|
||||
item: session_store::LoggedItem::ToolResult { call_id, .. },
|
||||
..
|
||||
} if call_id == "call-1"
|
||||
)
|
||||
})
|
||||
.count();
|
||||
let interrupt_system_count = entries
|
||||
.iter()
|
||||
.filter(|entry| {
|
||||
matches!(
|
||||
entry,
|
||||
LogEntry::SystemItem {
|
||||
item: SystemItem::Interrupt { body },
|
||||
..
|
||||
} if body == &interrupt_note
|
||||
)
|
||||
})
|
||||
.count();
|
||||
|
||||
assert_eq!(tool_result_count, 1);
|
||||
assert_eq!(interrupt_system_count, 1);
|
||||
}
|
||||
|
||||
fn minimal_manifest_with_skills(dirs: Vec<PathBuf>) -> PodManifest {
|
||||
// Construct the smallest possible PodManifest that resolves; only
|
||||
// the `skills` field matters for `skill_dir_read_rules`.
|
||||
|
|
|
|||
|
|
@ -67,6 +67,11 @@ pub enum SystemItem {
|
|||
/// `/<slug>` Workflow invocation. `body` is the workflow's
|
||||
/// prompt body materialized into the LLM context.
|
||||
Workflow { slug: String, body: String },
|
||||
|
||||
/// Synthetic note inserted after an interrupted turn before the next
|
||||
/// user input. `body` is the exact LLM-context text explaining that the
|
||||
/// previous turn was cut short.
|
||||
Interrupt { body: String },
|
||||
}
|
||||
|
||||
impl SystemItem {
|
||||
|
|
@ -79,6 +84,7 @@ impl SystemItem {
|
|||
SystemItem::FileAttachment { body, .. } => body.clone(),
|
||||
SystemItem::Knowledge { body, .. } => body.clone(),
|
||||
SystemItem::Workflow { body, .. } => body.clone(),
|
||||
SystemItem::Interrupt { body } => body.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -97,6 +103,7 @@ impl SystemItem {
|
|||
SystemItem::FileAttachment { .. } => "file_attachment",
|
||||
SystemItem::Knowledge { .. } => "knowledge",
|
||||
SystemItem::Workflow { .. } => "workflow",
|
||||
SystemItem::Interrupt { .. } => "interrupt",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1160,7 +1160,8 @@ impl App {
|
|||
}
|
||||
session_store::SystemItem::FileAttachment { body, .. }
|
||||
| session_store::SystemItem::Knowledge { body, .. }
|
||||
| session_store::SystemItem::Workflow { body, .. } => {
|
||||
| session_store::SystemItem::Workflow { body, .. }
|
||||
| session_store::SystemItem::Interrupt { body } => {
|
||||
self.task_store.apply_system_message_text(&body);
|
||||
self.blocks.push(Block::SystemMessage { text: body });
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user