From ff446052c7a72f669a65832bb865387c11ad1c01 Mon Sep 17 00:00:00 2001 From: Hare Date: Mon, 15 Jun 2026 01:16:54 +0900 Subject: [PATCH] fix: gate active workflow rehydration state --- crates/pod/src/active_workflow.rs | 262 ++++++++++++++++++++++++------ crates/pod/src/ipc/interceptor.rs | 37 ++++- crates/pod/src/pod.rs | 64 ++++---- 3 files changed, 274 insertions(+), 89 deletions(-) diff --git a/crates/pod/src/active_workflow.rs b/crates/pod/src/active_workflow.rs index 07cda103..b682463c 100644 --- a/crates/pod/src/active_workflow.rs +++ b/crates/pod/src/active_workflow.rs @@ -17,8 +17,12 @@ use serde_json::json; use session_store::{LogEntry, SystemItem, segment_log}; pub const DOMAIN: &str = "pod.active_workflows"; +pub const REHYDRATION_MESSAGE_PREFIX: &str = "[Active workflow snapshot]"; +pub const INACTIVE_MESSAGE_PREFIX: &str = "[Active workflow state]"; const SCHEMA_VERSION: u32 = 1; +pub type LogEntryCommitter = Arc; + #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct ActiveWorkflowSnapshot { pub schema_version: u32, @@ -221,6 +225,16 @@ impl ActiveWorkflowStore { (!active.is_empty()).then(|| render_rehydration_message(&active)) } + pub fn sanitize_context(&self, context: &mut Vec) -> usize { + let removed = strip_rehydration_messages(context); + if let Some(message) = self.rehydration_message() { + context.push(Item::system_message(message)); + } else if removed > 0 || context.iter().any(has_active_workflow_hint) { + context.push(Item::system_message(inactive_workflow_message())); + } + removed + } + pub fn extension_entry(&self) -> LogEntry { LogEntry::Extension { ts: segment_log::now_millis(), @@ -232,14 +246,13 @@ impl ActiveWorkflowStore { pub fn restore_from_history_and_extensions( &self, - history: &[Item], + _history: &[Item], extensions: &[(String, serde_json::Value)], ) { - let (mut snapshot, diagnostics) = fold_extensions(extensions); + let (snapshot, diagnostics) = fold_extensions(extensions); for diagnostic in diagnostics { tracing::warn!(diagnostic, "failed to restore active workflow state"); } - replay_history_tools(&mut snapshot, history); self.replace_with(snapshot); } } @@ -271,49 +284,61 @@ pub fn fold_extensions( (latest.unwrap_or_default(), diagnostics) } -fn replay_history_tools(snapshot: &mut ActiveWorkflowSnapshot, history: &[Item]) { - for item in history { - let Item::ToolCall { - name, arguments, .. - } = item - else { - continue; - }; - let status = match name.as_str() { - "ActiveWorkflowComplete" => ActiveWorkflowStatus::Completed, - "ActiveWorkflowCancel" => ActiveWorkflowStatus::Cancelled, - _ => continue, - }; - if let Ok(params) = serde_json::from_str::(arguments) { - if let Some(record) = snapshot - .workflows - .iter_mut() - .find(|record| record.slug == params.slug) - { - let reason = params.reason.unwrap_or_else(|| status.to_string()); - record.status = status; - record.updated_at_ms = record.updated_at_ms.saturating_add(1); - record.completion = Some(WorkflowCompletionInfo { - completed_at_ms: record.updated_at_ms, - reason, - }); - for checkpoint in &mut record.checkpoints { - checkpoint.status = match status { - ActiveWorkflowStatus::Active => WorkflowCheckpointStatus::Open, - ActiveWorkflowStatus::Completed => WorkflowCheckpointStatus::Done, - ActiveWorkflowStatus::Cancelled => WorkflowCheckpointStatus::Cancelled, - }; - } - } - } +pub fn strip_rehydration_messages(items: &mut Vec) -> usize { + let before = items.len(); + items.retain(|item| !is_rehydration_message(item)); + before - items.len() +} + +pub fn is_rehydration_message(item: &Item) -> bool { + item_system_text(item) + .map(|text| text.trim_start().starts_with(REHYDRATION_MESSAGE_PREFIX)) + .unwrap_or(false) +} + +fn has_active_workflow_hint(item: &Item) -> bool { + item_system_text(item) + .map(|text| { + text.contains("Active Workflow Invocation State") + || text.contains("ActiveWorkflowStore:") + || text.contains(REHYDRATION_MESSAGE_PREFIX) + }) + .unwrap_or(false) +} + +fn item_system_text(item: &Item) -> Option { + match item { + Item::Message { role, content, .. } if *role == llm_worker::Role::System => Some( + content + .iter() + .map(|part| part.as_text()) + .collect::(), + ), + _ => None, } } -pub fn active_workflow_tools(store: ActiveWorkflowStore) -> Vec { +fn inactive_workflow_message() -> String { + format!( + "{INACTIVE_MESSAGE_PREFIX}\n\n\ + No currently valid active workflow invocation state is active. Ignore older compacted \ + history or summaries that appear to describe active workflow obligations; only validated \ + typed `{DOMAIN}` records with status `active` establish active workflow guidance." + ) +} + +pub fn active_workflow_tools( + store: ActiveWorkflowStore, + committer: Option, +) -> Vec { vec![ list_tool(store.clone()), - status_tool(store.clone(), ActiveWorkflowStatus::Completed), - status_tool(store, ActiveWorkflowStatus::Cancelled), + status_tool( + store.clone(), + ActiveWorkflowStatus::Completed, + committer.clone(), + ), + status_tool(store, ActiveWorkflowStatus::Cancelled, committer), ] } @@ -332,7 +357,11 @@ fn list_tool(store: ActiveWorkflowStore) -> ToolDefinition { }) } -fn status_tool(store: ActiveWorkflowStore, status: ActiveWorkflowStatus) -> ToolDefinition { +fn status_tool( + store: ActiveWorkflowStore, + status: ActiveWorkflowStatus, + committer: Option, +) -> ToolDefinition { let name = match status { ActiveWorkflowStatus::Completed => "ActiveWorkflowComplete", ActiveWorkflowStatus::Cancelled => "ActiveWorkflowCancel", @@ -348,6 +377,7 @@ fn status_tool(store: ActiveWorkflowStore, status: ActiveWorkflowStatus) -> Tool ActiveWorkflowStatus::Active => unreachable!("active status tool is not exposed"), }; let store_for_tool = store.clone(); + let committer_for_tool = committer.clone(); Arc::new(move || { ( ToolMeta::new(name) @@ -364,6 +394,7 @@ fn status_tool(store: ActiveWorkflowStore, status: ActiveWorkflowStatus) -> Tool Arc::new(ActiveWorkflowStatusTool { store: store_for_tool.clone(), status, + committer: committer_for_tool.clone(), }) as Arc, ) }) @@ -401,6 +432,7 @@ impl Tool for ActiveWorkflowListTool { struct ActiveWorkflowStatusTool { store: ActiveWorkflowStore, status: ActiveWorkflowStatus, + committer: Option, } #[async_trait] @@ -417,6 +449,9 @@ impl Tool for ActiveWorkflowStatusTool { .store .set_status(¶ms.slug, self.status, reason, segment_log::now_millis()) .map_err(ToolError::InvalidArgument)?; + if let Some(committer) = &self.committer { + committer(self.store.extension_entry()); + } let content = serde_json::to_string_pretty(&record) .map_err(|err| ToolError::Internal(err.to_string()))?; Ok(ToolOutput { @@ -490,12 +525,12 @@ fn render_snapshot_text(records: &[ActiveWorkflowRecord]) -> String { } fn render_rehydration_message(records: &[ActiveWorkflowRecord]) -> String { - let mut out = String::from( - "[Active workflow snapshot]\n\n\ + let mut out = format!( + "{REHYDRATION_MESSAGE_PREFIX}\n\n\ The following workflow invocation state is durable state carried across compaction. \ Continue to follow each active workflow's snapshotted guidance until the governed task \ is completed with ActiveWorkflowComplete or explicitly cancelled with ActiveWorkflowCancel. \ - Missing or obsolete workflow resources must not replace these invocation snapshots.\n", + Missing or obsolete workflow resources must not replace these invocation snapshots.\n" ); for record in records { out.push_str(&format!( @@ -547,19 +582,29 @@ fn truncate_chars(text: &str, max_chars: usize) -> String { mod tests { use super::*; - #[test] - fn active_workflow_guidance_carries_merge_close_obligations() { + fn store_with_active_workflow() -> ActiveWorkflowStore { let store = ActiveWorkflowStore::new(); - let items = vec![SystemItem::Workflow { - slug: "multi-agent-workflow".into(), - body: "# Multi-agent workflow\n- Delegate implementation to coder.\n- Require external review before merge.\n- Close the Ticket after merge and report evidence.\n".into(), - }]; - assert!(store.activate_from_system_items( - &items, + &[SystemItem::Workflow { + slug: "multi-agent-workflow".into(), + body: "# Multi-agent workflow\n- Delegate implementation to coder.\n- Require external review before merge.\n- Close the Ticket after merge and report evidence.\n".into(), + }], "/multi-agent-workflow implement ticket".into(), 42, )); + store + } + + fn active_extension(store: &ActiveWorkflowStore) -> (String, serde_json::Value) { + ( + DOMAIN.to_string(), + serde_json::to_value(store.snapshot()).expect("snapshot json"), + ) + } + + #[test] + fn active_workflow_guidance_carries_merge_close_obligations() { + let store = store_with_active_workflow(); let msg = store.rehydration_message().unwrap(); assert!(msg.contains("multi-agent-workflow")); @@ -568,6 +613,117 @@ mod tests { assert!(msg.contains("Snapshotted workflow guidance")); } + #[test] + fn compacted_rehydration_message_is_removed_when_typed_state_missing_or_invalid() { + for extensions in [ + Vec::new(), + vec![(DOMAIN.to_string(), json!({"schema_version":"bad"}))], + vec![( + DOMAIN.to_string(), + json!({"schema_version":999,"workflows":[]}), + )], + ] { + let original = store_with_active_workflow(); + let stale_message = original.rehydration_message().unwrap(); + let mut context = vec![ + Item::system_message(stale_message), + Item::user_message("continue"), + ]; + let restored = ActiveWorkflowStore::new(); + + restored.restore_from_history_and_extensions(&context, &extensions); + let removed = restored.sanitize_context(&mut context); + + assert_eq!(removed, 1); + assert!(restored.active_records().is_empty()); + assert!(!context.iter().any(is_rehydration_message)); + } + } + + #[test] + fn completion_or_cancellation_suppresses_old_compacted_guidance() { + for status in [ + ActiveWorkflowStatus::Completed, + ActiveWorkflowStatus::Cancelled, + ] { + let store = store_with_active_workflow(); + let stale_message = store.rehydration_message().unwrap(); + let mut context = vec![ + Item::system_message(stale_message), + Item::user_message("continue"), + ]; + + store + .set_status("multi-agent-workflow", status, status.to_string(), 84) + .expect("workflow exists"); + let removed = store.sanitize_context(&mut context); + + assert_eq!(removed, 1); + assert!(!context.iter().any(is_rehydration_message)); + } + } + + #[test] + fn unmatched_status_tool_calls_do_not_mutate_restored_state() { + let store = store_with_active_workflow(); + let extensions = vec![active_extension(&store)]; + let history = vec![ + Item::tool_call( + "call-1", + "ActiveWorkflowCancel", + json!({"slug":"multi-agent-workflow","reason":"not durable"}).to_string(), + ), + Item::tool_result_error("call-1", "error: failed"), + ]; + let restored = ActiveWorkflowStore::new(); + + restored.restore_from_history_and_extensions(&history, &extensions); + + assert_eq!(restored.active_records().len(), 1); + assert_eq!( + restored.snapshot().workflows[0].status, + ActiveWorkflowStatus::Active + ); + } + + #[tokio::test] + async fn status_tool_persists_typed_extension_on_success() { + let store = store_with_active_workflow(); + let committed = Arc::new(Mutex::new(Vec::::new())); + let committed_for_tool = committed.clone(); + let tools = active_workflow_tools( + store.clone(), + Some(Arc::new(move |entry| { + committed_for_tool + .lock() + .expect("committed entries mutex poisoned") + .push(entry); + })), + ); + let (_, tool) = tools[1](); + + tool.execute( + &json!({"slug":"multi-agent-workflow","reason":"review complete"}).to_string(), + ToolExecutionContext::default(), + ) + .await + .expect("status tool succeeds"); + + let committed = committed.lock().expect("committed entries mutex poisoned"); + let LogEntry::Extension { + domain, payload, .. + } = committed.last().expect("extension committed") + else { + panic!("expected typed active workflow extension"); + }; + assert_eq!(domain, DOMAIN); + let snapshot: ActiveWorkflowSnapshot = serde_json::from_value(payload.clone()).unwrap(); + assert_eq!( + snapshot.workflows[0].status, + ActiveWorkflowStatus::Completed + ); + } + #[test] fn corrupt_extension_fails_closed_with_diagnostic() { let entries = vec![(DOMAIN.to_string(), json!({"schema_version":"bad"}))]; diff --git a/crates/pod/src/ipc/interceptor.rs b/crates/pod/src/ipc/interceptor.rs index 41f36e61..d8a296a3 100644 --- a/crates/pod/src/ipc/interceptor.rs +++ b/crates/pod/src/ipc/interceptor.rs @@ -22,6 +22,7 @@ use llm_worker::tool::ToolOutput; use tracing::info; use tracing::warn; +use crate::active_workflow::ActiveWorkflowStore; use crate::compact::state::CompactState; use crate::compact::usage_tracker::UsageTracker; use session_store::SystemItem; @@ -71,6 +72,10 @@ pub(crate) struct PodInterceptor { /// worker. `None` in tests / `Pod::new` paths where no writer is /// attached. log_writer: Option>, + /// Active workflow state is durable typed Pod state. The interceptor + /// regenerates request-local workflow guidance from this store and strips + /// any stale compacted-history copies before each model request. + active_workflows: ActiveWorkflowStore, /// Next turn index assigned by `on_prompt_submit`. next_turn_index: AtomicUsize, /// Tool calls observed in the current turn (reset on each new prompt). @@ -86,6 +91,7 @@ impl PodInterceptor { pending_attachments: Arc>>, prompts: Arc, log_writer: Option>, + active_workflows: ActiveWorkflowStore, ) -> Self { Self { registry, @@ -96,6 +102,7 @@ impl PodInterceptor { pending_attachments, prompts, log_writer, + active_workflows, next_turn_index: AtomicUsize::new(0), tool_calls_this_turn: AtomicUsize::new(0), } @@ -234,6 +241,8 @@ impl Interceptor for PodInterceptor { } async fn pre_llm_request(&self, context: &mut Vec) -> PreRequestAction { + self.active_workflows.sanitize_context(context); + let initial_tokens = self.estimated_tokens(context); if self.request_threshold_exceeded(initial_tokens, context) { return PreRequestAction::Yield; @@ -449,11 +458,13 @@ mod tests { } impl SystemItemCommitter for RecordingSystemItemCommitter { - fn commit_system_item(&self, item: SystemItem) { - self.committed - .lock() - .expect("committed system-item list poisoned") - .push(item); + fn commit_log_entry(&self, entry: session_store::LogEntry) { + if let session_store::LogEntry::SystemItem { item, .. } = entry { + self.committed + .lock() + .expect("committed system-item list poisoned") + .push(item); + } } } @@ -525,6 +536,7 @@ mod tests { Arc::new(Mutex::new(Vec::new())), PromptCatalog::builtins_only().unwrap(), None, + ActiveWorkflowStore::new(), ); let mut ctx = ctx_items; let action = interceptor.pre_llm_request(&mut ctx).await; @@ -557,6 +569,7 @@ mod tests { Some(Arc::new(RecordingSystemItemCommitter { committed: Arc::clone(&committed), })), + ActiveWorkflowStore::new(), ); let mut ctx = ctx_items; let action = interceptor.pre_llm_request(&mut ctx).await; @@ -593,6 +606,7 @@ mod tests { Arc::new(Mutex::new(Vec::new())), PromptCatalog::builtins_only().unwrap(), None, + ActiveWorkflowStore::new(), ) .with_usage_tracker(usage_tracker); let mut ctx = ctx_items; @@ -618,6 +632,7 @@ mod tests { Arc::new(Mutex::new(Vec::new())), PromptCatalog::builtins_only().unwrap(), None, + ActiveWorkflowStore::new(), ); let mut ctx = ctx_items; let action = interceptor.pre_llm_request(&mut ctx).await; @@ -659,6 +674,7 @@ mod tests { Arc::new(Mutex::new(Vec::new())), PromptCatalog::builtins_only().unwrap(), None, + ActiveWorkflowStore::new(), ); let mut ctx = ctx_items; let action = interceptor.pre_llm_request(&mut ctx).await; @@ -686,6 +702,7 @@ mod tests { Arc::new(Mutex::new(Vec::new())), PromptCatalog::builtins_only().unwrap(), None, + ActiveWorkflowStore::new(), ); let mut ctx = ctx_items; let action = interceptor.pre_llm_request(&mut ctx).await; @@ -707,6 +724,7 @@ mod tests { Arc::new(Mutex::new(Vec::new())), PromptCatalog::builtins_only().unwrap(), None, + ActiveWorkflowStore::new(), ); let mut ctx: Vec = Vec::new(); let action = interceptor.pre_llm_request(&mut ctx).await; @@ -735,6 +753,7 @@ mod tests { Arc::new(Mutex::new(Vec::new())), PromptCatalog::builtins_only().unwrap(), Some(committer), + ActiveWorkflowStore::new(), ); let mut ctx: Vec = Vec::new(); @@ -782,6 +801,7 @@ mod tests { Arc::new(Mutex::new(Vec::new())), PromptCatalog::builtins_only().unwrap(), None, + ActiveWorkflowStore::new(), ); let mut ctx: Vec = Vec::new(); @@ -839,6 +859,7 @@ mod tests { Arc::new(Mutex::new(Vec::new())), PromptCatalog::builtins_only().unwrap(), None, + ActiveWorkflowStore::new(), ); let mut info = task_tool_call_info("TaskList", serde_json::json!({"scope": "all"})); @@ -886,6 +907,7 @@ mod tests { Arc::new(Mutex::new(Vec::new())), PromptCatalog::builtins_only().unwrap(), None, + ActiveWorkflowStore::new(), ); let info = task_tool_call_info("TaskList", serde_json::json!({})); let mut result_info = ToolResultInfo { @@ -935,6 +957,7 @@ mod tests { Arc::new(Mutex::new(Vec::new())), PromptCatalog::builtins_only().unwrap(), None, + ActiveWorkflowStore::new(), ); let history = vec![Item::user_message("hi"), Item::assistant_message("done")]; @@ -969,6 +992,7 @@ mod tests { Some(Arc::new(RecordingSystemItemCommitter { committed: Arc::clone(&committed), })), + ActiveWorkflowStore::new(), ) .with_usage_tracker(Arc::clone(&usage_tracker)); @@ -1028,6 +1052,7 @@ mod tests { Arc::new(Mutex::new(Vec::new())), PromptCatalog::builtins_only().unwrap(), None, + ActiveWorkflowStore::new(), ); let items = interceptor.pending_history_appends().await; @@ -1065,6 +1090,7 @@ mod tests { Arc::new(Mutex::new(Vec::new())), PromptCatalog::builtins_only().unwrap(), None, + ActiveWorkflowStore::new(), ); let mut ctx: Vec = vec![Item::user_message("hi")]; let action = interceptor.pre_llm_request(&mut ctx).await; @@ -1095,6 +1121,7 @@ mod tests { Arc::new(Mutex::new(Vec::new())), PromptCatalog::builtins_only().unwrap(), None, + ActiveWorkflowStore::new(), ); let mut ctx: Vec = Vec::new(); let action = interceptor.pre_llm_request(&mut ctx).await; diff --git a/crates/pod/src/pod.rs b/crates/pod/src/pod.rs index c2cee73a..524e573e 100644 --- a/crates/pod/src/pod.rs +++ b/crates/pod/src/pod.rs @@ -198,20 +198,23 @@ where /// interceptor commit `SystemItem`s without being generic over the /// concrete `Store` type. pub trait SystemItemCommitter: Send + Sync { - fn commit_system_item(&self, item: SystemItem); + fn commit_log_entry(&self, entry: LogEntry); + + fn commit_system_item(&self, item: SystemItem) { + self.commit_log_entry(LogEntry::SystemItem { + ts: segment_log::now_millis(), + item, + }); + } } impl SystemItemCommitter for LogWriterHandle where St: Store + Clone + Send + Sync + 'static, { - fn commit_system_item(&self, item: SystemItem) { - let entry = LogEntry::SystemItem { - ts: segment_log::now_millis(), - item, - }; + fn commit_log_entry(&self, entry: LogEntry) { if let Err(err) = self.append_entry(entry) { - warn!(error = %err, "system item commit failed; dropping"); + warn!(error = %err, "session log entry commit failed; dropping"); } } } @@ -822,8 +825,13 @@ impl Pod { ) -> FeatureRegistryInstallReport { let worker = self.worker.as_mut().expect("worker taken during run"); let report = registry.install_into_worker(worker, &mut self.hook_builder); + let active_workflow_committer = self.log_writer.clone().map(|writer| { + Arc::new(move |entry| writer.commit_log_entry(entry)) + as active_workflow::LogEntryCommitter + }); worker.register_tools(active_workflow::active_workflow_tools( self.active_workflows.clone(), + active_workflow_committer, )); report } @@ -890,7 +898,9 @@ impl Pod { self.task_feature.restore_from_history(&state.history); self.active_workflows .restore_from_history_and_extensions(&state.history, &state.extensions); - self.worker_mut().set_history(state.history); + let mut history = state.history; + active_workflow::strip_rehydration_messages(&mut history); + self.worker_mut().set_history(history); self.worker_mut().set_request_config(state.config); self.worker_mut().set_turn_count(state.turn_count); self.worker_mut() @@ -1256,6 +1266,7 @@ impl Pod { self.pending_attachments.clone(), self.prompts.clone(), self.log_writer.clone(), + self.active_workflows.clone(), ) .with_usage_tracker(self.usage_tracker.clone()); self.worker_mut().set_interceptor(interceptor); @@ -2391,8 +2402,10 @@ impl Pod { let worker = self.worker.as_ref().expect("worker taken during run"); let history = worker.history(); let retain_from = cut.index.min(history.len()); - let retained_items = history[retain_from..].to_vec(); - let items_to_summarise = history[..retain_from].to_vec(); + let mut retained_items = history[retain_from..].to_vec(); + let mut items_to_summarise = history[..retain_from].to_vec(); + active_workflow::strip_rehydration_messages(&mut retained_items); + active_workflow::strip_rehydration_messages(&mut items_to_summarise); // Compaction-related knobs. Fall through to manifest defaults when // `[compaction]` is omitted entirely. @@ -2634,31 +2647,24 @@ impl Pod { .filter(|i| i.is_user_message()) .count(); - // Build new history: [summary, ...auto-read, references, ...retained, active workflow snapshot, task snapshot, TaskList synthetic call/result]. - // The active workflow snapshot is inserted from durable typed state so - // workflow-governed tasks keep their procedural authority after the - // compacted segment starts. + // Build new history: [summary, ...auto-read, references, ...retained, task snapshot, TaskList synthetic call/result]. + // Active workflow guidance is intentionally not persisted as an ordinary + // compacted-history system message. It is regenerated request-locally + // from typed `pod.active_workflows` extension state so completed, + // cancelled, corrupt, or missing state cannot leak stale obligations. // The TaskStore snapshot trails the retained items so that, on resume, // `replay_history` walks any pre-compact Task* calls preserved verbatim // in retained_items first and the trailing snapshot's `replace_with` // is the final word — pre-compact `TaskCreate` calls cannot leak as // duplicate entries. - let active_workflow_message = self - .active_workflows - .rehydration_message() - .map(Item::system_message); let mut new_history = Vec::with_capacity( 1 + auto_read_messages.len() + 3 + reference_message.is_some() as usize - + active_workflow_message.is_some() as usize + retained_items.len(), ); - let mut compact_introduced_system_messages = Vec::with_capacity( - 2 + auto_read_messages.len() - + reference_message.is_some() as usize - + active_workflow_message.is_some() as usize, - ); + let mut compact_introduced_system_messages = + Vec::with_capacity(2 + auto_read_messages.len() + reference_message.is_some() as usize); let summary_message = Item::system_message(format!("[Compacted context summary]\n\n{summary_text}")); compact_introduced_system_messages.push(summary_message.clone()); @@ -2671,9 +2677,6 @@ impl Pod { This is the complete session task list preserved across compaction. \ The following TaskList tool result presents the same state through the tool lane." )); - if let Some(msg) = active_workflow_message.as_ref() { - compact_introduced_system_messages.push(msg.clone()); - } compact_introduced_system_messages.push(task_snapshot_message.clone()); new_history.push(summary_message); @@ -2682,9 +2685,6 @@ impl Pod { new_history.push(msg); } new_history.extend(retained_items); - if let Some(msg) = active_workflow_message { - new_history.push(msg); - } new_history.push(task_snapshot_message); new_history.push(Item::tool_call("compact-tasklist", "TaskList", "{}")); new_history.push(Item::tool_result_with_content( @@ -4150,7 +4150,9 @@ where .. }) ); - worker.set_history(state.history.clone()); + let mut restored_history = state.history.clone(); + active_workflow::strip_rehydration_messages(&mut restored_history); + worker.set_history(restored_history); worker.set_request_config(state.config.clone()); worker.set_turn_count(state.turn_count); worker.set_last_run_interrupted(state.last_run_interrupted);