fix: gate active workflow rehydration state

This commit is contained in:
Keisuke Hirata 2026-06-15 01:16:54 +09:00
parent 362fedfbe6
commit ff446052c7
No known key found for this signature in database
3 changed files with 274 additions and 89 deletions

View File

@ -17,8 +17,12 @@ use serde_json::json;
use session_store::{LogEntry, SystemItem, segment_log}; use session_store::{LogEntry, SystemItem, segment_log};
pub const DOMAIN: &str = "pod.active_workflows"; pub const DOMAIN: &str = "pod.active_workflows";
pub const REHYDRATION_MESSAGE_PREFIX: &str = "[Active workflow snapshot]";
pub const INACTIVE_MESSAGE_PREFIX: &str = "[Active workflow state]";
const SCHEMA_VERSION: u32 = 1; const SCHEMA_VERSION: u32 = 1;
pub type LogEntryCommitter = Arc<dyn Fn(LogEntry) + Send + Sync>;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ActiveWorkflowSnapshot { pub struct ActiveWorkflowSnapshot {
pub schema_version: u32, pub schema_version: u32,
@ -221,6 +225,16 @@ impl ActiveWorkflowStore {
(!active.is_empty()).then(|| render_rehydration_message(&active)) (!active.is_empty()).then(|| render_rehydration_message(&active))
} }
pub fn sanitize_context(&self, context: &mut Vec<Item>) -> usize {
let removed = strip_rehydration_messages(context);
if let Some(message) = self.rehydration_message() {
context.push(Item::system_message(message));
} else if removed > 0 || context.iter().any(has_active_workflow_hint) {
context.push(Item::system_message(inactive_workflow_message()));
}
removed
}
pub fn extension_entry(&self) -> LogEntry { pub fn extension_entry(&self) -> LogEntry {
LogEntry::Extension { LogEntry::Extension {
ts: segment_log::now_millis(), ts: segment_log::now_millis(),
@ -232,14 +246,13 @@ impl ActiveWorkflowStore {
pub fn restore_from_history_and_extensions( pub fn restore_from_history_and_extensions(
&self, &self,
history: &[Item], _history: &[Item],
extensions: &[(String, serde_json::Value)], extensions: &[(String, serde_json::Value)],
) { ) {
let (mut snapshot, diagnostics) = fold_extensions(extensions); let (snapshot, diagnostics) = fold_extensions(extensions);
for diagnostic in diagnostics { for diagnostic in diagnostics {
tracing::warn!(diagnostic, "failed to restore active workflow state"); tracing::warn!(diagnostic, "failed to restore active workflow state");
} }
replay_history_tools(&mut snapshot, history);
self.replace_with(snapshot); self.replace_with(snapshot);
} }
} }
@ -271,49 +284,61 @@ pub fn fold_extensions(
(latest.unwrap_or_default(), diagnostics) (latest.unwrap_or_default(), diagnostics)
} }
fn replay_history_tools(snapshot: &mut ActiveWorkflowSnapshot, history: &[Item]) { pub fn strip_rehydration_messages(items: &mut Vec<Item>) -> usize {
for item in history { let before = items.len();
let Item::ToolCall { items.retain(|item| !is_rehydration_message(item));
name, arguments, .. before - items.len()
} = item }
else {
continue; pub fn is_rehydration_message(item: &Item) -> bool {
}; item_system_text(item)
let status = match name.as_str() { .map(|text| text.trim_start().starts_with(REHYDRATION_MESSAGE_PREFIX))
"ActiveWorkflowComplete" => ActiveWorkflowStatus::Completed, .unwrap_or(false)
"ActiveWorkflowCancel" => ActiveWorkflowStatus::Cancelled, }
_ => continue,
}; fn has_active_workflow_hint(item: &Item) -> bool {
if let Ok(params) = serde_json::from_str::<WorkflowStatusParams>(arguments) { item_system_text(item)
if let Some(record) = snapshot .map(|text| {
.workflows text.contains("Active Workflow Invocation State")
.iter_mut() || text.contains("ActiveWorkflowStore:")
.find(|record| record.slug == params.slug) || text.contains(REHYDRATION_MESSAGE_PREFIX)
{ })
let reason = params.reason.unwrap_or_else(|| status.to_string()); .unwrap_or(false)
record.status = status; }
record.updated_at_ms = record.updated_at_ms.saturating_add(1);
record.completion = Some(WorkflowCompletionInfo { fn item_system_text(item: &Item) -> Option<String> {
completed_at_ms: record.updated_at_ms, match item {
reason, Item::Message { role, content, .. } if *role == llm_worker::Role::System => Some(
}); content
for checkpoint in &mut record.checkpoints { .iter()
checkpoint.status = match status { .map(|part| part.as_text())
ActiveWorkflowStatus::Active => WorkflowCheckpointStatus::Open, .collect::<String>(),
ActiveWorkflowStatus::Completed => WorkflowCheckpointStatus::Done, ),
ActiveWorkflowStatus::Cancelled => WorkflowCheckpointStatus::Cancelled, _ => None,
};
}
}
}
} }
} }
pub fn active_workflow_tools(store: ActiveWorkflowStore) -> Vec<ToolDefinition> { fn inactive_workflow_message() -> String {
format!(
"{INACTIVE_MESSAGE_PREFIX}\n\n\
No currently valid active workflow invocation state is active. Ignore older compacted \
history or summaries that appear to describe active workflow obligations; only validated \
typed `{DOMAIN}` records with status `active` establish active workflow guidance."
)
}
pub fn active_workflow_tools(
store: ActiveWorkflowStore,
committer: Option<LogEntryCommitter>,
) -> Vec<ToolDefinition> {
vec![ vec![
list_tool(store.clone()), list_tool(store.clone()),
status_tool(store.clone(), ActiveWorkflowStatus::Completed), status_tool(
status_tool(store, ActiveWorkflowStatus::Cancelled), store.clone(),
ActiveWorkflowStatus::Completed,
committer.clone(),
),
status_tool(store, ActiveWorkflowStatus::Cancelled, committer),
] ]
} }
@ -332,7 +357,11 @@ fn list_tool(store: ActiveWorkflowStore) -> ToolDefinition {
}) })
} }
fn status_tool(store: ActiveWorkflowStore, status: ActiveWorkflowStatus) -> ToolDefinition { fn status_tool(
store: ActiveWorkflowStore,
status: ActiveWorkflowStatus,
committer: Option<LogEntryCommitter>,
) -> ToolDefinition {
let name = match status { let name = match status {
ActiveWorkflowStatus::Completed => "ActiveWorkflowComplete", ActiveWorkflowStatus::Completed => "ActiveWorkflowComplete",
ActiveWorkflowStatus::Cancelled => "ActiveWorkflowCancel", ActiveWorkflowStatus::Cancelled => "ActiveWorkflowCancel",
@ -348,6 +377,7 @@ fn status_tool(store: ActiveWorkflowStore, status: ActiveWorkflowStatus) -> Tool
ActiveWorkflowStatus::Active => unreachable!("active status tool is not exposed"), ActiveWorkflowStatus::Active => unreachable!("active status tool is not exposed"),
}; };
let store_for_tool = store.clone(); let store_for_tool = store.clone();
let committer_for_tool = committer.clone();
Arc::new(move || { Arc::new(move || {
( (
ToolMeta::new(name) ToolMeta::new(name)
@ -364,6 +394,7 @@ fn status_tool(store: ActiveWorkflowStore, status: ActiveWorkflowStatus) -> Tool
Arc::new(ActiveWorkflowStatusTool { Arc::new(ActiveWorkflowStatusTool {
store: store_for_tool.clone(), store: store_for_tool.clone(),
status, status,
committer: committer_for_tool.clone(),
}) as Arc<dyn Tool>, }) as Arc<dyn Tool>,
) )
}) })
@ -401,6 +432,7 @@ impl Tool for ActiveWorkflowListTool {
struct ActiveWorkflowStatusTool { struct ActiveWorkflowStatusTool {
store: ActiveWorkflowStore, store: ActiveWorkflowStore,
status: ActiveWorkflowStatus, status: ActiveWorkflowStatus,
committer: Option<LogEntryCommitter>,
} }
#[async_trait] #[async_trait]
@ -417,6 +449,9 @@ impl Tool for ActiveWorkflowStatusTool {
.store .store
.set_status(&params.slug, self.status, reason, segment_log::now_millis()) .set_status(&params.slug, self.status, reason, segment_log::now_millis())
.map_err(ToolError::InvalidArgument)?; .map_err(ToolError::InvalidArgument)?;
if let Some(committer) = &self.committer {
committer(self.store.extension_entry());
}
let content = serde_json::to_string_pretty(&record) let content = serde_json::to_string_pretty(&record)
.map_err(|err| ToolError::Internal(err.to_string()))?; .map_err(|err| ToolError::Internal(err.to_string()))?;
Ok(ToolOutput { Ok(ToolOutput {
@ -490,12 +525,12 @@ fn render_snapshot_text(records: &[ActiveWorkflowRecord]) -> String {
} }
fn render_rehydration_message(records: &[ActiveWorkflowRecord]) -> String { fn render_rehydration_message(records: &[ActiveWorkflowRecord]) -> String {
let mut out = String::from( let mut out = format!(
"[Active workflow snapshot]\n\n\ "{REHYDRATION_MESSAGE_PREFIX}\n\n\
The following workflow invocation state is durable state carried across compaction. \ The following workflow invocation state is durable state carried across compaction. \
Continue to follow each active workflow's snapshotted guidance until the governed task \ Continue to follow each active workflow's snapshotted guidance until the governed task \
is completed with ActiveWorkflowComplete or explicitly cancelled with ActiveWorkflowCancel. \ is completed with ActiveWorkflowComplete or explicitly cancelled with ActiveWorkflowCancel. \
Missing or obsolete workflow resources must not replace these invocation snapshots.\n", Missing or obsolete workflow resources must not replace these invocation snapshots.\n"
); );
for record in records { for record in records {
out.push_str(&format!( out.push_str(&format!(
@ -547,19 +582,29 @@ fn truncate_chars(text: &str, max_chars: usize) -> String {
mod tests { mod tests {
use super::*; use super::*;
#[test] fn store_with_active_workflow() -> ActiveWorkflowStore {
fn active_workflow_guidance_carries_merge_close_obligations() {
let store = ActiveWorkflowStore::new(); let store = ActiveWorkflowStore::new();
let items = vec![SystemItem::Workflow {
slug: "multi-agent-workflow".into(),
body: "# Multi-agent workflow\n- Delegate implementation to coder.\n- Require external review before merge.\n- Close the Ticket after merge and report evidence.\n".into(),
}];
assert!(store.activate_from_system_items( assert!(store.activate_from_system_items(
&items, &[SystemItem::Workflow {
slug: "multi-agent-workflow".into(),
body: "# Multi-agent workflow\n- Delegate implementation to coder.\n- Require external review before merge.\n- Close the Ticket after merge and report evidence.\n".into(),
}],
"/multi-agent-workflow implement ticket".into(), "/multi-agent-workflow implement ticket".into(),
42, 42,
)); ));
store
}
fn active_extension(store: &ActiveWorkflowStore) -> (String, serde_json::Value) {
(
DOMAIN.to_string(),
serde_json::to_value(store.snapshot()).expect("snapshot json"),
)
}
#[test]
fn active_workflow_guidance_carries_merge_close_obligations() {
let store = store_with_active_workflow();
let msg = store.rehydration_message().unwrap(); let msg = store.rehydration_message().unwrap();
assert!(msg.contains("multi-agent-workflow")); assert!(msg.contains("multi-agent-workflow"));
@ -568,6 +613,117 @@ mod tests {
assert!(msg.contains("Snapshotted workflow guidance")); assert!(msg.contains("Snapshotted workflow guidance"));
} }
#[test]
fn compacted_rehydration_message_is_removed_when_typed_state_missing_or_invalid() {
for extensions in [
Vec::new(),
vec![(DOMAIN.to_string(), json!({"schema_version":"bad"}))],
vec![(
DOMAIN.to_string(),
json!({"schema_version":999,"workflows":[]}),
)],
] {
let original = store_with_active_workflow();
let stale_message = original.rehydration_message().unwrap();
let mut context = vec![
Item::system_message(stale_message),
Item::user_message("continue"),
];
let restored = ActiveWorkflowStore::new();
restored.restore_from_history_and_extensions(&context, &extensions);
let removed = restored.sanitize_context(&mut context);
assert_eq!(removed, 1);
assert!(restored.active_records().is_empty());
assert!(!context.iter().any(is_rehydration_message));
}
}
#[test]
fn completion_or_cancellation_suppresses_old_compacted_guidance() {
for status in [
ActiveWorkflowStatus::Completed,
ActiveWorkflowStatus::Cancelled,
] {
let store = store_with_active_workflow();
let stale_message = store.rehydration_message().unwrap();
let mut context = vec![
Item::system_message(stale_message),
Item::user_message("continue"),
];
store
.set_status("multi-agent-workflow", status, status.to_string(), 84)
.expect("workflow exists");
let removed = store.sanitize_context(&mut context);
assert_eq!(removed, 1);
assert!(!context.iter().any(is_rehydration_message));
}
}
#[test]
fn unmatched_status_tool_calls_do_not_mutate_restored_state() {
let store = store_with_active_workflow();
let extensions = vec![active_extension(&store)];
let history = vec![
Item::tool_call(
"call-1",
"ActiveWorkflowCancel",
json!({"slug":"multi-agent-workflow","reason":"not durable"}).to_string(),
),
Item::tool_result_error("call-1", "error: failed"),
];
let restored = ActiveWorkflowStore::new();
restored.restore_from_history_and_extensions(&history, &extensions);
assert_eq!(restored.active_records().len(), 1);
assert_eq!(
restored.snapshot().workflows[0].status,
ActiveWorkflowStatus::Active
);
}
#[tokio::test]
async fn status_tool_persists_typed_extension_on_success() {
let store = store_with_active_workflow();
let committed = Arc::new(Mutex::new(Vec::<LogEntry>::new()));
let committed_for_tool = committed.clone();
let tools = active_workflow_tools(
store.clone(),
Some(Arc::new(move |entry| {
committed_for_tool
.lock()
.expect("committed entries mutex poisoned")
.push(entry);
})),
);
let (_, tool) = tools[1]();
tool.execute(
&json!({"slug":"multi-agent-workflow","reason":"review complete"}).to_string(),
ToolExecutionContext::default(),
)
.await
.expect("status tool succeeds");
let committed = committed.lock().expect("committed entries mutex poisoned");
let LogEntry::Extension {
domain, payload, ..
} = committed.last().expect("extension committed")
else {
panic!("expected typed active workflow extension");
};
assert_eq!(domain, DOMAIN);
let snapshot: ActiveWorkflowSnapshot = serde_json::from_value(payload.clone()).unwrap();
assert_eq!(
snapshot.workflows[0].status,
ActiveWorkflowStatus::Completed
);
}
#[test] #[test]
fn corrupt_extension_fails_closed_with_diagnostic() { fn corrupt_extension_fails_closed_with_diagnostic() {
let entries = vec![(DOMAIN.to_string(), json!({"schema_version":"bad"}))]; let entries = vec![(DOMAIN.to_string(), json!({"schema_version":"bad"}))];

View File

@ -22,6 +22,7 @@ use llm_worker::tool::ToolOutput;
use tracing::info; use tracing::info;
use tracing::warn; use tracing::warn;
use crate::active_workflow::ActiveWorkflowStore;
use crate::compact::state::CompactState; use crate::compact::state::CompactState;
use crate::compact::usage_tracker::UsageTracker; use crate::compact::usage_tracker::UsageTracker;
use session_store::SystemItem; use session_store::SystemItem;
@ -71,6 +72,10 @@ pub(crate) struct PodInterceptor {
/// worker. `None` in tests / `Pod::new` paths where no writer is /// worker. `None` in tests / `Pod::new` paths where no writer is
/// attached. /// attached.
log_writer: Option<Arc<dyn SystemItemCommitter>>, log_writer: Option<Arc<dyn SystemItemCommitter>>,
/// Active workflow state is durable typed Pod state. The interceptor
/// regenerates request-local workflow guidance from this store and strips
/// any stale compacted-history copies before each model request.
active_workflows: ActiveWorkflowStore,
/// Next turn index assigned by `on_prompt_submit`. /// Next turn index assigned by `on_prompt_submit`.
next_turn_index: AtomicUsize, next_turn_index: AtomicUsize,
/// Tool calls observed in the current turn (reset on each new prompt). /// Tool calls observed in the current turn (reset on each new prompt).
@ -86,6 +91,7 @@ impl PodInterceptor {
pending_attachments: Arc<Mutex<Vec<SystemItem>>>, pending_attachments: Arc<Mutex<Vec<SystemItem>>>,
prompts: Arc<PromptCatalog>, prompts: Arc<PromptCatalog>,
log_writer: Option<Arc<dyn SystemItemCommitter>>, log_writer: Option<Arc<dyn SystemItemCommitter>>,
active_workflows: ActiveWorkflowStore,
) -> Self { ) -> Self {
Self { Self {
registry, registry,
@ -96,6 +102,7 @@ impl PodInterceptor {
pending_attachments, pending_attachments,
prompts, prompts,
log_writer, log_writer,
active_workflows,
next_turn_index: AtomicUsize::new(0), next_turn_index: AtomicUsize::new(0),
tool_calls_this_turn: AtomicUsize::new(0), tool_calls_this_turn: AtomicUsize::new(0),
} }
@ -234,6 +241,8 @@ impl Interceptor for PodInterceptor {
} }
async fn pre_llm_request(&self, context: &mut Vec<Item>) -> PreRequestAction { async fn pre_llm_request(&self, context: &mut Vec<Item>) -> PreRequestAction {
self.active_workflows.sanitize_context(context);
let initial_tokens = self.estimated_tokens(context); let initial_tokens = self.estimated_tokens(context);
if self.request_threshold_exceeded(initial_tokens, context) { if self.request_threshold_exceeded(initial_tokens, context) {
return PreRequestAction::Yield; return PreRequestAction::Yield;
@ -449,11 +458,13 @@ mod tests {
} }
impl SystemItemCommitter for RecordingSystemItemCommitter { impl SystemItemCommitter for RecordingSystemItemCommitter {
fn commit_system_item(&self, item: SystemItem) { fn commit_log_entry(&self, entry: session_store::LogEntry) {
self.committed if let session_store::LogEntry::SystemItem { item, .. } = entry {
.lock() self.committed
.expect("committed system-item list poisoned") .lock()
.push(item); .expect("committed system-item list poisoned")
.push(item);
}
} }
} }
@ -525,6 +536,7 @@ mod tests {
Arc::new(Mutex::new(Vec::new())), Arc::new(Mutex::new(Vec::new())),
PromptCatalog::builtins_only().unwrap(), PromptCatalog::builtins_only().unwrap(),
None, None,
ActiveWorkflowStore::new(),
); );
let mut ctx = ctx_items; let mut ctx = ctx_items;
let action = interceptor.pre_llm_request(&mut ctx).await; let action = interceptor.pre_llm_request(&mut ctx).await;
@ -557,6 +569,7 @@ mod tests {
Some(Arc::new(RecordingSystemItemCommitter { Some(Arc::new(RecordingSystemItemCommitter {
committed: Arc::clone(&committed), committed: Arc::clone(&committed),
})), })),
ActiveWorkflowStore::new(),
); );
let mut ctx = ctx_items; let mut ctx = ctx_items;
let action = interceptor.pre_llm_request(&mut ctx).await; let action = interceptor.pre_llm_request(&mut ctx).await;
@ -593,6 +606,7 @@ mod tests {
Arc::new(Mutex::new(Vec::new())), Arc::new(Mutex::new(Vec::new())),
PromptCatalog::builtins_only().unwrap(), PromptCatalog::builtins_only().unwrap(),
None, None,
ActiveWorkflowStore::new(),
) )
.with_usage_tracker(usage_tracker); .with_usage_tracker(usage_tracker);
let mut ctx = ctx_items; let mut ctx = ctx_items;
@ -618,6 +632,7 @@ mod tests {
Arc::new(Mutex::new(Vec::new())), Arc::new(Mutex::new(Vec::new())),
PromptCatalog::builtins_only().unwrap(), PromptCatalog::builtins_only().unwrap(),
None, None,
ActiveWorkflowStore::new(),
); );
let mut ctx = ctx_items; let mut ctx = ctx_items;
let action = interceptor.pre_llm_request(&mut ctx).await; let action = interceptor.pre_llm_request(&mut ctx).await;
@ -659,6 +674,7 @@ mod tests {
Arc::new(Mutex::new(Vec::new())), Arc::new(Mutex::new(Vec::new())),
PromptCatalog::builtins_only().unwrap(), PromptCatalog::builtins_only().unwrap(),
None, None,
ActiveWorkflowStore::new(),
); );
let mut ctx = ctx_items; let mut ctx = ctx_items;
let action = interceptor.pre_llm_request(&mut ctx).await; let action = interceptor.pre_llm_request(&mut ctx).await;
@ -686,6 +702,7 @@ mod tests {
Arc::new(Mutex::new(Vec::new())), Arc::new(Mutex::new(Vec::new())),
PromptCatalog::builtins_only().unwrap(), PromptCatalog::builtins_only().unwrap(),
None, None,
ActiveWorkflowStore::new(),
); );
let mut ctx = ctx_items; let mut ctx = ctx_items;
let action = interceptor.pre_llm_request(&mut ctx).await; let action = interceptor.pre_llm_request(&mut ctx).await;
@ -707,6 +724,7 @@ mod tests {
Arc::new(Mutex::new(Vec::new())), Arc::new(Mutex::new(Vec::new())),
PromptCatalog::builtins_only().unwrap(), PromptCatalog::builtins_only().unwrap(),
None, None,
ActiveWorkflowStore::new(),
); );
let mut ctx: Vec<Item> = Vec::new(); let mut ctx: Vec<Item> = Vec::new();
let action = interceptor.pre_llm_request(&mut ctx).await; let action = interceptor.pre_llm_request(&mut ctx).await;
@ -735,6 +753,7 @@ mod tests {
Arc::new(Mutex::new(Vec::new())), Arc::new(Mutex::new(Vec::new())),
PromptCatalog::builtins_only().unwrap(), PromptCatalog::builtins_only().unwrap(),
Some(committer), Some(committer),
ActiveWorkflowStore::new(),
); );
let mut ctx: Vec<Item> = Vec::new(); let mut ctx: Vec<Item> = Vec::new();
@ -782,6 +801,7 @@ mod tests {
Arc::new(Mutex::new(Vec::new())), Arc::new(Mutex::new(Vec::new())),
PromptCatalog::builtins_only().unwrap(), PromptCatalog::builtins_only().unwrap(),
None, None,
ActiveWorkflowStore::new(),
); );
let mut ctx: Vec<Item> = Vec::new(); let mut ctx: Vec<Item> = Vec::new();
@ -839,6 +859,7 @@ mod tests {
Arc::new(Mutex::new(Vec::new())), Arc::new(Mutex::new(Vec::new())),
PromptCatalog::builtins_only().unwrap(), PromptCatalog::builtins_only().unwrap(),
None, None,
ActiveWorkflowStore::new(),
); );
let mut info = task_tool_call_info("TaskList", serde_json::json!({"scope": "all"})); let mut info = task_tool_call_info("TaskList", serde_json::json!({"scope": "all"}));
@ -886,6 +907,7 @@ mod tests {
Arc::new(Mutex::new(Vec::new())), Arc::new(Mutex::new(Vec::new())),
PromptCatalog::builtins_only().unwrap(), PromptCatalog::builtins_only().unwrap(),
None, None,
ActiveWorkflowStore::new(),
); );
let info = task_tool_call_info("TaskList", serde_json::json!({})); let info = task_tool_call_info("TaskList", serde_json::json!({}));
let mut result_info = ToolResultInfo { let mut result_info = ToolResultInfo {
@ -935,6 +957,7 @@ mod tests {
Arc::new(Mutex::new(Vec::new())), Arc::new(Mutex::new(Vec::new())),
PromptCatalog::builtins_only().unwrap(), PromptCatalog::builtins_only().unwrap(),
None, None,
ActiveWorkflowStore::new(),
); );
let history = vec![Item::user_message("hi"), Item::assistant_message("done")]; let history = vec![Item::user_message("hi"), Item::assistant_message("done")];
@ -969,6 +992,7 @@ mod tests {
Some(Arc::new(RecordingSystemItemCommitter { Some(Arc::new(RecordingSystemItemCommitter {
committed: Arc::clone(&committed), committed: Arc::clone(&committed),
})), })),
ActiveWorkflowStore::new(),
) )
.with_usage_tracker(Arc::clone(&usage_tracker)); .with_usage_tracker(Arc::clone(&usage_tracker));
@ -1028,6 +1052,7 @@ mod tests {
Arc::new(Mutex::new(Vec::new())), Arc::new(Mutex::new(Vec::new())),
PromptCatalog::builtins_only().unwrap(), PromptCatalog::builtins_only().unwrap(),
None, None,
ActiveWorkflowStore::new(),
); );
let items = interceptor.pending_history_appends().await; let items = interceptor.pending_history_appends().await;
@ -1065,6 +1090,7 @@ mod tests {
Arc::new(Mutex::new(Vec::new())), Arc::new(Mutex::new(Vec::new())),
PromptCatalog::builtins_only().unwrap(), PromptCatalog::builtins_only().unwrap(),
None, None,
ActiveWorkflowStore::new(),
); );
let mut ctx: Vec<Item> = vec![Item::user_message("hi")]; let mut ctx: Vec<Item> = vec![Item::user_message("hi")];
let action = interceptor.pre_llm_request(&mut ctx).await; let action = interceptor.pre_llm_request(&mut ctx).await;
@ -1095,6 +1121,7 @@ mod tests {
Arc::new(Mutex::new(Vec::new())), Arc::new(Mutex::new(Vec::new())),
PromptCatalog::builtins_only().unwrap(), PromptCatalog::builtins_only().unwrap(),
None, None,
ActiveWorkflowStore::new(),
); );
let mut ctx: Vec<Item> = Vec::new(); let mut ctx: Vec<Item> = Vec::new();
let action = interceptor.pre_llm_request(&mut ctx).await; let action = interceptor.pre_llm_request(&mut ctx).await;

View File

@ -198,20 +198,23 @@ where
/// interceptor commit `SystemItem`s without being generic over the /// interceptor commit `SystemItem`s without being generic over the
/// concrete `Store` type. /// concrete `Store` type.
pub trait SystemItemCommitter: Send + Sync { pub trait SystemItemCommitter: Send + Sync {
fn commit_system_item(&self, item: SystemItem); fn commit_log_entry(&self, entry: LogEntry);
fn commit_system_item(&self, item: SystemItem) {
self.commit_log_entry(LogEntry::SystemItem {
ts: segment_log::now_millis(),
item,
});
}
} }
impl<St> SystemItemCommitter for LogWriterHandle<St> impl<St> SystemItemCommitter for LogWriterHandle<St>
where where
St: Store + Clone + Send + Sync + 'static, St: Store + Clone + Send + Sync + 'static,
{ {
fn commit_system_item(&self, item: SystemItem) { fn commit_log_entry(&self, entry: LogEntry) {
let entry = LogEntry::SystemItem {
ts: segment_log::now_millis(),
item,
};
if let Err(err) = self.append_entry(entry) { if let Err(err) = self.append_entry(entry) {
warn!(error = %err, "system item commit failed; dropping"); warn!(error = %err, "session log entry commit failed; dropping");
} }
} }
} }
@ -822,8 +825,13 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
) -> FeatureRegistryInstallReport { ) -> FeatureRegistryInstallReport {
let worker = self.worker.as_mut().expect("worker taken during run"); let worker = self.worker.as_mut().expect("worker taken during run");
let report = registry.install_into_worker(worker, &mut self.hook_builder); let report = registry.install_into_worker(worker, &mut self.hook_builder);
let active_workflow_committer = self.log_writer.clone().map(|writer| {
Arc::new(move |entry| writer.commit_log_entry(entry))
as active_workflow::LogEntryCommitter
});
worker.register_tools(active_workflow::active_workflow_tools( worker.register_tools(active_workflow::active_workflow_tools(
self.active_workflows.clone(), self.active_workflows.clone(),
active_workflow_committer,
)); ));
report report
} }
@ -890,7 +898,9 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
self.task_feature.restore_from_history(&state.history); self.task_feature.restore_from_history(&state.history);
self.active_workflows self.active_workflows
.restore_from_history_and_extensions(&state.history, &state.extensions); .restore_from_history_and_extensions(&state.history, &state.extensions);
self.worker_mut().set_history(state.history); let mut history = state.history;
active_workflow::strip_rehydration_messages(&mut history);
self.worker_mut().set_history(history);
self.worker_mut().set_request_config(state.config); self.worker_mut().set_request_config(state.config);
self.worker_mut().set_turn_count(state.turn_count); self.worker_mut().set_turn_count(state.turn_count);
self.worker_mut() self.worker_mut()
@ -1256,6 +1266,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
self.pending_attachments.clone(), self.pending_attachments.clone(),
self.prompts.clone(), self.prompts.clone(),
self.log_writer.clone(), self.log_writer.clone(),
self.active_workflows.clone(),
) )
.with_usage_tracker(self.usage_tracker.clone()); .with_usage_tracker(self.usage_tracker.clone());
self.worker_mut().set_interceptor(interceptor); self.worker_mut().set_interceptor(interceptor);
@ -2391,8 +2402,10 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
let worker = self.worker.as_ref().expect("worker taken during run"); let worker = self.worker.as_ref().expect("worker taken during run");
let history = worker.history(); let history = worker.history();
let retain_from = cut.index.min(history.len()); let retain_from = cut.index.min(history.len());
let retained_items = history[retain_from..].to_vec(); let mut retained_items = history[retain_from..].to_vec();
let items_to_summarise = history[..retain_from].to_vec(); let mut items_to_summarise = history[..retain_from].to_vec();
active_workflow::strip_rehydration_messages(&mut retained_items);
active_workflow::strip_rehydration_messages(&mut items_to_summarise);
// Compaction-related knobs. Fall through to manifest defaults when // Compaction-related knobs. Fall through to manifest defaults when
// `[compaction]` is omitted entirely. // `[compaction]` is omitted entirely.
@ -2634,31 +2647,24 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
.filter(|i| i.is_user_message()) .filter(|i| i.is_user_message())
.count(); .count();
// Build new history: [summary, ...auto-read, references, ...retained, active workflow snapshot, task snapshot, TaskList synthetic call/result]. // Build new history: [summary, ...auto-read, references, ...retained, task snapshot, TaskList synthetic call/result].
// The active workflow snapshot is inserted from durable typed state so // Active workflow guidance is intentionally not persisted as an ordinary
// workflow-governed tasks keep their procedural authority after the // compacted-history system message. It is regenerated request-locally
// compacted segment starts. // from typed `pod.active_workflows` extension state so completed,
// cancelled, corrupt, or missing state cannot leak stale obligations.
// The TaskStore snapshot trails the retained items so that, on resume, // The TaskStore snapshot trails the retained items so that, on resume,
// `replay_history` walks any pre-compact Task* calls preserved verbatim // `replay_history` walks any pre-compact Task* calls preserved verbatim
// in retained_items first and the trailing snapshot's `replace_with` // in retained_items first and the trailing snapshot's `replace_with`
// is the final word — pre-compact `TaskCreate` calls cannot leak as // is the final word — pre-compact `TaskCreate` calls cannot leak as
// duplicate entries. // duplicate entries.
let active_workflow_message = self
.active_workflows
.rehydration_message()
.map(Item::system_message);
let mut new_history = Vec::with_capacity( let mut new_history = Vec::with_capacity(
1 + auto_read_messages.len() 1 + auto_read_messages.len()
+ 3 + 3
+ reference_message.is_some() as usize + reference_message.is_some() as usize
+ active_workflow_message.is_some() as usize
+ retained_items.len(), + retained_items.len(),
); );
let mut compact_introduced_system_messages = Vec::with_capacity( let mut compact_introduced_system_messages =
2 + auto_read_messages.len() Vec::with_capacity(2 + auto_read_messages.len() + reference_message.is_some() as usize);
+ reference_message.is_some() as usize
+ active_workflow_message.is_some() as usize,
);
let summary_message = let summary_message =
Item::system_message(format!("[Compacted context summary]\n\n{summary_text}")); Item::system_message(format!("[Compacted context summary]\n\n{summary_text}"));
compact_introduced_system_messages.push(summary_message.clone()); compact_introduced_system_messages.push(summary_message.clone());
@ -2671,9 +2677,6 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
This is the complete session task list preserved across compaction. \ This is the complete session task list preserved across compaction. \
The following TaskList tool result presents the same state through the tool lane." The following TaskList tool result presents the same state through the tool lane."
)); ));
if let Some(msg) = active_workflow_message.as_ref() {
compact_introduced_system_messages.push(msg.clone());
}
compact_introduced_system_messages.push(task_snapshot_message.clone()); compact_introduced_system_messages.push(task_snapshot_message.clone());
new_history.push(summary_message); new_history.push(summary_message);
@ -2682,9 +2685,6 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
new_history.push(msg); new_history.push(msg);
} }
new_history.extend(retained_items); new_history.extend(retained_items);
if let Some(msg) = active_workflow_message {
new_history.push(msg);
}
new_history.push(task_snapshot_message); new_history.push(task_snapshot_message);
new_history.push(Item::tool_call("compact-tasklist", "TaskList", "{}")); new_history.push(Item::tool_call("compact-tasklist", "TaskList", "{}"));
new_history.push(Item::tool_result_with_content( new_history.push(Item::tool_result_with_content(
@ -4150,7 +4150,9 @@ where
.. ..
}) })
); );
worker.set_history(state.history.clone()); let mut restored_history = state.history.clone();
active_workflow::strip_rehydration_messages(&mut restored_history);
worker.set_history(restored_history);
worker.set_request_config(state.config.clone()); worker.set_request_config(state.config.clone());
worker.set_turn_count(state.turn_count); worker.set_turn_count(state.turn_count);
worker.set_last_run_interrupted(state.last_run_interrupted); worker.set_last_run_interrupted(state.last_run_interrupted);