diff --git a/crates/pod/src/active_workflow.rs b/crates/pod/src/active_workflow.rs new file mode 100644 index 00000000..b682463c --- /dev/null +++ b/crates/pod/src/active_workflow.rs @@ -0,0 +1,736 @@ +//! Durable active workflow invocation state. +//! +//! Workflow bodies are resolved at invocation time and snapshotted here. The +//! snapshot, not whatever resource version is installed later, is the procedural +//! authority that survives compaction for the currently governed task. + +use std::collections::BTreeMap; +use std::sync::{Arc, Mutex}; + +use async_trait::async_trait; +use llm_worker::Item; +use llm_worker::tool::{ + Tool, ToolDefinition, ToolError, ToolExecutionContext, ToolMeta, ToolOutput, +}; +use serde::{Deserialize, Serialize}; +use serde_json::json; +use session_store::{LogEntry, SystemItem, segment_log}; + +pub const DOMAIN: &str = "pod.active_workflows"; +pub const REHYDRATION_MESSAGE_PREFIX: &str = "[Active workflow snapshot]"; +pub const INACTIVE_MESSAGE_PREFIX: &str = "[Active workflow state]"; +const SCHEMA_VERSION: u32 = 1; + +pub type LogEntryCommitter = Arc; + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct ActiveWorkflowSnapshot { + pub schema_version: u32, + pub workflows: Vec, +} + +impl Default for ActiveWorkflowSnapshot { + fn default() -> Self { + Self { + schema_version: SCHEMA_VERSION, + workflows: Vec::new(), + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct ActiveWorkflowRecord { + pub slug: String, + pub status: ActiveWorkflowStatus, + pub invocation: WorkflowInvocationInfo, + pub task_scope: String, + pub body_snapshot_policy: WorkflowBodySnapshotPolicy, + pub guidance_snapshot: String, + pub obligations: Vec, + pub checkpoints: Vec, + pub updated_at_ms: u64, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub completion: Option, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema)] +#[serde(rename_all = "snake_case")] +pub enum ActiveWorkflowStatus { + Active, + Completed, + Cancelled, +} + +impl std::fmt::Display for ActiveWorkflowStatus { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let s = match self { + Self::Active => "active", + Self::Completed => "completed", + Self::Cancelled => "cancelled", + }; + f.write_str(s) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct WorkflowInvocationInfo { + pub source: WorkflowInvocationSource, + pub invoked_at_ms: u64, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum WorkflowInvocationSource { + UserWorkflowInvokeSegment, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum WorkflowBodySnapshotPolicy { + SnapshottedAtInvocation, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct WorkflowCheckpoint { + pub label: String, + pub status: WorkflowCheckpointStatus, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum WorkflowCheckpointStatus { + Open, + Done, + Cancelled, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct WorkflowCompletionInfo { + pub completed_at_ms: u64, + pub reason: String, +} + +#[derive(Debug, Clone, Default)] +pub struct ActiveWorkflowStore { + inner: Arc>, +} + +impl ActiveWorkflowStore { + pub fn new() -> Self { + Self::default() + } + + pub fn snapshot(&self) -> ActiveWorkflowSnapshot { + self.inner.lock().unwrap_or_else(|e| e.into_inner()).clone() + } + + pub fn replace_with(&self, snapshot: ActiveWorkflowSnapshot) { + *self.inner.lock().unwrap_or_else(|e| e.into_inner()) = snapshot; + } + + pub fn active_records(&self) -> Vec { + self.snapshot() + .workflows + .into_iter() + .filter(|record| record.status == ActiveWorkflowStatus::Active) + .collect() + } + + pub fn activate_from_system_items( + &self, + items: &[SystemItem], + task_scope: String, + invoked_at_ms: u64, + ) -> bool { + let mut grouped: BTreeMap> = BTreeMap::new(); + for item in items { + if let SystemItem::Workflow { slug, body } = item { + grouped.entry(slug.clone()).or_default().push(body.clone()); + } + } + if grouped.is_empty() { + return false; + } + + let mut snapshot = self.snapshot(); + snapshot.schema_version = SCHEMA_VERSION; + for (slug, bodies) in grouped { + let guidance_snapshot = bodies.join("\n\n---\n\n"); + let obligations = extract_obligations(&guidance_snapshot); + let checkpoints = obligations + .iter() + .take(32) + .map(|label| WorkflowCheckpoint { + label: label.clone(), + status: WorkflowCheckpointStatus::Open, + }) + .collect(); + let record = ActiveWorkflowRecord { + slug: slug.clone(), + status: ActiveWorkflowStatus::Active, + invocation: WorkflowInvocationInfo { + source: WorkflowInvocationSource::UserWorkflowInvokeSegment, + invoked_at_ms, + }, + task_scope: truncate_chars(&task_scope, 2_000), + body_snapshot_policy: WorkflowBodySnapshotPolicy::SnapshottedAtInvocation, + guidance_snapshot, + obligations, + checkpoints, + updated_at_ms: invoked_at_ms, + completion: None, + }; + upsert_record(&mut snapshot.workflows, record); + } + self.replace_with(snapshot); + true + } + + pub fn set_status( + &self, + slug: &str, + status: ActiveWorkflowStatus, + reason: String, + now_ms: u64, + ) -> Result { + let mut snapshot = self.inner.lock().unwrap_or_else(|e| e.into_inner()); + let record = snapshot + .workflows + .iter_mut() + .find(|record| record.slug == slug) + .ok_or_else(|| format!("active workflow `{slug}` not found"))?; + record.status = status; + record.updated_at_ms = now_ms; + record.completion = Some(WorkflowCompletionInfo { + completed_at_ms: now_ms, + reason, + }); + for checkpoint in &mut record.checkpoints { + checkpoint.status = match status { + ActiveWorkflowStatus::Active => WorkflowCheckpointStatus::Open, + ActiveWorkflowStatus::Completed => WorkflowCheckpointStatus::Done, + ActiveWorkflowStatus::Cancelled => WorkflowCheckpointStatus::Cancelled, + }; + } + Ok(record.clone()) + } + + pub fn snapshot_text(&self) -> Option { + let active = self.active_records(); + (!active.is_empty()).then(|| render_snapshot_text(&active)) + } + + pub fn rehydration_message(&self) -> Option { + let active = self.active_records(); + (!active.is_empty()).then(|| render_rehydration_message(&active)) + } + + pub fn sanitize_context(&self, context: &mut Vec) -> usize { + let removed = strip_rehydration_messages(context); + if let Some(message) = self.rehydration_message() { + context.push(Item::system_message(message)); + } else if removed > 0 || context.iter().any(has_active_workflow_hint) { + context.push(Item::system_message(inactive_workflow_message())); + } + removed + } + + pub fn extension_entry(&self) -> LogEntry { + LogEntry::Extension { + ts: segment_log::now_millis(), + domain: DOMAIN.into(), + payload: serde_json::to_value(self.snapshot()) + .expect("ActiveWorkflowSnapshot is always JSON-serializable"), + } + } + + pub fn restore_from_history_and_extensions( + &self, + _history: &[Item], + extensions: &[(String, serde_json::Value)], + ) { + let (snapshot, diagnostics) = fold_extensions(extensions); + for diagnostic in diagnostics { + tracing::warn!(diagnostic, "failed to restore active workflow state"); + } + self.replace_with(snapshot); + } +} + +pub fn fold_extensions( + extensions: &[(String, serde_json::Value)], +) -> (ActiveWorkflowSnapshot, Vec) { + let mut latest = None; + let mut diagnostics = Vec::new(); + for (domain, payload) in extensions { + if domain != DOMAIN { + continue; + } + match serde_json::from_value::(payload.clone()) { + Ok(snapshot) if snapshot.schema_version == SCHEMA_VERSION => latest = Some(snapshot), + Ok(snapshot) => { + latest = None; + diagnostics.push(format!( + "unsupported active workflow schema_version {}", + snapshot.schema_version + )); + } + Err(err) => { + latest = None; + diagnostics.push(format!("corrupt active workflow payload: {err}")); + } + } + } + (latest.unwrap_or_default(), diagnostics) +} + +pub fn strip_rehydration_messages(items: &mut Vec) -> usize { + let before = items.len(); + items.retain(|item| !is_rehydration_message(item)); + before - items.len() +} + +pub fn is_rehydration_message(item: &Item) -> bool { + item_system_text(item) + .map(|text| text.trim_start().starts_with(REHYDRATION_MESSAGE_PREFIX)) + .unwrap_or(false) +} + +fn has_active_workflow_hint(item: &Item) -> bool { + item_system_text(item) + .map(|text| { + text.contains("Active Workflow Invocation State") + || text.contains("ActiveWorkflowStore:") + || text.contains(REHYDRATION_MESSAGE_PREFIX) + }) + .unwrap_or(false) +} + +fn item_system_text(item: &Item) -> Option { + match item { + Item::Message { role, content, .. } if *role == llm_worker::Role::System => Some( + content + .iter() + .map(|part| part.as_text()) + .collect::(), + ), + _ => None, + } +} + +fn inactive_workflow_message() -> String { + format!( + "{INACTIVE_MESSAGE_PREFIX}\n\n\ + No currently valid active workflow invocation state is active. Ignore older compacted \ + history or summaries that appear to describe active workflow obligations; only validated \ + typed `{DOMAIN}` records with status `active` establish active workflow guidance." + ) +} + +pub fn active_workflow_tools( + store: ActiveWorkflowStore, + committer: Option, +) -> Vec { + vec![ + list_tool(store.clone()), + status_tool( + store.clone(), + ActiveWorkflowStatus::Completed, + committer.clone(), + ), + status_tool(store, ActiveWorkflowStatus::Cancelled, committer), + ] +} + +fn list_tool(store: ActiveWorkflowStore) -> ToolDefinition { + Arc::new(move || { + ( + ToolMeta::new("ActiveWorkflowList") + .description("List durable active workflow invocations and their status") + .input_schema( + json!({"type":"object","properties":{},"additionalProperties":false}), + ), + Arc::new(ActiveWorkflowListTool { + store: store.clone(), + }) as Arc, + ) + }) +} + +fn status_tool( + store: ActiveWorkflowStore, + status: ActiveWorkflowStatus, + committer: Option, +) -> ToolDefinition { + let name = match status { + ActiveWorkflowStatus::Completed => "ActiveWorkflowComplete", + ActiveWorkflowStatus::Cancelled => "ActiveWorkflowCancel", + ActiveWorkflowStatus::Active => unreachable!("active status tool is not exposed"), + }; + let description = match status { + ActiveWorkflowStatus::Completed => { + "Mark an active workflow as completed when its governed task is finished" + } + ActiveWorkflowStatus::Cancelled => { + "Cancel an active workflow when the governed task is explicitly abandoned" + } + ActiveWorkflowStatus::Active => unreachable!("active status tool is not exposed"), + }; + let store_for_tool = store.clone(); + let committer_for_tool = committer.clone(); + Arc::new(move || { + ( + ToolMeta::new(name) + .description(description) + .input_schema(json!({ + "type":"object", + "properties":{ + "slug":{"type":"string","description":"Workflow slug to update"}, + "reason":{"type":"string","description":"Brief completion/cancellation reason"} + }, + "required":["slug"], + "additionalProperties":false + })), + Arc::new(ActiveWorkflowStatusTool { + store: store_for_tool.clone(), + status, + committer: committer_for_tool.clone(), + }) as Arc, + ) + }) +} + +struct ActiveWorkflowListTool { + store: ActiveWorkflowStore, +} + +#[async_trait] +impl Tool for ActiveWorkflowListTool { + async fn execute( + &self, + _input_json: &str, + _ctx: ToolExecutionContext, + ) -> Result { + let snapshot = self.store.snapshot(); + let content = serde_json::to_string_pretty(&snapshot) + .map_err(|err| ToolError::Internal(err.to_string()))?; + let active = snapshot + .workflows + .iter() + .filter(|record| record.status == ActiveWorkflowStatus::Active) + .count(); + Ok(ToolOutput { + summary: format!( + "ActiveWorkflowStore: {} workflow(s), {active} active", + snapshot.workflows.len() + ), + content: Some(content), + }) + } +} + +struct ActiveWorkflowStatusTool { + store: ActiveWorkflowStore, + status: ActiveWorkflowStatus, + committer: Option, +} + +#[async_trait] +impl Tool for ActiveWorkflowStatusTool { + async fn execute( + &self, + input_json: &str, + _ctx: ToolExecutionContext, + ) -> Result { + let params: WorkflowStatusParams = serde_json::from_str(input_json) + .map_err(|err| ToolError::InvalidArgument(err.to_string()))?; + let reason = params.reason.unwrap_or_else(|| self.status.to_string()); + let record = self + .store + .set_status(¶ms.slug, self.status, reason, segment_log::now_millis()) + .map_err(ToolError::InvalidArgument)?; + if let Some(committer) = &self.committer { + committer(self.store.extension_entry()); + } + let content = serde_json::to_string_pretty(&record) + .map_err(|err| ToolError::Internal(err.to_string()))?; + Ok(ToolOutput { + summary: format!("workflow {} marked {}", record.slug, record.status), + content: Some(content), + }) + } +} + +#[derive(Debug, Deserialize)] +struct WorkflowStatusParams { + slug: String, + #[serde(default)] + reason: Option, +} + +fn upsert_record(records: &mut Vec, record: ActiveWorkflowRecord) { + if let Some(existing) = records + .iter_mut() + .find(|existing| existing.slug == record.slug) + { + *existing = record; + } else { + records.push(record); + } +} + +fn extract_obligations(body: &str) -> Vec { + let mut obligations = Vec::new(); + for line in body.lines() { + let trimmed = line.trim(); + let candidate = trimmed + .strip_prefix("- ") + .or_else(|| trimmed.strip_prefix("* ")) + .or_else(|| trimmed.strip_prefix("• ")) + .unwrap_or(trimmed); + let lower = candidate.to_ascii_lowercase(); + let looks_obligating = lower.contains("must") + || lower.contains("require") + || lower.contains("obligation") + || lower.contains("review") + || lower.contains("merge") + || lower.contains("close") + || lower.contains("report") + || lower.contains("handoff"); + if looks_obligating && !candidate.is_empty() { + obligations.push(truncate_chars(candidate, 240)); + } + if obligations.len() >= 32 { + break; + } + } + if obligations.is_empty() { + obligations + .push("Follow the snapshotted workflow body until completion or cancellation".into()); + } + obligations +} + +fn render_snapshot_text(records: &[ActiveWorkflowRecord]) -> String { + let json = serde_json::to_string_pretty(&ActiveWorkflowSnapshot { + schema_version: SCHEMA_VERSION, + workflows: records.to_vec(), + }) + .unwrap_or_else(|_| String::from("{\"schema_version\":1,\"workflows\":[]}")); + format!( + "ActiveWorkflowStore: {} active workflow(s)\n\n```json\n{}\n```", + records.len(), + json + ) +} + +fn render_rehydration_message(records: &[ActiveWorkflowRecord]) -> String { + let mut out = format!( + "{REHYDRATION_MESSAGE_PREFIX}\n\n\ + The following workflow invocation state is durable state carried across compaction. \ + Continue to follow each active workflow's snapshotted guidance until the governed task \ + is completed with ActiveWorkflowComplete or explicitly cancelled with ActiveWorkflowCancel. \ + Missing or obsolete workflow resources must not replace these invocation snapshots.\n" + ); + for record in records { + out.push_str(&format!( + "\n## /{} ({})\n- invoked_at_ms: {}\n- invocation_source: {:?}\n- body_snapshot_policy: {:?}\n- task_scope: {}\n\n### Current obligations/checkpoints\n", + record.slug, + record.status, + record.invocation.invoked_at_ms, + record.invocation.source, + record.body_snapshot_policy, + record.task_scope.replace('\n', " "), + )); + for checkpoint in &record.checkpoints { + out.push_str(&format!( + "- [{}] {}\n", + checkpoint.status_label(), + checkpoint.label + )); + } + out.push_str("\n### Snapshotted workflow guidance\n"); + out.push_str(record.guidance_snapshot.trim_end()); + out.push_str("\n"); + } + out +} + +impl WorkflowCheckpoint { + fn status_label(&self) -> &'static str { + match self.status { + WorkflowCheckpointStatus::Open => "open", + WorkflowCheckpointStatus::Done => "done", + WorkflowCheckpointStatus::Cancelled => "cancelled", + } + } +} + +fn truncate_chars(text: &str, max_chars: usize) -> String { + let mut out = String::new(); + for (idx, ch) in text.chars().enumerate() { + if idx >= max_chars { + out.push('…'); + return out; + } + out.push(ch); + } + out +} + +#[cfg(test)] +mod tests { + use super::*; + + fn store_with_active_workflow() -> ActiveWorkflowStore { + let store = ActiveWorkflowStore::new(); + assert!(store.activate_from_system_items( + &[SystemItem::Workflow { + slug: "multi-agent-workflow".into(), + body: "# Multi-agent workflow\n- Delegate implementation to coder.\n- Require external review before merge.\n- Close the Ticket after merge and report evidence.\n".into(), + }], + "/multi-agent-workflow implement ticket".into(), + 42, + )); + store + } + + fn active_extension(store: &ActiveWorkflowStore) -> (String, serde_json::Value) { + ( + DOMAIN.to_string(), + serde_json::to_value(store.snapshot()).expect("snapshot json"), + ) + } + + #[test] + fn active_workflow_guidance_carries_merge_close_obligations() { + let store = store_with_active_workflow(); + let msg = store.rehydration_message().unwrap(); + + assert!(msg.contains("multi-agent-workflow")); + assert!(msg.contains("external review before merge")); + assert!(msg.contains("Close the Ticket after merge")); + assert!(msg.contains("Snapshotted workflow guidance")); + } + + #[test] + fn compacted_rehydration_message_is_removed_when_typed_state_missing_or_invalid() { + for extensions in [ + Vec::new(), + vec![(DOMAIN.to_string(), json!({"schema_version":"bad"}))], + vec![( + DOMAIN.to_string(), + json!({"schema_version":999,"workflows":[]}), + )], + ] { + let original = store_with_active_workflow(); + let stale_message = original.rehydration_message().unwrap(); + let mut context = vec![ + Item::system_message(stale_message), + Item::user_message("continue"), + ]; + let restored = ActiveWorkflowStore::new(); + + restored.restore_from_history_and_extensions(&context, &extensions); + let removed = restored.sanitize_context(&mut context); + + assert_eq!(removed, 1); + assert!(restored.active_records().is_empty()); + assert!(!context.iter().any(is_rehydration_message)); + } + } + + #[test] + fn completion_or_cancellation_suppresses_old_compacted_guidance() { + for status in [ + ActiveWorkflowStatus::Completed, + ActiveWorkflowStatus::Cancelled, + ] { + let store = store_with_active_workflow(); + let stale_message = store.rehydration_message().unwrap(); + let mut context = vec![ + Item::system_message(stale_message), + Item::user_message("continue"), + ]; + + store + .set_status("multi-agent-workflow", status, status.to_string(), 84) + .expect("workflow exists"); + let removed = store.sanitize_context(&mut context); + + assert_eq!(removed, 1); + assert!(!context.iter().any(is_rehydration_message)); + } + } + + #[test] + fn unmatched_status_tool_calls_do_not_mutate_restored_state() { + let store = store_with_active_workflow(); + let extensions = vec![active_extension(&store)]; + let history = vec![ + Item::tool_call( + "call-1", + "ActiveWorkflowCancel", + json!({"slug":"multi-agent-workflow","reason":"not durable"}).to_string(), + ), + Item::tool_result_error("call-1", "error: failed"), + ]; + let restored = ActiveWorkflowStore::new(); + + restored.restore_from_history_and_extensions(&history, &extensions); + + assert_eq!(restored.active_records().len(), 1); + assert_eq!( + restored.snapshot().workflows[0].status, + ActiveWorkflowStatus::Active + ); + } + + #[tokio::test] + async fn status_tool_persists_typed_extension_on_success() { + let store = store_with_active_workflow(); + let committed = Arc::new(Mutex::new(Vec::::new())); + let committed_for_tool = committed.clone(); + let tools = active_workflow_tools( + store.clone(), + Some(Arc::new(move |entry| { + committed_for_tool + .lock() + .expect("committed entries mutex poisoned") + .push(entry); + })), + ); + let (_, tool) = tools[1](); + + tool.execute( + &json!({"slug":"multi-agent-workflow","reason":"review complete"}).to_string(), + ToolExecutionContext::default(), + ) + .await + .expect("status tool succeeds"); + + let committed = committed.lock().expect("committed entries mutex poisoned"); + let LogEntry::Extension { + domain, payload, .. + } = committed.last().expect("extension committed") + else { + panic!("expected typed active workflow extension"); + }; + assert_eq!(domain, DOMAIN); + let snapshot: ActiveWorkflowSnapshot = serde_json::from_value(payload.clone()).unwrap(); + assert_eq!( + snapshot.workflows[0].status, + ActiveWorkflowStatus::Completed + ); + } + + #[test] + fn corrupt_extension_fails_closed_with_diagnostic() { + let entries = vec![(DOMAIN.to_string(), json!({"schema_version":"bad"}))]; + + let (snapshot, diagnostics) = fold_extensions(&entries); + + assert!(snapshot.workflows.is_empty()); + assert_eq!(diagnostics.len(), 1); + } +} diff --git a/crates/pod/src/ipc/interceptor.rs b/crates/pod/src/ipc/interceptor.rs index 41f36e61..d8a296a3 100644 --- a/crates/pod/src/ipc/interceptor.rs +++ b/crates/pod/src/ipc/interceptor.rs @@ -22,6 +22,7 @@ use llm_worker::tool::ToolOutput; use tracing::info; use tracing::warn; +use crate::active_workflow::ActiveWorkflowStore; use crate::compact::state::CompactState; use crate::compact::usage_tracker::UsageTracker; use session_store::SystemItem; @@ -71,6 +72,10 @@ pub(crate) struct PodInterceptor { /// worker. `None` in tests / `Pod::new` paths where no writer is /// attached. log_writer: Option>, + /// Active workflow state is durable typed Pod state. The interceptor + /// regenerates request-local workflow guidance from this store and strips + /// any stale compacted-history copies before each model request. + active_workflows: ActiveWorkflowStore, /// Next turn index assigned by `on_prompt_submit`. next_turn_index: AtomicUsize, /// Tool calls observed in the current turn (reset on each new prompt). @@ -86,6 +91,7 @@ impl PodInterceptor { pending_attachments: Arc>>, prompts: Arc, log_writer: Option>, + active_workflows: ActiveWorkflowStore, ) -> Self { Self { registry, @@ -96,6 +102,7 @@ impl PodInterceptor { pending_attachments, prompts, log_writer, + active_workflows, next_turn_index: AtomicUsize::new(0), tool_calls_this_turn: AtomicUsize::new(0), } @@ -234,6 +241,8 @@ impl Interceptor for PodInterceptor { } async fn pre_llm_request(&self, context: &mut Vec) -> PreRequestAction { + self.active_workflows.sanitize_context(context); + let initial_tokens = self.estimated_tokens(context); if self.request_threshold_exceeded(initial_tokens, context) { return PreRequestAction::Yield; @@ -449,11 +458,13 @@ mod tests { } impl SystemItemCommitter for RecordingSystemItemCommitter { - fn commit_system_item(&self, item: SystemItem) { - self.committed - .lock() - .expect("committed system-item list poisoned") - .push(item); + fn commit_log_entry(&self, entry: session_store::LogEntry) { + if let session_store::LogEntry::SystemItem { item, .. } = entry { + self.committed + .lock() + .expect("committed system-item list poisoned") + .push(item); + } } } @@ -525,6 +536,7 @@ mod tests { Arc::new(Mutex::new(Vec::new())), PromptCatalog::builtins_only().unwrap(), None, + ActiveWorkflowStore::new(), ); let mut ctx = ctx_items; let action = interceptor.pre_llm_request(&mut ctx).await; @@ -557,6 +569,7 @@ mod tests { Some(Arc::new(RecordingSystemItemCommitter { committed: Arc::clone(&committed), })), + ActiveWorkflowStore::new(), ); let mut ctx = ctx_items; let action = interceptor.pre_llm_request(&mut ctx).await; @@ -593,6 +606,7 @@ mod tests { Arc::new(Mutex::new(Vec::new())), PromptCatalog::builtins_only().unwrap(), None, + ActiveWorkflowStore::new(), ) .with_usage_tracker(usage_tracker); let mut ctx = ctx_items; @@ -618,6 +632,7 @@ mod tests { Arc::new(Mutex::new(Vec::new())), PromptCatalog::builtins_only().unwrap(), None, + ActiveWorkflowStore::new(), ); let mut ctx = ctx_items; let action = interceptor.pre_llm_request(&mut ctx).await; @@ -659,6 +674,7 @@ mod tests { Arc::new(Mutex::new(Vec::new())), PromptCatalog::builtins_only().unwrap(), None, + ActiveWorkflowStore::new(), ); let mut ctx = ctx_items; let action = interceptor.pre_llm_request(&mut ctx).await; @@ -686,6 +702,7 @@ mod tests { Arc::new(Mutex::new(Vec::new())), PromptCatalog::builtins_only().unwrap(), None, + ActiveWorkflowStore::new(), ); let mut ctx = ctx_items; let action = interceptor.pre_llm_request(&mut ctx).await; @@ -707,6 +724,7 @@ mod tests { Arc::new(Mutex::new(Vec::new())), PromptCatalog::builtins_only().unwrap(), None, + ActiveWorkflowStore::new(), ); let mut ctx: Vec = Vec::new(); let action = interceptor.pre_llm_request(&mut ctx).await; @@ -735,6 +753,7 @@ mod tests { Arc::new(Mutex::new(Vec::new())), PromptCatalog::builtins_only().unwrap(), Some(committer), + ActiveWorkflowStore::new(), ); let mut ctx: Vec = Vec::new(); @@ -782,6 +801,7 @@ mod tests { Arc::new(Mutex::new(Vec::new())), PromptCatalog::builtins_only().unwrap(), None, + ActiveWorkflowStore::new(), ); let mut ctx: Vec = Vec::new(); @@ -839,6 +859,7 @@ mod tests { Arc::new(Mutex::new(Vec::new())), PromptCatalog::builtins_only().unwrap(), None, + ActiveWorkflowStore::new(), ); let mut info = task_tool_call_info("TaskList", serde_json::json!({"scope": "all"})); @@ -886,6 +907,7 @@ mod tests { Arc::new(Mutex::new(Vec::new())), PromptCatalog::builtins_only().unwrap(), None, + ActiveWorkflowStore::new(), ); let info = task_tool_call_info("TaskList", serde_json::json!({})); let mut result_info = ToolResultInfo { @@ -935,6 +957,7 @@ mod tests { Arc::new(Mutex::new(Vec::new())), PromptCatalog::builtins_only().unwrap(), None, + ActiveWorkflowStore::new(), ); let history = vec![Item::user_message("hi"), Item::assistant_message("done")]; @@ -969,6 +992,7 @@ mod tests { Some(Arc::new(RecordingSystemItemCommitter { committed: Arc::clone(&committed), })), + ActiveWorkflowStore::new(), ) .with_usage_tracker(Arc::clone(&usage_tracker)); @@ -1028,6 +1052,7 @@ mod tests { Arc::new(Mutex::new(Vec::new())), PromptCatalog::builtins_only().unwrap(), None, + ActiveWorkflowStore::new(), ); let items = interceptor.pending_history_appends().await; @@ -1065,6 +1090,7 @@ mod tests { Arc::new(Mutex::new(Vec::new())), PromptCatalog::builtins_only().unwrap(), None, + ActiveWorkflowStore::new(), ); let mut ctx: Vec = vec![Item::user_message("hi")]; let action = interceptor.pre_llm_request(&mut ctx).await; @@ -1095,6 +1121,7 @@ mod tests { Arc::new(Mutex::new(Vec::new())), PromptCatalog::builtins_only().unwrap(), None, + ActiveWorkflowStore::new(), ); let mut ctx: Vec = Vec::new(); let action = interceptor.pre_llm_request(&mut ctx).await; diff --git a/crates/pod/src/lib.rs b/crates/pod/src/lib.rs index efc44e94..2153b268 100644 --- a/crates/pod/src/lib.rs +++ b/crates/pod/src/lib.rs @@ -1,3 +1,4 @@ +pub mod active_workflow; pub mod compact; pub mod controller; pub mod discovery; diff --git a/crates/pod/src/pod.rs b/crates/pod/src/pod.rs index 0b029b28..524e573e 100644 --- a/crates/pod/src/pod.rs +++ b/crates/pod/src/pod.rs @@ -26,6 +26,7 @@ use manifest::{ ScopeError, ScopeRule, SharedScope, WorkerManifest, }; +use crate::active_workflow::{self, ActiveWorkflowStore}; use crate::compact::state::CompactState; use crate::compact::usage_tracker::UsageTracker; use crate::feature::builtin::TaskFeature; @@ -145,6 +146,7 @@ struct EmptyTurnRollbackSnapshot { usage_history_len: usize, ai_activity_count: usize, last_run_interrupted: bool, + active_workflows: active_workflow::ActiveWorkflowSnapshot, } fn is_ai_materialized_item(item: &Item) -> bool { @@ -196,20 +198,23 @@ where /// interceptor commit `SystemItem`s without being generic over the /// concrete `Store` type. pub trait SystemItemCommitter: Send + Sync { - fn commit_system_item(&self, item: SystemItem); + fn commit_log_entry(&self, entry: LogEntry); + + fn commit_system_item(&self, item: SystemItem) { + self.commit_log_entry(LogEntry::SystemItem { + ts: segment_log::now_millis(), + item, + }); + } } impl SystemItemCommitter for LogWriterHandle where St: Store + Clone + Send + Sync + 'static, { - fn commit_system_item(&self, item: SystemItem) { - let entry = LogEntry::SystemItem { - ts: segment_log::now_millis(), - item, - }; + fn commit_log_entry(&self, entry: LogEntry) { if let Err(err) = self.append_entry(entry) { - warn!(error = %err, "system item commit failed; dropping"); + warn!(error = %err, "session log entry commit failed; dropping"); } } } @@ -274,6 +279,10 @@ pub struct Pod { /// the narrow snapshot/restore surface Pod needs for compaction and rewind. /// Store/reminder ownership stays inside the Task feature module. task_feature: TaskFeature, + /// Durable state for workflow invocations that are active for the current task. + /// The store is persisted as typed session-log extensions and rehydrated into + /// prompt context during compaction. + active_workflows: ActiveWorkflowStore, /// Parsed system-prompt template awaiting first-turn materialisation. /// `Some` until `ensure_system_prompt_materialized` renders it once, /// then `None` forever — including after compaction. @@ -435,6 +444,7 @@ impl Pod { usage_history: self.usage_history.clone(), tracker: None, task_feature: self.task_feature.clone(), + active_workflows: self.active_workflows.clone(), system_prompt_template: None, alerter: self.alerter.clone(), event_tx: self.event_tx.clone(), @@ -618,6 +628,7 @@ impl Pod { usage_history: Arc::new(Mutex::new(Vec::::new())), tracker: None, task_feature: TaskFeature::new(), + active_workflows: ActiveWorkflowStore::new(), system_prompt_template: None, alerter: None, event_tx: None, @@ -813,7 +824,16 @@ impl Pod { registry: FeatureRegistryBuilder, ) -> FeatureRegistryInstallReport { let worker = self.worker.as_mut().expect("worker taken during run"); - registry.install_into_worker(worker, &mut self.hook_builder) + let report = registry.install_into_worker(worker, &mut self.hook_builder); + let active_workflow_committer = self.log_writer.clone().map(|writer| { + Arc::new(move |entry| writer.commit_log_entry(entry)) + as active_workflow::LogEntryCommitter + }); + worker.register_tools(active_workflow::active_workflow_tools( + self.active_workflows.clone(), + active_workflow_committer, + )); + report } /// Reference to the store. @@ -876,7 +896,11 @@ impl Pod { self.sink.truncate_silent(truncate_entries); self.task_feature.restore_from_history(&state.history); - self.worker_mut().set_history(state.history); + self.active_workflows + .restore_from_history_and_extensions(&state.history, &state.extensions); + let mut history = state.history; + active_workflow::strip_rehydration_messages(&mut history); + self.worker_mut().set_history(history); self.worker_mut().set_request_config(state.config); self.worker_mut().set_turn_count(state.turn_count); self.worker_mut() @@ -1242,6 +1266,7 @@ impl Pod { self.pending_attachments.clone(), self.prompts.clone(), self.log_writer.clone(), + self.active_workflows.clone(), ) .with_usage_tracker(self.usage_tracker.clone()); self.worker_mut().set_interceptor(interceptor); @@ -1428,6 +1453,7 @@ impl Pod { usage_history_len, ai_activity_count: self.ai_activity_counter.load(Ordering::SeqCst), last_run_interrupted: self.worker().last_run_interrupted(), + active_workflows: self.active_workflows.snapshot(), } } @@ -1465,6 +1491,8 @@ impl Pod { .truncate(snapshot.usage_history_len); let _ = self.usage_tracker.drain(); let _ = self.metrics_tracker.drain(); + self.active_workflows + .replace_with(snapshot.active_workflows); let loc = self.segment_state.location(); self.store @@ -1535,6 +1563,14 @@ impl Pod { let mut attachments = self.resolve_file_refs(&input); attachments.extend(self.resolve_knowledge_refs(&input)); attachments.extend(self.resolve_workflow_invocations(&input)?); + let flattened = self.flatten_segments(&input); + if self.active_workflows.activate_from_system_items( + &attachments, + flattened.clone(), + segment_log::now_millis(), + ) { + self.commit_entry(self.active_workflows.extension_entry())?; + } if !attachments.is_empty() { *self .pending_attachments @@ -1542,8 +1578,6 @@ impl Pod { .expect("pending_attachments poisoned") = attachments; } - let flattened = self.flatten_segments(&input); - let history_before = self.worker.as_ref().unwrap().history().len(); // lock → run → unlock @@ -2368,8 +2402,10 @@ impl Pod { let worker = self.worker.as_ref().expect("worker taken during run"); let history = worker.history(); let retain_from = cut.index.min(history.len()); - let retained_items = history[retain_from..].to_vec(); - let items_to_summarise = history[..retain_from].to_vec(); + let mut retained_items = history[retain_from..].to_vec(); + let mut items_to_summarise = history[..retain_from].to_vec(); + active_workflow::strip_rehydration_messages(&mut retained_items); + active_workflow::strip_rehydration_messages(&mut items_to_summarise); // Compaction-related knobs. Fall through to manifest defaults when // `[compaction]` is omitted entirely. @@ -2428,13 +2464,15 @@ impl Pod { .unwrap_or_default(); // Input text fed to the compact worker. Includes the default - // references, current TaskStore snapshot, and the (pruned) - // conversation text. + // references, current TaskStore snapshot, active workflow invocation + // state, and the (pruned) conversation text. let task_snapshot_text = self.task_feature.snapshot_text(); + let active_workflow_snapshot_text = self.active_workflows.snapshot_text(); let summary_input = build_summary_input( &items_to_summarise, &default_refs, Some(task_snapshot_text.as_str()), + active_workflow_snapshot_text.as_deref(), SummaryInputOptions { overview_target_tokens, overview_warning_tokens, @@ -2610,6 +2648,10 @@ impl Pod { .count(); // Build new history: [summary, ...auto-read, references, ...retained, task snapshot, TaskList synthetic call/result]. + // Active workflow guidance is intentionally not persisted as an ordinary + // compacted-history system message. It is regenerated request-locally + // from typed `pod.active_workflows` extension state so completed, + // cancelled, corrupt, or missing state cannot leak stale obligations. // The TaskStore snapshot trails the retained items so that, on resume, // `replay_history` walks any pre-compact Task* calls preserved verbatim // in retained_items first and the trailing snapshot's `replace_with` @@ -2680,18 +2722,23 @@ impl Pod { at_turn_index: source_turn_count, }), }; + let active_workflow_extension = self.active_workflows.extension_entry(); + let initial_entries = vec![entry.clone(), active_workflow_extension.clone()]; self.store - .create_segment(old_loc.session_id, new_segment_id, &[entry.clone()])?; + .create_segment(old_loc.session_id, new_segment_id, &initial_entries)?; self.segment_state.set_location(SegmentLocation { session_id: old_loc.session_id, segment_id: new_segment_id, }); - self.segment_state.set_entries_written(1); + self.segment_state + .set_entries_written(initial_entries.len()); let session_start = entry; // Broadcast the SegmentStart through the sink. This atomically - // resets the mirror to `[SegmentStart]` so any subscriber - // querying after this point sees the post-compaction prefix. - self.sink.reset_with_initial(session_start); + // resets the mirror to the replacement segment prefix so any subscriber + // querying after this point sees the post-compaction prefix, including + // durable extension state. + self.sink + .reset_with_initial_entries(vec![session_start, active_workflow_extension]); // Keep pods.json pointing at the live segment_id. Without this // a concurrent `restore_from_manifest(new_segment_id)` would // see no live writer and grab the session this Pod just moved @@ -3794,6 +3841,7 @@ where usage_history: Arc::new(Mutex::new(Vec::new())), tracker: None, task_feature: TaskFeature::new(), + active_workflows: ActiveWorkflowStore::new(), system_prompt_template: common.system_prompt_template, alerter: None, event_tx: None, @@ -3902,6 +3950,7 @@ where usage_history: Arc::new(Mutex::new(Vec::new())), tracker: None, task_feature: TaskFeature::new(), + active_workflows: ActiveWorkflowStore::new(), system_prompt_template: common.system_prompt_template, alerter: None, event_tx: None, @@ -4101,7 +4150,9 @@ where .. }) ); - worker.set_history(state.history.clone()); + let mut restored_history = state.history.clone(); + active_workflow::strip_rehydration_messages(&mut restored_history); + worker.set_history(restored_history); worker.set_request_config(state.config.clone()); worker.set_turn_count(state.turn_count); worker.set_last_run_interrupted(state.last_run_interrupted); @@ -4111,6 +4162,8 @@ where let extract_pointer = memory::extract::fold_pointer(&state.extensions); let task_feature = TaskFeature::from_history(&state.history); + let active_workflows = ActiveWorkflowStore::new(); + active_workflows.restore_from_history_and_extensions(&state.history, &state.extensions); let pod_metadata_writer = Some(pod_metadata_writer_for_store(&store)); let mut pod = Self { @@ -4131,6 +4184,7 @@ where usage_history: Arc::new(Mutex::new(state.usage_history)), tracker: None, task_feature, + active_workflows, // Restore replays the saved system_prompt verbatim — no // template re-render on resume. system_prompt_template: None, @@ -4335,12 +4389,13 @@ struct SummaryInputBuild { } /// Build the compact worker's input: default-reference instructions, -/// the list of recently-touched files, task snapshot, and a bounded overview -/// rather than a prefix-wide transcript. +/// the list of recently-touched files, task snapshot, active workflow snapshot, +/// and a bounded overview rather than a prefix-wide transcript. fn build_summary_input( items: &[Item], default_refs: &[PathBuf], task_snapshot: Option<&str>, + active_workflow_snapshot: Option<&str>, options: SummaryInputOptions, ) -> SummaryInputBuild { let overview = build_summary_overview( @@ -4392,6 +4447,17 @@ fn build_summary_input( out.push_str(task_snapshot); out.push_str("\n\n"); } + if let Some(active_workflow_snapshot) = active_workflow_snapshot { + out.push_str( + "## Active Workflow Invocation State\n\ + This is durable typed workflow state for workflow-governed tasks. Preserve active \ + slugs, invocation scope, status, obligations/checkpoints, and the snapshotted \ + workflow guidance in the summary; do not substitute advertised/latest workflow \ + resources for this invocation state.\n", + ); + out.push_str(active_workflow_snapshot); + out.push_str("\n\n"); + } out.push_str("## Conversation overview/index\n"); out.push_str(&overview); out.push_str("\n\nWhen you are done, call `write_summary` with the final 5-section text."); @@ -5278,6 +5344,7 @@ mod build_summary_prompt_tests { items, &[], None, + None, SummaryInputOptions { overview_target_tokens: 512, overview_warning_tokens: 1024, @@ -5326,6 +5393,27 @@ mod build_summary_prompt_tests { assert!(!prompt.contains("deliberation")); } + #[test] + fn includes_active_workflow_snapshot_section() { + let prompt = build_summary_input( + &[Item::user_message("continue after review")], + &[], + None, + Some("ActiveWorkflowStore: 1 active workflow\n- review before merge\n- close ticket"), + SummaryInputOptions { + overview_target_tokens: 512, + overview_warning_tokens: 1024, + overview_deadline_tokens: 2048, + summary_target_tokens: 256, + }, + ) + .text; + + assert!(prompt.contains("## Active Workflow Invocation State")); + assert!(prompt.contains("review before merge")); + assert!(prompt.contains("close ticket")); + } + #[test] fn overview_warning_does_not_drop_input() { let items = vec![Item::user_message("x".repeat(4_000))]; @@ -5333,6 +5421,7 @@ mod build_summary_prompt_tests { &items, &[], None, + None, SummaryInputOptions { overview_target_tokens: 10, overview_warning_tokens: 100, @@ -5352,6 +5441,7 @@ mod build_summary_prompt_tests { &items, &[], None, + None, SummaryInputOptions { overview_target_tokens: 10, overview_warning_tokens: 10, diff --git a/crates/pod/src/segment_log_sink.rs b/crates/pod/src/segment_log_sink.rs index 42e6e6c4..cc4feff7 100644 --- a/crates/pod/src/segment_log_sink.rs +++ b/crates/pod/src/segment_log_sink.rs @@ -147,6 +147,24 @@ impl SegmentLogSink { let _ = self.inner.broadcast_tx.send(initial); } + /// Atomically swap the mirror to the supplied replacement-session prefix + /// and broadcast the first entry as the live rotation signal. Entries after + /// the first are already reflected in reconnect snapshots but are not + /// broadcast live; this is intended for non-live extension state that must + /// share the new segment prefix with SegmentStart. + pub fn reset_with_initial_entries(&self, entries: Vec) { + let first = entries.first().cloned(); + let mut mirror = self + .inner + .mirror + .lock() + .expect("session log mirror mutex poisoned"); + *mirror = entries; + if let Some(initial) = first { + let _ = self.inner.broadcast_tx.send(initial); + } + } + /// Replace the mirror with the supplied prefix without broadcasting. /// /// Used by restore paths that load a session's complete log into diff --git a/resources/prompts/internal/compact_system.md b/resources/prompts/internal/compact_system.md index 61a26b18..1241e54c 100644 --- a/resources/prompts/internal/compact_system.md +++ b/resources/prompts/internal/compact_system.md @@ -4,7 +4,7 @@ The conversation input is a bounded overview/index, not the full transcript. Tre ## Workflow -1. Read the provided overview/index and current TaskStore snapshot. +1. Read the provided overview/index, current TaskStore snapshot, and any Active Workflow Invocation State section. 2. If the overview does not contain enough detail, use `search_session_log` to find relevant compact-target history items, then `read_session_items` to inspect only the needed range. 3. Use `read_file` to inspect referenced files before deciding what the next session needs. Prefer skimming over blind inclusion. 4. For files whose current contents are load-bearing for the active work, call `mark_read_required` to inject them into the next session. These count against the auto-read token budget — spend it deliberately. @@ -39,5 +39,6 @@ Produce the summary in this exact format: ## Constraints +- Preserve active workflow invocation state when present: active slug, invocation scope/source/time, status, open obligations/checkpoints, and snapshotted workflow guidance. Do not replace a snapshotted invocation with merely advertised/latest workflow resources. - Keep code snippets and raw tool output OUT of the summary — that is what auto-read and references are for. - Follow the summary target stated in the run input; if asked to shrink, call `write_summary` again with a shorter version.