fix: preserve active workflows across compaction
This commit is contained in:
parent
73d0a6a452
commit
362fedfbe6
580
crates/pod/src/active_workflow.rs
Normal file
580
crates/pod/src/active_workflow.rs
Normal file
|
|
@ -0,0 +1,580 @@
|
||||||
|
//! Durable active workflow invocation state.
|
||||||
|
//!
|
||||||
|
//! Workflow bodies are resolved at invocation time and snapshotted here. The
|
||||||
|
//! snapshot, not whatever resource version is installed later, is the procedural
|
||||||
|
//! authority that survives compaction for the currently governed task.
|
||||||
|
|
||||||
|
use std::collections::BTreeMap;
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
|
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use llm_worker::Item;
|
||||||
|
use llm_worker::tool::{
|
||||||
|
Tool, ToolDefinition, ToolError, ToolExecutionContext, ToolMeta, ToolOutput,
|
||||||
|
};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use serde_json::json;
|
||||||
|
use session_store::{LogEntry, SystemItem, segment_log};
|
||||||
|
|
||||||
|
pub const DOMAIN: &str = "pod.active_workflows";
|
||||||
|
const SCHEMA_VERSION: u32 = 1;
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
|
pub struct ActiveWorkflowSnapshot {
|
||||||
|
pub schema_version: u32,
|
||||||
|
pub workflows: Vec<ActiveWorkflowRecord>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for ActiveWorkflowSnapshot {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
schema_version: SCHEMA_VERSION,
|
||||||
|
workflows: Vec::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
|
pub struct ActiveWorkflowRecord {
|
||||||
|
pub slug: String,
|
||||||
|
pub status: ActiveWorkflowStatus,
|
||||||
|
pub invocation: WorkflowInvocationInfo,
|
||||||
|
pub task_scope: String,
|
||||||
|
pub body_snapshot_policy: WorkflowBodySnapshotPolicy,
|
||||||
|
pub guidance_snapshot: String,
|
||||||
|
pub obligations: Vec<String>,
|
||||||
|
pub checkpoints: Vec<WorkflowCheckpoint>,
|
||||||
|
pub updated_at_ms: u64,
|
||||||
|
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||||
|
pub completion: Option<WorkflowCompletionInfo>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, schemars::JsonSchema)]
|
||||||
|
#[serde(rename_all = "snake_case")]
|
||||||
|
pub enum ActiveWorkflowStatus {
|
||||||
|
Active,
|
||||||
|
Completed,
|
||||||
|
Cancelled,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::fmt::Display for ActiveWorkflowStatus {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
let s = match self {
|
||||||
|
Self::Active => "active",
|
||||||
|
Self::Completed => "completed",
|
||||||
|
Self::Cancelled => "cancelled",
|
||||||
|
};
|
||||||
|
f.write_str(s)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
|
pub struct WorkflowInvocationInfo {
|
||||||
|
pub source: WorkflowInvocationSource,
|
||||||
|
pub invoked_at_ms: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
|
#[serde(rename_all = "snake_case")]
|
||||||
|
pub enum WorkflowInvocationSource {
|
||||||
|
UserWorkflowInvokeSegment,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
|
#[serde(rename_all = "snake_case")]
|
||||||
|
pub enum WorkflowBodySnapshotPolicy {
|
||||||
|
SnapshottedAtInvocation,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
|
pub struct WorkflowCheckpoint {
|
||||||
|
pub label: String,
|
||||||
|
pub status: WorkflowCheckpointStatus,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
|
#[serde(rename_all = "snake_case")]
|
||||||
|
pub enum WorkflowCheckpointStatus {
|
||||||
|
Open,
|
||||||
|
Done,
|
||||||
|
Cancelled,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
|
pub struct WorkflowCompletionInfo {
|
||||||
|
pub completed_at_ms: u64,
|
||||||
|
pub reason: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Default)]
|
||||||
|
pub struct ActiveWorkflowStore {
|
||||||
|
inner: Arc<Mutex<ActiveWorkflowSnapshot>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ActiveWorkflowStore {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self::default()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn snapshot(&self) -> ActiveWorkflowSnapshot {
|
||||||
|
self.inner.lock().unwrap_or_else(|e| e.into_inner()).clone()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn replace_with(&self, snapshot: ActiveWorkflowSnapshot) {
|
||||||
|
*self.inner.lock().unwrap_or_else(|e| e.into_inner()) = snapshot;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn active_records(&self) -> Vec<ActiveWorkflowRecord> {
|
||||||
|
self.snapshot()
|
||||||
|
.workflows
|
||||||
|
.into_iter()
|
||||||
|
.filter(|record| record.status == ActiveWorkflowStatus::Active)
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn activate_from_system_items(
|
||||||
|
&self,
|
||||||
|
items: &[SystemItem],
|
||||||
|
task_scope: String,
|
||||||
|
invoked_at_ms: u64,
|
||||||
|
) -> bool {
|
||||||
|
let mut grouped: BTreeMap<String, Vec<String>> = BTreeMap::new();
|
||||||
|
for item in items {
|
||||||
|
if let SystemItem::Workflow { slug, body } = item {
|
||||||
|
grouped.entry(slug.clone()).or_default().push(body.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if grouped.is_empty() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut snapshot = self.snapshot();
|
||||||
|
snapshot.schema_version = SCHEMA_VERSION;
|
||||||
|
for (slug, bodies) in grouped {
|
||||||
|
let guidance_snapshot = bodies.join("\n\n---\n\n");
|
||||||
|
let obligations = extract_obligations(&guidance_snapshot);
|
||||||
|
let checkpoints = obligations
|
||||||
|
.iter()
|
||||||
|
.take(32)
|
||||||
|
.map(|label| WorkflowCheckpoint {
|
||||||
|
label: label.clone(),
|
||||||
|
status: WorkflowCheckpointStatus::Open,
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
let record = ActiveWorkflowRecord {
|
||||||
|
slug: slug.clone(),
|
||||||
|
status: ActiveWorkflowStatus::Active,
|
||||||
|
invocation: WorkflowInvocationInfo {
|
||||||
|
source: WorkflowInvocationSource::UserWorkflowInvokeSegment,
|
||||||
|
invoked_at_ms,
|
||||||
|
},
|
||||||
|
task_scope: truncate_chars(&task_scope, 2_000),
|
||||||
|
body_snapshot_policy: WorkflowBodySnapshotPolicy::SnapshottedAtInvocation,
|
||||||
|
guidance_snapshot,
|
||||||
|
obligations,
|
||||||
|
checkpoints,
|
||||||
|
updated_at_ms: invoked_at_ms,
|
||||||
|
completion: None,
|
||||||
|
};
|
||||||
|
upsert_record(&mut snapshot.workflows, record);
|
||||||
|
}
|
||||||
|
self.replace_with(snapshot);
|
||||||
|
true
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn set_status(
|
||||||
|
&self,
|
||||||
|
slug: &str,
|
||||||
|
status: ActiveWorkflowStatus,
|
||||||
|
reason: String,
|
||||||
|
now_ms: u64,
|
||||||
|
) -> Result<ActiveWorkflowRecord, String> {
|
||||||
|
let mut snapshot = self.inner.lock().unwrap_or_else(|e| e.into_inner());
|
||||||
|
let record = snapshot
|
||||||
|
.workflows
|
||||||
|
.iter_mut()
|
||||||
|
.find(|record| record.slug == slug)
|
||||||
|
.ok_or_else(|| format!("active workflow `{slug}` not found"))?;
|
||||||
|
record.status = status;
|
||||||
|
record.updated_at_ms = now_ms;
|
||||||
|
record.completion = Some(WorkflowCompletionInfo {
|
||||||
|
completed_at_ms: now_ms,
|
||||||
|
reason,
|
||||||
|
});
|
||||||
|
for checkpoint in &mut record.checkpoints {
|
||||||
|
checkpoint.status = match status {
|
||||||
|
ActiveWorkflowStatus::Active => WorkflowCheckpointStatus::Open,
|
||||||
|
ActiveWorkflowStatus::Completed => WorkflowCheckpointStatus::Done,
|
||||||
|
ActiveWorkflowStatus::Cancelled => WorkflowCheckpointStatus::Cancelled,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
Ok(record.clone())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn snapshot_text(&self) -> Option<String> {
|
||||||
|
let active = self.active_records();
|
||||||
|
(!active.is_empty()).then(|| render_snapshot_text(&active))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn rehydration_message(&self) -> Option<String> {
|
||||||
|
let active = self.active_records();
|
||||||
|
(!active.is_empty()).then(|| render_rehydration_message(&active))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn extension_entry(&self) -> LogEntry {
|
||||||
|
LogEntry::Extension {
|
||||||
|
ts: segment_log::now_millis(),
|
||||||
|
domain: DOMAIN.into(),
|
||||||
|
payload: serde_json::to_value(self.snapshot())
|
||||||
|
.expect("ActiveWorkflowSnapshot is always JSON-serializable"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn restore_from_history_and_extensions(
|
||||||
|
&self,
|
||||||
|
history: &[Item],
|
||||||
|
extensions: &[(String, serde_json::Value)],
|
||||||
|
) {
|
||||||
|
let (mut snapshot, diagnostics) = fold_extensions(extensions);
|
||||||
|
for diagnostic in diagnostics {
|
||||||
|
tracing::warn!(diagnostic, "failed to restore active workflow state");
|
||||||
|
}
|
||||||
|
replay_history_tools(&mut snapshot, history);
|
||||||
|
self.replace_with(snapshot);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn fold_extensions(
|
||||||
|
extensions: &[(String, serde_json::Value)],
|
||||||
|
) -> (ActiveWorkflowSnapshot, Vec<String>) {
|
||||||
|
let mut latest = None;
|
||||||
|
let mut diagnostics = Vec::new();
|
||||||
|
for (domain, payload) in extensions {
|
||||||
|
if domain != DOMAIN {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
match serde_json::from_value::<ActiveWorkflowSnapshot>(payload.clone()) {
|
||||||
|
Ok(snapshot) if snapshot.schema_version == SCHEMA_VERSION => latest = Some(snapshot),
|
||||||
|
Ok(snapshot) => {
|
||||||
|
latest = None;
|
||||||
|
diagnostics.push(format!(
|
||||||
|
"unsupported active workflow schema_version {}",
|
||||||
|
snapshot.schema_version
|
||||||
|
));
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
latest = None;
|
||||||
|
diagnostics.push(format!("corrupt active workflow payload: {err}"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
(latest.unwrap_or_default(), diagnostics)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn replay_history_tools(snapshot: &mut ActiveWorkflowSnapshot, history: &[Item]) {
|
||||||
|
for item in history {
|
||||||
|
let Item::ToolCall {
|
||||||
|
name, arguments, ..
|
||||||
|
} = item
|
||||||
|
else {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
let status = match name.as_str() {
|
||||||
|
"ActiveWorkflowComplete" => ActiveWorkflowStatus::Completed,
|
||||||
|
"ActiveWorkflowCancel" => ActiveWorkflowStatus::Cancelled,
|
||||||
|
_ => continue,
|
||||||
|
};
|
||||||
|
if let Ok(params) = serde_json::from_str::<WorkflowStatusParams>(arguments) {
|
||||||
|
if let Some(record) = snapshot
|
||||||
|
.workflows
|
||||||
|
.iter_mut()
|
||||||
|
.find(|record| record.slug == params.slug)
|
||||||
|
{
|
||||||
|
let reason = params.reason.unwrap_or_else(|| status.to_string());
|
||||||
|
record.status = status;
|
||||||
|
record.updated_at_ms = record.updated_at_ms.saturating_add(1);
|
||||||
|
record.completion = Some(WorkflowCompletionInfo {
|
||||||
|
completed_at_ms: record.updated_at_ms,
|
||||||
|
reason,
|
||||||
|
});
|
||||||
|
for checkpoint in &mut record.checkpoints {
|
||||||
|
checkpoint.status = match status {
|
||||||
|
ActiveWorkflowStatus::Active => WorkflowCheckpointStatus::Open,
|
||||||
|
ActiveWorkflowStatus::Completed => WorkflowCheckpointStatus::Done,
|
||||||
|
ActiveWorkflowStatus::Cancelled => WorkflowCheckpointStatus::Cancelled,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn active_workflow_tools(store: ActiveWorkflowStore) -> Vec<ToolDefinition> {
|
||||||
|
vec![
|
||||||
|
list_tool(store.clone()),
|
||||||
|
status_tool(store.clone(), ActiveWorkflowStatus::Completed),
|
||||||
|
status_tool(store, ActiveWorkflowStatus::Cancelled),
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
fn list_tool(store: ActiveWorkflowStore) -> ToolDefinition {
|
||||||
|
Arc::new(move || {
|
||||||
|
(
|
||||||
|
ToolMeta::new("ActiveWorkflowList")
|
||||||
|
.description("List durable active workflow invocations and their status")
|
||||||
|
.input_schema(
|
||||||
|
json!({"type":"object","properties":{},"additionalProperties":false}),
|
||||||
|
),
|
||||||
|
Arc::new(ActiveWorkflowListTool {
|
||||||
|
store: store.clone(),
|
||||||
|
}) as Arc<dyn Tool>,
|
||||||
|
)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn status_tool(store: ActiveWorkflowStore, status: ActiveWorkflowStatus) -> ToolDefinition {
|
||||||
|
let name = match status {
|
||||||
|
ActiveWorkflowStatus::Completed => "ActiveWorkflowComplete",
|
||||||
|
ActiveWorkflowStatus::Cancelled => "ActiveWorkflowCancel",
|
||||||
|
ActiveWorkflowStatus::Active => unreachable!("active status tool is not exposed"),
|
||||||
|
};
|
||||||
|
let description = match status {
|
||||||
|
ActiveWorkflowStatus::Completed => {
|
||||||
|
"Mark an active workflow as completed when its governed task is finished"
|
||||||
|
}
|
||||||
|
ActiveWorkflowStatus::Cancelled => {
|
||||||
|
"Cancel an active workflow when the governed task is explicitly abandoned"
|
||||||
|
}
|
||||||
|
ActiveWorkflowStatus::Active => unreachable!("active status tool is not exposed"),
|
||||||
|
};
|
||||||
|
let store_for_tool = store.clone();
|
||||||
|
Arc::new(move || {
|
||||||
|
(
|
||||||
|
ToolMeta::new(name)
|
||||||
|
.description(description)
|
||||||
|
.input_schema(json!({
|
||||||
|
"type":"object",
|
||||||
|
"properties":{
|
||||||
|
"slug":{"type":"string","description":"Workflow slug to update"},
|
||||||
|
"reason":{"type":"string","description":"Brief completion/cancellation reason"}
|
||||||
|
},
|
||||||
|
"required":["slug"],
|
||||||
|
"additionalProperties":false
|
||||||
|
})),
|
||||||
|
Arc::new(ActiveWorkflowStatusTool {
|
||||||
|
store: store_for_tool.clone(),
|
||||||
|
status,
|
||||||
|
}) as Arc<dyn Tool>,
|
||||||
|
)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
struct ActiveWorkflowListTool {
|
||||||
|
store: ActiveWorkflowStore,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl Tool for ActiveWorkflowListTool {
|
||||||
|
async fn execute(
|
||||||
|
&self,
|
||||||
|
_input_json: &str,
|
||||||
|
_ctx: ToolExecutionContext,
|
||||||
|
) -> Result<ToolOutput, ToolError> {
|
||||||
|
let snapshot = self.store.snapshot();
|
||||||
|
let content = serde_json::to_string_pretty(&snapshot)
|
||||||
|
.map_err(|err| ToolError::Internal(err.to_string()))?;
|
||||||
|
let active = snapshot
|
||||||
|
.workflows
|
||||||
|
.iter()
|
||||||
|
.filter(|record| record.status == ActiveWorkflowStatus::Active)
|
||||||
|
.count();
|
||||||
|
Ok(ToolOutput {
|
||||||
|
summary: format!(
|
||||||
|
"ActiveWorkflowStore: {} workflow(s), {active} active",
|
||||||
|
snapshot.workflows.len()
|
||||||
|
),
|
||||||
|
content: Some(content),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct ActiveWorkflowStatusTool {
|
||||||
|
store: ActiveWorkflowStore,
|
||||||
|
status: ActiveWorkflowStatus,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl Tool for ActiveWorkflowStatusTool {
|
||||||
|
async fn execute(
|
||||||
|
&self,
|
||||||
|
input_json: &str,
|
||||||
|
_ctx: ToolExecutionContext,
|
||||||
|
) -> Result<ToolOutput, ToolError> {
|
||||||
|
let params: WorkflowStatusParams = serde_json::from_str(input_json)
|
||||||
|
.map_err(|err| ToolError::InvalidArgument(err.to_string()))?;
|
||||||
|
let reason = params.reason.unwrap_or_else(|| self.status.to_string());
|
||||||
|
let record = self
|
||||||
|
.store
|
||||||
|
.set_status(¶ms.slug, self.status, reason, segment_log::now_millis())
|
||||||
|
.map_err(ToolError::InvalidArgument)?;
|
||||||
|
let content = serde_json::to_string_pretty(&record)
|
||||||
|
.map_err(|err| ToolError::Internal(err.to_string()))?;
|
||||||
|
Ok(ToolOutput {
|
||||||
|
summary: format!("workflow {} marked {}", record.slug, record.status),
|
||||||
|
content: Some(content),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize)]
|
||||||
|
struct WorkflowStatusParams {
|
||||||
|
slug: String,
|
||||||
|
#[serde(default)]
|
||||||
|
reason: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
fn upsert_record(records: &mut Vec<ActiveWorkflowRecord>, record: ActiveWorkflowRecord) {
|
||||||
|
if let Some(existing) = records
|
||||||
|
.iter_mut()
|
||||||
|
.find(|existing| existing.slug == record.slug)
|
||||||
|
{
|
||||||
|
*existing = record;
|
||||||
|
} else {
|
||||||
|
records.push(record);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn extract_obligations(body: &str) -> Vec<String> {
|
||||||
|
let mut obligations = Vec::new();
|
||||||
|
for line in body.lines() {
|
||||||
|
let trimmed = line.trim();
|
||||||
|
let candidate = trimmed
|
||||||
|
.strip_prefix("- ")
|
||||||
|
.or_else(|| trimmed.strip_prefix("* "))
|
||||||
|
.or_else(|| trimmed.strip_prefix("• "))
|
||||||
|
.unwrap_or(trimmed);
|
||||||
|
let lower = candidate.to_ascii_lowercase();
|
||||||
|
let looks_obligating = lower.contains("must")
|
||||||
|
|| lower.contains("require")
|
||||||
|
|| lower.contains("obligation")
|
||||||
|
|| lower.contains("review")
|
||||||
|
|| lower.contains("merge")
|
||||||
|
|| lower.contains("close")
|
||||||
|
|| lower.contains("report")
|
||||||
|
|| lower.contains("handoff");
|
||||||
|
if looks_obligating && !candidate.is_empty() {
|
||||||
|
obligations.push(truncate_chars(candidate, 240));
|
||||||
|
}
|
||||||
|
if obligations.len() >= 32 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if obligations.is_empty() {
|
||||||
|
obligations
|
||||||
|
.push("Follow the snapshotted workflow body until completion or cancellation".into());
|
||||||
|
}
|
||||||
|
obligations
|
||||||
|
}
|
||||||
|
|
||||||
|
fn render_snapshot_text(records: &[ActiveWorkflowRecord]) -> String {
|
||||||
|
let json = serde_json::to_string_pretty(&ActiveWorkflowSnapshot {
|
||||||
|
schema_version: SCHEMA_VERSION,
|
||||||
|
workflows: records.to_vec(),
|
||||||
|
})
|
||||||
|
.unwrap_or_else(|_| String::from("{\"schema_version\":1,\"workflows\":[]}"));
|
||||||
|
format!(
|
||||||
|
"ActiveWorkflowStore: {} active workflow(s)\n\n```json\n{}\n```",
|
||||||
|
records.len(),
|
||||||
|
json
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn render_rehydration_message(records: &[ActiveWorkflowRecord]) -> String {
|
||||||
|
let mut out = String::from(
|
||||||
|
"[Active workflow snapshot]\n\n\
|
||||||
|
The following workflow invocation state is durable state carried across compaction. \
|
||||||
|
Continue to follow each active workflow's snapshotted guidance until the governed task \
|
||||||
|
is completed with ActiveWorkflowComplete or explicitly cancelled with ActiveWorkflowCancel. \
|
||||||
|
Missing or obsolete workflow resources must not replace these invocation snapshots.\n",
|
||||||
|
);
|
||||||
|
for record in records {
|
||||||
|
out.push_str(&format!(
|
||||||
|
"\n## /{} ({})\n- invoked_at_ms: {}\n- invocation_source: {:?}\n- body_snapshot_policy: {:?}\n- task_scope: {}\n\n### Current obligations/checkpoints\n",
|
||||||
|
record.slug,
|
||||||
|
record.status,
|
||||||
|
record.invocation.invoked_at_ms,
|
||||||
|
record.invocation.source,
|
||||||
|
record.body_snapshot_policy,
|
||||||
|
record.task_scope.replace('\n', " "),
|
||||||
|
));
|
||||||
|
for checkpoint in &record.checkpoints {
|
||||||
|
out.push_str(&format!(
|
||||||
|
"- [{}] {}\n",
|
||||||
|
checkpoint.status_label(),
|
||||||
|
checkpoint.label
|
||||||
|
));
|
||||||
|
}
|
||||||
|
out.push_str("\n### Snapshotted workflow guidance\n");
|
||||||
|
out.push_str(record.guidance_snapshot.trim_end());
|
||||||
|
out.push_str("\n");
|
||||||
|
}
|
||||||
|
out
|
||||||
|
}
|
||||||
|
|
||||||
|
impl WorkflowCheckpoint {
|
||||||
|
fn status_label(&self) -> &'static str {
|
||||||
|
match self.status {
|
||||||
|
WorkflowCheckpointStatus::Open => "open",
|
||||||
|
WorkflowCheckpointStatus::Done => "done",
|
||||||
|
WorkflowCheckpointStatus::Cancelled => "cancelled",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn truncate_chars(text: &str, max_chars: usize) -> String {
|
||||||
|
let mut out = String::new();
|
||||||
|
for (idx, ch) in text.chars().enumerate() {
|
||||||
|
if idx >= max_chars {
|
||||||
|
out.push('…');
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
out.push(ch);
|
||||||
|
}
|
||||||
|
out
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn active_workflow_guidance_carries_merge_close_obligations() {
|
||||||
|
let store = ActiveWorkflowStore::new();
|
||||||
|
let items = vec![SystemItem::Workflow {
|
||||||
|
slug: "multi-agent-workflow".into(),
|
||||||
|
body: "# Multi-agent workflow\n- Delegate implementation to coder.\n- Require external review before merge.\n- Close the Ticket after merge and report evidence.\n".into(),
|
||||||
|
}];
|
||||||
|
|
||||||
|
assert!(store.activate_from_system_items(
|
||||||
|
&items,
|
||||||
|
"/multi-agent-workflow implement ticket".into(),
|
||||||
|
42,
|
||||||
|
));
|
||||||
|
let msg = store.rehydration_message().unwrap();
|
||||||
|
|
||||||
|
assert!(msg.contains("multi-agent-workflow"));
|
||||||
|
assert!(msg.contains("external review before merge"));
|
||||||
|
assert!(msg.contains("Close the Ticket after merge"));
|
||||||
|
assert!(msg.contains("Snapshotted workflow guidance"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn corrupt_extension_fails_closed_with_diagnostic() {
|
||||||
|
let entries = vec![(DOMAIN.to_string(), json!({"schema_version":"bad"}))];
|
||||||
|
|
||||||
|
let (snapshot, diagnostics) = fold_extensions(&entries);
|
||||||
|
|
||||||
|
assert!(snapshot.workflows.is_empty());
|
||||||
|
assert_eq!(diagnostics.len(), 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -1,3 +1,4 @@
|
||||||
|
pub mod active_workflow;
|
||||||
pub mod compact;
|
pub mod compact;
|
||||||
pub mod controller;
|
pub mod controller;
|
||||||
pub mod discovery;
|
pub mod discovery;
|
||||||
|
|
|
||||||
|
|
@ -26,6 +26,7 @@ use manifest::{
|
||||||
ScopeError, ScopeRule, SharedScope, WorkerManifest,
|
ScopeError, ScopeRule, SharedScope, WorkerManifest,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use crate::active_workflow::{self, ActiveWorkflowStore};
|
||||||
use crate::compact::state::CompactState;
|
use crate::compact::state::CompactState;
|
||||||
use crate::compact::usage_tracker::UsageTracker;
|
use crate::compact::usage_tracker::UsageTracker;
|
||||||
use crate::feature::builtin::TaskFeature;
|
use crate::feature::builtin::TaskFeature;
|
||||||
|
|
@ -145,6 +146,7 @@ struct EmptyTurnRollbackSnapshot {
|
||||||
usage_history_len: usize,
|
usage_history_len: usize,
|
||||||
ai_activity_count: usize,
|
ai_activity_count: usize,
|
||||||
last_run_interrupted: bool,
|
last_run_interrupted: bool,
|
||||||
|
active_workflows: active_workflow::ActiveWorkflowSnapshot,
|
||||||
}
|
}
|
||||||
|
|
||||||
fn is_ai_materialized_item(item: &Item) -> bool {
|
fn is_ai_materialized_item(item: &Item) -> bool {
|
||||||
|
|
@ -274,6 +276,10 @@ pub struct Pod<C: LlmClient, St: Store> {
|
||||||
/// the narrow snapshot/restore surface Pod needs for compaction and rewind.
|
/// the narrow snapshot/restore surface Pod needs for compaction and rewind.
|
||||||
/// Store/reminder ownership stays inside the Task feature module.
|
/// Store/reminder ownership stays inside the Task feature module.
|
||||||
task_feature: TaskFeature,
|
task_feature: TaskFeature,
|
||||||
|
/// Durable state for workflow invocations that are active for the current task.
|
||||||
|
/// The store is persisted as typed session-log extensions and rehydrated into
|
||||||
|
/// prompt context during compaction.
|
||||||
|
active_workflows: ActiveWorkflowStore,
|
||||||
/// Parsed system-prompt template awaiting first-turn materialisation.
|
/// Parsed system-prompt template awaiting first-turn materialisation.
|
||||||
/// `Some` until `ensure_system_prompt_materialized` renders it once,
|
/// `Some` until `ensure_system_prompt_materialized` renders it once,
|
||||||
/// then `None` forever — including after compaction.
|
/// then `None` forever — including after compaction.
|
||||||
|
|
@ -435,6 +441,7 @@ impl<C: LlmClient + Clone + 'static, St: Store + Clone + 'static> Pod<C, St> {
|
||||||
usage_history: self.usage_history.clone(),
|
usage_history: self.usage_history.clone(),
|
||||||
tracker: None,
|
tracker: None,
|
||||||
task_feature: self.task_feature.clone(),
|
task_feature: self.task_feature.clone(),
|
||||||
|
active_workflows: self.active_workflows.clone(),
|
||||||
system_prompt_template: None,
|
system_prompt_template: None,
|
||||||
alerter: self.alerter.clone(),
|
alerter: self.alerter.clone(),
|
||||||
event_tx: self.event_tx.clone(),
|
event_tx: self.event_tx.clone(),
|
||||||
|
|
@ -618,6 +625,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
||||||
usage_history: Arc::new(Mutex::new(Vec::<UsageRecord>::new())),
|
usage_history: Arc::new(Mutex::new(Vec::<UsageRecord>::new())),
|
||||||
tracker: None,
|
tracker: None,
|
||||||
task_feature: TaskFeature::new(),
|
task_feature: TaskFeature::new(),
|
||||||
|
active_workflows: ActiveWorkflowStore::new(),
|
||||||
system_prompt_template: None,
|
system_prompt_template: None,
|
||||||
alerter: None,
|
alerter: None,
|
||||||
event_tx: None,
|
event_tx: None,
|
||||||
|
|
@ -813,7 +821,11 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
||||||
registry: FeatureRegistryBuilder,
|
registry: FeatureRegistryBuilder,
|
||||||
) -> FeatureRegistryInstallReport {
|
) -> FeatureRegistryInstallReport {
|
||||||
let worker = self.worker.as_mut().expect("worker taken during run");
|
let worker = self.worker.as_mut().expect("worker taken during run");
|
||||||
registry.install_into_worker(worker, &mut self.hook_builder)
|
let report = registry.install_into_worker(worker, &mut self.hook_builder);
|
||||||
|
worker.register_tools(active_workflow::active_workflow_tools(
|
||||||
|
self.active_workflows.clone(),
|
||||||
|
));
|
||||||
|
report
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reference to the store.
|
/// Reference to the store.
|
||||||
|
|
@ -876,6 +888,8 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
||||||
self.sink.truncate_silent(truncate_entries);
|
self.sink.truncate_silent(truncate_entries);
|
||||||
|
|
||||||
self.task_feature.restore_from_history(&state.history);
|
self.task_feature.restore_from_history(&state.history);
|
||||||
|
self.active_workflows
|
||||||
|
.restore_from_history_and_extensions(&state.history, &state.extensions);
|
||||||
self.worker_mut().set_history(state.history);
|
self.worker_mut().set_history(state.history);
|
||||||
self.worker_mut().set_request_config(state.config);
|
self.worker_mut().set_request_config(state.config);
|
||||||
self.worker_mut().set_turn_count(state.turn_count);
|
self.worker_mut().set_turn_count(state.turn_count);
|
||||||
|
|
@ -1428,6 +1442,7 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
||||||
usage_history_len,
|
usage_history_len,
|
||||||
ai_activity_count: self.ai_activity_counter.load(Ordering::SeqCst),
|
ai_activity_count: self.ai_activity_counter.load(Ordering::SeqCst),
|
||||||
last_run_interrupted: self.worker().last_run_interrupted(),
|
last_run_interrupted: self.worker().last_run_interrupted(),
|
||||||
|
active_workflows: self.active_workflows.snapshot(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -1465,6 +1480,8 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
||||||
.truncate(snapshot.usage_history_len);
|
.truncate(snapshot.usage_history_len);
|
||||||
let _ = self.usage_tracker.drain();
|
let _ = self.usage_tracker.drain();
|
||||||
let _ = self.metrics_tracker.drain();
|
let _ = self.metrics_tracker.drain();
|
||||||
|
self.active_workflows
|
||||||
|
.replace_with(snapshot.active_workflows);
|
||||||
|
|
||||||
let loc = self.segment_state.location();
|
let loc = self.segment_state.location();
|
||||||
self.store
|
self.store
|
||||||
|
|
@ -1535,6 +1552,14 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
||||||
let mut attachments = self.resolve_file_refs(&input);
|
let mut attachments = self.resolve_file_refs(&input);
|
||||||
attachments.extend(self.resolve_knowledge_refs(&input));
|
attachments.extend(self.resolve_knowledge_refs(&input));
|
||||||
attachments.extend(self.resolve_workflow_invocations(&input)?);
|
attachments.extend(self.resolve_workflow_invocations(&input)?);
|
||||||
|
let flattened = self.flatten_segments(&input);
|
||||||
|
if self.active_workflows.activate_from_system_items(
|
||||||
|
&attachments,
|
||||||
|
flattened.clone(),
|
||||||
|
segment_log::now_millis(),
|
||||||
|
) {
|
||||||
|
self.commit_entry(self.active_workflows.extension_entry())?;
|
||||||
|
}
|
||||||
if !attachments.is_empty() {
|
if !attachments.is_empty() {
|
||||||
*self
|
*self
|
||||||
.pending_attachments
|
.pending_attachments
|
||||||
|
|
@ -1542,8 +1567,6 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
||||||
.expect("pending_attachments poisoned") = attachments;
|
.expect("pending_attachments poisoned") = attachments;
|
||||||
}
|
}
|
||||||
|
|
||||||
let flattened = self.flatten_segments(&input);
|
|
||||||
|
|
||||||
let history_before = self.worker.as_ref().unwrap().history().len();
|
let history_before = self.worker.as_ref().unwrap().history().len();
|
||||||
|
|
||||||
// lock → run → unlock
|
// lock → run → unlock
|
||||||
|
|
@ -2428,13 +2451,15 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
||||||
.unwrap_or_default();
|
.unwrap_or_default();
|
||||||
|
|
||||||
// Input text fed to the compact worker. Includes the default
|
// Input text fed to the compact worker. Includes the default
|
||||||
// references, current TaskStore snapshot, and the (pruned)
|
// references, current TaskStore snapshot, active workflow invocation
|
||||||
// conversation text.
|
// state, and the (pruned) conversation text.
|
||||||
let task_snapshot_text = self.task_feature.snapshot_text();
|
let task_snapshot_text = self.task_feature.snapshot_text();
|
||||||
|
let active_workflow_snapshot_text = self.active_workflows.snapshot_text();
|
||||||
let summary_input = build_summary_input(
|
let summary_input = build_summary_input(
|
||||||
&items_to_summarise,
|
&items_to_summarise,
|
||||||
&default_refs,
|
&default_refs,
|
||||||
Some(task_snapshot_text.as_str()),
|
Some(task_snapshot_text.as_str()),
|
||||||
|
active_workflow_snapshot_text.as_deref(),
|
||||||
SummaryInputOptions {
|
SummaryInputOptions {
|
||||||
overview_target_tokens,
|
overview_target_tokens,
|
||||||
overview_warning_tokens,
|
overview_warning_tokens,
|
||||||
|
|
@ -2609,20 +2634,31 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
||||||
.filter(|i| i.is_user_message())
|
.filter(|i| i.is_user_message())
|
||||||
.count();
|
.count();
|
||||||
|
|
||||||
// Build new history: [summary, ...auto-read, references, ...retained, task snapshot, TaskList synthetic call/result].
|
// Build new history: [summary, ...auto-read, references, ...retained, active workflow snapshot, task snapshot, TaskList synthetic call/result].
|
||||||
|
// The active workflow snapshot is inserted from durable typed state so
|
||||||
|
// workflow-governed tasks keep their procedural authority after the
|
||||||
|
// compacted segment starts.
|
||||||
// The TaskStore snapshot trails the retained items so that, on resume,
|
// The TaskStore snapshot trails the retained items so that, on resume,
|
||||||
// `replay_history` walks any pre-compact Task* calls preserved verbatim
|
// `replay_history` walks any pre-compact Task* calls preserved verbatim
|
||||||
// in retained_items first and the trailing snapshot's `replace_with`
|
// in retained_items first and the trailing snapshot's `replace_with`
|
||||||
// is the final word — pre-compact `TaskCreate` calls cannot leak as
|
// is the final word — pre-compact `TaskCreate` calls cannot leak as
|
||||||
// duplicate entries.
|
// duplicate entries.
|
||||||
|
let active_workflow_message = self
|
||||||
|
.active_workflows
|
||||||
|
.rehydration_message()
|
||||||
|
.map(Item::system_message);
|
||||||
let mut new_history = Vec::with_capacity(
|
let mut new_history = Vec::with_capacity(
|
||||||
1 + auto_read_messages.len()
|
1 + auto_read_messages.len()
|
||||||
+ 3
|
+ 3
|
||||||
+ reference_message.is_some() as usize
|
+ reference_message.is_some() as usize
|
||||||
|
+ active_workflow_message.is_some() as usize
|
||||||
+ retained_items.len(),
|
+ retained_items.len(),
|
||||||
);
|
);
|
||||||
let mut compact_introduced_system_messages =
|
let mut compact_introduced_system_messages = Vec::with_capacity(
|
||||||
Vec::with_capacity(2 + auto_read_messages.len() + reference_message.is_some() as usize);
|
2 + auto_read_messages.len()
|
||||||
|
+ reference_message.is_some() as usize
|
||||||
|
+ active_workflow_message.is_some() as usize,
|
||||||
|
);
|
||||||
let summary_message =
|
let summary_message =
|
||||||
Item::system_message(format!("[Compacted context summary]\n\n{summary_text}"));
|
Item::system_message(format!("[Compacted context summary]\n\n{summary_text}"));
|
||||||
compact_introduced_system_messages.push(summary_message.clone());
|
compact_introduced_system_messages.push(summary_message.clone());
|
||||||
|
|
@ -2635,6 +2671,9 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
||||||
This is the complete session task list preserved across compaction. \
|
This is the complete session task list preserved across compaction. \
|
||||||
The following TaskList tool result presents the same state through the tool lane."
|
The following TaskList tool result presents the same state through the tool lane."
|
||||||
));
|
));
|
||||||
|
if let Some(msg) = active_workflow_message.as_ref() {
|
||||||
|
compact_introduced_system_messages.push(msg.clone());
|
||||||
|
}
|
||||||
compact_introduced_system_messages.push(task_snapshot_message.clone());
|
compact_introduced_system_messages.push(task_snapshot_message.clone());
|
||||||
|
|
||||||
new_history.push(summary_message);
|
new_history.push(summary_message);
|
||||||
|
|
@ -2643,6 +2682,9 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
||||||
new_history.push(msg);
|
new_history.push(msg);
|
||||||
}
|
}
|
||||||
new_history.extend(retained_items);
|
new_history.extend(retained_items);
|
||||||
|
if let Some(msg) = active_workflow_message {
|
||||||
|
new_history.push(msg);
|
||||||
|
}
|
||||||
new_history.push(task_snapshot_message);
|
new_history.push(task_snapshot_message);
|
||||||
new_history.push(Item::tool_call("compact-tasklist", "TaskList", "{}"));
|
new_history.push(Item::tool_call("compact-tasklist", "TaskList", "{}"));
|
||||||
new_history.push(Item::tool_result_with_content(
|
new_history.push(Item::tool_result_with_content(
|
||||||
|
|
@ -2680,18 +2722,23 @@ impl<C: LlmClient, St: Store> Pod<C, St> {
|
||||||
at_turn_index: source_turn_count,
|
at_turn_index: source_turn_count,
|
||||||
}),
|
}),
|
||||||
};
|
};
|
||||||
|
let active_workflow_extension = self.active_workflows.extension_entry();
|
||||||
|
let initial_entries = vec![entry.clone(), active_workflow_extension.clone()];
|
||||||
self.store
|
self.store
|
||||||
.create_segment(old_loc.session_id, new_segment_id, &[entry.clone()])?;
|
.create_segment(old_loc.session_id, new_segment_id, &initial_entries)?;
|
||||||
self.segment_state.set_location(SegmentLocation {
|
self.segment_state.set_location(SegmentLocation {
|
||||||
session_id: old_loc.session_id,
|
session_id: old_loc.session_id,
|
||||||
segment_id: new_segment_id,
|
segment_id: new_segment_id,
|
||||||
});
|
});
|
||||||
self.segment_state.set_entries_written(1);
|
self.segment_state
|
||||||
|
.set_entries_written(initial_entries.len());
|
||||||
let session_start = entry;
|
let session_start = entry;
|
||||||
// Broadcast the SegmentStart through the sink. This atomically
|
// Broadcast the SegmentStart through the sink. This atomically
|
||||||
// resets the mirror to `[SegmentStart]` so any subscriber
|
// resets the mirror to the replacement segment prefix so any subscriber
|
||||||
// querying after this point sees the post-compaction prefix.
|
// querying after this point sees the post-compaction prefix, including
|
||||||
self.sink.reset_with_initial(session_start);
|
// durable extension state.
|
||||||
|
self.sink
|
||||||
|
.reset_with_initial_entries(vec![session_start, active_workflow_extension]);
|
||||||
// Keep pods.json pointing at the live segment_id. Without this
|
// Keep pods.json pointing at the live segment_id. Without this
|
||||||
// a concurrent `restore_from_manifest(new_segment_id)` would
|
// a concurrent `restore_from_manifest(new_segment_id)` would
|
||||||
// see no live writer and grab the session this Pod just moved
|
// see no live writer and grab the session this Pod just moved
|
||||||
|
|
@ -3794,6 +3841,7 @@ where
|
||||||
usage_history: Arc::new(Mutex::new(Vec::new())),
|
usage_history: Arc::new(Mutex::new(Vec::new())),
|
||||||
tracker: None,
|
tracker: None,
|
||||||
task_feature: TaskFeature::new(),
|
task_feature: TaskFeature::new(),
|
||||||
|
active_workflows: ActiveWorkflowStore::new(),
|
||||||
system_prompt_template: common.system_prompt_template,
|
system_prompt_template: common.system_prompt_template,
|
||||||
alerter: None,
|
alerter: None,
|
||||||
event_tx: None,
|
event_tx: None,
|
||||||
|
|
@ -3902,6 +3950,7 @@ where
|
||||||
usage_history: Arc::new(Mutex::new(Vec::new())),
|
usage_history: Arc::new(Mutex::new(Vec::new())),
|
||||||
tracker: None,
|
tracker: None,
|
||||||
task_feature: TaskFeature::new(),
|
task_feature: TaskFeature::new(),
|
||||||
|
active_workflows: ActiveWorkflowStore::new(),
|
||||||
system_prompt_template: common.system_prompt_template,
|
system_prompt_template: common.system_prompt_template,
|
||||||
alerter: None,
|
alerter: None,
|
||||||
event_tx: None,
|
event_tx: None,
|
||||||
|
|
@ -4111,6 +4160,8 @@ where
|
||||||
|
|
||||||
let extract_pointer = memory::extract::fold_pointer(&state.extensions);
|
let extract_pointer = memory::extract::fold_pointer(&state.extensions);
|
||||||
let task_feature = TaskFeature::from_history(&state.history);
|
let task_feature = TaskFeature::from_history(&state.history);
|
||||||
|
let active_workflows = ActiveWorkflowStore::new();
|
||||||
|
active_workflows.restore_from_history_and_extensions(&state.history, &state.extensions);
|
||||||
let pod_metadata_writer = Some(pod_metadata_writer_for_store(&store));
|
let pod_metadata_writer = Some(pod_metadata_writer_for_store(&store));
|
||||||
|
|
||||||
let mut pod = Self {
|
let mut pod = Self {
|
||||||
|
|
@ -4131,6 +4182,7 @@ where
|
||||||
usage_history: Arc::new(Mutex::new(state.usage_history)),
|
usage_history: Arc::new(Mutex::new(state.usage_history)),
|
||||||
tracker: None,
|
tracker: None,
|
||||||
task_feature,
|
task_feature,
|
||||||
|
active_workflows,
|
||||||
// Restore replays the saved system_prompt verbatim — no
|
// Restore replays the saved system_prompt verbatim — no
|
||||||
// template re-render on resume.
|
// template re-render on resume.
|
||||||
system_prompt_template: None,
|
system_prompt_template: None,
|
||||||
|
|
@ -4335,12 +4387,13 @@ struct SummaryInputBuild {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Build the compact worker's input: default-reference instructions,
|
/// Build the compact worker's input: default-reference instructions,
|
||||||
/// the list of recently-touched files, task snapshot, and a bounded overview
|
/// the list of recently-touched files, task snapshot, active workflow snapshot,
|
||||||
/// rather than a prefix-wide transcript.
|
/// and a bounded overview rather than a prefix-wide transcript.
|
||||||
fn build_summary_input(
|
fn build_summary_input(
|
||||||
items: &[Item],
|
items: &[Item],
|
||||||
default_refs: &[PathBuf],
|
default_refs: &[PathBuf],
|
||||||
task_snapshot: Option<&str>,
|
task_snapshot: Option<&str>,
|
||||||
|
active_workflow_snapshot: Option<&str>,
|
||||||
options: SummaryInputOptions,
|
options: SummaryInputOptions,
|
||||||
) -> SummaryInputBuild {
|
) -> SummaryInputBuild {
|
||||||
let overview = build_summary_overview(
|
let overview = build_summary_overview(
|
||||||
|
|
@ -4392,6 +4445,17 @@ fn build_summary_input(
|
||||||
out.push_str(task_snapshot);
|
out.push_str(task_snapshot);
|
||||||
out.push_str("\n\n");
|
out.push_str("\n\n");
|
||||||
}
|
}
|
||||||
|
if let Some(active_workflow_snapshot) = active_workflow_snapshot {
|
||||||
|
out.push_str(
|
||||||
|
"## Active Workflow Invocation State\n\
|
||||||
|
This is durable typed workflow state for workflow-governed tasks. Preserve active \
|
||||||
|
slugs, invocation scope, status, obligations/checkpoints, and the snapshotted \
|
||||||
|
workflow guidance in the summary; do not substitute advertised/latest workflow \
|
||||||
|
resources for this invocation state.\n",
|
||||||
|
);
|
||||||
|
out.push_str(active_workflow_snapshot);
|
||||||
|
out.push_str("\n\n");
|
||||||
|
}
|
||||||
out.push_str("## Conversation overview/index\n");
|
out.push_str("## Conversation overview/index\n");
|
||||||
out.push_str(&overview);
|
out.push_str(&overview);
|
||||||
out.push_str("\n\nWhen you are done, call `write_summary` with the final 5-section text.");
|
out.push_str("\n\nWhen you are done, call `write_summary` with the final 5-section text.");
|
||||||
|
|
@ -5278,6 +5342,7 @@ mod build_summary_prompt_tests {
|
||||||
items,
|
items,
|
||||||
&[],
|
&[],
|
||||||
None,
|
None,
|
||||||
|
None,
|
||||||
SummaryInputOptions {
|
SummaryInputOptions {
|
||||||
overview_target_tokens: 512,
|
overview_target_tokens: 512,
|
||||||
overview_warning_tokens: 1024,
|
overview_warning_tokens: 1024,
|
||||||
|
|
@ -5326,6 +5391,27 @@ mod build_summary_prompt_tests {
|
||||||
assert!(!prompt.contains("deliberation"));
|
assert!(!prompt.contains("deliberation"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn includes_active_workflow_snapshot_section() {
|
||||||
|
let prompt = build_summary_input(
|
||||||
|
&[Item::user_message("continue after review")],
|
||||||
|
&[],
|
||||||
|
None,
|
||||||
|
Some("ActiveWorkflowStore: 1 active workflow\n- review before merge\n- close ticket"),
|
||||||
|
SummaryInputOptions {
|
||||||
|
overview_target_tokens: 512,
|
||||||
|
overview_warning_tokens: 1024,
|
||||||
|
overview_deadline_tokens: 2048,
|
||||||
|
summary_target_tokens: 256,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.text;
|
||||||
|
|
||||||
|
assert!(prompt.contains("## Active Workflow Invocation State"));
|
||||||
|
assert!(prompt.contains("review before merge"));
|
||||||
|
assert!(prompt.contains("close ticket"));
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn overview_warning_does_not_drop_input() {
|
fn overview_warning_does_not_drop_input() {
|
||||||
let items = vec![Item::user_message("x".repeat(4_000))];
|
let items = vec![Item::user_message("x".repeat(4_000))];
|
||||||
|
|
@ -5333,6 +5419,7 @@ mod build_summary_prompt_tests {
|
||||||
&items,
|
&items,
|
||||||
&[],
|
&[],
|
||||||
None,
|
None,
|
||||||
|
None,
|
||||||
SummaryInputOptions {
|
SummaryInputOptions {
|
||||||
overview_target_tokens: 10,
|
overview_target_tokens: 10,
|
||||||
overview_warning_tokens: 100,
|
overview_warning_tokens: 100,
|
||||||
|
|
@ -5352,6 +5439,7 @@ mod build_summary_prompt_tests {
|
||||||
&items,
|
&items,
|
||||||
&[],
|
&[],
|
||||||
None,
|
None,
|
||||||
|
None,
|
||||||
SummaryInputOptions {
|
SummaryInputOptions {
|
||||||
overview_target_tokens: 10,
|
overview_target_tokens: 10,
|
||||||
overview_warning_tokens: 10,
|
overview_warning_tokens: 10,
|
||||||
|
|
|
||||||
|
|
@ -147,6 +147,24 @@ impl SegmentLogSink {
|
||||||
let _ = self.inner.broadcast_tx.send(initial);
|
let _ = self.inner.broadcast_tx.send(initial);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Atomically swap the mirror to the supplied replacement-session prefix
|
||||||
|
/// and broadcast the first entry as the live rotation signal. Entries after
|
||||||
|
/// the first are already reflected in reconnect snapshots but are not
|
||||||
|
/// broadcast live; this is intended for non-live extension state that must
|
||||||
|
/// share the new segment prefix with SegmentStart.
|
||||||
|
pub fn reset_with_initial_entries(&self, entries: Vec<LogEntry>) {
|
||||||
|
let first = entries.first().cloned();
|
||||||
|
let mut mirror = self
|
||||||
|
.inner
|
||||||
|
.mirror
|
||||||
|
.lock()
|
||||||
|
.expect("session log mirror mutex poisoned");
|
||||||
|
*mirror = entries;
|
||||||
|
if let Some(initial) = first {
|
||||||
|
let _ = self.inner.broadcast_tx.send(initial);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Replace the mirror with the supplied prefix without broadcasting.
|
/// Replace the mirror with the supplied prefix without broadcasting.
|
||||||
///
|
///
|
||||||
/// Used by restore paths that load a session's complete log into
|
/// Used by restore paths that load a session's complete log into
|
||||||
|
|
|
||||||
|
|
@ -4,7 +4,7 @@ The conversation input is a bounded overview/index, not the full transcript. Tre
|
||||||
|
|
||||||
## Workflow
|
## Workflow
|
||||||
|
|
||||||
1. Read the provided overview/index and current TaskStore snapshot.
|
1. Read the provided overview/index, current TaskStore snapshot, and any Active Workflow Invocation State section.
|
||||||
2. If the overview does not contain enough detail, use `search_session_log` to find relevant compact-target history items, then `read_session_items` to inspect only the needed range.
|
2. If the overview does not contain enough detail, use `search_session_log` to find relevant compact-target history items, then `read_session_items` to inspect only the needed range.
|
||||||
3. Use `read_file` to inspect referenced files before deciding what the next session needs. Prefer skimming over blind inclusion.
|
3. Use `read_file` to inspect referenced files before deciding what the next session needs. Prefer skimming over blind inclusion.
|
||||||
4. For files whose current contents are load-bearing for the active work, call `mark_read_required` to inject them into the next session. These count against the auto-read token budget — spend it deliberately.
|
4. For files whose current contents are load-bearing for the active work, call `mark_read_required` to inject them into the next session. These count against the auto-read token budget — spend it deliberately.
|
||||||
|
|
@ -39,5 +39,6 @@ Produce the summary in this exact format:
|
||||||
|
|
||||||
## Constraints
|
## Constraints
|
||||||
|
|
||||||
|
- Preserve active workflow invocation state when present: active slug, invocation scope/source/time, status, open obligations/checkpoints, and snapshotted workflow guidance. Do not replace a snapshotted invocation with merely advertised/latest workflow resources.
|
||||||
- Keep code snippets and raw tool output OUT of the summary — that is what auto-read and references are for.
|
- Keep code snippets and raw tool output OUT of the summary — that is what auto-read and references are for.
|
||||||
- Follow the summary target stated in the run input; if asked to shrink, call `write_summary` again with a shorter version.
|
- Follow the summary target stated in the run input; if asked to shrink, call `write_summary` again with a shorter version.
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user