diff --git a/crates/memory/src/lib.rs b/crates/memory/src/lib.rs index 622c9fc2..b34475d6 100644 --- a/crates/memory/src/lib.rs +++ b/crates/memory/src/lib.rs @@ -15,6 +15,7 @@ pub mod schema; pub mod scope; pub mod slug; pub mod tool; +pub mod workflow; pub mod workspace; pub use error::{LintError, LintWarning, MemoryError}; @@ -23,4 +24,8 @@ pub use linter::{LintReport, Linter}; pub use resident::{ResidentKnowledgeEntry, collect_resident_knowledge}; pub use scope::deny_write_rules; pub use slug::Slug; +pub use workflow::{ + ResidentWorkflowEntry, WORKFLOW_DESCRIPTION_HARD_CAP, WorkflowLoadError, WorkflowRecord, + WorkflowRegistry, load_workflows, +}; pub use workspace::WorkspaceLayout; diff --git a/crates/memory/src/linter/frontmatter.rs b/crates/memory/src/linter/frontmatter.rs index db05d72f..767e2ac2 100644 --- a/crates/memory/src/linter/frontmatter.rs +++ b/crates/memory/src/linter/frontmatter.rs @@ -44,7 +44,6 @@ fn parse_missing_field(msg: &str) -> Option<&'static str> { "model_invokation", "user_invocable", "last_sources", - "auto_invoke", "requires", ]; FIELDS.iter().copied().find(|n| *n == field_name) diff --git a/crates/memory/src/linter/mod.rs b/crates/memory/src/linter/mod.rs index 5b0fced7..a526ab0c 100644 --- a/crates/memory/src/linter/mod.rs +++ b/crates/memory/src/linter/mod.rs @@ -323,10 +323,9 @@ mod tests { fn workflow_write_rejected() { let (dir, linter) = workspace(); let path = dir.path().join(".insomnia/memory/workflow/wf.md"); - let content = format!( - "---\nupdated_at: {now}\ndescription: x\nauto_invoke: false\nuser_invocable: true\n---\nbody", - now = iso_now() - ); + let content = + "---\ndescription: x\nmodel_invokation: false\nuser_invocable: true\n---\nbody" + .to_string(); let report = linter.lint(&path, &content, WriteMode::Create); assert!( report @@ -499,10 +498,7 @@ mod tests { n = iso_now() ), ); - let wf = format!( - "---\nupdated_at: {n}\ndescription: do thing\nauto_invoke: false\nuser_invocable: true\nrequires: [foo]\n---\nstep 1\n", - n = iso_now() - ); + let wf = "---\ndescription: do thing\nmodel_invokation: false\nuser_invocable: true\nrequires: [foo]\n---\nstep 1\n".to_string(); let report = linter.lint_workflow(&wf); assert!(!report.has_errors(), "got errors: {:?}", report.errors); } @@ -510,10 +506,7 @@ mod tests { #[test] fn workflow_lint_flags_unknown_requires() { let (_dir, linter) = workspace(); - let wf = format!( - "---\nupdated_at: {n}\ndescription: x\nauto_invoke: false\nuser_invocable: true\nrequires: [missing-knowledge]\n---\n", - n = iso_now() - ); + let wf = "---\ndescription: x\nmodel_invokation: false\nuser_invocable: true\nrequires: [missing-knowledge]\n---\n".to_string(); let report = linter.lint_workflow(&wf); assert!(report.errors.iter().any(|e| matches!( e, @@ -528,10 +521,7 @@ mod tests { #[test] fn workflow_lint_collects_multiple_unknown_requires() { let (_dir, linter) = workspace(); - let wf = format!( - "---\nupdated_at: {n}\ndescription: x\nauto_invoke: false\nuser_invocable: true\nrequires: [a, b, c]\n---\n", - n = iso_now() - ); + let wf = "---\ndescription: x\nmodel_invokation: false\nuser_invocable: true\nrequires: [a, b, c]\n---\n".to_string(); let report = linter.lint_workflow(&wf); let unknown_count = report .errors diff --git a/crates/memory/src/schema/workflow.rs b/crates/memory/src/schema/workflow.rs index b3ac0662..7c38f606 100644 --- a/crates/memory/src/schema/workflow.rs +++ b/crates/memory/src/schema/workflow.rs @@ -13,25 +13,37 @@ use crate::slug::Slug; #[derive(Debug, Clone, Deserialize, Serialize)] pub struct WorkflowFrontmatter { - /// Workflows don't carry sources/created_at requirements in the - /// plan doc; only `updated_at` is required at the schema level. - pub updated_at: DateTime, + /// Workflows do not require timestamps in the MVP. Human-authored files + /// may carry them; when absent the linter uses Unix epoch as a neutral + /// placeholder for the shared `Frontmatter` trait. + #[serde(default)] + pub updated_at: Option>, #[serde(default)] pub created_at: Option>, pub description: String, - pub auto_invoke: bool, + #[serde(default)] + pub model_invokation: bool, + #[serde(default = "default_user_invocable")] pub user_invocable: bool, #[serde(default)] pub requires: Vec, } +fn default_user_invocable() -> bool { + true +} + +fn epoch() -> DateTime { + DateTime::::from_timestamp(0, 0).expect("Unix epoch timestamp is valid") +} + impl Frontmatter for WorkflowFrontmatter { const BODY_LIMIT: usize = 8000; fn created_at(&self) -> DateTime { - self.created_at.unwrap_or(self.updated_at) + self.created_at.or(self.updated_at).unwrap_or_else(epoch) } fn updated_at(&self) -> DateTime { - self.updated_at + self.updated_at.unwrap_or_else(epoch) } } diff --git a/crates/memory/src/scope.rs b/crates/memory/src/scope.rs index 5c4a60f9..89b29633 100644 --- a/crates/memory/src/scope.rs +++ b/crates/memory/src/scope.rs @@ -15,7 +15,7 @@ use crate::workspace::WorkspaceLayout; /// Build deny rules that strip Write permission from `/memory/` /// and `/knowledge/`. Recursive — every descendant is capped -/// at Read for the generic tools. +/// at Read for the generic tools, including `memory/workflow/`. pub fn deny_write_rules(layout: &WorkspaceLayout) -> Vec { vec![ deny_write(layout.memory_dir().as_path()), diff --git a/crates/memory/src/workflow.rs b/crates/memory/src/workflow.rs new file mode 100644 index 00000000..c9adc66f --- /dev/null +++ b/crates/memory/src/workflow.rs @@ -0,0 +1,313 @@ +//! Workflow loader and registry. +//! +//! Workflows live under `/.insomnia/memory/workflow/.md`. +//! They are human-authored Markdown documents with YAML frontmatter. The loader +//! is intentionally strict about malformed records because Pod startup should +//! fail rather than silently ignoring a broken procedural instruction. + +use std::collections::BTreeMap; +use std::io; +use std::path::{Path, PathBuf}; + +use thiserror::Error; +use tracing::warn; + +use crate::error::LintError; +use crate::schema::{WorkflowFrontmatter, split_frontmatter}; +use crate::slug::Slug; +use crate::workspace::WorkspaceLayout; + +/// Hard cap on Workflow descriptions that are advertised resident. +/// Mirrors agent-skills and resident Knowledge descriptions. +pub const WORKFLOW_DESCRIPTION_HARD_CAP: usize = 1024; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct WorkflowRecord { + pub slug: Slug, + pub description: String, + pub model_invokation: bool, + pub user_invocable: bool, + pub requires: Vec, + /// Markdown body after the closing frontmatter delimiter. + pub body: String, + pub path: PathBuf, +} + +#[derive(Debug, Clone, Default)] +pub struct WorkflowRegistry { + records: BTreeMap, +} + +impl WorkflowRegistry { + pub fn empty() -> Self { + Self::default() + } + + pub fn len(&self) -> usize { + self.records.len() + } + + pub fn is_empty(&self) -> bool { + self.records.is_empty() + } + + pub fn get(&self, slug: &Slug) -> Option<&WorkflowRecord> { + self.records.get(slug) + } + + pub fn iter(&self) -> impl Iterator { + self.records.values() + } + + pub fn resident_entries(&self) -> Vec { + self.records + .values() + .filter(|record| record.model_invokation) + .map(|record| ResidentWorkflowEntry { + slug: record.slug.to_string(), + description: record.description.clone(), + }) + .collect() + } + + pub fn list_user_invocable(&self, prefix: &str) -> Vec { + self.records + .values() + .filter(|record| record.user_invocable && record.slug.as_str().starts_with(prefix)) + .map(|record| record.slug.to_string()) + .collect() + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ResidentWorkflowEntry { + pub slug: String, + pub description: String, +} + +#[derive(Debug, Error)] +pub enum WorkflowLoadError { + #[error("failed to read workflow directory {}: {source}", .dir.display())] + ReadDir { dir: PathBuf, source: io::Error }, + #[error("failed to read workflow file {}: {source}", .path.display())] + ReadFile { path: PathBuf, source: io::Error }, + #[error("invalid workflow file name {}: {source}", .path.display())] + InvalidSlug { path: PathBuf, source: LintError }, + #[error("invalid workflow frontmatter in {}: {source}", .path.display())] + Frontmatter { path: PathBuf, source: LintError }, + #[error( + "Workflow {} with model_invokation: true cannot have description longer than {limit} chars (got {actual})", + .path.display() + )] + DescriptionTooLong { + path: PathBuf, + actual: usize, + limit: usize, + }, +} + +pub fn load_workflows(layout: &WorkspaceLayout) -> Result { + let dir = layout.workflow_dir(); + let entries = match std::fs::read_dir(&dir) { + Ok(entries) => entries, + Err(err) if err.kind() == io::ErrorKind::NotFound => { + return Ok(WorkflowRegistry::empty()); + } + Err(source) => return Err(WorkflowLoadError::ReadDir { dir, source }), + }; + + let mut paths = Vec::new(); + for entry in entries { + let entry = entry.map_err(|source| WorkflowLoadError::ReadDir { + dir: dir.clone(), + source, + })?; + let path = entry.path(); + if path.is_file() && path.extension().and_then(|e| e.to_str()) == Some("md") { + paths.push(path); + } + } + paths.sort(); + + let mut records = BTreeMap::new(); + for path in paths { + let Some(stem) = path.file_stem().and_then(|s| s.to_str()) else { + continue; + }; + let slug = + Slug::parse(stem.to_string()).map_err(|source| WorkflowLoadError::InvalidSlug { + path: path.clone(), + source, + })?; + if records.contains_key(&slug) { + warn!(slug = %slug, path = %path.display(), "duplicate workflow slug encountered; keeping first record"); + continue; + } + let raw = std::fs::read_to_string(&path).map_err(|source| WorkflowLoadError::ReadFile { + path: path.clone(), + source, + })?; + let (yaml, body) = + split_frontmatter(&raw).map_err(|source| WorkflowLoadError::Frontmatter { + path: path.clone(), + source, + })?; + warn_unknown_workflow_fields(&path, yaml); + let frontmatter: WorkflowFrontmatter = + serde_yaml::from_str(yaml).map_err(|err| WorkflowLoadError::Frontmatter { + path: path.clone(), + source: map_serde_workflow_error(err), + })?; + if frontmatter.model_invokation + && frontmatter.description.chars().count() > WORKFLOW_DESCRIPTION_HARD_CAP + { + return Err(WorkflowLoadError::DescriptionTooLong { + path, + actual: frontmatter.description.chars().count(), + limit: WORKFLOW_DESCRIPTION_HARD_CAP, + }); + } + + let record = WorkflowRecord { + slug: slug.clone(), + description: frontmatter.description, + model_invokation: frontmatter.model_invokation, + user_invocable: frontmatter.user_invocable, + requires: frontmatter.requires, + body: body.to_string(), + path: path.clone(), + }; + records.insert(slug.clone(), record); + } + + Ok(WorkflowRegistry { records }) +} + +fn warn_unknown_workflow_fields(path: &Path, yaml: &str) { + let Ok(value) = serde_yaml::from_str::(yaml) else { + return; + }; + let Some(map) = value.as_mapping() else { + return; + }; + for key in map.keys().filter_map(|k| k.as_str()) { + if !matches!( + key, + "description" + | "model_invokation" + | "user_invocable" + | "requires" + | "created_at" + | "updated_at" + ) { + warn!(path = %path.display(), field = key, "unknown workflow frontmatter field ignored"); + } + } +} + +fn map_serde_workflow_error(err: serde_yaml::Error) -> LintError { + let msg = err.to_string(); + if let Some(field) = parse_missing_field(&msg) { + return LintError::MissingField(field); + } + LintError::MalformedFrontmatter(msg) +} + +fn parse_missing_field(msg: &str) -> Option<&'static str> { + let needle = "missing field `"; + let start = msg.find(needle)? + needle.len(); + let end = msg[start..].find('`')? + start; + match &msg[start..end] { + "description" => Some("description"), + "model_invokation" => Some("model_invokation"), + "user_invocable" => Some("user_invocable"), + "requires" => Some("requires"), + _ => None, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::TempDir; + + fn setup() -> (TempDir, WorkspaceLayout) { + let dir = TempDir::new().unwrap(); + std::fs::create_dir_all(dir.path().join(".insomnia/memory/workflow")).unwrap(); + let layout = WorkspaceLayout::new(dir.path().to_path_buf()); + (dir, layout) + } + + fn write_workflow(root: &Path, slug: &str, frontmatter: &str, body: &str) { + let path = root + .join(".insomnia/memory/workflow") + .join(format!("{slug}.md")); + std::fs::write(path, format!("---\n{frontmatter}\n---\n{body}")).unwrap(); + } + + #[test] + fn missing_directory_loads_empty_registry() { + let dir = TempDir::new().unwrap(); + let layout = WorkspaceLayout::new(dir.path().to_path_buf()); + let got = load_workflows(&layout).unwrap(); + assert!(got.is_empty()); + } + + #[test] + fn loads_valid_workflow_with_default_flags() { + let (dir, layout) = setup(); + write_workflow(dir.path(), "do-thing", "description: Do thing", "Step 1"); + let got = load_workflows(&layout).unwrap(); + let slug = Slug::parse("do-thing").unwrap(); + let record = got.get(&slug).unwrap(); + assert_eq!(record.description, "Do thing"); + assert!(!record.model_invokation); + assert!(record.user_invocable); + assert!(record.requires.is_empty()); + assert_eq!(record.body, "Step 1"); + } + + #[test] + fn model_invokation_uses_typo_field() { + let (dir, layout) = setup(); + write_workflow( + dir.path(), + "auto", + "description: Auto\nmodel_invokation: true\nuser_invocable: false", + "Body", + ); + let got = load_workflows(&layout).unwrap(); + assert_eq!(got.resident_entries()[0].slug, "auto"); + assert!(got.list_user_invocable("").is_empty()); + } + + #[test] + fn invalid_filename_is_hard_error() { + let (dir, layout) = setup(); + write_workflow(dir.path(), "Bad", "description: Bad", "Body"); + let err = load_workflows(&layout).unwrap_err(); + assert!(matches!(err, WorkflowLoadError::InvalidSlug { .. })); + } + + #[test] + fn missing_description_is_hard_error() { + let (dir, layout) = setup(); + write_workflow(dir.path(), "bad", "model_invokation: false", "Body"); + let err = load_workflows(&layout).unwrap_err(); + assert!(matches!(err, WorkflowLoadError::Frontmatter { .. })); + } + + #[test] + fn resident_description_cap_is_enforced() { + let (dir, layout) = setup(); + let desc = "x".repeat(WORKFLOW_DESCRIPTION_HARD_CAP + 1); + write_workflow( + dir.path(), + "bad", + &format!("description: {desc}\nmodel_invokation: true"), + "Body", + ); + let err = load_workflows(&layout).unwrap_err(); + assert!(matches!(err, WorkflowLoadError::DescriptionTooLong { .. })); + } +} diff --git a/crates/pod/src/controller.rs b/crates/pod/src/controller.rs index fc33a7a8..63cafa0c 100644 --- a/crates/pod/src/controller.rs +++ b/crates/pod/src/controller.rs @@ -251,19 +251,15 @@ impl PodController { permission: manifest::Permission::Read, recursive: true, }); - let scope_with_bash = manifest::Scope::from_config(&scope_config) - .map_err(std::io::Error::other)?; + let scope_with_bash = + manifest::Scope::from_config(&scope_config).map_err(std::io::Error::other)?; let fs = tools::ScopedFs::new(scope_with_bash, pwd_for_tools.clone()); let tracker = tools::Tracker::new(); // The same ScopedFs also powers the IPC `ListCompletions` // query — keep a clone for the FS view we attach below, // since the tools consume `fs` itself. fs_for_view = fs.clone(); - worker.register_tools(tools::builtin_tools( - fs, - tracker.clone(), - bash_output_dir, - )); + worker.register_tools(tools::builtin_tools(fs, tracker.clone(), bash_output_dir)); // Memory subsystem opt-in. When `[memory]` is present in // the manifest, register the memory-specific Read/Write/Edit @@ -316,6 +312,12 @@ impl PodController { shared_state.update_history(pod.worker().history().to_vec()); shared_state.set_user_segments(pod.user_segments().to_vec()); shared_state.set_fs_view(crate::fs_view::PodFsView::new(fs_for_view)); + shared_state.set_workflows( + pod.workflow_completions() + .into_iter() + .map(|slug| crate::shared_state::WorkflowCandidate { slug }) + .collect(), + ); runtime_dir.write_manifest(&manifest_toml).await?; runtime_dir.write_status(&shared_state).await?; runtime_dir.write_history(&shared_state).await?; @@ -360,6 +362,14 @@ impl PodController { }); continue; } + let was_paused = status_before == PodStatus::Paused; + if let Err(e) = pod.validate_workflow_invocations(&input) { + let _ = event_tx.send(Event::Error { + code: ErrorCode::InvalidRequest, + message: e.to_string(), + }); + continue; + } // Broadcast the accepted user message so every // subscriber (including the submitter) can // render the turn header + user line from a @@ -369,7 +379,6 @@ impl PodController { let _ = event_tx.send(Event::UserMessage { segments: input.clone(), }); - let was_paused = status_before == PodStatus::Paused; shared_state.set_status(PodStatus::Running); let _ = runtime_dir.write_status(&shared_state).await; diff --git a/crates/pod/src/factory.rs b/crates/pod/src/factory.rs index 072d92e5..897e07d2 100644 --- a/crates/pod/src/factory.rs +++ b/crates/pod/src/factory.rs @@ -647,6 +647,7 @@ permission = "write" tool_names: Vec::new(), agents_md: None, resident_knowledge: None, + resident_workflows: None, prompts: &catalog, }; let rendered = tmpl.render(&ctx).unwrap(); diff --git a/crates/pod/src/ipc/server.rs b/crates/pod/src/ipc/server.rs index d9e5d307..a414a653 100644 --- a/crates/pod/src/ipc/server.rs +++ b/crates/pod/src/ipc/server.rs @@ -102,11 +102,16 @@ async fn handle_connection(stream: tokio::net::UnixStream, handle: PodHandle) { is_dir: c.is_dir, }) .collect(), - // Knowledge / Workflow resolvers are not wired - // up yet — reply empty so the TUI sees a - // consistent shape regardless of kind. - protocol::CompletionKind::Knowledge - | protocol::CompletionKind::Workflow => Vec::new(), + protocol::CompletionKind::Knowledge => Vec::new(), + protocol::CompletionKind::Workflow => handle + .shared_state + .list_workflow_completions(&prefix) + .into_iter() + .map(|c| protocol::CompletionEntry { + value: c.slug, + is_dir: false, + }) + .collect(), }; if writer .write(&Event::Completions { kind, entries }) diff --git a/crates/pod/src/lib.rs b/crates/pod/src/lib.rs index 3b1e592c..8b8bcdfd 100644 --- a/crates/pod/src/lib.rs +++ b/crates/pod/src/lib.rs @@ -7,6 +7,7 @@ pub mod prompt; pub mod runtime; pub mod shared_state; pub mod spawn; +pub mod workflow; mod factory; mod interrupt_and_run; diff --git a/crates/pod/src/pod.rs b/crates/pod/src/pod.rs index ffca2185..4eee9fec 100644 --- a/crates/pod/src/pod.rs +++ b/crates/pod/src/pod.rs @@ -27,6 +27,7 @@ use crate::prompt::loader::PromptLoader; use crate::prompt::system::{SystemPromptContext, SystemPromptError, SystemPromptTemplate}; use crate::runtime::dir; use crate::runtime::pod_registry::{self, ScopeAllocationGuard, ScopeLockError}; +use crate::workflow::WorkflowResolveError; use async_trait::async_trait; use llm_worker::interceptor::PreRequestAction; use protocol::{AlertLevel, AlertSource, Event, Segment}; @@ -124,6 +125,12 @@ pub struct Pod { /// [`Self::from_manifest`], or defaults to the builtin pack when a /// Pod is constructed through lower-level paths that have no loader. prompts: Arc, + /// Registry loaded from `/.insomnia/memory/workflow/*.md` + /// when memory is enabled. Missing memory config keeps this empty. + workflow_registry: memory::WorkflowRegistry, + /// Memory workspace layout used by the workflow resolver to load required + /// Knowledge records by exact slug. + memory_layout: Option, /// When true (default), the system-prompt assembler walks /// `/knowledge/*` and appends a `## Resident knowledge` /// section listing records with `model_invokation: true`. @@ -200,6 +207,8 @@ impl Pod { scope_allocation: None, callback_socket: None, prompts, + workflow_registry: memory::WorkflowRegistry::empty(), + memory_layout: None, inject_resident_knowledge: true, extract_in_flight: Arc::new(AtomicBool::new(false)), consolidation_in_flight: Arc::new(AtomicBool::new(false)), @@ -565,23 +574,31 @@ impl Pod { // Owned `Vec` lives for the duration of `render` below; the // context borrows a slice into it. let resident: Vec = if self.inject_resident_knowledge { - self.manifest - .memory + self.memory_layout .as_ref() - .map(|mem| { - let layout = memory::WorkspaceLayout::resolve(mem, &self.pwd); - memory::collect_resident_knowledge(&layout) - }) + .map(memory::collect_resident_knowledge) .unwrap_or_default() } else { Vec::new() }; let resident_slice: Option<&[memory::ResidentKnowledgeEntry]> = - if self.inject_resident_knowledge && self.manifest.memory.is_some() { + if self.inject_resident_knowledge && self.memory_layout.is_some() { Some(&resident) } else { None }; + let resident_workflows: Vec = + if self.inject_resident_knowledge && self.memory_layout.is_some() { + self.workflow_registry.resident_entries() + } else { + Vec::new() + }; + let resident_workflow_slice: Option<&[memory::ResidentWorkflowEntry]> = + if self.inject_resident_knowledge && self.memory_layout.is_some() { + Some(&resident_workflows) + } else { + None + }; let ctx = SystemPromptContext { now: chrono::Utc::now(), cwd: &self.pwd, @@ -589,6 +606,7 @@ impl Pod { tool_names, agents_md: agents_md_read.body, resident_knowledge: resident_slice, + resident_workflows: resident_workflow_slice, prompts: &self.prompts, }; let rendered = template @@ -636,11 +654,12 @@ impl Pod { .await?; self.user_segments.push(input.clone()); - // Resolve `@` refs to system messages stashed for the - // PodInterceptor to attach right after the user message. Failures - // surface as user-facing Alerts and the placeholder remains in - // the flattened text so the LLM sees the unresolved intent. - let attachments = self.resolve_file_refs(&input); + // Resolve `@` refs and `/` workflow invocations to + // system messages stashed for the PodInterceptor to attach right + // after the user message. File failures are non-fatal alerts; explicit + // workflow invocation failures abort before the Worker sees the turn. + let mut attachments = self.resolve_file_refs(&input); + attachments.extend(self.resolve_workflow_invocations(&input)?); if !attachments.is_empty() { *self .pending_attachments @@ -690,6 +709,63 @@ impl Pod { out } + fn resolve_workflow_invocations( + &self, + segments: &[Segment], + ) -> Result, WorkflowResolveError> { + let Some(layout) = self.memory_layout.as_ref() else { + if let Some(slug) = segments.iter().find_map(|seg| match seg { + Segment::WorkflowInvoke { slug } => Some(slug.clone()), + _ => None, + }) { + return Err(WorkflowResolveError::NotFound { slug }); + } + return Ok(Vec::new()); + }; + let mut out = Vec::new(); + for seg in segments { + let Segment::WorkflowInvoke { slug } = seg else { + continue; + }; + let items = crate::workflow::resolve_workflow_invocation( + &self.workflow_registry, + layout, + slug, + )?; + out.extend(items); + } + Ok(out) + } + + /// Validate explicit workflow invocations without reading dependency + /// bodies. Used by the controller before broadcasting `UserMessage` so + /// user-invocation errors are returned immediately and never reach the + /// Worker or client history. + pub fn validate_workflow_invocations( + &self, + segments: &[Segment], + ) -> Result<(), WorkflowResolveError> { + for seg in segments { + let Segment::WorkflowInvoke { slug } = seg else { + continue; + }; + let parsed = + memory::Slug::parse(slug.clone()).map_err(WorkflowResolveError::InvalidSlug)?; + let record = self + .workflow_registry + .get(&parsed) + .ok_or_else(|| WorkflowResolveError::NotFound { slug: slug.clone() })?; + if !record.user_invocable { + return Err(WorkflowResolveError::NotUserInvocable { slug: slug.clone() }); + } + } + Ok(()) + } + + pub fn workflow_completions(&self) -> Vec { + self.workflow_registry.list_user_invocable("") + } + /// Flatten a typed segment list into the single string the Worker /// receives as the user message, and emit user-facing alerts for /// segments that fall through to placeholder (knowledge / workflow @@ -711,16 +787,7 @@ impl Pod { ), ); } - Segment::WorkflowInvoke { slug } => { - self.alert( - AlertLevel::Warn, - AlertSource::Pod, - format!( - "workflow /{slug} cannot be resolved \ - (resolver not yet implemented); passed to LLM as placeholder" - ), - ); - } + Segment::WorkflowInvoke { .. } => {} Segment::Unknown => { self.alert( AlertLevel::Warn, @@ -1652,7 +1719,10 @@ impl Pod { worker.register_tool(memory::tool::write_tool(layout.clone())); worker.register_tool(memory::tool::edit_tool(layout.clone())); worker.register_tool(memory::tool::memory_query_tool(layout.clone(), query_cfg)); - worker.register_tool(memory::tool::knowledge_query_tool(layout.clone(), query_cfg)); + worker.register_tool(memory::tool::knowledge_query_tool( + layout.clone(), + query_cfg, + )); let tidy = consolidate::collect_tidy_hints(&layout); let candidates = consolidate::KnowledgeCandidateReport::empty(); @@ -1809,6 +1879,8 @@ impl Pod, St> { scope_allocation: Some(scope_allocation), callback_socket: None, prompts: common.prompts, + workflow_registry: common.workflow_registry, + memory_layout: common.memory_layout, inject_resident_knowledge: true, extract_in_flight: Arc::new(AtomicBool::new(false)), consolidation_in_flight: Arc::new(AtomicBool::new(false)), @@ -1867,6 +1939,8 @@ impl Pod, St> { scope_allocation: Some(scope_allocation), callback_socket: Some(callback_socket), prompts: common.prompts, + workflow_registry: common.workflow_registry, + memory_layout: common.memory_layout, inject_resident_knowledge: true, extract_in_flight: Arc::new(AtomicBool::new(false)), consolidation_in_flight: Arc::new(AtomicBool::new(false)), @@ -1977,6 +2051,8 @@ impl Pod, St> { scope_allocation: Some(scope_allocation), callback_socket: None, prompts: common.prompts, + workflow_registry: common.workflow_registry, + memory_layout: common.memory_layout, inject_resident_knowledge: true, extract_in_flight: Arc::new(AtomicBool::new(false)), consolidation_in_flight: Arc::new(AtomicBool::new(false)), @@ -2184,6 +2260,12 @@ pub enum PodError { #[error("memory Phase 2 lock acquisition failed: {0}")] ConsolidationLock(#[source] memory::consolidate::LockError), + #[error("workflow load failed: {0}")] + WorkflowLoad(#[source] memory::WorkflowLoadError), + + #[error("workflow invocation failed: {0}")] + WorkflowResolve(#[from] WorkflowResolveError), + #[error("session {session_id} has no entries to restore")] SessionEmpty { session_id: SessionId }, } @@ -2197,6 +2279,8 @@ struct PodCommon { scope: Scope, client: Box, prompts: Arc, + workflow_registry: memory::WorkflowRegistry, + memory_layout: Option, system_prompt_template: Option, } @@ -2223,6 +2307,14 @@ fn prepare_pod_common( let client = provider::build_client(&manifest.model)?; let prompts = PromptCatalog::load(loader, manifest.pod.prompt_pack.as_deref())?; + let memory_layout = manifest + .memory + .as_ref() + .map(|mem| memory::WorkspaceLayout::resolve(mem, &pwd)); + let workflow_registry = match memory_layout.as_ref() { + Some(layout) => memory::load_workflows(layout).map_err(PodError::WorkflowLoad)?, + None => memory::WorkflowRegistry::empty(), + }; let system_prompt_template = if parse_template { Some( @@ -2238,6 +2330,8 @@ fn prepare_pod_common( scope, client, prompts, + workflow_registry, + memory_layout, system_prompt_template, }) } diff --git a/crates/pod/src/prompt/catalog.rs b/crates/pod/src/prompt/catalog.rs index 57e60b23..28e2632d 100644 --- a/crates/pod/src/prompt/catalog.rs +++ b/crates/pod/src/prompt/catalog.rs @@ -79,6 +79,10 @@ pub enum PodPrompt { /// AGENTS.md section when memory is enabled and at least one /// `knowledge/*` record advertises `model_invokation: true`. ResidentKnowledgeSection, + /// Trailing `## Resident workflows` section, appended after resident + /// knowledge when memory is enabled and at least one workflow advertises + /// `model_invokation: true`. + ResidentWorkflowsSection, } impl PodPrompt { @@ -91,6 +95,7 @@ impl PodPrompt { Self::WorkingBoundariesSection => "working_boundaries_section", Self::AgentsMdSection => "agents_md_section", Self::ResidentKnowledgeSection => "resident_knowledge_section", + Self::ResidentWorkflowsSection => "resident_workflows_section", } } @@ -105,6 +110,7 @@ impl PodPrompt { PodPrompt::WorkingBoundariesSection, PodPrompt::AgentsMdSection, PodPrompt::ResidentKnowledgeSection, + PodPrompt::ResidentWorkflowsSection, ]; pub const KEYS: &'static [&'static str] = &[ @@ -115,6 +121,7 @@ impl PodPrompt { "working_boundaries_section", "agents_md_section", "resident_knowledge_section", + "resident_workflows_section", ]; } @@ -330,6 +337,15 @@ impl PromptCatalog { single("entries", entries), ) } + + /// Render `PodPrompt::ResidentWorkflowsSection` with `{{ entries }}` + /// (a pre-formatted list block authored by the caller). + pub fn resident_workflows_section(&self, entries: &str) -> Result { + self.render( + PodPrompt::ResidentWorkflowsSection, + single("entries", entries), + ) + } } fn single(key: &'static str, value: &str) -> Value { diff --git a/crates/pod/src/prompt/system.rs b/crates/pod/src/prompt/system.rs index dfbb94e8..e782f568 100644 --- a/crates/pod/src/prompt/system.rs +++ b/crates/pod/src/prompt/system.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use chrono::{DateTime, SecondsFormat, Utc}; use manifest::Scope; -use memory::ResidentKnowledgeEntry; +use memory::{ResidentKnowledgeEntry, ResidentWorkflowEntry}; use minijinja::value::Value; use minijinja::{Environment, ErrorKind, UndefinedBehavior}; use thiserror::Error; @@ -122,6 +122,7 @@ impl SystemPromptTemplate { ctx.scope, ctx.agents_md.as_deref(), ctx.resident_knowledge, + ctx.resident_workflows, ) } } @@ -153,6 +154,10 @@ pub struct SystemPromptContext<'a> { /// section entirely (memory disabled, or a Phase 2 worker that opts /// out); `Some(&[])` also yields no section. pub resident_knowledge: Option<&'a [ResidentKnowledgeEntry]>, + /// Resident workflow descriptions from `/memory/workflow/*` + /// whose frontmatter has `model_invokation: true`. `None` disables the + /// section; Phase 2 workers opt out together with resident Knowledge. + pub resident_workflows: Option<&'a [ResidentWorkflowEntry]>, /// Catalog used to render the fixed trailing section headers. /// Passed by reference so callers do not give up ownership across /// the short-lived render borrow. @@ -201,6 +206,7 @@ pub fn append_trailing_section( scope: &Scope, agents_md: Option<&str>, resident_knowledge: Option<&[ResidentKnowledgeEntry]>, + resident_workflows: Option<&[ResidentWorkflowEntry]>, ) -> Result { let mut out = String::with_capacity(body.len() + 256); out.push_str(body); @@ -227,6 +233,15 @@ pub fn append_trailing_section( out.push('\n'); } } + if let Some(entries) = resident_workflows { + if !entries.is_empty() { + out.push('\n'); + let formatted = format_resident_workflow_entries(entries); + let section = prompts.resident_workflows_section(&formatted)?; + out.push_str(section.trim_end_matches(&['\n', ' '][..])); + out.push('\n'); + } + } // Canonicalise the tail so the emitted prompt has a single form // regardless of how individual templates chose to end. while out.ends_with('\n') || out.ends_with(' ') { @@ -238,15 +253,31 @@ pub fn append_trailing_section( /// `- : ` per line. Description newlines are folded /// to spaces so a single entry stays on one row in the rendered prompt. fn format_resident_knowledge_entries(entries: &[ResidentKnowledgeEntry]) -> String { + format_resident_entries( + entries + .iter() + .map(|e| (e.slug.as_str(), e.description.as_str())), + ) +} + +fn format_resident_workflow_entries(entries: &[ResidentWorkflowEntry]) -> String { + format_resident_entries( + entries + .iter() + .map(|e| (e.slug.as_str(), e.description.as_str())), + ) +} + +fn format_resident_entries<'a>(entries: impl Iterator) -> String { let mut out = String::new(); - for (i, e) in entries.iter().enumerate() { + for (i, (slug, description)) in entries.enumerate() { if i > 0 { out.push('\n'); } out.push_str("- "); - out.push_str(&e.slug); + out.push_str(slug); out.push_str(": "); - for ch in e.description.chars() { + for ch in description.chars() { if ch == '\n' || ch == '\r' { out.push(' '); } else { @@ -300,6 +331,7 @@ mod tests { tool_names: tools, agents_md, resident_knowledge: None, + resident_workflows: None, prompts: test_prompts(), } } @@ -316,6 +348,7 @@ mod tests { tool_names: Vec::new(), agents_md: None, resident_knowledge: Some(resident), + resident_workflows: None, prompts: test_prompts(), } } diff --git a/crates/pod/src/shared_state.rs b/crates/pod/src/shared_state.rs index 216a695f..2ae0e5ab 100644 --- a/crates/pod/src/shared_state.rs +++ b/crates/pod/src/shared_state.rs @@ -7,6 +7,11 @@ use session_store::SessionId; use crate::fs_view::PodFsView; +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct WorkflowCandidate { + pub slug: String, +} + /// Shared state between PodController and runtime directory. /// /// Controller updates this in-memory; RuntimeDir writes it to disk. @@ -31,6 +36,7 @@ pub struct PodSharedState { /// (only relevant for unit tests that build a `PodSharedState` /// directly without spinning up a controller). fs_view: OnceLock, + workflows: OnceLock>, } #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] @@ -57,6 +63,7 @@ impl PodSharedState { history: RwLock::new(Vec::new()), user_segments: RwLock::new(Vec::new()), fs_view: OnceLock::new(), + workflows: OnceLock::new(), } } @@ -72,6 +79,23 @@ impl PodSharedState { self.fs_view.get() } + pub fn set_workflows(&self, workflows: Vec) { + let _ = self.workflows.set(workflows); + } + + pub fn list_workflow_completions(&self, prefix: &str) -> Vec { + self.workflows + .get() + .map(|items| { + items + .iter() + .filter(|candidate| candidate.slug.starts_with(prefix)) + .cloned() + .collect() + }) + .unwrap_or_default() + } + pub fn user_segments(&self) -> Vec> { self.user_segments .read() diff --git a/crates/pod/src/workflow/mod.rs b/crates/pod/src/workflow/mod.rs new file mode 100644 index 00000000..bf5745ee --- /dev/null +++ b/crates/pod/src/workflow/mod.rs @@ -0,0 +1,197 @@ +//! Pod-side Workflow resolver. +//! +//! Turns `Segment::WorkflowInvoke { slug }` into system-message attachments: +//! dependency Knowledge bodies first, then the Workflow body. Resolution is +//! strict for explicit user invocations: missing workflows, non-user-invocable +//! workflows, and missing Knowledge requirements are returned as errors before +//! the turn is handed to the Worker. + +use std::fmt; + +use llm_worker::Item; +use memory::schema::split_frontmatter; +use memory::{Slug, WorkflowRegistry, WorkspaceLayout}; + +#[derive(Debug)] +pub enum WorkflowResolveError { + InvalidSlug(memory::LintError), + NotFound { + slug: String, + }, + NotUserInvocable { + slug: String, + }, + KnowledgeNotFound { + workflow: String, + slug: String, + }, + KnowledgeRead { + workflow: String, + slug: String, + source: std::io::Error, + }, + KnowledgeFrontmatter { + workflow: String, + slug: String, + source: memory::LintError, + }, +} + +impl fmt::Display for WorkflowResolveError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::InvalidSlug(e) => write!(f, "invalid workflow slug: {e}"), + Self::NotFound { slug } => write!(f, "workflow /{slug} is not registered"), + Self::NotUserInvocable { slug } => { + write!(f, "workflow /{slug} is not user-invocable") + } + Self::KnowledgeNotFound { workflow, slug } => write!( + f, + "workflow /{workflow} requires missing Knowledge slug `{slug}`" + ), + Self::KnowledgeRead { + workflow, + slug, + source, + } => write!( + f, + "workflow /{workflow} could not read required Knowledge `{slug}`: {source}" + ), + Self::KnowledgeFrontmatter { + workflow, + slug, + source, + } => write!( + f, + "workflow /{workflow} required Knowledge `{slug}` has invalid frontmatter: {source}" + ), + } + } +} + +impl std::error::Error for WorkflowResolveError {} + +pub fn resolve_workflow_invocation( + registry: &WorkflowRegistry, + layout: &WorkspaceLayout, + raw_slug: &str, +) -> Result, WorkflowResolveError> { + let slug = Slug::parse(raw_slug.to_string()).map_err(WorkflowResolveError::InvalidSlug)?; + let record = registry + .get(&slug) + .ok_or_else(|| WorkflowResolveError::NotFound { + slug: raw_slug.to_string(), + })?; + if !record.user_invocable { + return Err(WorkflowResolveError::NotUserInvocable { + slug: raw_slug.to_string(), + }); + } + + let mut out = Vec::new(); + for req in &record.requires { + let path = layout.knowledge_path(req); + let raw = std::fs::read_to_string(&path).map_err(|source| { + if source.kind() == std::io::ErrorKind::NotFound { + WorkflowResolveError::KnowledgeNotFound { + workflow: slug.to_string(), + slug: req.to_string(), + } + } else { + WorkflowResolveError::KnowledgeRead { + workflow: slug.to_string(), + slug: req.to_string(), + source, + } + } + })?; + let (_yaml, body) = split_frontmatter(&raw).map_err(|source| { + WorkflowResolveError::KnowledgeFrontmatter { + workflow: slug.to_string(), + slug: req.to_string(), + source, + } + })?; + out.push(Item::system_message(format!( + "[Workflow /{} requires Knowledge #{}]\n{}", + slug, + req, + body.trim_end() + ))); + } + out.push(Item::system_message(format!( + "[Workflow /{}]\n{}", + slug, + record.body.trim_end() + ))); + Ok(out) +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::TempDir; + + fn write(path: &std::path::Path, content: &str) { + if let Some(parent) = path.parent() { + std::fs::create_dir_all(parent).unwrap(); + } + std::fs::write(path, content).unwrap(); + } + + fn setup() -> (TempDir, WorkspaceLayout, WorkflowRegistry) { + let dir = TempDir::new().unwrap(); + let layout = WorkspaceLayout::new(dir.path().to_path_buf()); + write( + &dir.path().join(".insomnia/knowledge/policy.md"), + "---\ncreated_at: 2026-01-01T00:00:00Z\nupdated_at: 2026-01-01T00:00:00Z\nkind: policy\ndescription: p\nmodel_invokation: false\nuser_invocable: true\nlast_sources: []\n---\npolicy body\n", + ); + write( + &dir.path().join(".insomnia/memory/workflow/run-it.md"), + "---\ndescription: run\nrequires: [policy]\n---\nworkflow body\n", + ); + let registry = memory::load_workflows(&layout).unwrap(); + (dir, layout, registry) + } + + #[test] + fn resolves_requires_before_workflow_body() { + let (_dir, layout, registry) = setup(); + let items = resolve_workflow_invocation(®istry, &layout, "run-it").unwrap(); + assert_eq!(items.len(), 2); + let first = format!("{:?}", items[0]); + let second = format!("{:?}", items[1]); + assert!(first.contains("Knowledge #policy")); + assert!(first.contains("policy body")); + assert!(second.contains("[Workflow /run-it]")); + assert!(second.contains("workflow body")); + } + + #[test] + fn user_invocable_false_errors() { + let (dir, layout, _registry) = setup(); + write( + &dir.path().join(".insomnia/memory/workflow/hidden.md"), + "---\ndescription: hidden\nuser_invocable: false\n---\nbody\n", + ); + let registry = memory::load_workflows(&layout).unwrap(); + let err = resolve_workflow_invocation(®istry, &layout, "hidden").unwrap_err(); + assert!(matches!(err, WorkflowResolveError::NotUserInvocable { .. })); + } + + #[test] + fn missing_required_knowledge_errors() { + let dir = TempDir::new().unwrap(); + let layout = WorkspaceLayout::new(dir.path().to_path_buf()); + write( + &dir.path().join(".insomnia/memory/workflow/bad.md"), + "---\ndescription: bad\nrequires: [ghost]\n---\nbody\n", + ); + let registry = memory::load_workflows(&layout).unwrap(); + let err = resolve_workflow_invocation(®istry, &layout, "bad").unwrap_err(); + assert!(matches!( + err, + WorkflowResolveError::KnowledgeNotFound { .. } + )); + } +} diff --git a/crates/protocol/src/lib.rs b/crates/protocol/src/lib.rs index 2b1a3a2c..49f8812e 100644 --- a/crates/protocol/src/lib.rs +++ b/crates/protocol/src/lib.rs @@ -412,6 +412,7 @@ pub enum ErrorCode { NotPaused, ProviderError, ToolError, + InvalidRequest, Internal, } diff --git a/resources/prompts/internal.toml b/resources/prompts/internal.toml index 88a75906..b16ebcaa 100644 --- a/resources/prompts/internal.toml +++ b/resources/prompts/internal.toml @@ -43,3 +43,12 @@ The following knowledge records are advertised resident. Use the KnowledgeQuery {{ entries }}\ """ + +resident_workflows_section = """\ +--- +## Resident workflows + +The following workflows are advertised resident. Invoke one with / when its description matches the task. + +{{ entries }}\ +"""