diff --git a/crates/pod/src/active_workflow.rs b/crates/pod/src/active_workflow.rs new file mode 100644 index 00000000..07cda103 --- /dev/null +++ b/crates/pod/src/active_workflow.rs @@ -0,0 +1,580 @@ +//! Durable active workflow invocation state. +//! +//! Workflow bodies are resolved at invocation time and snapshotted here. The +//! snapshot, not whatever resource version is installed later, is the procedural +//! authority that survives compaction for the currently governed task. + +use std::collections::BTreeMap; +use std::sync::{Arc, Mutex}; + +use async_trait::async_trait; +use llm_worker::Item; +use llm_worker::tool::{ + Tool, ToolDefinition, ToolError, ToolExecutionContext, ToolMeta, ToolOutput, +}; +use serde::{Deserialize, Serialize}; +use serde_json::json; +use session_store::{LogEntry, SystemItem, segment_log}; + +pub const DOMAIN: &str = "pod.active_workflows"; +const SCHEMA_VERSION: u32 = 1; + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct ActiveWorkflowSnapshot { + pub schema_version: u32, + pub workflows: Vec, +} + +impl Default for ActiveWorkflowSnapshot { + fn default() -> Self { + Self { + schema_version: SCHEMA_VERSION, + workflows: Vec::new(), + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct ActiveWorkflowRecord { + pub slug: String, + pub status: ActiveWorkflowStatus, + pub invocation: WorkflowInvocationInfo, + pub task_scope: String, + pub body_snapshot_policy: WorkflowBodySnapshotPolicy, + pub guidance_snapshot: String, + pub obligations: Vec, + pub checkpoints: Vec, + pub updated_at_ms: u64, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub completion: Option, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema)] +#[serde(rename_all = "snake_case")] +pub enum ActiveWorkflowStatus { + Active, + Completed, + Cancelled, +} + +impl std::fmt::Display for ActiveWorkflowStatus { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let s = match self { + Self::Active => "active", + Self::Completed => "completed", + Self::Cancelled => "cancelled", + }; + f.write_str(s) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct WorkflowInvocationInfo { + pub source: WorkflowInvocationSource, + pub invoked_at_ms: u64, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum WorkflowInvocationSource { + UserWorkflowInvokeSegment, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum WorkflowBodySnapshotPolicy { + SnapshottedAtInvocation, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct WorkflowCheckpoint { + pub label: String, + pub status: WorkflowCheckpointStatus, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum WorkflowCheckpointStatus { + Open, + Done, + Cancelled, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct WorkflowCompletionInfo { + pub completed_at_ms: u64, + pub reason: String, +} + +#[derive(Debug, Clone, Default)] +pub struct ActiveWorkflowStore { + inner: Arc>, +} + +impl ActiveWorkflowStore { + pub fn new() -> Self { + Self::default() + } + + pub fn snapshot(&self) -> ActiveWorkflowSnapshot { + self.inner.lock().unwrap_or_else(|e| e.into_inner()).clone() + } + + pub fn replace_with(&self, snapshot: ActiveWorkflowSnapshot) { + *self.inner.lock().unwrap_or_else(|e| e.into_inner()) = snapshot; + } + + pub fn active_records(&self) -> Vec { + self.snapshot() + .workflows + .into_iter() + .filter(|record| record.status == ActiveWorkflowStatus::Active) + .collect() + } + + pub fn activate_from_system_items( + &self, + items: &[SystemItem], + task_scope: String, + invoked_at_ms: u64, + ) -> bool { + let mut grouped: BTreeMap> = BTreeMap::new(); + for item in items { + if let SystemItem::Workflow { slug, body } = item { + grouped.entry(slug.clone()).or_default().push(body.clone()); + } + } + if grouped.is_empty() { + return false; + } + + let mut snapshot = self.snapshot(); + snapshot.schema_version = SCHEMA_VERSION; + for (slug, bodies) in grouped { + let guidance_snapshot = bodies.join("\n\n---\n\n"); + let obligations = extract_obligations(&guidance_snapshot); + let checkpoints = obligations + .iter() + .take(32) + .map(|label| WorkflowCheckpoint { + label: label.clone(), + status: WorkflowCheckpointStatus::Open, + }) + .collect(); + let record = ActiveWorkflowRecord { + slug: slug.clone(), + status: ActiveWorkflowStatus::Active, + invocation: WorkflowInvocationInfo { + source: WorkflowInvocationSource::UserWorkflowInvokeSegment, + invoked_at_ms, + }, + task_scope: truncate_chars(&task_scope, 2_000), + body_snapshot_policy: WorkflowBodySnapshotPolicy::SnapshottedAtInvocation, + guidance_snapshot, + obligations, + checkpoints, + updated_at_ms: invoked_at_ms, + completion: None, + }; + upsert_record(&mut snapshot.workflows, record); + } + self.replace_with(snapshot); + true + } + + pub fn set_status( + &self, + slug: &str, + status: ActiveWorkflowStatus, + reason: String, + now_ms: u64, + ) -> Result { + let mut snapshot = self.inner.lock().unwrap_or_else(|e| e.into_inner()); + let record = snapshot + .workflows + .iter_mut() + .find(|record| record.slug == slug) + .ok_or_else(|| format!("active workflow `{slug}` not found"))?; + record.status = status; + record.updated_at_ms = now_ms; + record.completion = Some(WorkflowCompletionInfo { + completed_at_ms: now_ms, + reason, + }); + for checkpoint in &mut record.checkpoints { + checkpoint.status = match status { + ActiveWorkflowStatus::Active => WorkflowCheckpointStatus::Open, + ActiveWorkflowStatus::Completed => WorkflowCheckpointStatus::Done, + ActiveWorkflowStatus::Cancelled => WorkflowCheckpointStatus::Cancelled, + }; + } + Ok(record.clone()) + } + + pub fn snapshot_text(&self) -> Option { + let active = self.active_records(); + (!active.is_empty()).then(|| render_snapshot_text(&active)) + } + + pub fn rehydration_message(&self) -> Option { + let active = self.active_records(); + (!active.is_empty()).then(|| render_rehydration_message(&active)) + } + + pub fn extension_entry(&self) -> LogEntry { + LogEntry::Extension { + ts: segment_log::now_millis(), + domain: DOMAIN.into(), + payload: serde_json::to_value(self.snapshot()) + .expect("ActiveWorkflowSnapshot is always JSON-serializable"), + } + } + + pub fn restore_from_history_and_extensions( + &self, + history: &[Item], + extensions: &[(String, serde_json::Value)], + ) { + let (mut snapshot, diagnostics) = fold_extensions(extensions); + for diagnostic in diagnostics { + tracing::warn!(diagnostic, "failed to restore active workflow state"); + } + replay_history_tools(&mut snapshot, history); + self.replace_with(snapshot); + } +} + +pub fn fold_extensions( + extensions: &[(String, serde_json::Value)], +) -> (ActiveWorkflowSnapshot, Vec) { + let mut latest = None; + let mut diagnostics = Vec::new(); + for (domain, payload) in extensions { + if domain != DOMAIN { + continue; + } + match serde_json::from_value::(payload.clone()) { + Ok(snapshot) if snapshot.schema_version == SCHEMA_VERSION => latest = Some(snapshot), + Ok(snapshot) => { + latest = None; + diagnostics.push(format!( + "unsupported active workflow schema_version {}", + snapshot.schema_version + )); + } + Err(err) => { + latest = None; + diagnostics.push(format!("corrupt active workflow payload: {err}")); + } + } + } + (latest.unwrap_or_default(), diagnostics) +} + +fn replay_history_tools(snapshot: &mut ActiveWorkflowSnapshot, history: &[Item]) { + for item in history { + let Item::ToolCall { + name, arguments, .. + } = item + else { + continue; + }; + let status = match name.as_str() { + "ActiveWorkflowComplete" => ActiveWorkflowStatus::Completed, + "ActiveWorkflowCancel" => ActiveWorkflowStatus::Cancelled, + _ => continue, + }; + if let Ok(params) = serde_json::from_str::(arguments) { + if let Some(record) = snapshot + .workflows + .iter_mut() + .find(|record| record.slug == params.slug) + { + let reason = params.reason.unwrap_or_else(|| status.to_string()); + record.status = status; + record.updated_at_ms = record.updated_at_ms.saturating_add(1); + record.completion = Some(WorkflowCompletionInfo { + completed_at_ms: record.updated_at_ms, + reason, + }); + for checkpoint in &mut record.checkpoints { + checkpoint.status = match status { + ActiveWorkflowStatus::Active => WorkflowCheckpointStatus::Open, + ActiveWorkflowStatus::Completed => WorkflowCheckpointStatus::Done, + ActiveWorkflowStatus::Cancelled => WorkflowCheckpointStatus::Cancelled, + }; + } + } + } + } +} + +pub fn active_workflow_tools(store: ActiveWorkflowStore) -> Vec { + vec![ + list_tool(store.clone()), + status_tool(store.clone(), ActiveWorkflowStatus::Completed), + status_tool(store, ActiveWorkflowStatus::Cancelled), + ] +} + +fn list_tool(store: ActiveWorkflowStore) -> ToolDefinition { + Arc::new(move || { + ( + ToolMeta::new("ActiveWorkflowList") + .description("List durable active workflow invocations and their status") + .input_schema( + json!({"type":"object","properties":{},"additionalProperties":false}), + ), + Arc::new(ActiveWorkflowListTool { + store: store.clone(), + }) as Arc, + ) + }) +} + +fn status_tool(store: ActiveWorkflowStore, status: ActiveWorkflowStatus) -> ToolDefinition { + let name = match status { + ActiveWorkflowStatus::Completed => "ActiveWorkflowComplete", + ActiveWorkflowStatus::Cancelled => "ActiveWorkflowCancel", + ActiveWorkflowStatus::Active => unreachable!("active status tool is not exposed"), + }; + let description = match status { + ActiveWorkflowStatus::Completed => { + "Mark an active workflow as completed when its governed task is finished" + } + ActiveWorkflowStatus::Cancelled => { + "Cancel an active workflow when the governed task is explicitly abandoned" + } + ActiveWorkflowStatus::Active => unreachable!("active status tool is not exposed"), + }; + let store_for_tool = store.clone(); + Arc::new(move || { + ( + ToolMeta::new(name) + .description(description) + .input_schema(json!({ + "type":"object", + "properties":{ + "slug":{"type":"string","description":"Workflow slug to update"}, + "reason":{"type":"string","description":"Brief completion/cancellation reason"} + }, + "required":["slug"], + "additionalProperties":false + })), + Arc::new(ActiveWorkflowStatusTool { + store: store_for_tool.clone(), + status, + }) as Arc, + ) + }) +} + +struct ActiveWorkflowListTool { + store: ActiveWorkflowStore, +} + +#[async_trait] +impl Tool for ActiveWorkflowListTool { + async fn execute( + &self, + _input_json: &str, + _ctx: ToolExecutionContext, + ) -> Result { + let snapshot = self.store.snapshot(); + let content = serde_json::to_string_pretty(&snapshot) + .map_err(|err| ToolError::Internal(err.to_string()))?; + let active = snapshot + .workflows + .iter() + .filter(|record| record.status == ActiveWorkflowStatus::Active) + .count(); + Ok(ToolOutput { + summary: format!( + "ActiveWorkflowStore: {} workflow(s), {active} active", + snapshot.workflows.len() + ), + content: Some(content), + }) + } +} + +struct ActiveWorkflowStatusTool { + store: ActiveWorkflowStore, + status: ActiveWorkflowStatus, +} + +#[async_trait] +impl Tool for ActiveWorkflowStatusTool { + async fn execute( + &self, + input_json: &str, + _ctx: ToolExecutionContext, + ) -> Result { + let params: WorkflowStatusParams = serde_json::from_str(input_json) + .map_err(|err| ToolError::InvalidArgument(err.to_string()))?; + let reason = params.reason.unwrap_or_else(|| self.status.to_string()); + let record = self + .store + .set_status(¶ms.slug, self.status, reason, segment_log::now_millis()) + .map_err(ToolError::InvalidArgument)?; + let content = serde_json::to_string_pretty(&record) + .map_err(|err| ToolError::Internal(err.to_string()))?; + Ok(ToolOutput { + summary: format!("workflow {} marked {}", record.slug, record.status), + content: Some(content), + }) + } +} + +#[derive(Debug, Deserialize)] +struct WorkflowStatusParams { + slug: String, + #[serde(default)] + reason: Option, +} + +fn upsert_record(records: &mut Vec, record: ActiveWorkflowRecord) { + if let Some(existing) = records + .iter_mut() + .find(|existing| existing.slug == record.slug) + { + *existing = record; + } else { + records.push(record); + } +} + +fn extract_obligations(body: &str) -> Vec { + let mut obligations = Vec::new(); + for line in body.lines() { + let trimmed = line.trim(); + let candidate = trimmed + .strip_prefix("- ") + .or_else(|| trimmed.strip_prefix("* ")) + .or_else(|| trimmed.strip_prefix("• ")) + .unwrap_or(trimmed); + let lower = candidate.to_ascii_lowercase(); + let looks_obligating = lower.contains("must") + || lower.contains("require") + || lower.contains("obligation") + || lower.contains("review") + || lower.contains("merge") + || lower.contains("close") + || lower.contains("report") + || lower.contains("handoff"); + if looks_obligating && !candidate.is_empty() { + obligations.push(truncate_chars(candidate, 240)); + } + if obligations.len() >= 32 { + break; + } + } + if obligations.is_empty() { + obligations + .push("Follow the snapshotted workflow body until completion or cancellation".into()); + } + obligations +} + +fn render_snapshot_text(records: &[ActiveWorkflowRecord]) -> String { + let json = serde_json::to_string_pretty(&ActiveWorkflowSnapshot { + schema_version: SCHEMA_VERSION, + workflows: records.to_vec(), + }) + .unwrap_or_else(|_| String::from("{\"schema_version\":1,\"workflows\":[]}")); + format!( + "ActiveWorkflowStore: {} active workflow(s)\n\n```json\n{}\n```", + records.len(), + json + ) +} + +fn render_rehydration_message(records: &[ActiveWorkflowRecord]) -> String { + let mut out = String::from( + "[Active workflow snapshot]\n\n\ + The following workflow invocation state is durable state carried across compaction. \ + Continue to follow each active workflow's snapshotted guidance until the governed task \ + is completed with ActiveWorkflowComplete or explicitly cancelled with ActiveWorkflowCancel. \ + Missing or obsolete workflow resources must not replace these invocation snapshots.\n", + ); + for record in records { + out.push_str(&format!( + "\n## /{} ({})\n- invoked_at_ms: {}\n- invocation_source: {:?}\n- body_snapshot_policy: {:?}\n- task_scope: {}\n\n### Current obligations/checkpoints\n", + record.slug, + record.status, + record.invocation.invoked_at_ms, + record.invocation.source, + record.body_snapshot_policy, + record.task_scope.replace('\n', " "), + )); + for checkpoint in &record.checkpoints { + out.push_str(&format!( + "- [{}] {}\n", + checkpoint.status_label(), + checkpoint.label + )); + } + out.push_str("\n### Snapshotted workflow guidance\n"); + out.push_str(record.guidance_snapshot.trim_end()); + out.push_str("\n"); + } + out +} + +impl WorkflowCheckpoint { + fn status_label(&self) -> &'static str { + match self.status { + WorkflowCheckpointStatus::Open => "open", + WorkflowCheckpointStatus::Done => "done", + WorkflowCheckpointStatus::Cancelled => "cancelled", + } + } +} + +fn truncate_chars(text: &str, max_chars: usize) -> String { + let mut out = String::new(); + for (idx, ch) in text.chars().enumerate() { + if idx >= max_chars { + out.push('…'); + return out; + } + out.push(ch); + } + out +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn active_workflow_guidance_carries_merge_close_obligations() { + let store = ActiveWorkflowStore::new(); + let items = vec![SystemItem::Workflow { + slug: "multi-agent-workflow".into(), + body: "# Multi-agent workflow\n- Delegate implementation to coder.\n- Require external review before merge.\n- Close the Ticket after merge and report evidence.\n".into(), + }]; + + assert!(store.activate_from_system_items( + &items, + "/multi-agent-workflow implement ticket".into(), + 42, + )); + let msg = store.rehydration_message().unwrap(); + + assert!(msg.contains("multi-agent-workflow")); + assert!(msg.contains("external review before merge")); + assert!(msg.contains("Close the Ticket after merge")); + assert!(msg.contains("Snapshotted workflow guidance")); + } + + #[test] + fn corrupt_extension_fails_closed_with_diagnostic() { + let entries = vec![(DOMAIN.to_string(), json!({"schema_version":"bad"}))]; + + let (snapshot, diagnostics) = fold_extensions(&entries); + + assert!(snapshot.workflows.is_empty()); + assert_eq!(diagnostics.len(), 1); + } +} diff --git a/crates/pod/src/lib.rs b/crates/pod/src/lib.rs index efc44e94..2153b268 100644 --- a/crates/pod/src/lib.rs +++ b/crates/pod/src/lib.rs @@ -1,3 +1,4 @@ +pub mod active_workflow; pub mod compact; pub mod controller; pub mod discovery; diff --git a/crates/pod/src/pod.rs b/crates/pod/src/pod.rs index 0b029b28..c2cee73a 100644 --- a/crates/pod/src/pod.rs +++ b/crates/pod/src/pod.rs @@ -26,6 +26,7 @@ use manifest::{ ScopeError, ScopeRule, SharedScope, WorkerManifest, }; +use crate::active_workflow::{self, ActiveWorkflowStore}; use crate::compact::state::CompactState; use crate::compact::usage_tracker::UsageTracker; use crate::feature::builtin::TaskFeature; @@ -145,6 +146,7 @@ struct EmptyTurnRollbackSnapshot { usage_history_len: usize, ai_activity_count: usize, last_run_interrupted: bool, + active_workflows: active_workflow::ActiveWorkflowSnapshot, } fn is_ai_materialized_item(item: &Item) -> bool { @@ -274,6 +276,10 @@ pub struct Pod { /// the narrow snapshot/restore surface Pod needs for compaction and rewind. /// Store/reminder ownership stays inside the Task feature module. task_feature: TaskFeature, + /// Durable state for workflow invocations that are active for the current task. + /// The store is persisted as typed session-log extensions and rehydrated into + /// prompt context during compaction. + active_workflows: ActiveWorkflowStore, /// Parsed system-prompt template awaiting first-turn materialisation. /// `Some` until `ensure_system_prompt_materialized` renders it once, /// then `None` forever — including after compaction. @@ -435,6 +441,7 @@ impl Pod { usage_history: self.usage_history.clone(), tracker: None, task_feature: self.task_feature.clone(), + active_workflows: self.active_workflows.clone(), system_prompt_template: None, alerter: self.alerter.clone(), event_tx: self.event_tx.clone(), @@ -618,6 +625,7 @@ impl Pod { usage_history: Arc::new(Mutex::new(Vec::::new())), tracker: None, task_feature: TaskFeature::new(), + active_workflows: ActiveWorkflowStore::new(), system_prompt_template: None, alerter: None, event_tx: None, @@ -813,7 +821,11 @@ impl Pod { registry: FeatureRegistryBuilder, ) -> FeatureRegistryInstallReport { let worker = self.worker.as_mut().expect("worker taken during run"); - registry.install_into_worker(worker, &mut self.hook_builder) + let report = registry.install_into_worker(worker, &mut self.hook_builder); + worker.register_tools(active_workflow::active_workflow_tools( + self.active_workflows.clone(), + )); + report } /// Reference to the store. @@ -876,6 +888,8 @@ impl Pod { self.sink.truncate_silent(truncate_entries); self.task_feature.restore_from_history(&state.history); + self.active_workflows + .restore_from_history_and_extensions(&state.history, &state.extensions); self.worker_mut().set_history(state.history); self.worker_mut().set_request_config(state.config); self.worker_mut().set_turn_count(state.turn_count); @@ -1428,6 +1442,7 @@ impl Pod { usage_history_len, ai_activity_count: self.ai_activity_counter.load(Ordering::SeqCst), last_run_interrupted: self.worker().last_run_interrupted(), + active_workflows: self.active_workflows.snapshot(), } } @@ -1465,6 +1480,8 @@ impl Pod { .truncate(snapshot.usage_history_len); let _ = self.usage_tracker.drain(); let _ = self.metrics_tracker.drain(); + self.active_workflows + .replace_with(snapshot.active_workflows); let loc = self.segment_state.location(); self.store @@ -1535,6 +1552,14 @@ impl Pod { let mut attachments = self.resolve_file_refs(&input); attachments.extend(self.resolve_knowledge_refs(&input)); attachments.extend(self.resolve_workflow_invocations(&input)?); + let flattened = self.flatten_segments(&input); + if self.active_workflows.activate_from_system_items( + &attachments, + flattened.clone(), + segment_log::now_millis(), + ) { + self.commit_entry(self.active_workflows.extension_entry())?; + } if !attachments.is_empty() { *self .pending_attachments @@ -1542,8 +1567,6 @@ impl Pod { .expect("pending_attachments poisoned") = attachments; } - let flattened = self.flatten_segments(&input); - let history_before = self.worker.as_ref().unwrap().history().len(); // lock → run → unlock @@ -2428,13 +2451,15 @@ impl Pod { .unwrap_or_default(); // Input text fed to the compact worker. Includes the default - // references, current TaskStore snapshot, and the (pruned) - // conversation text. + // references, current TaskStore snapshot, active workflow invocation + // state, and the (pruned) conversation text. let task_snapshot_text = self.task_feature.snapshot_text(); + let active_workflow_snapshot_text = self.active_workflows.snapshot_text(); let summary_input = build_summary_input( &items_to_summarise, &default_refs, Some(task_snapshot_text.as_str()), + active_workflow_snapshot_text.as_deref(), SummaryInputOptions { overview_target_tokens, overview_warning_tokens, @@ -2609,20 +2634,31 @@ impl Pod { .filter(|i| i.is_user_message()) .count(); - // Build new history: [summary, ...auto-read, references, ...retained, task snapshot, TaskList synthetic call/result]. + // Build new history: [summary, ...auto-read, references, ...retained, active workflow snapshot, task snapshot, TaskList synthetic call/result]. + // The active workflow snapshot is inserted from durable typed state so + // workflow-governed tasks keep their procedural authority after the + // compacted segment starts. // The TaskStore snapshot trails the retained items so that, on resume, // `replay_history` walks any pre-compact Task* calls preserved verbatim // in retained_items first and the trailing snapshot's `replace_with` // is the final word — pre-compact `TaskCreate` calls cannot leak as // duplicate entries. + let active_workflow_message = self + .active_workflows + .rehydration_message() + .map(Item::system_message); let mut new_history = Vec::with_capacity( 1 + auto_read_messages.len() + 3 + reference_message.is_some() as usize + + active_workflow_message.is_some() as usize + retained_items.len(), ); - let mut compact_introduced_system_messages = - Vec::with_capacity(2 + auto_read_messages.len() + reference_message.is_some() as usize); + let mut compact_introduced_system_messages = Vec::with_capacity( + 2 + auto_read_messages.len() + + reference_message.is_some() as usize + + active_workflow_message.is_some() as usize, + ); let summary_message = Item::system_message(format!("[Compacted context summary]\n\n{summary_text}")); compact_introduced_system_messages.push(summary_message.clone()); @@ -2635,6 +2671,9 @@ impl Pod { This is the complete session task list preserved across compaction. \ The following TaskList tool result presents the same state through the tool lane." )); + if let Some(msg) = active_workflow_message.as_ref() { + compact_introduced_system_messages.push(msg.clone()); + } compact_introduced_system_messages.push(task_snapshot_message.clone()); new_history.push(summary_message); @@ -2643,6 +2682,9 @@ impl Pod { new_history.push(msg); } new_history.extend(retained_items); + if let Some(msg) = active_workflow_message { + new_history.push(msg); + } new_history.push(task_snapshot_message); new_history.push(Item::tool_call("compact-tasklist", "TaskList", "{}")); new_history.push(Item::tool_result_with_content( @@ -2680,18 +2722,23 @@ impl Pod { at_turn_index: source_turn_count, }), }; + let active_workflow_extension = self.active_workflows.extension_entry(); + let initial_entries = vec![entry.clone(), active_workflow_extension.clone()]; self.store - .create_segment(old_loc.session_id, new_segment_id, &[entry.clone()])?; + .create_segment(old_loc.session_id, new_segment_id, &initial_entries)?; self.segment_state.set_location(SegmentLocation { session_id: old_loc.session_id, segment_id: new_segment_id, }); - self.segment_state.set_entries_written(1); + self.segment_state + .set_entries_written(initial_entries.len()); let session_start = entry; // Broadcast the SegmentStart through the sink. This atomically - // resets the mirror to `[SegmentStart]` so any subscriber - // querying after this point sees the post-compaction prefix. - self.sink.reset_with_initial(session_start); + // resets the mirror to the replacement segment prefix so any subscriber + // querying after this point sees the post-compaction prefix, including + // durable extension state. + self.sink + .reset_with_initial_entries(vec![session_start, active_workflow_extension]); // Keep pods.json pointing at the live segment_id. Without this // a concurrent `restore_from_manifest(new_segment_id)` would // see no live writer and grab the session this Pod just moved @@ -3794,6 +3841,7 @@ where usage_history: Arc::new(Mutex::new(Vec::new())), tracker: None, task_feature: TaskFeature::new(), + active_workflows: ActiveWorkflowStore::new(), system_prompt_template: common.system_prompt_template, alerter: None, event_tx: None, @@ -3902,6 +3950,7 @@ where usage_history: Arc::new(Mutex::new(Vec::new())), tracker: None, task_feature: TaskFeature::new(), + active_workflows: ActiveWorkflowStore::new(), system_prompt_template: common.system_prompt_template, alerter: None, event_tx: None, @@ -4111,6 +4160,8 @@ where let extract_pointer = memory::extract::fold_pointer(&state.extensions); let task_feature = TaskFeature::from_history(&state.history); + let active_workflows = ActiveWorkflowStore::new(); + active_workflows.restore_from_history_and_extensions(&state.history, &state.extensions); let pod_metadata_writer = Some(pod_metadata_writer_for_store(&store)); let mut pod = Self { @@ -4131,6 +4182,7 @@ where usage_history: Arc::new(Mutex::new(state.usage_history)), tracker: None, task_feature, + active_workflows, // Restore replays the saved system_prompt verbatim — no // template re-render on resume. system_prompt_template: None, @@ -4335,12 +4387,13 @@ struct SummaryInputBuild { } /// Build the compact worker's input: default-reference instructions, -/// the list of recently-touched files, task snapshot, and a bounded overview -/// rather than a prefix-wide transcript. +/// the list of recently-touched files, task snapshot, active workflow snapshot, +/// and a bounded overview rather than a prefix-wide transcript. fn build_summary_input( items: &[Item], default_refs: &[PathBuf], task_snapshot: Option<&str>, + active_workflow_snapshot: Option<&str>, options: SummaryInputOptions, ) -> SummaryInputBuild { let overview = build_summary_overview( @@ -4392,6 +4445,17 @@ fn build_summary_input( out.push_str(task_snapshot); out.push_str("\n\n"); } + if let Some(active_workflow_snapshot) = active_workflow_snapshot { + out.push_str( + "## Active Workflow Invocation State\n\ + This is durable typed workflow state for workflow-governed tasks. Preserve active \ + slugs, invocation scope, status, obligations/checkpoints, and the snapshotted \ + workflow guidance in the summary; do not substitute advertised/latest workflow \ + resources for this invocation state.\n", + ); + out.push_str(active_workflow_snapshot); + out.push_str("\n\n"); + } out.push_str("## Conversation overview/index\n"); out.push_str(&overview); out.push_str("\n\nWhen you are done, call `write_summary` with the final 5-section text."); @@ -5278,6 +5342,7 @@ mod build_summary_prompt_tests { items, &[], None, + None, SummaryInputOptions { overview_target_tokens: 512, overview_warning_tokens: 1024, @@ -5326,6 +5391,27 @@ mod build_summary_prompt_tests { assert!(!prompt.contains("deliberation")); } + #[test] + fn includes_active_workflow_snapshot_section() { + let prompt = build_summary_input( + &[Item::user_message("continue after review")], + &[], + None, + Some("ActiveWorkflowStore: 1 active workflow\n- review before merge\n- close ticket"), + SummaryInputOptions { + overview_target_tokens: 512, + overview_warning_tokens: 1024, + overview_deadline_tokens: 2048, + summary_target_tokens: 256, + }, + ) + .text; + + assert!(prompt.contains("## Active Workflow Invocation State")); + assert!(prompt.contains("review before merge")); + assert!(prompt.contains("close ticket")); + } + #[test] fn overview_warning_does_not_drop_input() { let items = vec![Item::user_message("x".repeat(4_000))]; @@ -5333,6 +5419,7 @@ mod build_summary_prompt_tests { &items, &[], None, + None, SummaryInputOptions { overview_target_tokens: 10, overview_warning_tokens: 100, @@ -5352,6 +5439,7 @@ mod build_summary_prompt_tests { &items, &[], None, + None, SummaryInputOptions { overview_target_tokens: 10, overview_warning_tokens: 10, diff --git a/crates/pod/src/segment_log_sink.rs b/crates/pod/src/segment_log_sink.rs index 42e6e6c4..cc4feff7 100644 --- a/crates/pod/src/segment_log_sink.rs +++ b/crates/pod/src/segment_log_sink.rs @@ -147,6 +147,24 @@ impl SegmentLogSink { let _ = self.inner.broadcast_tx.send(initial); } + /// Atomically swap the mirror to the supplied replacement-session prefix + /// and broadcast the first entry as the live rotation signal. Entries after + /// the first are already reflected in reconnect snapshots but are not + /// broadcast live; this is intended for non-live extension state that must + /// share the new segment prefix with SegmentStart. + pub fn reset_with_initial_entries(&self, entries: Vec) { + let first = entries.first().cloned(); + let mut mirror = self + .inner + .mirror + .lock() + .expect("session log mirror mutex poisoned"); + *mirror = entries; + if let Some(initial) = first { + let _ = self.inner.broadcast_tx.send(initial); + } + } + /// Replace the mirror with the supplied prefix without broadcasting. /// /// Used by restore paths that load a session's complete log into diff --git a/resources/prompts/internal/compact_system.md b/resources/prompts/internal/compact_system.md index 61a26b18..1241e54c 100644 --- a/resources/prompts/internal/compact_system.md +++ b/resources/prompts/internal/compact_system.md @@ -4,7 +4,7 @@ The conversation input is a bounded overview/index, not the full transcript. Tre ## Workflow -1. Read the provided overview/index and current TaskStore snapshot. +1. Read the provided overview/index, current TaskStore snapshot, and any Active Workflow Invocation State section. 2. If the overview does not contain enough detail, use `search_session_log` to find relevant compact-target history items, then `read_session_items` to inspect only the needed range. 3. Use `read_file` to inspect referenced files before deciding what the next session needs. Prefer skimming over blind inclusion. 4. For files whose current contents are load-bearing for the active work, call `mark_read_required` to inject them into the next session. These count against the auto-read token budget — spend it deliberately. @@ -39,5 +39,6 @@ Produce the summary in this exact format: ## Constraints +- Preserve active workflow invocation state when present: active slug, invocation scope/source/time, status, open obligations/checkpoints, and snapshotted workflow guidance. Do not replace a snapshotted invocation with merely advertised/latest workflow resources. - Keep code snippets and raw tool output OUT of the summary — that is what auto-read and references are for. - Follow the summary target stated in the run input; if asked to shrink, call `write_summary` again with a shorter version.