merge: preserve active workflows across compaction

This commit is contained in:
Keisuke Hirata 2026-06-15 01:25:09 +09:00
commit 64d26f8490
No known key found for this signature in database
6 changed files with 902 additions and 29 deletions

View File

@ -0,0 +1,736 @@
//! Durable active workflow invocation state.
//!
//! Workflow bodies are resolved at invocation time and snapshotted here. The
//! snapshot, not whatever resource version is installed later, is the procedural
//! authority that survives compaction for the currently governed task.
use std::collections::BTreeMap;
use std::sync::{Arc, Mutex};
use async_trait::async_trait;
use llm_worker::Item;
use llm_worker::tool::{
Tool, ToolDefinition, ToolError, ToolExecutionContext, ToolMeta, ToolOutput,
};
use serde::{Deserialize, Serialize};
use serde_json::json;
use session_store::{LogEntry, SystemItem, segment_log};
pub const DOMAIN: &str = "pod.active_workflows";
pub const REHYDRATION_MESSAGE_PREFIX: &str = "[Active workflow snapshot]";
pub const INACTIVE_MESSAGE_PREFIX: &str = "[Active workflow state]";
const SCHEMA_VERSION: u32 = 1;
pub type LogEntryCommitter = Arc<dyn Fn(LogEntry) + Send + Sync>;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ActiveWorkflowSnapshot {
pub schema_version: u32,
pub workflows: Vec<ActiveWorkflowRecord>,
}
impl Default for ActiveWorkflowSnapshot {
fn default() -> Self {
Self {
schema_version: SCHEMA_VERSION,
workflows: Vec::new(),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ActiveWorkflowRecord {
pub slug: String,
pub status: ActiveWorkflowStatus,
pub invocation: WorkflowInvocationInfo,
pub task_scope: String,
pub body_snapshot_policy: WorkflowBodySnapshotPolicy,
pub guidance_snapshot: String,
pub obligations: Vec<String>,
pub checkpoints: Vec<WorkflowCheckpoint>,
pub updated_at_ms: u64,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub completion: Option<WorkflowCompletionInfo>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema)]
#[serde(rename_all = "snake_case")]
pub enum ActiveWorkflowStatus {
Active,
Completed,
Cancelled,
}
impl std::fmt::Display for ActiveWorkflowStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let s = match self {
Self::Active => "active",
Self::Completed => "completed",
Self::Cancelled => "cancelled",
};
f.write_str(s)
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct WorkflowInvocationInfo {
pub source: WorkflowInvocationSource,
pub invoked_at_ms: u64,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum WorkflowInvocationSource {
UserWorkflowInvokeSegment,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum WorkflowBodySnapshotPolicy {
SnapshottedAtInvocation,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct WorkflowCheckpoint {
pub label: String,
pub status: WorkflowCheckpointStatus,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum WorkflowCheckpointStatus {
Open,
Done,
Cancelled,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct WorkflowCompletionInfo {
pub completed_at_ms: u64,
pub reason: String,
}
#[derive(Debug, Clone, Default)]
pub struct ActiveWorkflowStore {
inner: Arc<Mutex<ActiveWorkflowSnapshot>>,
}
impl ActiveWorkflowStore {
pub fn new() -> Self {
Self::default()
}
pub fn snapshot(&self) -> ActiveWorkflowSnapshot {
self.inner.lock().unwrap_or_else(|e| e.into_inner()).clone()
}
pub fn replace_with(&self, snapshot: ActiveWorkflowSnapshot) {
*self.inner.lock().unwrap_or_else(|e| e.into_inner()) = snapshot;
}
pub fn active_records(&self) -> Vec<ActiveWorkflowRecord> {
self.snapshot()
.workflows
.into_iter()
.filter(|record| record.status == ActiveWorkflowStatus::Active)
.collect()
}
pub fn activate_from_system_items(
&self,
items: &[SystemItem],
task_scope: String,
invoked_at_ms: u64,
) -> bool {
let mut grouped: BTreeMap<String, Vec<String>> = BTreeMap::new();
for item in items {
if let SystemItem::Workflow { slug, body } = item {
grouped.entry(slug.clone()).or_default().push(body.clone());
}
}
if grouped.is_empty() {
return false;
}
let mut snapshot = self.snapshot();
snapshot.schema_version = SCHEMA_VERSION;
for (slug, bodies) in grouped {
let guidance_snapshot = bodies.join("\n\n---\n\n");
let obligations = extract_obligations(&guidance_snapshot);
let checkpoints = obligations
.iter()
.take(32)
.map(|label| WorkflowCheckpoint {
label: label.clone(),
status: WorkflowCheckpointStatus::Open,
})
.collect();
let record = ActiveWorkflowRecord {
slug: slug.clone(),
status: ActiveWorkflowStatus::Active,
invocation: WorkflowInvocationInfo {
source: WorkflowInvocationSource::UserWorkflowInvokeSegment,
invoked_at_ms,
},
task_scope: truncate_chars(&task_scope, 2_000),
body_snapshot_policy: WorkflowBodySnapshotPolicy::SnapshottedAtInvocation,
guidance_snapshot,
obligations,
checkpoints,
updated_at_ms: invoked_at_ms,
completion: None,
};
upsert_record(&mut snapshot.workflows, record);
}
self.replace_with(snapshot);
true
}
pub fn set_status(
&self,
slug: &str,
status: ActiveWorkflowStatus,
reason: String,
now_ms: u64,
) -> Result<ActiveWorkflowRecord, String> {
let mut snapshot = self.inner.lock().unwrap_or_else(|e| e.into_inner());
let record = snapshot
.workflows
.iter_mut()
.find(|record| record.slug == slug)
.ok_or_else(|| format!("active workflow `{slug}` not found"))?;
record.status = status;
record.updated_at_ms = now_ms;
record.completion = Some(WorkflowCompletionInfo {
completed_at_ms: now_ms,
reason,
});
for checkpoint in &mut record.checkpoints {
checkpoint.status = match status {
ActiveWorkflowStatus::Active => WorkflowCheckpointStatus::Open,
ActiveWorkflowStatus::Completed => WorkflowCheckpointStatus::Done,
ActiveWorkflowStatus::Cancelled => WorkflowCheckpointStatus::Cancelled,
};
}
Ok(record.clone())
}
pub fn snapshot_text(&self) -> Option<String> {
let active = self.active_records();
(!active.is_empty()).then(|| render_snapshot_text(&active))
}
pub fn rehydration_message(&self) -> Option<String> {
let active = self.active_records();
(!active.is_empty()).then(|| render_rehydration_message(&active))
}
pub fn sanitize_context(&self, context: &mut Vec<Item>) -> usize {
let removed = strip_rehydration_messages(context);
if let Some(message) = self.rehydration_message() {
context.push(Item::system_message(message));
} else if removed > 0 || context.iter().any(has_active_workflow_hint) {
context.push(Item::system_message(inactive_workflow_message()));
}
removed
}
pub fn extension_entry(&self) -> LogEntry {
LogEntry::Extension {
ts: segment_log::now_millis(),
domain: DOMAIN.into(),
payload: serde_json::to_value(self.snapshot())
.expect("ActiveWorkflowSnapshot is always JSON-serializable"),
}
}
pub fn restore_from_history_and_extensions(
&self,
_history: &[Item],
extensions: &[(String, serde_json::Value)],
) {
let (snapshot, diagnostics) = fold_extensions(extensions);
for diagnostic in diagnostics {
tracing::warn!(diagnostic, "failed to restore active workflow state");
}
self.replace_with(snapshot);
}
}
pub fn fold_extensions(
extensions: &[(String, serde_json::Value)],
) -> (ActiveWorkflowSnapshot, Vec<String>) {
let mut latest = None;
let mut diagnostics = Vec::new();
for (domain, payload) in extensions {
if domain != DOMAIN {
continue;
}
match serde_json::from_value::<ActiveWorkflowSnapshot>(payload.clone()) {
Ok(snapshot) if snapshot.schema_version == SCHEMA_VERSION => latest = Some(snapshot),
Ok(snapshot) => {
latest = None;
diagnostics.push(format!(
"unsupported active workflow schema_version {}",
snapshot.schema_version
));
}
Err(err) => {
latest = None;
diagnostics.push(format!("corrupt active workflow payload: {err}"));
}
}
}
(latest.unwrap_or_default(), diagnostics)
}
pub fn strip_rehydration_messages(items: &mut Vec<Item>) -> usize {
let before = items.len();
items.retain(|item| !is_rehydration_message(item));
before - items.len()
}
pub fn is_rehydration_message(item: &Item) -> bool {
item_system_text(item)
.map(|text| text.trim_start().starts_with(REHYDRATION_MESSAGE_PREFIX))
.unwrap_or(false)
}
fn has_active_workflow_hint(item: &Item) -> bool {
item_system_text(item)
.map(|text| {
text.contains("Active Workflow Invocation State")
|| text.contains("ActiveWorkflowStore:")
|| text.contains(REHYDRATION_MESSAGE_PREFIX)
})
.unwrap_or(false)
}
fn item_system_text(item: &Item) -> Option<String> {
match item {
Item::Message { role, content, .. } if *role == llm_worker::Role::System => Some(
content
.iter()
.map(|part| part.as_text())
.collect::<String>(),
),
_ => None,
}
}
fn inactive_workflow_message() -> String {
format!(
"{INACTIVE_MESSAGE_PREFIX}\n\n\
No currently valid active workflow invocation state is active. Ignore older compacted \
history or summaries that appear to describe active workflow obligations; only validated \
typed `{DOMAIN}` records with status `active` establish active workflow guidance."
)
}
pub fn active_workflow_tools(
store: ActiveWorkflowStore,
committer: Option<LogEntryCommitter>,
) -> Vec<ToolDefinition> {
vec![
list_tool(store.clone()),
status_tool(
store.clone(),
ActiveWorkflowStatus::Completed,
committer.clone(),
),
status_tool(store, ActiveWorkflowStatus::Cancelled, committer),
]
}
fn list_tool(store: ActiveWorkflowStore) -> ToolDefinition {
Arc::new(move || {
(
ToolMeta::new("ActiveWorkflowList")
.description("List durable active workflow invocations and their status")
.input_schema(
json!({"type":"object","properties":{},"additionalProperties":false}),
),
Arc::new(ActiveWorkflowListTool {
store: store.clone(),
}) as Arc<dyn Tool>,
)
})
}
fn status_tool(
store: ActiveWorkflowStore,
status: ActiveWorkflowStatus,
committer: Option<LogEntryCommitter>,
) -> ToolDefinition {
let name = match status {
ActiveWorkflowStatus::Completed => "ActiveWorkflowComplete",
ActiveWorkflowStatus::Cancelled => "ActiveWorkflowCancel",
ActiveWorkflowStatus::Active => unreachable!("active status tool is not exposed"),
};
let description = match status {
ActiveWorkflowStatus::Completed => {
"Mark an active workflow as completed when its governed task is finished"
}
ActiveWorkflowStatus::Cancelled => {
"Cancel an active workflow when the governed task is explicitly abandoned"
}
ActiveWorkflowStatus::Active => unreachable!("active status tool is not exposed"),
};
let store_for_tool = store.clone();
let committer_for_tool = committer.clone();
Arc::new(move || {
(
ToolMeta::new(name)
.description(description)
.input_schema(json!({
"type":"object",
"properties":{
"slug":{"type":"string","description":"Workflow slug to update"},
"reason":{"type":"string","description":"Brief completion/cancellation reason"}
},
"required":["slug"],
"additionalProperties":false
})),
Arc::new(ActiveWorkflowStatusTool {
store: store_for_tool.clone(),
status,
committer: committer_for_tool.clone(),
}) as Arc<dyn Tool>,
)
})
}
struct ActiveWorkflowListTool {
store: ActiveWorkflowStore,
}
#[async_trait]
impl Tool for ActiveWorkflowListTool {
async fn execute(
&self,
_input_json: &str,
_ctx: ToolExecutionContext,
) -> Result<ToolOutput, ToolError> {
let snapshot = self.store.snapshot();
let content = serde_json::to_string_pretty(&snapshot)
.map_err(|err| ToolError::Internal(err.to_string()))?;
let active = snapshot
.workflows
.iter()
.filter(|record| record.status == ActiveWorkflowStatus::Active)
.count();
Ok(ToolOutput {
summary: format!(
"ActiveWorkflowStore: {} workflow(s), {active} active",
snapshot.workflows.len()
),
content: Some(content),
})
}
}
struct ActiveWorkflowStatusTool {
store: ActiveWorkflowStore,
status: ActiveWorkflowStatus,
committer: Option<LogEntryCommitter>,
}
#[async_trait]
impl Tool for ActiveWorkflowStatusTool {
async fn execute(
&self,
input_json: &str,
_ctx: ToolExecutionContext,
) -> Result<ToolOutput, ToolError> {
let params: WorkflowStatusParams = serde_json::from_str(input_json)
.map_err(|err| ToolError::InvalidArgument(err.to_string()))?;
let reason = params.reason.unwrap_or_else(|| self.status.to_string());
let record = self
.store
.set_status(&params.slug, self.status, reason, segment_log::now_millis())
.map_err(ToolError::InvalidArgument)?;
if let Some(committer) = &self.committer {
committer(self.store.extension_entry());
}
let content = serde_json::to_string_pretty(&record)
.map_err(|err| ToolError::Internal(err.to_string()))?;
Ok(ToolOutput {
summary: format!("workflow {} marked {}", record.slug, record.status),
content: Some(content),
})
}
}
#[derive(Debug, Deserialize)]
struct WorkflowStatusParams {
slug: String,
#[serde(default)]
reason: Option<String>,
}
fn upsert_record(records: &mut Vec<ActiveWorkflowRecord>, record: ActiveWorkflowRecord) {
if let Some(existing) = records
.iter_mut()
.find(|existing| existing.slug == record.slug)
{
*existing = record;
} else {
records.push(record);
}
}
fn extract_obligations(body: &str) -> Vec<String> {
let mut obligations = Vec::new();
for line in body.lines() {
let trimmed = line.trim();
let candidate = trimmed
.strip_prefix("- ")
.or_else(|| trimmed.strip_prefix("* "))
.or_else(|| trimmed.strip_prefix(""))
.unwrap_or(trimmed);
let lower = candidate.to_ascii_lowercase();
let looks_obligating = lower.contains("must")
|| lower.contains("require")
|| lower.contains("obligation")
|| lower.contains("review")
|| lower.contains("merge")
|| lower.contains("close")
|| lower.contains("report")
|| lower.contains("handoff");
if looks_obligating && !candidate.is_empty() {
obligations.push(truncate_chars(candidate, 240));
}
if obligations.len() >= 32 {
break;
}
}
if obligations.is_empty() {
obligations
.push("Follow the snapshotted workflow body until completion or cancellation".into());
}
obligations
}
fn render_snapshot_text(records: &[ActiveWorkflowRecord]) -> String {
let json = serde_json::to_string_pretty(&ActiveWorkflowSnapshot {
schema_version: SCHEMA_VERSION,
workflows: records.to_vec(),
})
.unwrap_or_else(|_| String::from("{\"schema_version\":1,\"workflows\":[]}"));
format!(
"ActiveWorkflowStore: {} active workflow(s)\n\n```json\n{}\n```",
records.len(),
json
)
}
fn render_rehydration_message(records: &[ActiveWorkflowRecord]) -> String {
let mut out = format!(
"{REHYDRATION_MESSAGE_PREFIX}\n\n\
The following workflow invocation state is durable state carried across compaction. \
Continue to follow each active workflow's snapshotted guidance until the governed task \
is completed with ActiveWorkflowComplete or explicitly cancelled with ActiveWorkflowCancel. \
Missing or obsolete workflow resources must not replace these invocation snapshots.\n"
);
for record in records {
out.push_str(&format!(
"\n## /{} ({})\n- invoked_at_ms: {}\n- invocation_source: {:?}\n- body_snapshot_policy: {:?}\n- task_scope: {}\n\n### Current obligations/checkpoints\n",
record.slug,
record.status,
record.invocation.invoked_at_ms,
record.invocation.source,
record.body_snapshot_policy,
record.task_scope.replace('\n', " "),
));
for checkpoint in &record.checkpoints {
out.push_str(&format!(
"- [{}] {}\n",
checkpoint.status_label(),
checkpoint.label
));
}
out.push_str("\n### Snapshotted workflow guidance\n");
out.push_str(record.guidance_snapshot.trim_end());
out.push_str("\n");
}
out
}
impl WorkflowCheckpoint {
fn status_label(&self) -> &'static str {
match self.status {
WorkflowCheckpointStatus::Open => "open",
WorkflowCheckpointStatus::Done => "done",
WorkflowCheckpointStatus::Cancelled => "cancelled",
}
}
}
fn truncate_chars(text: &str, max_chars: usize) -> String {
let mut out = String::new();
for (idx, ch) in text.chars().enumerate() {
if idx >= max_chars {
out.push('…');
return out;
}
out.push(ch);
}
out
}
#[cfg(test)]
mod tests {
use super::*;
fn store_with_active_workflow() -> ActiveWorkflowStore {
let store = ActiveWorkflowStore::new();
assert!(store.activate_from_system_items(
&[SystemItem::Workflow {
slug: "multi-agent-workflow".into(),
body: "# Multi-agent workflow\n- Delegate implementation to coder.\n- Require external review before merge.\n- Close the Ticket after merge and report evidence.\n".into(),
}],
"/multi-agent-workflow implement ticket".into(),
42,
));
store
}
fn active_extension(store: &ActiveWorkflowStore) -> (String, serde_json::Value) {
(
DOMAIN.to_string(),
serde_json::to_value(store.snapshot()).expect("snapshot json"),
)
}
#[test]
fn active_workflow_guidance_carries_merge_close_obligations() {
let store = store_with_active_workflow();
let msg = store.rehydration_message().unwrap();
assert!(msg.contains("multi-agent-workflow"));
assert!(msg.contains("external review before merge"));
assert!(msg.contains("Close the Ticket after merge"));
assert!(msg.contains("Snapshotted workflow guidance"));
}
#[test]
fn compacted_rehydration_message_is_removed_when_typed_state_missing_or_invalid() {
for extensions in [
Vec::new(),
vec![(DOMAIN.to_string(), json!({"schema_version":"bad"}))],
vec![(
DOMAIN.to_string(),
json!({"schema_version":999,"workflows":[]}),
)],
] {
let original = store_with_active_workflow();
let stale_message = original.rehydration_message().unwrap();
let mut context = vec![
Item::system_message(stale_message),
Item::user_message("continue"),
];
let restored = ActiveWorkflowStore::new();
restored.restore_from_history_and_extensions(&context, &extensions);
let removed = restored.sanitize_context(&mut context);
assert_eq!(removed, 1);
assert!(restored.active_records().is_empty());
assert!(!context.iter().any(is_rehydration_message));
}
}
#[test]
fn completion_or_cancellation_suppresses_old_compacted_guidance() {
for status in [
ActiveWorkflowStatus::Completed,
ActiveWorkflowStatus::Cancelled,
] {
let store = store_with_active_workflow();
let stale_message = store.rehydration_message().unwrap();
let mut context = vec![
Item::system_message(stale_message),
Item::user_message("continue"),
];
store
.set_status("multi-agent-workflow", status, status.to_string(), 84)
.expect("workflow exists");
let removed = store.sanitize_context(&mut context);
assert_eq!(removed, 1);
assert!(!context.iter().any(is_rehydration_message));
}
}
#[test]
fn unmatched_status_tool_calls_do_not_mutate_restored_state() {
let store = store_with_active_workflow();
let extensions = vec![active_extension(&store)];
let history = vec![
Item::tool_call(
"call-1",
"ActiveWorkflowCancel",
json!({"slug":"multi-agent-workflow","reason":"not durable"}).to_string(),
),
Item::tool_result_error("call-1", "error: failed"),
];
let restored = ActiveWorkflowStore::new();
restored.restore_from_history_and_extensions(&history, &extensions);
assert_eq!(restored.active_records().len(), 1);
assert_eq!(
restored.snapshot().workflows[0].status,
ActiveWorkflowStatus::Active
);
}
#[tokio::test]
async fn status_tool_persists_typed_extension_on_success() {
let store = store_with_active_workflow();
let committed = Arc::new(Mutex::new(Vec::<LogEntry>::new()));
let committed_for_tool = committed.clone();
let tools = active_workflow_tools(
store.clone(),
Some(Arc::new(move |entry| {
committed_for_tool
.lock()
.expect("committed entries mutex poisoned")
.push(entry);
})),
);
let (_, tool) = tools[1]();
tool.execute(
&json!({"slug":"multi-agent-workflow","reason":"review complete"}).to_string(),
ToolExecutionContext::default(),
)
.await
.expect("status tool succeeds");
let committed = committed.lock().expect("committed entries mutex poisoned");
let LogEntry::Extension {
domain, payload, ..
} = committed.last().expect("extension committed")
else {
panic!("expected typed active workflow extension");
};
assert_eq!(domain, DOMAIN);
let snapshot: ActiveWorkflowSnapshot = serde_json::from_value(payload.clone()).unwrap();
assert_eq!(
snapshot.workflows[0].status,
ActiveWorkflowStatus::Completed
);
}
#[test]
fn corrupt_extension_fails_closed_with_diagnostic() {
let entries = vec![(DOMAIN.to_string(), json!({"schema_version":"bad"}))];
let (snapshot, diagnostics) = fold_extensions(&entries);
assert!(snapshot.workflows.is_empty());
assert_eq!(diagnostics.len(), 1);
}
}

View File

@ -22,6 +22,7 @@ use llm_worker::tool::ToolOutput;
use tracing::info; use tracing::info;
use tracing::warn; use tracing::warn;
use crate::active_workflow::ActiveWorkflowStore;
use crate::compact::state::CompactState; use crate::compact::state::CompactState;
use crate::compact::usage_tracker::UsageTracker; use crate::compact::usage_tracker::UsageTracker;
use session_store::SystemItem; use session_store::SystemItem;
@ -71,6 +72,10 @@ pub(crate) struct PodInterceptor {
/// worker. `None` in tests / `Pod::new` paths where no writer is /// worker. `None` in tests / `Pod::new` paths where no writer is
/// attached. /// attached.
log_writer: Option<Arc<dyn SystemItemCommitter>>, log_writer: Option<Arc<dyn SystemItemCommitter>>,
/// Active workflow state is durable typed Pod state. The interceptor
/// regenerates request-local workflow guidance from this store and strips
/// any stale compacted-history copies before each model request.
active_workflows: ActiveWorkflowStore,
/// Next turn index assigned by `on_prompt_submit`. /// Next turn index assigned by `on_prompt_submit`.
next_turn_index: AtomicUsize, next_turn_index: AtomicUsize,
/// Tool calls observed in the current turn (reset on each new prompt). /// Tool calls observed in the current turn (reset on each new prompt).
@ -86,6 +91,7 @@ impl PodInterceptor {
pending_attachments: Arc<Mutex<Vec<SystemItem>>>, pending_attachments: Arc<Mutex<Vec<SystemItem>>>,
prompts: Arc<PromptCatalog>, prompts: Arc<PromptCatalog>,
log_writer: Option<Arc<dyn SystemItemCommitter>>, log_writer: Option<Arc<dyn SystemItemCommitter>>,
active_workflows: ActiveWorkflowStore,
) -> Self { ) -> Self {
Self { Self {
registry, registry,
@ -96,6 +102,7 @@ impl PodInterceptor {
pending_attachments, pending_attachments,
prompts, prompts,
log_writer, log_writer,
active_workflows,
next_turn_index: AtomicUsize::new(0), next_turn_index: AtomicUsize::new(0),
tool_calls_this_turn: AtomicUsize::new(0), tool_calls_this_turn: AtomicUsize::new(0),
} }
@ -234,6 +241,8 @@ impl Interceptor for PodInterceptor {
} }
async fn pre_llm_request(&self, context: &mut Vec<Item>) -> PreRequestAction { async fn pre_llm_request(&self, context: &mut Vec<Item>) -> PreRequestAction {
self.active_workflows.sanitize_context(context);
let initial_tokens = self.estimated_tokens(context); let initial_tokens = self.estimated_tokens(context);
if self.request_threshold_exceeded(initial_tokens, context) { if self.request_threshold_exceeded(initial_tokens, context) {
return PreRequestAction::Yield; return PreRequestAction::Yield;
@ -449,13 +458,15 @@ mod tests {
} }
impl SystemItemCommitter for RecordingSystemItemCommitter { impl SystemItemCommitter for RecordingSystemItemCommitter {
fn commit_system_item(&self, item: SystemItem) { fn commit_log_entry(&self, entry: session_store::LogEntry) {
if let session_store::LogEntry::SystemItem { item, .. } = entry {
self.committed self.committed
.lock() .lock()
.expect("committed system-item list poisoned") .expect("committed system-item list poisoned")
.push(item); .push(item);
} }
} }
}
struct AppendingPreRequestHook { struct AppendingPreRequestHook {
saw_handle: Arc<AtomicBool>, saw_handle: Arc<AtomicBool>,
@ -525,6 +536,7 @@ mod tests {
Arc::new(Mutex::new(Vec::new())), Arc::new(Mutex::new(Vec::new())),
PromptCatalog::builtins_only().unwrap(), PromptCatalog::builtins_only().unwrap(),
None, None,
ActiveWorkflowStore::new(),
); );
let mut ctx = ctx_items; let mut ctx = ctx_items;
let action = interceptor.pre_llm_request(&mut ctx).await; let action = interceptor.pre_llm_request(&mut ctx).await;
@ -557,6 +569,7 @@ mod tests {
Some(Arc::new(RecordingSystemItemCommitter { Some(Arc::new(RecordingSystemItemCommitter {
committed: Arc::clone(&committed), committed: Arc::clone(&committed),
})), })),
ActiveWorkflowStore::new(),
); );
let mut ctx = ctx_items; let mut ctx = ctx_items;
let action = interceptor.pre_llm_request(&mut ctx).await; let action = interceptor.pre_llm_request(&mut ctx).await;
@ -593,6 +606,7 @@ mod tests {
Arc::new(Mutex::new(Vec::new())), Arc::new(Mutex::new(Vec::new())),
PromptCatalog::builtins_only().unwrap(), PromptCatalog::builtins_only().unwrap(),
None, None,
ActiveWorkflowStore::new(),
) )
.with_usage_tracker(usage_tracker); .with_usage_tracker(usage_tracker);
let mut ctx = ctx_items; let mut ctx = ctx_items;
@ -618,6 +632,7 @@ mod tests {
Arc::new(Mutex::new(Vec::new())), Arc::new(Mutex::new(Vec::new())),
PromptCatalog::builtins_only().unwrap(), PromptCatalog::builtins_only().unwrap(),
None, None,
ActiveWorkflowStore::new(),
); );
let mut ctx = ctx_items; let mut ctx = ctx_items;
let action = interceptor.pre_llm_request(&mut ctx).await; let action = interceptor.pre_llm_request(&mut ctx).await;
@ -659,6 +674,7 @@ mod tests {
Arc::new(Mutex::new(Vec::new())), Arc::new(Mutex::new(Vec::new())),
PromptCatalog::builtins_only().unwrap(), PromptCatalog::builtins_only().unwrap(),
None, None,
ActiveWorkflowStore::new(),
); );
let mut ctx = ctx_items; let mut ctx = ctx_items;
let action = interceptor.pre_llm_request(&mut ctx).await; let action = interceptor.pre_llm_request(&mut ctx).await;
@ -686,6 +702,7 @@ mod tests {
Arc::new(Mutex::new(Vec::new())), Arc::new(Mutex::new(Vec::new())),
PromptCatalog::builtins_only().unwrap(), PromptCatalog::builtins_only().unwrap(),
None, None,
ActiveWorkflowStore::new(),
); );
let mut ctx = ctx_items; let mut ctx = ctx_items;
let action = interceptor.pre_llm_request(&mut ctx).await; let action = interceptor.pre_llm_request(&mut ctx).await;
@ -707,6 +724,7 @@ mod tests {
Arc::new(Mutex::new(Vec::new())), Arc::new(Mutex::new(Vec::new())),
PromptCatalog::builtins_only().unwrap(), PromptCatalog::builtins_only().unwrap(),
None, None,
ActiveWorkflowStore::new(),
); );
let mut ctx: Vec<Item> = Vec::new(); let mut ctx: Vec<Item> = Vec::new();
let action = interceptor.pre_llm_request(&mut ctx).await; let action = interceptor.pre_llm_request(&mut ctx).await;
@ -735,6 +753,7 @@ mod tests {
Arc::new(Mutex::new(Vec::new())), Arc::new(Mutex::new(Vec::new())),
PromptCatalog::builtins_only().unwrap(), PromptCatalog::builtins_only().unwrap(),
Some(committer), Some(committer),
ActiveWorkflowStore::new(),
); );
let mut ctx: Vec<Item> = Vec::new(); let mut ctx: Vec<Item> = Vec::new();
@ -782,6 +801,7 @@ mod tests {
Arc::new(Mutex::new(Vec::new())), Arc::new(Mutex::new(Vec::new())),
PromptCatalog::builtins_only().unwrap(), PromptCatalog::builtins_only().unwrap(),
None, None,
ActiveWorkflowStore::new(),
); );
let mut ctx: Vec<Item> = Vec::new(); let mut ctx: Vec<Item> = Vec::new();
@ -839,6 +859,7 @@ mod tests {
Arc::new(Mutex::new(Vec::new())), Arc::new(Mutex::new(Vec::new())),
PromptCatalog::builtins_only().unwrap(), PromptCatalog::builtins_only().unwrap(),
None, None,
ActiveWorkflowStore::new(),
); );
let mut info = task_tool_call_info("TaskList", serde_json::json!({"scope": "all"})); let mut info = task_tool_call_info("TaskList", serde_json::json!({"scope": "all"}));
@ -886,6 +907,7 @@ mod tests {
Arc::new(Mutex::new(Vec::new())), Arc::new(Mutex::new(Vec::new())),
PromptCatalog::builtins_only().unwrap(), PromptCatalog::builtins_only().unwrap(),
None, None,
ActiveWorkflowStore::new(),
); );
let info = task_tool_call_info("TaskList", serde_json::json!({})); let info = task_tool_call_info("TaskList", serde_json::json!({}));
let mut result_info = ToolResultInfo { let mut result_info = ToolResultInfo {
@ -935,6 +957,7 @@ mod tests {
Arc::new(Mutex::new(Vec::new())), Arc::new(Mutex::new(Vec::new())),
PromptCatalog::builtins_only().unwrap(), PromptCatalog::builtins_only().unwrap(),
None, None,
ActiveWorkflowStore::new(),
); );
let history = vec![Item::user_message("hi"), Item::assistant_message("done")]; let history = vec![Item::user_message("hi"), Item::assistant_message("done")];
@ -969,6 +992,7 @@ mod tests {
Some(Arc::new(RecordingSystemItemCommitter { Some(Arc::new(RecordingSystemItemCommitter {
committed: Arc::clone(&committed), committed: Arc::clone(&committed),
})), })),
ActiveWorkflowStore::new(),
) )
.with_usage_tracker(Arc::clone(&usage_tracker)); .with_usage_tracker(Arc::clone(&usage_tracker));
@ -1028,6 +1052,7 @@ mod tests {
Arc::new(Mutex::new(Vec::new())), Arc::new(Mutex::new(Vec::new())),
PromptCatalog::builtins_only().unwrap(), PromptCatalog::builtins_only().unwrap(),
None, None,
ActiveWorkflowStore::new(),
); );
let items = interceptor.pending_history_appends().await; let items = interceptor.pending_history_appends().await;
@ -1065,6 +1090,7 @@ mod tests {
Arc::new(Mutex::new(Vec::new())), Arc::new(Mutex::new(Vec::new())),
PromptCatalog::builtins_only().unwrap(), PromptCatalog::builtins_only().unwrap(),
None, None,
ActiveWorkflowStore::new(),
); );
let mut ctx: Vec<Item> = vec![Item::user_message("hi")]; let mut ctx: Vec<Item> = vec![Item::user_message("hi")];
let action = interceptor.pre_llm_request(&mut ctx).await; let action = interceptor.pre_llm_request(&mut ctx).await;
@ -1095,6 +1121,7 @@ mod tests {
Arc::new(Mutex::new(Vec::new())), Arc::new(Mutex::new(Vec::new())),
PromptCatalog::builtins_only().unwrap(), PromptCatalog::builtins_only().unwrap(),
None, None,
ActiveWorkflowStore::new(),
); );
let mut ctx: Vec<Item> = Vec::new(); let mut ctx: Vec<Item> = Vec::new();
let action = interceptor.pre_llm_request(&mut ctx).await; let action = interceptor.pre_llm_request(&mut ctx).await;

View File

@ -1,3 +1,4 @@
pub mod active_workflow;
pub mod compact; pub mod compact;
pub mod controller; pub mod controller;
pub mod discovery; pub mod discovery;

View File

@ -26,6 +26,7 @@ use manifest::{
ScopeError, ScopeRule, SharedScope, WorkerManifest, ScopeError, ScopeRule, SharedScope, WorkerManifest,
}; };
use crate::active_workflow::{self, ActiveWorkflowStore};
use crate::compact::state::CompactState; use crate::compact::state::CompactState;
use crate::compact::usage_tracker::UsageTracker; use crate::compact::usage_tracker::UsageTracker;
use crate::feature::builtin::TaskFeature; use crate::feature::builtin::TaskFeature;
@ -145,6 +146,7 @@ struct EmptyTurnRollbackSnapshot {
usage_history_len: usize, usage_history_len: usize,
ai_activity_count: usize, ai_activity_count: usize,
last_run_interrupted: bool, last_run_interrupted: bool,
active_workflows: active_workflow::ActiveWorkflowSnapshot,
} }
fn is_ai_materialized_item(item: &Item) -> bool { fn is_ai_materialized_item(item: &Item) -> bool {
@ -196,20 +198,23 @@ where
/// interceptor commit `SystemItem`s without being generic over the /// interceptor commit `SystemItem`s without being generic over the
/// concrete `Store` type. /// concrete `Store` type.
pub trait SystemItemCommitter: Send + Sync { pub trait SystemItemCommitter: Send + Sync {
fn commit_system_item(&self, item: SystemItem); fn commit_log_entry(&self, entry: LogEntry);
fn commit_system_item(&self, item: SystemItem) {
self.commit_log_entry(LogEntry::SystemItem {
ts: segment_log::now_millis(),
item,
});
}
} }
impl<St> SystemItemCommitter for LogWriterHandle<St> impl<St> SystemItemCommitter for LogWriterHandle<St>
where where
St: Store + Clone + Send + Sync + 'static, St: Store + Clone + Send + Sync + 'static,
{ {
fn commit_system_item(&self, item: SystemItem) { fn commit_log_entry(&self, entry: LogEntry) {
let entry = LogEntry::SystemItem {
ts: segment_log::now_millis(),
item,
};
if let Err(err) = self.append_entry(entry) { if let Err(err) = self.append_entry(entry) {
warn!(error = %err, "system item commit failed; dropping"); warn!(error = %err, "session log entry commit failed; dropping");
} }
} }
} }
@ -274,6 +279,10 @@ pub struct Pod<C: LlmClient, St: Store> {
/// the narrow snapshot/restore surface Pod needs for compaction and rewind. /// the narrow snapshot/restore surface Pod needs for compaction and rewind.
/// Store/reminder ownership stays inside the Task feature module. /// Store/reminder ownership stays inside the Task feature module.
task_feature: TaskFeature, task_feature: TaskFeature,
/// Durable state for workflow invocations that are active for the current task.
/// The store is persisted as typed session-log extensions and rehydrated into
/// prompt context during compaction.
active_workflows: ActiveWorkflowStore,
/// Parsed system-prompt template awaiting first-turn materialisation. /// Parsed system-prompt template awaiting first-turn materialisation.
/// `Some` until `ensure_system_prompt_materialized` renders it once, /// `Some` until `ensure_system_prompt_materialized` renders it once,
/// then `None` forever — including after compaction. /// then `None` forever — including after compaction.
@ -435,6 +444,7 @@ impl<C: LlmClient + Clone + 'static, St: Store + Clone + 'static> Pod<C, St> {
usage_history: self.usage_history.clone(), usage_history: self.usage_history.clone(),
tracker: None, tracker: None,
task_feature: self.task_feature.clone(), task_feature: self.task_feature.clone(),
active_workflows: self.active_workflows.clone(),
system_prompt_template: None, system_prompt_template: None,
alerter: self.alerter.clone(), alerter: self.alerter.clone(),
event_tx: self.event_tx.clone(), event_tx: self.event_tx.clone(),
@ -618,6 +628,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
usage_history: Arc::new(Mutex::new(Vec::<UsageRecord>::new())), usage_history: Arc::new(Mutex::new(Vec::<UsageRecord>::new())),
tracker: None, tracker: None,
task_feature: TaskFeature::new(), task_feature: TaskFeature::new(),
active_workflows: ActiveWorkflowStore::new(),
system_prompt_template: None, system_prompt_template: None,
alerter: None, alerter: None,
event_tx: None, event_tx: None,
@ -813,7 +824,16 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
registry: FeatureRegistryBuilder, registry: FeatureRegistryBuilder,
) -> FeatureRegistryInstallReport { ) -> FeatureRegistryInstallReport {
let worker = self.worker.as_mut().expect("worker taken during run"); let worker = self.worker.as_mut().expect("worker taken during run");
registry.install_into_worker(worker, &mut self.hook_builder) let report = registry.install_into_worker(worker, &mut self.hook_builder);
let active_workflow_committer = self.log_writer.clone().map(|writer| {
Arc::new(move |entry| writer.commit_log_entry(entry))
as active_workflow::LogEntryCommitter
});
worker.register_tools(active_workflow::active_workflow_tools(
self.active_workflows.clone(),
active_workflow_committer,
));
report
} }
/// Reference to the store. /// Reference to the store.
@ -876,7 +896,11 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
self.sink.truncate_silent(truncate_entries); self.sink.truncate_silent(truncate_entries);
self.task_feature.restore_from_history(&state.history); self.task_feature.restore_from_history(&state.history);
self.worker_mut().set_history(state.history); self.active_workflows
.restore_from_history_and_extensions(&state.history, &state.extensions);
let mut history = state.history;
active_workflow::strip_rehydration_messages(&mut history);
self.worker_mut().set_history(history);
self.worker_mut().set_request_config(state.config); self.worker_mut().set_request_config(state.config);
self.worker_mut().set_turn_count(state.turn_count); self.worker_mut().set_turn_count(state.turn_count);
self.worker_mut() self.worker_mut()
@ -1242,6 +1266,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
self.pending_attachments.clone(), self.pending_attachments.clone(),
self.prompts.clone(), self.prompts.clone(),
self.log_writer.clone(), self.log_writer.clone(),
self.active_workflows.clone(),
) )
.with_usage_tracker(self.usage_tracker.clone()); .with_usage_tracker(self.usage_tracker.clone());
self.worker_mut().set_interceptor(interceptor); self.worker_mut().set_interceptor(interceptor);
@ -1428,6 +1453,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
usage_history_len, usage_history_len,
ai_activity_count: self.ai_activity_counter.load(Ordering::SeqCst), ai_activity_count: self.ai_activity_counter.load(Ordering::SeqCst),
last_run_interrupted: self.worker().last_run_interrupted(), last_run_interrupted: self.worker().last_run_interrupted(),
active_workflows: self.active_workflows.snapshot(),
} }
} }
@ -1465,6 +1491,8 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
.truncate(snapshot.usage_history_len); .truncate(snapshot.usage_history_len);
let _ = self.usage_tracker.drain(); let _ = self.usage_tracker.drain();
let _ = self.metrics_tracker.drain(); let _ = self.metrics_tracker.drain();
self.active_workflows
.replace_with(snapshot.active_workflows);
let loc = self.segment_state.location(); let loc = self.segment_state.location();
self.store self.store
@ -1535,6 +1563,14 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
let mut attachments = self.resolve_file_refs(&input); let mut attachments = self.resolve_file_refs(&input);
attachments.extend(self.resolve_knowledge_refs(&input)); attachments.extend(self.resolve_knowledge_refs(&input));
attachments.extend(self.resolve_workflow_invocations(&input)?); attachments.extend(self.resolve_workflow_invocations(&input)?);
let flattened = self.flatten_segments(&input);
if self.active_workflows.activate_from_system_items(
&attachments,
flattened.clone(),
segment_log::now_millis(),
) {
self.commit_entry(self.active_workflows.extension_entry())?;
}
if !attachments.is_empty() { if !attachments.is_empty() {
*self *self
.pending_attachments .pending_attachments
@ -1542,8 +1578,6 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
.expect("pending_attachments poisoned") = attachments; .expect("pending_attachments poisoned") = attachments;
} }
let flattened = self.flatten_segments(&input);
let history_before = self.worker.as_ref().unwrap().history().len(); let history_before = self.worker.as_ref().unwrap().history().len();
// lock → run → unlock // lock → run → unlock
@ -2368,8 +2402,10 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
let worker = self.worker.as_ref().expect("worker taken during run"); let worker = self.worker.as_ref().expect("worker taken during run");
let history = worker.history(); let history = worker.history();
let retain_from = cut.index.min(history.len()); let retain_from = cut.index.min(history.len());
let retained_items = history[retain_from..].to_vec(); let mut retained_items = history[retain_from..].to_vec();
let items_to_summarise = history[..retain_from].to_vec(); let mut items_to_summarise = history[..retain_from].to_vec();
active_workflow::strip_rehydration_messages(&mut retained_items);
active_workflow::strip_rehydration_messages(&mut items_to_summarise);
// Compaction-related knobs. Fall through to manifest defaults when // Compaction-related knobs. Fall through to manifest defaults when
// `[compaction]` is omitted entirely. // `[compaction]` is omitted entirely.
@ -2428,13 +2464,15 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
.unwrap_or_default(); .unwrap_or_default();
// Input text fed to the compact worker. Includes the default // Input text fed to the compact worker. Includes the default
// references, current TaskStore snapshot, and the (pruned) // references, current TaskStore snapshot, active workflow invocation
// conversation text. // state, and the (pruned) conversation text.
let task_snapshot_text = self.task_feature.snapshot_text(); let task_snapshot_text = self.task_feature.snapshot_text();
let active_workflow_snapshot_text = self.active_workflows.snapshot_text();
let summary_input = build_summary_input( let summary_input = build_summary_input(
&items_to_summarise, &items_to_summarise,
&default_refs, &default_refs,
Some(task_snapshot_text.as_str()), Some(task_snapshot_text.as_str()),
active_workflow_snapshot_text.as_deref(),
SummaryInputOptions { SummaryInputOptions {
overview_target_tokens, overview_target_tokens,
overview_warning_tokens, overview_warning_tokens,
@ -2610,6 +2648,10 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
.count(); .count();
// Build new history: [summary, ...auto-read, references, ...retained, task snapshot, TaskList synthetic call/result]. // Build new history: [summary, ...auto-read, references, ...retained, task snapshot, TaskList synthetic call/result].
// Active workflow guidance is intentionally not persisted as an ordinary
// compacted-history system message. It is regenerated request-locally
// from typed `pod.active_workflows` extension state so completed,
// cancelled, corrupt, or missing state cannot leak stale obligations.
// The TaskStore snapshot trails the retained items so that, on resume, // The TaskStore snapshot trails the retained items so that, on resume,
// `replay_history` walks any pre-compact Task* calls preserved verbatim // `replay_history` walks any pre-compact Task* calls preserved verbatim
// in retained_items first and the trailing snapshot's `replace_with` // in retained_items first and the trailing snapshot's `replace_with`
@ -2680,18 +2722,23 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
at_turn_index: source_turn_count, at_turn_index: source_turn_count,
}), }),
}; };
let active_workflow_extension = self.active_workflows.extension_entry();
let initial_entries = vec![entry.clone(), active_workflow_extension.clone()];
self.store self.store
.create_segment(old_loc.session_id, new_segment_id, &[entry.clone()])?; .create_segment(old_loc.session_id, new_segment_id, &initial_entries)?;
self.segment_state.set_location(SegmentLocation { self.segment_state.set_location(SegmentLocation {
session_id: old_loc.session_id, session_id: old_loc.session_id,
segment_id: new_segment_id, segment_id: new_segment_id,
}); });
self.segment_state.set_entries_written(1); self.segment_state
.set_entries_written(initial_entries.len());
let session_start = entry; let session_start = entry;
// Broadcast the SegmentStart through the sink. This atomically // Broadcast the SegmentStart through the sink. This atomically
// resets the mirror to `[SegmentStart]` so any subscriber // resets the mirror to the replacement segment prefix so any subscriber
// querying after this point sees the post-compaction prefix. // querying after this point sees the post-compaction prefix, including
self.sink.reset_with_initial(session_start); // durable extension state.
self.sink
.reset_with_initial_entries(vec![session_start, active_workflow_extension]);
// Keep pods.json pointing at the live segment_id. Without this // Keep pods.json pointing at the live segment_id. Without this
// a concurrent `restore_from_manifest(new_segment_id)` would // a concurrent `restore_from_manifest(new_segment_id)` would
// see no live writer and grab the session this Pod just moved // see no live writer and grab the session this Pod just moved
@ -3794,6 +3841,7 @@ where
usage_history: Arc::new(Mutex::new(Vec::new())), usage_history: Arc::new(Mutex::new(Vec::new())),
tracker: None, tracker: None,
task_feature: TaskFeature::new(), task_feature: TaskFeature::new(),
active_workflows: ActiveWorkflowStore::new(),
system_prompt_template: common.system_prompt_template, system_prompt_template: common.system_prompt_template,
alerter: None, alerter: None,
event_tx: None, event_tx: None,
@ -3902,6 +3950,7 @@ where
usage_history: Arc::new(Mutex::new(Vec::new())), usage_history: Arc::new(Mutex::new(Vec::new())),
tracker: None, tracker: None,
task_feature: TaskFeature::new(), task_feature: TaskFeature::new(),
active_workflows: ActiveWorkflowStore::new(),
system_prompt_template: common.system_prompt_template, system_prompt_template: common.system_prompt_template,
alerter: None, alerter: None,
event_tx: None, event_tx: None,
@ -4101,7 +4150,9 @@ where
.. ..
}) })
); );
worker.set_history(state.history.clone()); let mut restored_history = state.history.clone();
active_workflow::strip_rehydration_messages(&mut restored_history);
worker.set_history(restored_history);
worker.set_request_config(state.config.clone()); worker.set_request_config(state.config.clone());
worker.set_turn_count(state.turn_count); worker.set_turn_count(state.turn_count);
worker.set_last_run_interrupted(state.last_run_interrupted); worker.set_last_run_interrupted(state.last_run_interrupted);
@ -4111,6 +4162,8 @@ where
let extract_pointer = memory::extract::fold_pointer(&state.extensions); let extract_pointer = memory::extract::fold_pointer(&state.extensions);
let task_feature = TaskFeature::from_history(&state.history); let task_feature = TaskFeature::from_history(&state.history);
let active_workflows = ActiveWorkflowStore::new();
active_workflows.restore_from_history_and_extensions(&state.history, &state.extensions);
let pod_metadata_writer = Some(pod_metadata_writer_for_store(&store)); let pod_metadata_writer = Some(pod_metadata_writer_for_store(&store));
let mut pod = Self { let mut pod = Self {
@ -4131,6 +4184,7 @@ where
usage_history: Arc::new(Mutex::new(state.usage_history)), usage_history: Arc::new(Mutex::new(state.usage_history)),
tracker: None, tracker: None,
task_feature, task_feature,
active_workflows,
// Restore replays the saved system_prompt verbatim — no // Restore replays the saved system_prompt verbatim — no
// template re-render on resume. // template re-render on resume.
system_prompt_template: None, system_prompt_template: None,
@ -4335,12 +4389,13 @@ struct SummaryInputBuild {
} }
/// Build the compact worker's input: default-reference instructions, /// Build the compact worker's input: default-reference instructions,
/// the list of recently-touched files, task snapshot, and a bounded overview /// the list of recently-touched files, task snapshot, active workflow snapshot,
/// rather than a prefix-wide transcript. /// and a bounded overview rather than a prefix-wide transcript.
fn build_summary_input( fn build_summary_input(
items: &[Item], items: &[Item],
default_refs: &[PathBuf], default_refs: &[PathBuf],
task_snapshot: Option<&str>, task_snapshot: Option<&str>,
active_workflow_snapshot: Option<&str>,
options: SummaryInputOptions, options: SummaryInputOptions,
) -> SummaryInputBuild { ) -> SummaryInputBuild {
let overview = build_summary_overview( let overview = build_summary_overview(
@ -4392,6 +4447,17 @@ fn build_summary_input(
out.push_str(task_snapshot); out.push_str(task_snapshot);
out.push_str("\n\n"); out.push_str("\n\n");
} }
if let Some(active_workflow_snapshot) = active_workflow_snapshot {
out.push_str(
"## Active Workflow Invocation State\n\
This is durable typed workflow state for workflow-governed tasks. Preserve active \
slugs, invocation scope, status, obligations/checkpoints, and the snapshotted \
workflow guidance in the summary; do not substitute advertised/latest workflow \
resources for this invocation state.\n",
);
out.push_str(active_workflow_snapshot);
out.push_str("\n\n");
}
out.push_str("## Conversation overview/index\n"); out.push_str("## Conversation overview/index\n");
out.push_str(&overview); out.push_str(&overview);
out.push_str("\n\nWhen you are done, call `write_summary` with the final 5-section text."); out.push_str("\n\nWhen you are done, call `write_summary` with the final 5-section text.");
@ -5278,6 +5344,7 @@ mod build_summary_prompt_tests {
items, items,
&[], &[],
None, None,
None,
SummaryInputOptions { SummaryInputOptions {
overview_target_tokens: 512, overview_target_tokens: 512,
overview_warning_tokens: 1024, overview_warning_tokens: 1024,
@ -5326,6 +5393,27 @@ mod build_summary_prompt_tests {
assert!(!prompt.contains("deliberation")); assert!(!prompt.contains("deliberation"));
} }
#[test]
fn includes_active_workflow_snapshot_section() {
let prompt = build_summary_input(
&[Item::user_message("continue after review")],
&[],
None,
Some("ActiveWorkflowStore: 1 active workflow\n- review before merge\n- close ticket"),
SummaryInputOptions {
overview_target_tokens: 512,
overview_warning_tokens: 1024,
overview_deadline_tokens: 2048,
summary_target_tokens: 256,
},
)
.text;
assert!(prompt.contains("## Active Workflow Invocation State"));
assert!(prompt.contains("review before merge"));
assert!(prompt.contains("close ticket"));
}
#[test] #[test]
fn overview_warning_does_not_drop_input() { fn overview_warning_does_not_drop_input() {
let items = vec![Item::user_message("x".repeat(4_000))]; let items = vec![Item::user_message("x".repeat(4_000))];
@ -5333,6 +5421,7 @@ mod build_summary_prompt_tests {
&items, &items,
&[], &[],
None, None,
None,
SummaryInputOptions { SummaryInputOptions {
overview_target_tokens: 10, overview_target_tokens: 10,
overview_warning_tokens: 100, overview_warning_tokens: 100,
@ -5352,6 +5441,7 @@ mod build_summary_prompt_tests {
&items, &items,
&[], &[],
None, None,
None,
SummaryInputOptions { SummaryInputOptions {
overview_target_tokens: 10, overview_target_tokens: 10,
overview_warning_tokens: 10, overview_warning_tokens: 10,

View File

@ -147,6 +147,24 @@ impl SegmentLogSink {
let _ = self.inner.broadcast_tx.send(initial); let _ = self.inner.broadcast_tx.send(initial);
} }
/// Atomically swap the mirror to the supplied replacement-session prefix
/// and broadcast the first entry as the live rotation signal. Entries after
/// the first are already reflected in reconnect snapshots but are not
/// broadcast live; this is intended for non-live extension state that must
/// share the new segment prefix with SegmentStart.
pub fn reset_with_initial_entries(&self, entries: Vec<LogEntry>) {
let first = entries.first().cloned();
let mut mirror = self
.inner
.mirror
.lock()
.expect("session log mirror mutex poisoned");
*mirror = entries;
if let Some(initial) = first {
let _ = self.inner.broadcast_tx.send(initial);
}
}
/// Replace the mirror with the supplied prefix without broadcasting. /// Replace the mirror with the supplied prefix without broadcasting.
/// ///
/// Used by restore paths that load a session's complete log into /// Used by restore paths that load a session's complete log into

View File

@ -4,7 +4,7 @@ The conversation input is a bounded overview/index, not the full transcript. Tre
## Workflow ## Workflow
1. Read the provided overview/index and current TaskStore snapshot. 1. Read the provided overview/index, current TaskStore snapshot, and any Active Workflow Invocation State section.
2. If the overview does not contain enough detail, use `search_session_log` to find relevant compact-target history items, then `read_session_items` to inspect only the needed range. 2. If the overview does not contain enough detail, use `search_session_log` to find relevant compact-target history items, then `read_session_items` to inspect only the needed range.
3. Use `read_file` to inspect referenced files before deciding what the next session needs. Prefer skimming over blind inclusion. 3. Use `read_file` to inspect referenced files before deciding what the next session needs. Prefer skimming over blind inclusion.
4. For files whose current contents are load-bearing for the active work, call `mark_read_required` to inject them into the next session. These count against the auto-read token budget — spend it deliberately. 4. For files whose current contents are load-bearing for the active work, call `mark_read_required` to inject them into the next session. These count against the auto-read token budget — spend it deliberately.
@ -39,5 +39,6 @@ Produce the summary in this exact format:
## Constraints ## Constraints
- Preserve active workflow invocation state when present: active slug, invocation scope/source/time, status, open obligations/checkpoints, and snapshotted workflow guidance. Do not replace a snapshotted invocation with merely advertised/latest workflow resources.
- Keep code snippets and raw tool output OUT of the summary — that is what auto-read and references are for. - Keep code snippets and raw tool output OUT of the summary — that is what auto-read and references are for.
- Follow the summary target stated in the run input; if asked to shrink, call `write_summary` again with a shorter version. - Follow the summary target stated in the run input; if asked to shrink, call `write_summary` again with a shorter version.