From 9b2cae32eaee9c1fb9c416a793d5f86b795726f4 Mon Sep 17 00:00:00 2001 From: Hare Date: Fri, 26 Jun 2026 01:31:09 +0900 Subject: [PATCH] feat: add memory worker runtime crate --- Cargo.lock | 8 + Cargo.toml | 3 + crates/worker-runtime/Cargo.toml | 10 + crates/worker-runtime/src/catalog.rs | 179 ++++++ crates/worker-runtime/src/diagnostics.rs | 21 + crates/worker-runtime/src/error.rs | 38 ++ crates/worker-runtime/src/identity.rs | 82 +++ crates/worker-runtime/src/interaction.rs | 44 ++ crates/worker-runtime/src/lib.rs | 17 + crates/worker-runtime/src/management.rs | 66 ++ crates/worker-runtime/src/observation.rs | 95 +++ crates/worker-runtime/src/runtime.rs | 762 +++++++++++++++++++++++ 12 files changed, 1325 insertions(+) create mode 100644 crates/worker-runtime/Cargo.toml create mode 100644 crates/worker-runtime/src/catalog.rs create mode 100644 crates/worker-runtime/src/diagnostics.rs create mode 100644 crates/worker-runtime/src/error.rs create mode 100644 crates/worker-runtime/src/identity.rs create mode 100644 crates/worker-runtime/src/interaction.rs create mode 100644 crates/worker-runtime/src/lib.rs create mode 100644 crates/worker-runtime/src/management.rs create mode 100644 crates/worker-runtime/src/observation.rs create mode 100644 crates/worker-runtime/src/runtime.rs diff --git a/Cargo.lock b/Cargo.lock index 0e89b577..35c287d4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5901,6 +5901,14 @@ dependencies = [ "yoi-plugin-pdk", ] +[[package]] +name = "worker-runtime" +version = "0.1.0" +dependencies = [ + "serde", + "thiserror 2.0.18", +] + [[package]] name = "workflow" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 7f5ab38f..36662456 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ members = [ "crates/manifest", "crates/mcp", "crates/worker", + "crates/worker-runtime", "crates/plugin-pdk", "crates/yoi", "crates/pod-store", @@ -36,6 +37,7 @@ default-members = [ "crates/manifest", "crates/mcp", "crates/worker", + "crates/worker-runtime", "crates/plugin-pdk", "crates/yoi", "crates/pod-store", @@ -70,6 +72,7 @@ memory = { path = "crates/memory" } ticket = { path = "crates/ticket" } project-record = { path = "crates/project-record" } worker = { path = "crates/worker" } +worker-runtime = { path = "crates/worker-runtime" } yoi-plugin-pdk = { path = "crates/plugin-pdk" } yoi = { path = "crates/yoi" } pod-registry = { path = "crates/pod-registry" } diff --git a/crates/worker-runtime/Cargo.toml b/crates/worker-runtime/Cargo.toml new file mode 100644 index 00000000..e9b53282 --- /dev/null +++ b/crates/worker-runtime/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "worker-runtime" +description = "Embedded memory-backed Runtime API for Worker management" +version = "0.1.0" +edition.workspace = true +license.workspace = true + +[dependencies] +serde = { workspace = true, features = ["derive"] } +thiserror = { workspace = true } diff --git a/crates/worker-runtime/src/catalog.rs b/crates/worker-runtime/src/catalog.rs new file mode 100644 index 00000000..cf9cd735 --- /dev/null +++ b/crates/worker-runtime/src/catalog.rs @@ -0,0 +1,179 @@ +use crate::identity::{RuntimeId, WorkerId, WorkerRef}; +use serde::{Deserialize, Serialize}; + +/// Intent supplied when a Worker is created. +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[serde(tag = "kind", rename_all = "snake_case")] +pub enum WorkerIntent { + Assistant { + #[serde(default, skip_serializing_if = "Option::is_none")] + purpose: Option, + }, + Task { + objective: String, + }, + Role { + role: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + purpose: Option, + }, +} + +impl Default for WorkerIntent { + fn default() -> Self { + Self::Assistant { purpose: None } + } +} + +/// Profile selector boundary. This is a selector, not a resolved config bundle. +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[serde(tag = "kind", content = "value", rename_all = "snake_case")] +pub enum ProfileSelector { + RuntimeDefault, + Builtin(String), + Named(String), +} + +impl Default for ProfileSelector { + fn default() -> Self { + Self::RuntimeDefault + } +} + +/// Placeholder for future config-bundle synchronization. +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct ConfigBundleRef { + pub id: String, +} + +/// Requested capability name plus optional human-readable reason. +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct CapabilityRequest { + pub name: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub reason: Option, +} + +impl CapabilityRequest { + pub fn named(name: impl Into) -> Self { + Self { + name: name.into(), + reason: None, + } + } +} + +/// Opaque workspace reference supplied by a caller. +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct WorkspaceRef { + pub name: String, + pub reference: String, +} + +/// Opaque mount reference supplied by a caller. +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct MountRef { + pub name: String, + pub reference: String, +} + +/// Worker creation request for the catalog/lifecycle API. +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct CreateWorkerRequest { + pub intent: WorkerIntent, + pub profile: ProfileSelector, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub config_bundle: Option, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub requested_capabilities: Vec, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub workspace_refs: Vec, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub mount_refs: Vec, +} + +impl Default for CreateWorkerRequest { + fn default() -> Self { + Self { + intent: WorkerIntent::default(), + profile: ProfileSelector::default(), + config_bundle: None, + requested_capabilities: Vec::new(), + workspace_refs: Vec::new(), + mount_refs: Vec::new(), + } + } +} + +impl CreateWorkerRequest { + /// Create a tools-less Worker using runtime-local default resources. + pub fn tools_less(intent: WorkerIntent, profile: ProfileSelector) -> Self { + Self { + intent, + profile, + config_bundle: None, + requested_capabilities: Vec::new(), + workspace_refs: Vec::new(), + mount_refs: Vec::new(), + } + } +} + +/// Worker lifecycle status for the in-memory embedded runtime. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum WorkerStatus { + Running, + Stopped, + Cancelled, +} + +impl WorkerStatus { + pub fn is_active(self) -> bool { + matches!(self, Self::Running) + } +} + +/// Lightweight catalog row. +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct WorkerSummary { + pub worker_ref: WorkerRef, + pub runtime_id: RuntimeId, + pub worker_id: WorkerId, + pub status: WorkerStatus, + pub intent: WorkerIntent, + pub profile: ProfileSelector, + pub requested_capability_count: usize, + pub has_config_bundle: bool, + pub transcript_len: usize, + pub last_event_id: u64, +} + +/// Full Worker catalog/lifecycle detail. +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct WorkerDetail { + pub worker_ref: WorkerRef, + pub runtime_id: RuntimeId, + pub worker_id: WorkerId, + pub status: WorkerStatus, + pub intent: WorkerIntent, + pub profile: ProfileSelector, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub config_bundle: Option, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub requested_capabilities: Vec, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub workspace_refs: Vec, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub mount_refs: Vec, + pub transcript_len: usize, + pub last_event_id: u64, +} + +/// Acknowledgement returned by stop/cancel lifecycle operations. +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct WorkerLifecycleAck { + pub worker_ref: WorkerRef, + pub status: WorkerStatus, + pub event_id: u64, +} diff --git a/crates/worker-runtime/src/diagnostics.rs b/crates/worker-runtime/src/diagnostics.rs new file mode 100644 index 00000000..3140f0c8 --- /dev/null +++ b/crates/worker-runtime/src/diagnostics.rs @@ -0,0 +1,21 @@ +use crate::identity::WorkerRef; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum DiagnosticSeverity { + Info, + Warning, + Error, +} + +/// Runtime diagnostic emitted by memory-runtime operations. +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct RuntimeDiagnostic { + pub id: u64, + pub severity: DiagnosticSeverity, + pub code: String, + pub message: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub worker_ref: Option, +} diff --git a/crates/worker-runtime/src/error.rs b/crates/worker-runtime/src/error.rs new file mode 100644 index 00000000..41c2e7ec --- /dev/null +++ b/crates/worker-runtime/src/error.rs @@ -0,0 +1,38 @@ +use crate::identity::{RuntimeId, WorkerId}; + +/// Errors returned by the embedded Runtime API. +#[derive(Debug, thiserror::Error)] +pub enum RuntimeError { + #[error("runtime {runtime_id} is stopped")] + RuntimeStopped { runtime_id: RuntimeId }, + + #[error( + "worker {worker_id} belongs to runtime {actual_runtime_id}, not runtime {expected_runtime_id}" + )] + WrongRuntime { + expected_runtime_id: RuntimeId, + actual_runtime_id: RuntimeId, + worker_id: WorkerId, + }, + + #[error("cursor belongs to runtime {actual_runtime_id}, not runtime {expected_runtime_id}")] + WrongRuntimeCursor { + expected_runtime_id: RuntimeId, + actual_runtime_id: RuntimeId, + }, + + #[error("worker {worker_id} was not found in runtime {runtime_id}")] + WorkerNotFound { + runtime_id: RuntimeId, + worker_id: WorkerId, + }, + + #[error("limit {requested} exceeds maximum {max}")] + LimitTooLarge { requested: usize, max: usize }, + + #[error("invalid request: {0}")] + InvalidRequest(String), + + #[error("runtime state lock was poisoned")] + StatePoisoned, +} diff --git a/crates/worker-runtime/src/identity.rs b/crates/worker-runtime/src/identity.rs new file mode 100644 index 00000000..fd660b21 --- /dev/null +++ b/crates/worker-runtime/src/identity.rs @@ -0,0 +1,82 @@ +use serde::{Deserialize, Serialize}; +use std::fmt; + +/// Public Runtime identity. +/// +/// This is the first half of Worker authority. Runtime APIs that operate on a +/// Worker require this id alongside a [`WorkerId`]; socket paths, session paths, +/// and display names are deliberately not authority. +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] +#[serde(transparent)] +pub struct RuntimeId(String); + +impl RuntimeId { + pub fn new(value: impl Into) -> Option { + let value = value.into(); + if value.trim().is_empty() { + None + } else { + Some(Self(value)) + } + } + + pub(crate) fn generated(sequence: u64) -> Self { + Self(format!("runtime-mem-{sequence:016x}")) + } + + pub fn as_str(&self) -> &str { + &self.0 + } +} + +impl fmt::Display for RuntimeId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.fmt(f) + } +} + +/// Runtime-local Worker identity. +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] +#[serde(transparent)] +pub struct WorkerId(String); + +impl WorkerId { + pub fn new(value: impl Into) -> Option { + let value = value.into(); + if value.trim().is_empty() { + None + } else { + Some(Self(value)) + } + } + + pub(crate) fn generated(sequence: u64) -> Self { + Self(format!("worker-{sequence:08x}")) + } + + pub fn as_str(&self) -> &str { + &self.0 + } +} + +impl fmt::Display for WorkerId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.fmt(f) + } +} + +/// Complete public authority reference for Worker operations. +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] +pub struct WorkerRef { + pub runtime_id: RuntimeId, + pub worker_id: WorkerId, +} + +impl WorkerRef { + pub fn new(runtime_id: RuntimeId, worker_id: WorkerId) -> Self { + Self { + runtime_id, + worker_id, + } + } +} diff --git a/crates/worker-runtime/src/interaction.rs b/crates/worker-runtime/src/interaction.rs new file mode 100644 index 00000000..a9dd440d --- /dev/null +++ b/crates/worker-runtime/src/interaction.rs @@ -0,0 +1,44 @@ +use crate::catalog::WorkerStatus; +use crate::identity::WorkerRef; +use serde::{Deserialize, Serialize}; + +/// Input kind accepted by the embedded interaction API. +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum WorkerInputKind { + User, + System, +} + +/// Worker input request. v0 stores the input in an in-memory transcript and +/// does not execute providers/tools. +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct WorkerInput { + pub kind: WorkerInputKind, + pub content: String, +} + +impl WorkerInput { + pub fn user(content: impl Into) -> Self { + Self { + kind: WorkerInputKind::User, + content: content.into(), + } + } + + pub fn system(content: impl Into) -> Self { + Self { + kind: WorkerInputKind::System, + content: content.into(), + } + } +} + +/// Acknowledgement returned after input is accepted into the in-memory Worker. +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct WorkerInteractionAck { + pub worker_ref: WorkerRef, + pub status: WorkerStatus, + pub transcript_sequence: u64, + pub event_id: u64, +} diff --git a/crates/worker-runtime/src/lib.rs b/crates/worker-runtime/src/lib.rs new file mode 100644 index 00000000..96bcf3c1 --- /dev/null +++ b/crates/worker-runtime/src/lib.rs @@ -0,0 +1,17 @@ +//! Embedded Runtime domain API for Worker management. +//! +//! `worker-runtime` intentionally stays independent from HTTP/WebSocket servers, +//! filesystem persistence, provider execution, and the existing Worker host. It +//! defines the in-process Runtime authority surface that higher layers can later +//! adapt into registries or web APIs. + +pub mod catalog; +pub mod diagnostics; +pub mod error; +pub mod identity; +pub mod interaction; +pub mod management; +pub mod observation; +mod runtime; + +pub use runtime::Runtime; diff --git a/crates/worker-runtime/src/management.rs b/crates/worker-runtime/src/management.rs new file mode 100644 index 00000000..5fa2b5da --- /dev/null +++ b/crates/worker-runtime/src/management.rs @@ -0,0 +1,66 @@ +use crate::identity::RuntimeId; +use serde::{Deserialize, Serialize}; + +/// Runtime backend kind. v0 intentionally ships only an embedded memory backend. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum RuntimeBackendKind { + Memory, +} + +/// Runtime lifecycle state. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum RuntimeStatus { + Running, + Stopped, +} + +/// Guardrails for bounded observation/projection APIs. +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct RuntimeLimits { + pub max_transcript_projection_items: usize, + pub max_event_batch_items: usize, +} + +impl Default for RuntimeLimits { + fn default() -> Self { + Self { + max_transcript_projection_items: 256, + max_event_batch_items: 256, + } + } +} + +/// Options used to construct an embedded memory Runtime. +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct RuntimeOptions { + pub runtime_id: Option, + pub display_name: Option, + pub limits: RuntimeLimits, +} + +impl Default for RuntimeOptions { + fn default() -> Self { + Self { + runtime_id: None, + display_name: None, + limits: RuntimeLimits::default(), + } + } +} + +/// Management-plane summary for a Runtime. +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct RuntimeSummary { + pub runtime_id: RuntimeId, + pub display_name: Option, + pub backend: RuntimeBackendKind, + pub status: RuntimeStatus, + pub worker_count: usize, + pub active_worker_count: usize, + pub stopped_worker_count: usize, + pub cancelled_worker_count: usize, + pub diagnostic_count: usize, + pub limits: RuntimeLimits, +} diff --git a/crates/worker-runtime/src/observation.rs b/crates/worker-runtime/src/observation.rs new file mode 100644 index 00000000..50ee2318 --- /dev/null +++ b/crates/worker-runtime/src/observation.rs @@ -0,0 +1,95 @@ +use crate::identity::{RuntimeId, WorkerRef}; +use serde::{Deserialize, Serialize}; + +/// Transcript role used by bounded projection. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum TranscriptRole { + User, + System, +} + +/// One projected transcript item. +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct TranscriptEntry { + pub sequence: u64, + pub worker_ref: WorkerRef, + pub role: TranscriptRole, + pub content: String, + pub event_id: u64, +} + +/// Bounded transcript query. +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct TranscriptQuery { + pub start: usize, + pub limit: usize, +} + +impl TranscriptQuery { + pub fn new(start: usize, limit: usize) -> Self { + Self { start, limit } + } +} + +/// Bounded transcript projection response. +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct TranscriptProjection { + pub worker_ref: WorkerRef, + pub start: usize, + pub limit: usize, + pub total_items: usize, + pub items: Vec, + pub next_start: Option, +} + +/// Event cursor. `next_event_id` is the first event id that should be returned +/// by the next poll. +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct EventCursor { + pub runtime_id: RuntimeId, + pub next_event_id: u64, +} + +/// Placeholder subscription handle for future streaming APIs. v0 is explicit +/// poll-only so HTTP/WS/SSE dependencies are not pulled into this crate. +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct EventSubscription { + pub runtime_id: RuntimeId, + pub cursor: EventCursor, + pub mode: EventSubscriptionMode, +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum EventSubscriptionMode { + PollOnly, +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct RuntimeEvent { + pub id: u64, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub worker_ref: Option, + pub kind: RuntimeEventKind, + pub message: String, +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[serde(tag = "kind", rename_all = "snake_case")] +pub enum RuntimeEventKind { + RuntimeStarted, + RuntimeStopped, + WorkerCreated, + WorkerInputAccepted, + WorkerStopped, + WorkerCancelled, +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct RuntimeEventBatch { + pub runtime_id: RuntimeId, + pub cursor: EventCursor, + pub events: Vec, + pub has_more: bool, +} diff --git a/crates/worker-runtime/src/runtime.rs b/crates/worker-runtime/src/runtime.rs new file mode 100644 index 00000000..aee5b0b8 --- /dev/null +++ b/crates/worker-runtime/src/runtime.rs @@ -0,0 +1,762 @@ +use crate::catalog::{ + CreateWorkerRequest, WorkerDetail, WorkerLifecycleAck, WorkerStatus, WorkerSummary, +}; +use crate::diagnostics::{DiagnosticSeverity, RuntimeDiagnostic}; +use crate::error::RuntimeError; +use crate::identity::{RuntimeId, WorkerId, WorkerRef}; +use crate::interaction::{WorkerInput, WorkerInputKind, WorkerInteractionAck}; +use crate::management::{ + RuntimeBackendKind, RuntimeLimits, RuntimeOptions, RuntimeStatus, RuntimeSummary, +}; +use crate::observation::{ + EventCursor, EventSubscription, EventSubscriptionMode, RuntimeEvent, RuntimeEventBatch, + RuntimeEventKind, TranscriptEntry, TranscriptProjection, TranscriptQuery, TranscriptRole, +}; +use std::collections::BTreeMap; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::{Arc, Mutex, MutexGuard}; + +static NEXT_RUNTIME_SEQUENCE: AtomicU64 = AtomicU64::new(1); + +/// Concrete embedded Runtime domain entity. +/// +/// The current implementation is memory-backed and tools/provider-less by +/// design. It provides a typed API boundary that can later be adapted by +/// backend registries or web servers without making sockets, sessions, or paths +/// public authority. +#[derive(Clone, Debug)] +pub struct Runtime { + inner: Arc>, +} + +impl Runtime { + /// Create a memory-backed Runtime with generated identity and default limits. + pub fn new_memory() -> Self { + Self::with_options(RuntimeOptions::default()) + } + + /// Create a memory-backed Runtime with explicit options. + pub fn with_options(options: RuntimeOptions) -> Self { + let runtime_id = options.runtime_id.unwrap_or_else(|| { + RuntimeId::generated(NEXT_RUNTIME_SEQUENCE.fetch_add(1, Ordering::Relaxed)) + }); + let mut state = RuntimeState::new(runtime_id, options.display_name, options.limits); + state.push_event(None, RuntimeEventKind::RuntimeStarted, "runtime started"); + Self { + inner: Arc::new(Mutex::new(state)), + } + } + + /// Runtime id half of public Worker authority. + pub fn runtime_id(&self) -> Result { + Ok(self.lock()?.runtime_id.clone()) + } + + /// Management-plane summary. + pub fn summary(&self) -> Result { + let state = self.lock()?; + let mut active_worker_count = 0; + let mut stopped_worker_count = 0; + let mut cancelled_worker_count = 0; + for worker in state.workers.values() { + match worker.status { + WorkerStatus::Running => active_worker_count += 1, + WorkerStatus::Stopped => stopped_worker_count += 1, + WorkerStatus::Cancelled => cancelled_worker_count += 1, + } + } + + Ok(RuntimeSummary { + runtime_id: state.runtime_id.clone(), + display_name: state.display_name.clone(), + backend: state.backend, + status: state.status, + worker_count: state.workers.len(), + active_worker_count, + stopped_worker_count, + cancelled_worker_count, + diagnostic_count: state.diagnostics.len(), + limits: state.limits.clone(), + }) + } + + /// Current Runtime lifecycle state. + pub fn status(&self) -> Result { + Ok(self.lock()?.status) + } + + /// Stop the Runtime. v0 keeps data readable after stop, but rejects new + /// create/send/worker lifecycle mutations. + pub fn stop_runtime(&self) -> Result { + let mut state = self.lock()?; + if state.status == RuntimeStatus::Stopped { + return Ok(state.last_event_id()); + } + state.status = RuntimeStatus::Stopped; + let runtime_id = state.runtime_id.clone(); + for worker in state.workers.values_mut() { + if worker.status.is_active() { + worker.status = WorkerStatus::Stopped; + } + } + Ok(state.push_event( + None, + RuntimeEventKind::RuntimeStopped, + format!("runtime {runtime_id} stopped"), + )) + } + + /// Create a Worker in the embedded catalog. + pub fn create_worker( + &self, + request: CreateWorkerRequest, + ) -> Result { + let mut state = self.lock()?; + state.ensure_running()?; + validate_create_worker_request(&request)?; + + let worker_id = WorkerId::generated(state.next_worker_sequence); + state.next_worker_sequence += 1; + let worker_ref = WorkerRef::new(state.runtime_id.clone(), worker_id.clone()); + let event_id = state.push_event( + Some(worker_ref.clone()), + RuntimeEventKind::WorkerCreated, + format!("worker {worker_id} created"), + ); + + let record = WorkerRecord { + worker_ref, + worker_id: worker_id.clone(), + status: WorkerStatus::Running, + request, + transcript: Vec::new(), + next_transcript_sequence: 1, + last_event_id: event_id, + }; + let detail = record.detail(&state.runtime_id); + state.emit_create_diagnostics(&detail); + state.workers.insert(worker_id, record); + Ok(detail) + } + + /// List Workers known to this Runtime. + pub fn list_workers(&self) -> Result, RuntimeError> { + let state = self.lock()?; + Ok(state + .workers + .values() + .map(|worker| worker.summary(&state.runtime_id)) + .collect()) + } + + /// Fetch Worker detail. The supplied [`WorkerRef`] must match this Runtime. + pub fn worker_detail(&self, worker_ref: &WorkerRef) -> Result { + let state = self.lock()?; + let worker = state.worker(worker_ref)?; + Ok(worker.detail(&state.runtime_id)) + } + + /// Accept input into a Worker transcript. + pub fn send_input( + &self, + worker_ref: &WorkerRef, + input: WorkerInput, + ) -> Result { + let mut state = self.lock()?; + state.ensure_running()?; + validate_worker_input(&input)?; + state.ensure_worker_ref(worker_ref)?; + { + let worker = state.worker(worker_ref)?; + if !worker.status.is_active() { + return Err(RuntimeError::InvalidRequest(format!( + "worker {} is not running", + worker_ref.worker_id + ))); + } + } + + let event_id = state.push_event( + Some(worker_ref.clone()), + RuntimeEventKind::WorkerInputAccepted, + "worker input accepted", + ); + let worker = state.worker_mut(worker_ref)?; + + let role = match input.kind { + WorkerInputKind::User => TranscriptRole::User, + WorkerInputKind::System => TranscriptRole::System, + }; + let transcript_sequence = worker.next_transcript_sequence; + worker.next_transcript_sequence += 1; + worker.last_event_id = event_id; + worker.transcript.push(TranscriptEntry { + sequence: transcript_sequence, + worker_ref: worker_ref.clone(), + role, + content: input.content, + event_id, + }); + + Ok(WorkerInteractionAck { + worker_ref: worker_ref.clone(), + status: worker.status, + transcript_sequence, + event_id, + }) + } + + /// Stop a Worker. Repeated stops are idempotent and return the last event id. + pub fn stop_worker( + &self, + worker_ref: &WorkerRef, + reason: Option, + ) -> Result { + self.transition_worker( + worker_ref, + WorkerStatus::Stopped, + RuntimeEventKind::WorkerStopped, + reason.unwrap_or_else(|| "worker stopped".to_string()), + ) + } + + /// Cancel a Worker. Repeated cancels are idempotent and return the last event id. + pub fn cancel_worker( + &self, + worker_ref: &WorkerRef, + reason: Option, + ) -> Result { + self.transition_worker( + worker_ref, + WorkerStatus::Cancelled, + RuntimeEventKind::WorkerCancelled, + reason.unwrap_or_else(|| "worker cancelled".to_string()), + ) + } + + /// Bounded transcript projection for a Worker. + pub fn transcript_projection( + &self, + worker_ref: &WorkerRef, + query: TranscriptQuery, + ) -> Result { + let state = self.lock()?; + if query.limit > state.limits.max_transcript_projection_items { + return Err(RuntimeError::LimitTooLarge { + requested: query.limit, + max: state.limits.max_transcript_projection_items, + }); + } + let worker = state.worker(worker_ref)?; + let total_items = worker.transcript.len(); + let end = query.start.saturating_add(query.limit).min(total_items); + let items = if query.start >= total_items { + Vec::new() + } else { + worker.transcript[query.start..end].to_vec() + }; + let next_start = (end < total_items).then_some(end); + Ok(TranscriptProjection { + worker_ref: worker_ref.clone(), + start: query.start, + limit: query.limit, + total_items, + items, + next_start, + }) + } + + /// Cursor pointing to the beginning of Runtime events. + pub fn event_cursor_from_start(&self) -> Result { + let state = self.lock()?; + Ok(EventCursor { + runtime_id: state.runtime_id.clone(), + next_event_id: 1, + }) + } + + /// Cursor pointing after the current last event. + pub fn event_cursor_now(&self) -> Result { + let state = self.lock()?; + Ok(EventCursor { + runtime_id: state.runtime_id.clone(), + next_event_id: state.last_event_id() + 1, + }) + } + + /// Poll Runtime events from a cursor. + pub fn read_events( + &self, + cursor: &EventCursor, + limit: usize, + ) -> Result { + let state = self.lock()?; + if cursor.runtime_id != state.runtime_id { + return Err(RuntimeError::WrongRuntimeCursor { + expected_runtime_id: state.runtime_id.clone(), + actual_runtime_id: cursor.runtime_id.clone(), + }); + } + if limit > state.limits.max_event_batch_items { + return Err(RuntimeError::LimitTooLarge { + requested: limit, + max: state.limits.max_event_batch_items, + }); + } + + let mut events = Vec::new(); + for event in state + .events + .iter() + .filter(|event| event.id >= cursor.next_event_id) + .take(limit) + { + events.push(event.clone()); + } + let next_event_id = events + .last() + .map(|event| event.id + 1) + .unwrap_or(cursor.next_event_id); + let has_more = state.events.iter().any(|event| event.id >= next_event_id); + Ok(RuntimeEventBatch { + runtime_id: state.runtime_id.clone(), + cursor: EventCursor { + runtime_id: state.runtime_id.clone(), + next_event_id, + }, + events, + has_more, + }) + } + + /// Create a poll-only placeholder subscription boundary for future streaming. + pub fn subscribe_events(&self, cursor: EventCursor) -> Result { + let state = self.lock()?; + if cursor.runtime_id != state.runtime_id { + return Err(RuntimeError::WrongRuntimeCursor { + expected_runtime_id: state.runtime_id.clone(), + actual_runtime_id: cursor.runtime_id, + }); + } + Ok(EventSubscription { + runtime_id: state.runtime_id.clone(), + cursor, + mode: EventSubscriptionMode::PollOnly, + }) + } + + /// Snapshot current diagnostics. + pub fn diagnostics(&self) -> Result, RuntimeError> { + Ok(self.lock()?.diagnostics.clone()) + } + + fn transition_worker( + &self, + worker_ref: &WorkerRef, + status: WorkerStatus, + event_kind: RuntimeEventKind, + reason: String, + ) -> Result { + let mut state = self.lock()?; + state.ensure_running()?; + state.ensure_worker_ref(worker_ref)?; + + let already_status = { + let worker = state.worker(worker_ref)?; + worker.status == status + }; + if already_status { + let worker = state.worker(worker_ref)?; + return Ok(WorkerLifecycleAck { + worker_ref: worker_ref.clone(), + status: worker.status, + event_id: worker.last_event_id, + }); + } + + let event_id = state.push_event(Some(worker_ref.clone()), event_kind, reason); + let worker = state.worker_mut(worker_ref)?; + worker.status = status; + worker.last_event_id = event_id; + Ok(WorkerLifecycleAck { + worker_ref: worker_ref.clone(), + status: worker.status, + event_id, + }) + } + + fn lock(&self) -> Result, RuntimeError> { + self.inner.lock().map_err(|_| RuntimeError::StatePoisoned) + } +} + +#[derive(Debug)] +struct RuntimeState { + runtime_id: RuntimeId, + display_name: Option, + backend: RuntimeBackendKind, + status: RuntimeStatus, + limits: RuntimeLimits, + next_worker_sequence: u64, + next_event_id: u64, + next_diagnostic_id: u64, + workers: BTreeMap, + events: Vec, + diagnostics: Vec, +} + +impl RuntimeState { + fn new(runtime_id: RuntimeId, display_name: Option, limits: RuntimeLimits) -> Self { + Self { + runtime_id, + display_name, + backend: RuntimeBackendKind::Memory, + status: RuntimeStatus::Running, + limits, + next_worker_sequence: 1, + next_event_id: 1, + next_diagnostic_id: 1, + workers: BTreeMap::new(), + events: Vec::new(), + diagnostics: Vec::new(), + } + } + + fn ensure_running(&self) -> Result<(), RuntimeError> { + if self.status == RuntimeStatus::Stopped { + Err(RuntimeError::RuntimeStopped { + runtime_id: self.runtime_id.clone(), + }) + } else { + Ok(()) + } + } + + fn ensure_worker_ref(&self, worker_ref: &WorkerRef) -> Result<(), RuntimeError> { + if worker_ref.runtime_id != self.runtime_id { + return Err(RuntimeError::WrongRuntime { + expected_runtime_id: self.runtime_id.clone(), + actual_runtime_id: worker_ref.runtime_id.clone(), + worker_id: worker_ref.worker_id.clone(), + }); + } + if !self.workers.contains_key(&worker_ref.worker_id) { + return Err(RuntimeError::WorkerNotFound { + runtime_id: self.runtime_id.clone(), + worker_id: worker_ref.worker_id.clone(), + }); + } + Ok(()) + } + + fn worker(&self, worker_ref: &WorkerRef) -> Result<&WorkerRecord, RuntimeError> { + self.ensure_worker_ref(worker_ref)?; + self.workers + .get(&worker_ref.worker_id) + .ok_or_else(|| RuntimeError::WorkerNotFound { + runtime_id: self.runtime_id.clone(), + worker_id: worker_ref.worker_id.clone(), + }) + } + + fn worker_mut(&mut self, worker_ref: &WorkerRef) -> Result<&mut WorkerRecord, RuntimeError> { + self.ensure_worker_ref(worker_ref)?; + self.workers + .get_mut(&worker_ref.worker_id) + .ok_or_else(|| RuntimeError::WorkerNotFound { + runtime_id: self.runtime_id.clone(), + worker_id: worker_ref.worker_id.clone(), + }) + } + + fn push_event( + &mut self, + worker_ref: Option, + kind: RuntimeEventKind, + message: impl Into, + ) -> u64 { + let id = self.next_event_id; + self.next_event_id += 1; + self.events.push(RuntimeEvent { + id, + worker_ref, + kind, + message: message.into(), + }); + id + } + + fn last_event_id(&self) -> u64 { + self.next_event_id.saturating_sub(1) + } + + fn push_diagnostic( + &mut self, + severity: DiagnosticSeverity, + code: impl Into, + message: impl Into, + worker_ref: Option, + ) { + let id = self.next_diagnostic_id; + self.next_diagnostic_id += 1; + self.diagnostics.push(RuntimeDiagnostic { + id, + severity, + code: code.into(), + message: message.into(), + worker_ref, + }); + } + + fn emit_create_diagnostics(&mut self, detail: &WorkerDetail) { + if detail.config_bundle.is_none() { + self.push_diagnostic( + DiagnosticSeverity::Info, + "runtime.local_default_resources", + "worker created without ConfigBundleRef; runtime-local defaults are assumed", + Some(detail.worker_ref.clone()), + ); + } + if detail.requested_capabilities.is_empty() { + self.push_diagnostic( + DiagnosticSeverity::Info, + "worker.tools_less", + "worker created without requested tool capabilities", + Some(detail.worker_ref.clone()), + ); + } + } +} + +#[derive(Debug)] +struct WorkerRecord { + worker_ref: WorkerRef, + worker_id: WorkerId, + status: WorkerStatus, + request: CreateWorkerRequest, + transcript: Vec, + next_transcript_sequence: u64, + last_event_id: u64, +} + +impl WorkerRecord { + fn summary(&self, runtime_id: &RuntimeId) -> WorkerSummary { + WorkerSummary { + worker_ref: self.worker_ref.clone(), + runtime_id: runtime_id.clone(), + worker_id: self.worker_id.clone(), + status: self.status, + intent: self.request.intent.clone(), + profile: self.request.profile.clone(), + requested_capability_count: self.request.requested_capabilities.len(), + has_config_bundle: self.request.config_bundle.is_some(), + transcript_len: self.transcript.len(), + last_event_id: self.last_event_id, + } + } + + fn detail(&self, runtime_id: &RuntimeId) -> WorkerDetail { + WorkerDetail { + worker_ref: self.worker_ref.clone(), + runtime_id: runtime_id.clone(), + worker_id: self.worker_id.clone(), + status: self.status, + intent: self.request.intent.clone(), + profile: self.request.profile.clone(), + config_bundle: self.request.config_bundle.clone(), + requested_capabilities: self.request.requested_capabilities.clone(), + workspace_refs: self.request.workspace_refs.clone(), + mount_refs: self.request.mount_refs.clone(), + transcript_len: self.transcript.len(), + last_event_id: self.last_event_id, + } + } +} + +fn validate_create_worker_request(request: &CreateWorkerRequest) -> Result<(), RuntimeError> { + if let crate::catalog::WorkerIntent::Task { objective } = &request.intent { + if objective.trim().is_empty() { + return Err(RuntimeError::InvalidRequest( + "task objective must not be empty".to_string(), + )); + } + } + for capability in &request.requested_capabilities { + if capability.name.trim().is_empty() { + return Err(RuntimeError::InvalidRequest( + "capability name must not be empty".to_string(), + )); + } + } + Ok(()) +} + +fn validate_worker_input(input: &WorkerInput) -> Result<(), RuntimeError> { + if input.content.trim().is_empty() { + return Err(RuntimeError::InvalidRequest( + "worker input content must not be empty".to_string(), + )); + } + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::catalog::{CapabilityRequest, ConfigBundleRef, ProfileSelector, WorkerIntent}; + use crate::management::RuntimeLimits; + + fn task_request(objective: &str) -> CreateWorkerRequest { + CreateWorkerRequest { + intent: WorkerIntent::Task { + objective: objective.to_string(), + }, + profile: ProfileSelector::Builtin("builtin:coder".to_string()), + config_bundle: Some(ConfigBundleRef { + id: "bundle-1".to_string(), + }), + requested_capabilities: vec![CapabilityRequest::named("read")], + workspace_refs: Vec::new(), + mount_refs: Vec::new(), + } + } + + #[test] + fn create_list_and_detail_preserve_runtime_worker_authority() { + let runtime = Runtime::new_memory(); + let detail = runtime.create_worker(task_request("implement v0")).unwrap(); + + assert_eq!(detail.worker_ref.runtime_id, runtime.runtime_id().unwrap()); + assert_eq!(detail.status, WorkerStatus::Running); + assert!(detail.config_bundle.is_some()); + + let list = runtime.list_workers().unwrap(); + assert_eq!(list.len(), 1); + assert_eq!(list[0].worker_ref, detail.worker_ref); + assert_eq!(list[0].requested_capability_count, 1); + + let fetched = runtime.worker_detail(&detail.worker_ref).unwrap(); + assert_eq!(fetched.worker_id, detail.worker_id); + assert_eq!(fetched.intent, detail.intent); + } + + #[test] + fn rejects_worker_refs_from_another_runtime() { + let runtime_a = Runtime::new_memory(); + let runtime_b = Runtime::new_memory(); + let detail = runtime_a.create_worker(task_request("runtime a")).unwrap(); + + let err = runtime_b.worker_detail(&detail.worker_ref).unwrap_err(); + assert!(matches!(err, RuntimeError::WrongRuntime { .. })); + } + + #[test] + fn tools_less_worker_without_config_bundle_uses_local_defaults_and_diagnostics() { + let runtime = Runtime::new_memory(); + let detail = runtime + .create_worker(CreateWorkerRequest::tools_less( + WorkerIntent::default(), + ProfileSelector::RuntimeDefault, + )) + .unwrap(); + + assert!(detail.config_bundle.is_none()); + assert!(detail.requested_capabilities.is_empty()); + let diagnostics = runtime.diagnostics().unwrap(); + assert_eq!(diagnostics.len(), 2); + assert!( + diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "runtime.local_default_resources") + ); + assert!( + diagnostics + .iter() + .any(|diagnostic| diagnostic.code == "worker.tools_less") + ); + } + + #[test] + fn send_input_and_project_bounded_transcript() { + let runtime = Runtime::with_options(RuntimeOptions { + limits: RuntimeLimits { + max_transcript_projection_items: 2, + max_event_batch_items: 16, + }, + ..RuntimeOptions::default() + }); + let detail = runtime.create_worker(task_request("chat")).unwrap(); + + let first = runtime + .send_input(&detail.worker_ref, WorkerInput::user("hello")) + .unwrap(); + assert_eq!(first.transcript_sequence, 1); + runtime + .send_input(&detail.worker_ref, WorkerInput::system("note")) + .unwrap(); + runtime + .send_input(&detail.worker_ref, WorkerInput::user("again")) + .unwrap(); + + let projection = runtime + .transcript_projection(&detail.worker_ref, TranscriptQuery::new(0, 2)) + .unwrap(); + assert_eq!(projection.total_items, 3); + assert_eq!(projection.items.len(), 2); + assert_eq!(projection.items[0].content, "hello"); + assert_eq!(projection.items[1].role, TranscriptRole::System); + assert_eq!(projection.next_start, Some(2)); + + let err = runtime + .transcript_projection(&detail.worker_ref, TranscriptQuery::new(0, 3)) + .unwrap_err(); + assert!(matches!(err, RuntimeError::LimitTooLarge { .. })); + } + + #[test] + fn stop_and_cancel_workers_update_summary() { + let runtime = Runtime::new_memory(); + let stopped = runtime.create_worker(task_request("stop me")).unwrap(); + let cancelled = runtime.create_worker(task_request("cancel me")).unwrap(); + + let stop_ack = runtime + .stop_worker(&stopped.worker_ref, Some("done".to_string())) + .unwrap(); + assert_eq!(stop_ack.status, WorkerStatus::Stopped); + + let cancel_ack = runtime + .cancel_worker(&cancelled.worker_ref, Some("abort".to_string())) + .unwrap(); + assert_eq!(cancel_ack.status, WorkerStatus::Cancelled); + + let summary = runtime.summary().unwrap(); + assert_eq!(summary.worker_count, 2); + assert_eq!(summary.active_worker_count, 0); + assert_eq!(summary.stopped_worker_count, 1); + assert_eq!(summary.cancelled_worker_count, 1); + } + + #[test] + fn event_cursor_and_poll_only_subscription_are_bounded_placeholders() { + let runtime = Runtime::new_memory(); + let cursor = runtime.event_cursor_from_start().unwrap(); + let subscription = runtime.subscribe_events(cursor.clone()).unwrap(); + assert_eq!(subscription.mode, EventSubscriptionMode::PollOnly); + + let worker = runtime.create_worker(task_request("events")).unwrap(); + runtime + .send_input(&worker.worker_ref, WorkerInput::user("eventful")) + .unwrap(); + + let batch = runtime.read_events(&cursor, 2).unwrap(); + assert_eq!(batch.events.len(), 2); + assert!(batch.has_more); + assert_eq!(batch.events[0].kind, RuntimeEventKind::RuntimeStarted); + assert_eq!(batch.events[1].kind, RuntimeEventKind::WorkerCreated); + + let next = runtime.read_events(&batch.cursor, 2).unwrap(); + assert_eq!(next.events.len(), 1); + assert_eq!(next.events[0].kind, RuntimeEventKind::WorkerInputAccepted); + assert!(!next.has_more); + } +}