feat: add memory worker runtime crate
This commit is contained in:
parent
f8d3b1cca9
commit
9b2cae32ea
8
Cargo.lock
generated
8
Cargo.lock
generated
|
|
@ -5901,6 +5901,14 @@ dependencies = [
|
||||||
"yoi-plugin-pdk",
|
"yoi-plugin-pdk",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "worker-runtime"
|
||||||
|
version = "0.1.0"
|
||||||
|
dependencies = [
|
||||||
|
"serde",
|
||||||
|
"thiserror 2.0.18",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "workflow"
|
name = "workflow"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
|
|
|
||||||
|
|
@ -9,6 +9,7 @@ members = [
|
||||||
"crates/manifest",
|
"crates/manifest",
|
||||||
"crates/mcp",
|
"crates/mcp",
|
||||||
"crates/worker",
|
"crates/worker",
|
||||||
|
"crates/worker-runtime",
|
||||||
"crates/plugin-pdk",
|
"crates/plugin-pdk",
|
||||||
"crates/yoi",
|
"crates/yoi",
|
||||||
"crates/pod-store",
|
"crates/pod-store",
|
||||||
|
|
@ -36,6 +37,7 @@ default-members = [
|
||||||
"crates/manifest",
|
"crates/manifest",
|
||||||
"crates/mcp",
|
"crates/mcp",
|
||||||
"crates/worker",
|
"crates/worker",
|
||||||
|
"crates/worker-runtime",
|
||||||
"crates/plugin-pdk",
|
"crates/plugin-pdk",
|
||||||
"crates/yoi",
|
"crates/yoi",
|
||||||
"crates/pod-store",
|
"crates/pod-store",
|
||||||
|
|
@ -70,6 +72,7 @@ memory = { path = "crates/memory" }
|
||||||
ticket = { path = "crates/ticket" }
|
ticket = { path = "crates/ticket" }
|
||||||
project-record = { path = "crates/project-record" }
|
project-record = { path = "crates/project-record" }
|
||||||
worker = { path = "crates/worker" }
|
worker = { path = "crates/worker" }
|
||||||
|
worker-runtime = { path = "crates/worker-runtime" }
|
||||||
yoi-plugin-pdk = { path = "crates/plugin-pdk" }
|
yoi-plugin-pdk = { path = "crates/plugin-pdk" }
|
||||||
yoi = { path = "crates/yoi" }
|
yoi = { path = "crates/yoi" }
|
||||||
pod-registry = { path = "crates/pod-registry" }
|
pod-registry = { path = "crates/pod-registry" }
|
||||||
|
|
|
||||||
10
crates/worker-runtime/Cargo.toml
Normal file
10
crates/worker-runtime/Cargo.toml
Normal file
|
|
@ -0,0 +1,10 @@
|
||||||
|
[package]
|
||||||
|
name = "worker-runtime"
|
||||||
|
description = "Embedded memory-backed Runtime API for Worker management"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition.workspace = true
|
||||||
|
license.workspace = true
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
serde = { workspace = true, features = ["derive"] }
|
||||||
|
thiserror = { workspace = true }
|
||||||
179
crates/worker-runtime/src/catalog.rs
Normal file
179
crates/worker-runtime/src/catalog.rs
Normal file
|
|
@ -0,0 +1,179 @@
|
||||||
|
use crate::identity::{RuntimeId, WorkerId, WorkerRef};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
/// Intent supplied when a Worker is created.
|
||||||
|
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
|
#[serde(tag = "kind", rename_all = "snake_case")]
|
||||||
|
pub enum WorkerIntent {
|
||||||
|
Assistant {
|
||||||
|
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||||
|
purpose: Option<String>,
|
||||||
|
},
|
||||||
|
Task {
|
||||||
|
objective: String,
|
||||||
|
},
|
||||||
|
Role {
|
||||||
|
role: String,
|
||||||
|
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||||
|
purpose: Option<String>,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for WorkerIntent {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self::Assistant { purpose: None }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Profile selector boundary. This is a selector, not a resolved config bundle.
|
||||||
|
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
|
#[serde(tag = "kind", content = "value", rename_all = "snake_case")]
|
||||||
|
pub enum ProfileSelector {
|
||||||
|
RuntimeDefault,
|
||||||
|
Builtin(String),
|
||||||
|
Named(String),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for ProfileSelector {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self::RuntimeDefault
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Placeholder for future config-bundle synchronization.
|
||||||
|
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
|
pub struct ConfigBundleRef {
|
||||||
|
pub id: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Requested capability name plus optional human-readable reason.
|
||||||
|
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
|
pub struct CapabilityRequest {
|
||||||
|
pub name: String,
|
||||||
|
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||||
|
pub reason: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl CapabilityRequest {
|
||||||
|
pub fn named(name: impl Into<String>) -> Self {
|
||||||
|
Self {
|
||||||
|
name: name.into(),
|
||||||
|
reason: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Opaque workspace reference supplied by a caller.
|
||||||
|
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
|
pub struct WorkspaceRef {
|
||||||
|
pub name: String,
|
||||||
|
pub reference: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Opaque mount reference supplied by a caller.
|
||||||
|
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
|
pub struct MountRef {
|
||||||
|
pub name: String,
|
||||||
|
pub reference: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Worker creation request for the catalog/lifecycle API.
|
||||||
|
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
|
pub struct CreateWorkerRequest {
|
||||||
|
pub intent: WorkerIntent,
|
||||||
|
pub profile: ProfileSelector,
|
||||||
|
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||||
|
pub config_bundle: Option<ConfigBundleRef>,
|
||||||
|
#[serde(default, skip_serializing_if = "Vec::is_empty")]
|
||||||
|
pub requested_capabilities: Vec<CapabilityRequest>,
|
||||||
|
#[serde(default, skip_serializing_if = "Vec::is_empty")]
|
||||||
|
pub workspace_refs: Vec<WorkspaceRef>,
|
||||||
|
#[serde(default, skip_serializing_if = "Vec::is_empty")]
|
||||||
|
pub mount_refs: Vec<MountRef>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for CreateWorkerRequest {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
intent: WorkerIntent::default(),
|
||||||
|
profile: ProfileSelector::default(),
|
||||||
|
config_bundle: None,
|
||||||
|
requested_capabilities: Vec::new(),
|
||||||
|
workspace_refs: Vec::new(),
|
||||||
|
mount_refs: Vec::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl CreateWorkerRequest {
|
||||||
|
/// Create a tools-less Worker using runtime-local default resources.
|
||||||
|
pub fn tools_less(intent: WorkerIntent, profile: ProfileSelector) -> Self {
|
||||||
|
Self {
|
||||||
|
intent,
|
||||||
|
profile,
|
||||||
|
config_bundle: None,
|
||||||
|
requested_capabilities: Vec::new(),
|
||||||
|
workspace_refs: Vec::new(),
|
||||||
|
mount_refs: Vec::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Worker lifecycle status for the in-memory embedded runtime.
|
||||||
|
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
|
#[serde(rename_all = "snake_case")]
|
||||||
|
pub enum WorkerStatus {
|
||||||
|
Running,
|
||||||
|
Stopped,
|
||||||
|
Cancelled,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl WorkerStatus {
|
||||||
|
pub fn is_active(self) -> bool {
|
||||||
|
matches!(self, Self::Running)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Lightweight catalog row.
|
||||||
|
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
|
pub struct WorkerSummary {
|
||||||
|
pub worker_ref: WorkerRef,
|
||||||
|
pub runtime_id: RuntimeId,
|
||||||
|
pub worker_id: WorkerId,
|
||||||
|
pub status: WorkerStatus,
|
||||||
|
pub intent: WorkerIntent,
|
||||||
|
pub profile: ProfileSelector,
|
||||||
|
pub requested_capability_count: usize,
|
||||||
|
pub has_config_bundle: bool,
|
||||||
|
pub transcript_len: usize,
|
||||||
|
pub last_event_id: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Full Worker catalog/lifecycle detail.
|
||||||
|
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
|
pub struct WorkerDetail {
|
||||||
|
pub worker_ref: WorkerRef,
|
||||||
|
pub runtime_id: RuntimeId,
|
||||||
|
pub worker_id: WorkerId,
|
||||||
|
pub status: WorkerStatus,
|
||||||
|
pub intent: WorkerIntent,
|
||||||
|
pub profile: ProfileSelector,
|
||||||
|
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||||
|
pub config_bundle: Option<ConfigBundleRef>,
|
||||||
|
#[serde(default, skip_serializing_if = "Vec::is_empty")]
|
||||||
|
pub requested_capabilities: Vec<CapabilityRequest>,
|
||||||
|
#[serde(default, skip_serializing_if = "Vec::is_empty")]
|
||||||
|
pub workspace_refs: Vec<WorkspaceRef>,
|
||||||
|
#[serde(default, skip_serializing_if = "Vec::is_empty")]
|
||||||
|
pub mount_refs: Vec<MountRef>,
|
||||||
|
pub transcript_len: usize,
|
||||||
|
pub last_event_id: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Acknowledgement returned by stop/cancel lifecycle operations.
|
||||||
|
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
|
pub struct WorkerLifecycleAck {
|
||||||
|
pub worker_ref: WorkerRef,
|
||||||
|
pub status: WorkerStatus,
|
||||||
|
pub event_id: u64,
|
||||||
|
}
|
||||||
21
crates/worker-runtime/src/diagnostics.rs
Normal file
21
crates/worker-runtime/src/diagnostics.rs
Normal file
|
|
@ -0,0 +1,21 @@
|
||||||
|
use crate::identity::WorkerRef;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
|
#[serde(rename_all = "snake_case")]
|
||||||
|
pub enum DiagnosticSeverity {
|
||||||
|
Info,
|
||||||
|
Warning,
|
||||||
|
Error,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Runtime diagnostic emitted by memory-runtime operations.
|
||||||
|
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
|
pub struct RuntimeDiagnostic {
|
||||||
|
pub id: u64,
|
||||||
|
pub severity: DiagnosticSeverity,
|
||||||
|
pub code: String,
|
||||||
|
pub message: String,
|
||||||
|
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||||
|
pub worker_ref: Option<WorkerRef>,
|
||||||
|
}
|
||||||
38
crates/worker-runtime/src/error.rs
Normal file
38
crates/worker-runtime/src/error.rs
Normal file
|
|
@ -0,0 +1,38 @@
|
||||||
|
use crate::identity::{RuntimeId, WorkerId};
|
||||||
|
|
||||||
|
/// Errors returned by the embedded Runtime API.
|
||||||
|
#[derive(Debug, thiserror::Error)]
|
||||||
|
pub enum RuntimeError {
|
||||||
|
#[error("runtime {runtime_id} is stopped")]
|
||||||
|
RuntimeStopped { runtime_id: RuntimeId },
|
||||||
|
|
||||||
|
#[error(
|
||||||
|
"worker {worker_id} belongs to runtime {actual_runtime_id}, not runtime {expected_runtime_id}"
|
||||||
|
)]
|
||||||
|
WrongRuntime {
|
||||||
|
expected_runtime_id: RuntimeId,
|
||||||
|
actual_runtime_id: RuntimeId,
|
||||||
|
worker_id: WorkerId,
|
||||||
|
},
|
||||||
|
|
||||||
|
#[error("cursor belongs to runtime {actual_runtime_id}, not runtime {expected_runtime_id}")]
|
||||||
|
WrongRuntimeCursor {
|
||||||
|
expected_runtime_id: RuntimeId,
|
||||||
|
actual_runtime_id: RuntimeId,
|
||||||
|
},
|
||||||
|
|
||||||
|
#[error("worker {worker_id} was not found in runtime {runtime_id}")]
|
||||||
|
WorkerNotFound {
|
||||||
|
runtime_id: RuntimeId,
|
||||||
|
worker_id: WorkerId,
|
||||||
|
},
|
||||||
|
|
||||||
|
#[error("limit {requested} exceeds maximum {max}")]
|
||||||
|
LimitTooLarge { requested: usize, max: usize },
|
||||||
|
|
||||||
|
#[error("invalid request: {0}")]
|
||||||
|
InvalidRequest(String),
|
||||||
|
|
||||||
|
#[error("runtime state lock was poisoned")]
|
||||||
|
StatePoisoned,
|
||||||
|
}
|
||||||
82
crates/worker-runtime/src/identity.rs
Normal file
82
crates/worker-runtime/src/identity.rs
Normal file
|
|
@ -0,0 +1,82 @@
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use std::fmt;
|
||||||
|
|
||||||
|
/// Public Runtime identity.
|
||||||
|
///
|
||||||
|
/// This is the first half of Worker authority. Runtime APIs that operate on a
|
||||||
|
/// Worker require this id alongside a [`WorkerId`]; socket paths, session paths,
|
||||||
|
/// and display names are deliberately not authority.
|
||||||
|
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
|
||||||
|
#[serde(transparent)]
|
||||||
|
pub struct RuntimeId(String);
|
||||||
|
|
||||||
|
impl RuntimeId {
|
||||||
|
pub fn new(value: impl Into<String>) -> Option<Self> {
|
||||||
|
let value = value.into();
|
||||||
|
if value.trim().is_empty() {
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
Some(Self(value))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn generated(sequence: u64) -> Self {
|
||||||
|
Self(format!("runtime-mem-{sequence:016x}"))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn as_str(&self) -> &str {
|
||||||
|
&self.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Display for RuntimeId {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
self.0.fmt(f)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Runtime-local Worker identity.
|
||||||
|
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
|
||||||
|
#[serde(transparent)]
|
||||||
|
pub struct WorkerId(String);
|
||||||
|
|
||||||
|
impl WorkerId {
|
||||||
|
pub fn new(value: impl Into<String>) -> Option<Self> {
|
||||||
|
let value = value.into();
|
||||||
|
if value.trim().is_empty() {
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
Some(Self(value))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn generated(sequence: u64) -> Self {
|
||||||
|
Self(format!("worker-{sequence:08x}"))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn as_str(&self) -> &str {
|
||||||
|
&self.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Display for WorkerId {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
self.0.fmt(f)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Complete public authority reference for Worker operations.
|
||||||
|
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
|
||||||
|
pub struct WorkerRef {
|
||||||
|
pub runtime_id: RuntimeId,
|
||||||
|
pub worker_id: WorkerId,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl WorkerRef {
|
||||||
|
pub fn new(runtime_id: RuntimeId, worker_id: WorkerId) -> Self {
|
||||||
|
Self {
|
||||||
|
runtime_id,
|
||||||
|
worker_id,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
44
crates/worker-runtime/src/interaction.rs
Normal file
44
crates/worker-runtime/src/interaction.rs
Normal file
|
|
@ -0,0 +1,44 @@
|
||||||
|
use crate::catalog::WorkerStatus;
|
||||||
|
use crate::identity::WorkerRef;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
/// Input kind accepted by the embedded interaction API.
|
||||||
|
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
|
#[serde(rename_all = "snake_case")]
|
||||||
|
pub enum WorkerInputKind {
|
||||||
|
User,
|
||||||
|
System,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Worker input request. v0 stores the input in an in-memory transcript and
|
||||||
|
/// does not execute providers/tools.
|
||||||
|
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
|
pub struct WorkerInput {
|
||||||
|
pub kind: WorkerInputKind,
|
||||||
|
pub content: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl WorkerInput {
|
||||||
|
pub fn user(content: impl Into<String>) -> Self {
|
||||||
|
Self {
|
||||||
|
kind: WorkerInputKind::User,
|
||||||
|
content: content.into(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn system(content: impl Into<String>) -> Self {
|
||||||
|
Self {
|
||||||
|
kind: WorkerInputKind::System,
|
||||||
|
content: content.into(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Acknowledgement returned after input is accepted into the in-memory Worker.
|
||||||
|
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
|
pub struct WorkerInteractionAck {
|
||||||
|
pub worker_ref: WorkerRef,
|
||||||
|
pub status: WorkerStatus,
|
||||||
|
pub transcript_sequence: u64,
|
||||||
|
pub event_id: u64,
|
||||||
|
}
|
||||||
17
crates/worker-runtime/src/lib.rs
Normal file
17
crates/worker-runtime/src/lib.rs
Normal file
|
|
@ -0,0 +1,17 @@
|
||||||
|
//! Embedded Runtime domain API for Worker management.
|
||||||
|
//!
|
||||||
|
//! `worker-runtime` intentionally stays independent from HTTP/WebSocket servers,
|
||||||
|
//! filesystem persistence, provider execution, and the existing Worker host. It
|
||||||
|
//! defines the in-process Runtime authority surface that higher layers can later
|
||||||
|
//! adapt into registries or web APIs.
|
||||||
|
|
||||||
|
pub mod catalog;
|
||||||
|
pub mod diagnostics;
|
||||||
|
pub mod error;
|
||||||
|
pub mod identity;
|
||||||
|
pub mod interaction;
|
||||||
|
pub mod management;
|
||||||
|
pub mod observation;
|
||||||
|
mod runtime;
|
||||||
|
|
||||||
|
pub use runtime::Runtime;
|
||||||
66
crates/worker-runtime/src/management.rs
Normal file
66
crates/worker-runtime/src/management.rs
Normal file
|
|
@ -0,0 +1,66 @@
|
||||||
|
use crate::identity::RuntimeId;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
/// Runtime backend kind. v0 intentionally ships only an embedded memory backend.
|
||||||
|
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
|
#[serde(rename_all = "snake_case")]
|
||||||
|
pub enum RuntimeBackendKind {
|
||||||
|
Memory,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Runtime lifecycle state.
|
||||||
|
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
|
#[serde(rename_all = "snake_case")]
|
||||||
|
pub enum RuntimeStatus {
|
||||||
|
Running,
|
||||||
|
Stopped,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Guardrails for bounded observation/projection APIs.
|
||||||
|
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
|
pub struct RuntimeLimits {
|
||||||
|
pub max_transcript_projection_items: usize,
|
||||||
|
pub max_event_batch_items: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for RuntimeLimits {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
max_transcript_projection_items: 256,
|
||||||
|
max_event_batch_items: 256,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Options used to construct an embedded memory Runtime.
|
||||||
|
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
|
pub struct RuntimeOptions {
|
||||||
|
pub runtime_id: Option<RuntimeId>,
|
||||||
|
pub display_name: Option<String>,
|
||||||
|
pub limits: RuntimeLimits,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for RuntimeOptions {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
runtime_id: None,
|
||||||
|
display_name: None,
|
||||||
|
limits: RuntimeLimits::default(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Management-plane summary for a Runtime.
|
||||||
|
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
|
pub struct RuntimeSummary {
|
||||||
|
pub runtime_id: RuntimeId,
|
||||||
|
pub display_name: Option<String>,
|
||||||
|
pub backend: RuntimeBackendKind,
|
||||||
|
pub status: RuntimeStatus,
|
||||||
|
pub worker_count: usize,
|
||||||
|
pub active_worker_count: usize,
|
||||||
|
pub stopped_worker_count: usize,
|
||||||
|
pub cancelled_worker_count: usize,
|
||||||
|
pub diagnostic_count: usize,
|
||||||
|
pub limits: RuntimeLimits,
|
||||||
|
}
|
||||||
95
crates/worker-runtime/src/observation.rs
Normal file
95
crates/worker-runtime/src/observation.rs
Normal file
|
|
@ -0,0 +1,95 @@
|
||||||
|
use crate::identity::{RuntimeId, WorkerRef};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
/// Transcript role used by bounded projection.
|
||||||
|
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
|
#[serde(rename_all = "snake_case")]
|
||||||
|
pub enum TranscriptRole {
|
||||||
|
User,
|
||||||
|
System,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// One projected transcript item.
|
||||||
|
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
|
pub struct TranscriptEntry {
|
||||||
|
pub sequence: u64,
|
||||||
|
pub worker_ref: WorkerRef,
|
||||||
|
pub role: TranscriptRole,
|
||||||
|
pub content: String,
|
||||||
|
pub event_id: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Bounded transcript query.
|
||||||
|
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
|
pub struct TranscriptQuery {
|
||||||
|
pub start: usize,
|
||||||
|
pub limit: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TranscriptQuery {
|
||||||
|
pub fn new(start: usize, limit: usize) -> Self {
|
||||||
|
Self { start, limit }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Bounded transcript projection response.
|
||||||
|
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
|
pub struct TranscriptProjection {
|
||||||
|
pub worker_ref: WorkerRef,
|
||||||
|
pub start: usize,
|
||||||
|
pub limit: usize,
|
||||||
|
pub total_items: usize,
|
||||||
|
pub items: Vec<TranscriptEntry>,
|
||||||
|
pub next_start: Option<usize>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Event cursor. `next_event_id` is the first event id that should be returned
|
||||||
|
/// by the next poll.
|
||||||
|
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
|
pub struct EventCursor {
|
||||||
|
pub runtime_id: RuntimeId,
|
||||||
|
pub next_event_id: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Placeholder subscription handle for future streaming APIs. v0 is explicit
|
||||||
|
/// poll-only so HTTP/WS/SSE dependencies are not pulled into this crate.
|
||||||
|
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
|
pub struct EventSubscription {
|
||||||
|
pub runtime_id: RuntimeId,
|
||||||
|
pub cursor: EventCursor,
|
||||||
|
pub mode: EventSubscriptionMode,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
|
#[serde(rename_all = "snake_case")]
|
||||||
|
pub enum EventSubscriptionMode {
|
||||||
|
PollOnly,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
|
pub struct RuntimeEvent {
|
||||||
|
pub id: u64,
|
||||||
|
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||||
|
pub worker_ref: Option<WorkerRef>,
|
||||||
|
pub kind: RuntimeEventKind,
|
||||||
|
pub message: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
|
#[serde(tag = "kind", rename_all = "snake_case")]
|
||||||
|
pub enum RuntimeEventKind {
|
||||||
|
RuntimeStarted,
|
||||||
|
RuntimeStopped,
|
||||||
|
WorkerCreated,
|
||||||
|
WorkerInputAccepted,
|
||||||
|
WorkerStopped,
|
||||||
|
WorkerCancelled,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
|
pub struct RuntimeEventBatch {
|
||||||
|
pub runtime_id: RuntimeId,
|
||||||
|
pub cursor: EventCursor,
|
||||||
|
pub events: Vec<RuntimeEvent>,
|
||||||
|
pub has_more: bool,
|
||||||
|
}
|
||||||
762
crates/worker-runtime/src/runtime.rs
Normal file
762
crates/worker-runtime/src/runtime.rs
Normal file
|
|
@ -0,0 +1,762 @@
|
||||||
|
use crate::catalog::{
|
||||||
|
CreateWorkerRequest, WorkerDetail, WorkerLifecycleAck, WorkerStatus, WorkerSummary,
|
||||||
|
};
|
||||||
|
use crate::diagnostics::{DiagnosticSeverity, RuntimeDiagnostic};
|
||||||
|
use crate::error::RuntimeError;
|
||||||
|
use crate::identity::{RuntimeId, WorkerId, WorkerRef};
|
||||||
|
use crate::interaction::{WorkerInput, WorkerInputKind, WorkerInteractionAck};
|
||||||
|
use crate::management::{
|
||||||
|
RuntimeBackendKind, RuntimeLimits, RuntimeOptions, RuntimeStatus, RuntimeSummary,
|
||||||
|
};
|
||||||
|
use crate::observation::{
|
||||||
|
EventCursor, EventSubscription, EventSubscriptionMode, RuntimeEvent, RuntimeEventBatch,
|
||||||
|
RuntimeEventKind, TranscriptEntry, TranscriptProjection, TranscriptQuery, TranscriptRole,
|
||||||
|
};
|
||||||
|
use std::collections::BTreeMap;
|
||||||
|
use std::sync::atomic::{AtomicU64, Ordering};
|
||||||
|
use std::sync::{Arc, Mutex, MutexGuard};
|
||||||
|
|
||||||
|
static NEXT_RUNTIME_SEQUENCE: AtomicU64 = AtomicU64::new(1);
|
||||||
|
|
||||||
|
/// Concrete embedded Runtime domain entity.
|
||||||
|
///
|
||||||
|
/// The current implementation is memory-backed and tools/provider-less by
|
||||||
|
/// design. It provides a typed API boundary that can later be adapted by
|
||||||
|
/// backend registries or web servers without making sockets, sessions, or paths
|
||||||
|
/// public authority.
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
|
pub struct Runtime {
|
||||||
|
inner: Arc<Mutex<RuntimeState>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Runtime {
|
||||||
|
/// Create a memory-backed Runtime with generated identity and default limits.
|
||||||
|
pub fn new_memory() -> Self {
|
||||||
|
Self::with_options(RuntimeOptions::default())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create a memory-backed Runtime with explicit options.
|
||||||
|
pub fn with_options(options: RuntimeOptions) -> Self {
|
||||||
|
let runtime_id = options.runtime_id.unwrap_or_else(|| {
|
||||||
|
RuntimeId::generated(NEXT_RUNTIME_SEQUENCE.fetch_add(1, Ordering::Relaxed))
|
||||||
|
});
|
||||||
|
let mut state = RuntimeState::new(runtime_id, options.display_name, options.limits);
|
||||||
|
state.push_event(None, RuntimeEventKind::RuntimeStarted, "runtime started");
|
||||||
|
Self {
|
||||||
|
inner: Arc::new(Mutex::new(state)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Runtime id half of public Worker authority.
|
||||||
|
pub fn runtime_id(&self) -> Result<RuntimeId, RuntimeError> {
|
||||||
|
Ok(self.lock()?.runtime_id.clone())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Management-plane summary.
|
||||||
|
pub fn summary(&self) -> Result<RuntimeSummary, RuntimeError> {
|
||||||
|
let state = self.lock()?;
|
||||||
|
let mut active_worker_count = 0;
|
||||||
|
let mut stopped_worker_count = 0;
|
||||||
|
let mut cancelled_worker_count = 0;
|
||||||
|
for worker in state.workers.values() {
|
||||||
|
match worker.status {
|
||||||
|
WorkerStatus::Running => active_worker_count += 1,
|
||||||
|
WorkerStatus::Stopped => stopped_worker_count += 1,
|
||||||
|
WorkerStatus::Cancelled => cancelled_worker_count += 1,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(RuntimeSummary {
|
||||||
|
runtime_id: state.runtime_id.clone(),
|
||||||
|
display_name: state.display_name.clone(),
|
||||||
|
backend: state.backend,
|
||||||
|
status: state.status,
|
||||||
|
worker_count: state.workers.len(),
|
||||||
|
active_worker_count,
|
||||||
|
stopped_worker_count,
|
||||||
|
cancelled_worker_count,
|
||||||
|
diagnostic_count: state.diagnostics.len(),
|
||||||
|
limits: state.limits.clone(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Current Runtime lifecycle state.
|
||||||
|
pub fn status(&self) -> Result<RuntimeStatus, RuntimeError> {
|
||||||
|
Ok(self.lock()?.status)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Stop the Runtime. v0 keeps data readable after stop, but rejects new
|
||||||
|
/// create/send/worker lifecycle mutations.
|
||||||
|
pub fn stop_runtime(&self) -> Result<u64, RuntimeError> {
|
||||||
|
let mut state = self.lock()?;
|
||||||
|
if state.status == RuntimeStatus::Stopped {
|
||||||
|
return Ok(state.last_event_id());
|
||||||
|
}
|
||||||
|
state.status = RuntimeStatus::Stopped;
|
||||||
|
let runtime_id = state.runtime_id.clone();
|
||||||
|
for worker in state.workers.values_mut() {
|
||||||
|
if worker.status.is_active() {
|
||||||
|
worker.status = WorkerStatus::Stopped;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(state.push_event(
|
||||||
|
None,
|
||||||
|
RuntimeEventKind::RuntimeStopped,
|
||||||
|
format!("runtime {runtime_id} stopped"),
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create a Worker in the embedded catalog.
|
||||||
|
pub fn create_worker(
|
||||||
|
&self,
|
||||||
|
request: CreateWorkerRequest,
|
||||||
|
) -> Result<WorkerDetail, RuntimeError> {
|
||||||
|
let mut state = self.lock()?;
|
||||||
|
state.ensure_running()?;
|
||||||
|
validate_create_worker_request(&request)?;
|
||||||
|
|
||||||
|
let worker_id = WorkerId::generated(state.next_worker_sequence);
|
||||||
|
state.next_worker_sequence += 1;
|
||||||
|
let worker_ref = WorkerRef::new(state.runtime_id.clone(), worker_id.clone());
|
||||||
|
let event_id = state.push_event(
|
||||||
|
Some(worker_ref.clone()),
|
||||||
|
RuntimeEventKind::WorkerCreated,
|
||||||
|
format!("worker {worker_id} created"),
|
||||||
|
);
|
||||||
|
|
||||||
|
let record = WorkerRecord {
|
||||||
|
worker_ref,
|
||||||
|
worker_id: worker_id.clone(),
|
||||||
|
status: WorkerStatus::Running,
|
||||||
|
request,
|
||||||
|
transcript: Vec::new(),
|
||||||
|
next_transcript_sequence: 1,
|
||||||
|
last_event_id: event_id,
|
||||||
|
};
|
||||||
|
let detail = record.detail(&state.runtime_id);
|
||||||
|
state.emit_create_diagnostics(&detail);
|
||||||
|
state.workers.insert(worker_id, record);
|
||||||
|
Ok(detail)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// List Workers known to this Runtime.
|
||||||
|
pub fn list_workers(&self) -> Result<Vec<WorkerSummary>, RuntimeError> {
|
||||||
|
let state = self.lock()?;
|
||||||
|
Ok(state
|
||||||
|
.workers
|
||||||
|
.values()
|
||||||
|
.map(|worker| worker.summary(&state.runtime_id))
|
||||||
|
.collect())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Fetch Worker detail. The supplied [`WorkerRef`] must match this Runtime.
|
||||||
|
pub fn worker_detail(&self, worker_ref: &WorkerRef) -> Result<WorkerDetail, RuntimeError> {
|
||||||
|
let state = self.lock()?;
|
||||||
|
let worker = state.worker(worker_ref)?;
|
||||||
|
Ok(worker.detail(&state.runtime_id))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Accept input into a Worker transcript.
|
||||||
|
pub fn send_input(
|
||||||
|
&self,
|
||||||
|
worker_ref: &WorkerRef,
|
||||||
|
input: WorkerInput,
|
||||||
|
) -> Result<WorkerInteractionAck, RuntimeError> {
|
||||||
|
let mut state = self.lock()?;
|
||||||
|
state.ensure_running()?;
|
||||||
|
validate_worker_input(&input)?;
|
||||||
|
state.ensure_worker_ref(worker_ref)?;
|
||||||
|
{
|
||||||
|
let worker = state.worker(worker_ref)?;
|
||||||
|
if !worker.status.is_active() {
|
||||||
|
return Err(RuntimeError::InvalidRequest(format!(
|
||||||
|
"worker {} is not running",
|
||||||
|
worker_ref.worker_id
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let event_id = state.push_event(
|
||||||
|
Some(worker_ref.clone()),
|
||||||
|
RuntimeEventKind::WorkerInputAccepted,
|
||||||
|
"worker input accepted",
|
||||||
|
);
|
||||||
|
let worker = state.worker_mut(worker_ref)?;
|
||||||
|
|
||||||
|
let role = match input.kind {
|
||||||
|
WorkerInputKind::User => TranscriptRole::User,
|
||||||
|
WorkerInputKind::System => TranscriptRole::System,
|
||||||
|
};
|
||||||
|
let transcript_sequence = worker.next_transcript_sequence;
|
||||||
|
worker.next_transcript_sequence += 1;
|
||||||
|
worker.last_event_id = event_id;
|
||||||
|
worker.transcript.push(TranscriptEntry {
|
||||||
|
sequence: transcript_sequence,
|
||||||
|
worker_ref: worker_ref.clone(),
|
||||||
|
role,
|
||||||
|
content: input.content,
|
||||||
|
event_id,
|
||||||
|
});
|
||||||
|
|
||||||
|
Ok(WorkerInteractionAck {
|
||||||
|
worker_ref: worker_ref.clone(),
|
||||||
|
status: worker.status,
|
||||||
|
transcript_sequence,
|
||||||
|
event_id,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Stop a Worker. Repeated stops are idempotent and return the last event id.
|
||||||
|
pub fn stop_worker(
|
||||||
|
&self,
|
||||||
|
worker_ref: &WorkerRef,
|
||||||
|
reason: Option<String>,
|
||||||
|
) -> Result<WorkerLifecycleAck, RuntimeError> {
|
||||||
|
self.transition_worker(
|
||||||
|
worker_ref,
|
||||||
|
WorkerStatus::Stopped,
|
||||||
|
RuntimeEventKind::WorkerStopped,
|
||||||
|
reason.unwrap_or_else(|| "worker stopped".to_string()),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Cancel a Worker. Repeated cancels are idempotent and return the last event id.
|
||||||
|
pub fn cancel_worker(
|
||||||
|
&self,
|
||||||
|
worker_ref: &WorkerRef,
|
||||||
|
reason: Option<String>,
|
||||||
|
) -> Result<WorkerLifecycleAck, RuntimeError> {
|
||||||
|
self.transition_worker(
|
||||||
|
worker_ref,
|
||||||
|
WorkerStatus::Cancelled,
|
||||||
|
RuntimeEventKind::WorkerCancelled,
|
||||||
|
reason.unwrap_or_else(|| "worker cancelled".to_string()),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Bounded transcript projection for a Worker.
|
||||||
|
pub fn transcript_projection(
|
||||||
|
&self,
|
||||||
|
worker_ref: &WorkerRef,
|
||||||
|
query: TranscriptQuery,
|
||||||
|
) -> Result<TranscriptProjection, RuntimeError> {
|
||||||
|
let state = self.lock()?;
|
||||||
|
if query.limit > state.limits.max_transcript_projection_items {
|
||||||
|
return Err(RuntimeError::LimitTooLarge {
|
||||||
|
requested: query.limit,
|
||||||
|
max: state.limits.max_transcript_projection_items,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
let worker = state.worker(worker_ref)?;
|
||||||
|
let total_items = worker.transcript.len();
|
||||||
|
let end = query.start.saturating_add(query.limit).min(total_items);
|
||||||
|
let items = if query.start >= total_items {
|
||||||
|
Vec::new()
|
||||||
|
} else {
|
||||||
|
worker.transcript[query.start..end].to_vec()
|
||||||
|
};
|
||||||
|
let next_start = (end < total_items).then_some(end);
|
||||||
|
Ok(TranscriptProjection {
|
||||||
|
worker_ref: worker_ref.clone(),
|
||||||
|
start: query.start,
|
||||||
|
limit: query.limit,
|
||||||
|
total_items,
|
||||||
|
items,
|
||||||
|
next_start,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Cursor pointing to the beginning of Runtime events.
|
||||||
|
pub fn event_cursor_from_start(&self) -> Result<EventCursor, RuntimeError> {
|
||||||
|
let state = self.lock()?;
|
||||||
|
Ok(EventCursor {
|
||||||
|
runtime_id: state.runtime_id.clone(),
|
||||||
|
next_event_id: 1,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Cursor pointing after the current last event.
|
||||||
|
pub fn event_cursor_now(&self) -> Result<EventCursor, RuntimeError> {
|
||||||
|
let state = self.lock()?;
|
||||||
|
Ok(EventCursor {
|
||||||
|
runtime_id: state.runtime_id.clone(),
|
||||||
|
next_event_id: state.last_event_id() + 1,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Poll Runtime events from a cursor.
|
||||||
|
pub fn read_events(
|
||||||
|
&self,
|
||||||
|
cursor: &EventCursor,
|
||||||
|
limit: usize,
|
||||||
|
) -> Result<RuntimeEventBatch, RuntimeError> {
|
||||||
|
let state = self.lock()?;
|
||||||
|
if cursor.runtime_id != state.runtime_id {
|
||||||
|
return Err(RuntimeError::WrongRuntimeCursor {
|
||||||
|
expected_runtime_id: state.runtime_id.clone(),
|
||||||
|
actual_runtime_id: cursor.runtime_id.clone(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
if limit > state.limits.max_event_batch_items {
|
||||||
|
return Err(RuntimeError::LimitTooLarge {
|
||||||
|
requested: limit,
|
||||||
|
max: state.limits.max_event_batch_items,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut events = Vec::new();
|
||||||
|
for event in state
|
||||||
|
.events
|
||||||
|
.iter()
|
||||||
|
.filter(|event| event.id >= cursor.next_event_id)
|
||||||
|
.take(limit)
|
||||||
|
{
|
||||||
|
events.push(event.clone());
|
||||||
|
}
|
||||||
|
let next_event_id = events
|
||||||
|
.last()
|
||||||
|
.map(|event| event.id + 1)
|
||||||
|
.unwrap_or(cursor.next_event_id);
|
||||||
|
let has_more = state.events.iter().any(|event| event.id >= next_event_id);
|
||||||
|
Ok(RuntimeEventBatch {
|
||||||
|
runtime_id: state.runtime_id.clone(),
|
||||||
|
cursor: EventCursor {
|
||||||
|
runtime_id: state.runtime_id.clone(),
|
||||||
|
next_event_id,
|
||||||
|
},
|
||||||
|
events,
|
||||||
|
has_more,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create a poll-only placeholder subscription boundary for future streaming.
|
||||||
|
pub fn subscribe_events(&self, cursor: EventCursor) -> Result<EventSubscription, RuntimeError> {
|
||||||
|
let state = self.lock()?;
|
||||||
|
if cursor.runtime_id != state.runtime_id {
|
||||||
|
return Err(RuntimeError::WrongRuntimeCursor {
|
||||||
|
expected_runtime_id: state.runtime_id.clone(),
|
||||||
|
actual_runtime_id: cursor.runtime_id,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
Ok(EventSubscription {
|
||||||
|
runtime_id: state.runtime_id.clone(),
|
||||||
|
cursor,
|
||||||
|
mode: EventSubscriptionMode::PollOnly,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Snapshot current diagnostics.
|
||||||
|
pub fn diagnostics(&self) -> Result<Vec<RuntimeDiagnostic>, RuntimeError> {
|
||||||
|
Ok(self.lock()?.diagnostics.clone())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn transition_worker(
|
||||||
|
&self,
|
||||||
|
worker_ref: &WorkerRef,
|
||||||
|
status: WorkerStatus,
|
||||||
|
event_kind: RuntimeEventKind,
|
||||||
|
reason: String,
|
||||||
|
) -> Result<WorkerLifecycleAck, RuntimeError> {
|
||||||
|
let mut state = self.lock()?;
|
||||||
|
state.ensure_running()?;
|
||||||
|
state.ensure_worker_ref(worker_ref)?;
|
||||||
|
|
||||||
|
let already_status = {
|
||||||
|
let worker = state.worker(worker_ref)?;
|
||||||
|
worker.status == status
|
||||||
|
};
|
||||||
|
if already_status {
|
||||||
|
let worker = state.worker(worker_ref)?;
|
||||||
|
return Ok(WorkerLifecycleAck {
|
||||||
|
worker_ref: worker_ref.clone(),
|
||||||
|
status: worker.status,
|
||||||
|
event_id: worker.last_event_id,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
let event_id = state.push_event(Some(worker_ref.clone()), event_kind, reason);
|
||||||
|
let worker = state.worker_mut(worker_ref)?;
|
||||||
|
worker.status = status;
|
||||||
|
worker.last_event_id = event_id;
|
||||||
|
Ok(WorkerLifecycleAck {
|
||||||
|
worker_ref: worker_ref.clone(),
|
||||||
|
status: worker.status,
|
||||||
|
event_id,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn lock(&self) -> Result<MutexGuard<'_, RuntimeState>, RuntimeError> {
|
||||||
|
self.inner.lock().map_err(|_| RuntimeError::StatePoisoned)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct RuntimeState {
|
||||||
|
runtime_id: RuntimeId,
|
||||||
|
display_name: Option<String>,
|
||||||
|
backend: RuntimeBackendKind,
|
||||||
|
status: RuntimeStatus,
|
||||||
|
limits: RuntimeLimits,
|
||||||
|
next_worker_sequence: u64,
|
||||||
|
next_event_id: u64,
|
||||||
|
next_diagnostic_id: u64,
|
||||||
|
workers: BTreeMap<WorkerId, WorkerRecord>,
|
||||||
|
events: Vec<RuntimeEvent>,
|
||||||
|
diagnostics: Vec<RuntimeDiagnostic>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RuntimeState {
|
||||||
|
fn new(runtime_id: RuntimeId, display_name: Option<String>, limits: RuntimeLimits) -> Self {
|
||||||
|
Self {
|
||||||
|
runtime_id,
|
||||||
|
display_name,
|
||||||
|
backend: RuntimeBackendKind::Memory,
|
||||||
|
status: RuntimeStatus::Running,
|
||||||
|
limits,
|
||||||
|
next_worker_sequence: 1,
|
||||||
|
next_event_id: 1,
|
||||||
|
next_diagnostic_id: 1,
|
||||||
|
workers: BTreeMap::new(),
|
||||||
|
events: Vec::new(),
|
||||||
|
diagnostics: Vec::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn ensure_running(&self) -> Result<(), RuntimeError> {
|
||||||
|
if self.status == RuntimeStatus::Stopped {
|
||||||
|
Err(RuntimeError::RuntimeStopped {
|
||||||
|
runtime_id: self.runtime_id.clone(),
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn ensure_worker_ref(&self, worker_ref: &WorkerRef) -> Result<(), RuntimeError> {
|
||||||
|
if worker_ref.runtime_id != self.runtime_id {
|
||||||
|
return Err(RuntimeError::WrongRuntime {
|
||||||
|
expected_runtime_id: self.runtime_id.clone(),
|
||||||
|
actual_runtime_id: worker_ref.runtime_id.clone(),
|
||||||
|
worker_id: worker_ref.worker_id.clone(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
if !self.workers.contains_key(&worker_ref.worker_id) {
|
||||||
|
return Err(RuntimeError::WorkerNotFound {
|
||||||
|
runtime_id: self.runtime_id.clone(),
|
||||||
|
worker_id: worker_ref.worker_id.clone(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn worker(&self, worker_ref: &WorkerRef) -> Result<&WorkerRecord, RuntimeError> {
|
||||||
|
self.ensure_worker_ref(worker_ref)?;
|
||||||
|
self.workers
|
||||||
|
.get(&worker_ref.worker_id)
|
||||||
|
.ok_or_else(|| RuntimeError::WorkerNotFound {
|
||||||
|
runtime_id: self.runtime_id.clone(),
|
||||||
|
worker_id: worker_ref.worker_id.clone(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn worker_mut(&mut self, worker_ref: &WorkerRef) -> Result<&mut WorkerRecord, RuntimeError> {
|
||||||
|
self.ensure_worker_ref(worker_ref)?;
|
||||||
|
self.workers
|
||||||
|
.get_mut(&worker_ref.worker_id)
|
||||||
|
.ok_or_else(|| RuntimeError::WorkerNotFound {
|
||||||
|
runtime_id: self.runtime_id.clone(),
|
||||||
|
worker_id: worker_ref.worker_id.clone(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn push_event(
|
||||||
|
&mut self,
|
||||||
|
worker_ref: Option<WorkerRef>,
|
||||||
|
kind: RuntimeEventKind,
|
||||||
|
message: impl Into<String>,
|
||||||
|
) -> u64 {
|
||||||
|
let id = self.next_event_id;
|
||||||
|
self.next_event_id += 1;
|
||||||
|
self.events.push(RuntimeEvent {
|
||||||
|
id,
|
||||||
|
worker_ref,
|
||||||
|
kind,
|
||||||
|
message: message.into(),
|
||||||
|
});
|
||||||
|
id
|
||||||
|
}
|
||||||
|
|
||||||
|
fn last_event_id(&self) -> u64 {
|
||||||
|
self.next_event_id.saturating_sub(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn push_diagnostic(
|
||||||
|
&mut self,
|
||||||
|
severity: DiagnosticSeverity,
|
||||||
|
code: impl Into<String>,
|
||||||
|
message: impl Into<String>,
|
||||||
|
worker_ref: Option<WorkerRef>,
|
||||||
|
) {
|
||||||
|
let id = self.next_diagnostic_id;
|
||||||
|
self.next_diagnostic_id += 1;
|
||||||
|
self.diagnostics.push(RuntimeDiagnostic {
|
||||||
|
id,
|
||||||
|
severity,
|
||||||
|
code: code.into(),
|
||||||
|
message: message.into(),
|
||||||
|
worker_ref,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
fn emit_create_diagnostics(&mut self, detail: &WorkerDetail) {
|
||||||
|
if detail.config_bundle.is_none() {
|
||||||
|
self.push_diagnostic(
|
||||||
|
DiagnosticSeverity::Info,
|
||||||
|
"runtime.local_default_resources",
|
||||||
|
"worker created without ConfigBundleRef; runtime-local defaults are assumed",
|
||||||
|
Some(detail.worker_ref.clone()),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
if detail.requested_capabilities.is_empty() {
|
||||||
|
self.push_diagnostic(
|
||||||
|
DiagnosticSeverity::Info,
|
||||||
|
"worker.tools_less",
|
||||||
|
"worker created without requested tool capabilities",
|
||||||
|
Some(detail.worker_ref.clone()),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct WorkerRecord {
|
||||||
|
worker_ref: WorkerRef,
|
||||||
|
worker_id: WorkerId,
|
||||||
|
status: WorkerStatus,
|
||||||
|
request: CreateWorkerRequest,
|
||||||
|
transcript: Vec<TranscriptEntry>,
|
||||||
|
next_transcript_sequence: u64,
|
||||||
|
last_event_id: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl WorkerRecord {
|
||||||
|
fn summary(&self, runtime_id: &RuntimeId) -> WorkerSummary {
|
||||||
|
WorkerSummary {
|
||||||
|
worker_ref: self.worker_ref.clone(),
|
||||||
|
runtime_id: runtime_id.clone(),
|
||||||
|
worker_id: self.worker_id.clone(),
|
||||||
|
status: self.status,
|
||||||
|
intent: self.request.intent.clone(),
|
||||||
|
profile: self.request.profile.clone(),
|
||||||
|
requested_capability_count: self.request.requested_capabilities.len(),
|
||||||
|
has_config_bundle: self.request.config_bundle.is_some(),
|
||||||
|
transcript_len: self.transcript.len(),
|
||||||
|
last_event_id: self.last_event_id,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn detail(&self, runtime_id: &RuntimeId) -> WorkerDetail {
|
||||||
|
WorkerDetail {
|
||||||
|
worker_ref: self.worker_ref.clone(),
|
||||||
|
runtime_id: runtime_id.clone(),
|
||||||
|
worker_id: self.worker_id.clone(),
|
||||||
|
status: self.status,
|
||||||
|
intent: self.request.intent.clone(),
|
||||||
|
profile: self.request.profile.clone(),
|
||||||
|
config_bundle: self.request.config_bundle.clone(),
|
||||||
|
requested_capabilities: self.request.requested_capabilities.clone(),
|
||||||
|
workspace_refs: self.request.workspace_refs.clone(),
|
||||||
|
mount_refs: self.request.mount_refs.clone(),
|
||||||
|
transcript_len: self.transcript.len(),
|
||||||
|
last_event_id: self.last_event_id,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn validate_create_worker_request(request: &CreateWorkerRequest) -> Result<(), RuntimeError> {
|
||||||
|
if let crate::catalog::WorkerIntent::Task { objective } = &request.intent {
|
||||||
|
if objective.trim().is_empty() {
|
||||||
|
return Err(RuntimeError::InvalidRequest(
|
||||||
|
"task objective must not be empty".to_string(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for capability in &request.requested_capabilities {
|
||||||
|
if capability.name.trim().is_empty() {
|
||||||
|
return Err(RuntimeError::InvalidRequest(
|
||||||
|
"capability name must not be empty".to_string(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn validate_worker_input(input: &WorkerInput) -> Result<(), RuntimeError> {
|
||||||
|
if input.content.trim().is_empty() {
|
||||||
|
return Err(RuntimeError::InvalidRequest(
|
||||||
|
"worker input content must not be empty".to_string(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use crate::catalog::{CapabilityRequest, ConfigBundleRef, ProfileSelector, WorkerIntent};
|
||||||
|
use crate::management::RuntimeLimits;
|
||||||
|
|
||||||
|
fn task_request(objective: &str) -> CreateWorkerRequest {
|
||||||
|
CreateWorkerRequest {
|
||||||
|
intent: WorkerIntent::Task {
|
||||||
|
objective: objective.to_string(),
|
||||||
|
},
|
||||||
|
profile: ProfileSelector::Builtin("builtin:coder".to_string()),
|
||||||
|
config_bundle: Some(ConfigBundleRef {
|
||||||
|
id: "bundle-1".to_string(),
|
||||||
|
}),
|
||||||
|
requested_capabilities: vec![CapabilityRequest::named("read")],
|
||||||
|
workspace_refs: Vec::new(),
|
||||||
|
mount_refs: Vec::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn create_list_and_detail_preserve_runtime_worker_authority() {
|
||||||
|
let runtime = Runtime::new_memory();
|
||||||
|
let detail = runtime.create_worker(task_request("implement v0")).unwrap();
|
||||||
|
|
||||||
|
assert_eq!(detail.worker_ref.runtime_id, runtime.runtime_id().unwrap());
|
||||||
|
assert_eq!(detail.status, WorkerStatus::Running);
|
||||||
|
assert!(detail.config_bundle.is_some());
|
||||||
|
|
||||||
|
let list = runtime.list_workers().unwrap();
|
||||||
|
assert_eq!(list.len(), 1);
|
||||||
|
assert_eq!(list[0].worker_ref, detail.worker_ref);
|
||||||
|
assert_eq!(list[0].requested_capability_count, 1);
|
||||||
|
|
||||||
|
let fetched = runtime.worker_detail(&detail.worker_ref).unwrap();
|
||||||
|
assert_eq!(fetched.worker_id, detail.worker_id);
|
||||||
|
assert_eq!(fetched.intent, detail.intent);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn rejects_worker_refs_from_another_runtime() {
|
||||||
|
let runtime_a = Runtime::new_memory();
|
||||||
|
let runtime_b = Runtime::new_memory();
|
||||||
|
let detail = runtime_a.create_worker(task_request("runtime a")).unwrap();
|
||||||
|
|
||||||
|
let err = runtime_b.worker_detail(&detail.worker_ref).unwrap_err();
|
||||||
|
assert!(matches!(err, RuntimeError::WrongRuntime { .. }));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn tools_less_worker_without_config_bundle_uses_local_defaults_and_diagnostics() {
|
||||||
|
let runtime = Runtime::new_memory();
|
||||||
|
let detail = runtime
|
||||||
|
.create_worker(CreateWorkerRequest::tools_less(
|
||||||
|
WorkerIntent::default(),
|
||||||
|
ProfileSelector::RuntimeDefault,
|
||||||
|
))
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
assert!(detail.config_bundle.is_none());
|
||||||
|
assert!(detail.requested_capabilities.is_empty());
|
||||||
|
let diagnostics = runtime.diagnostics().unwrap();
|
||||||
|
assert_eq!(diagnostics.len(), 2);
|
||||||
|
assert!(
|
||||||
|
diagnostics
|
||||||
|
.iter()
|
||||||
|
.any(|diagnostic| diagnostic.code == "runtime.local_default_resources")
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
diagnostics
|
||||||
|
.iter()
|
||||||
|
.any(|diagnostic| diagnostic.code == "worker.tools_less")
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn send_input_and_project_bounded_transcript() {
|
||||||
|
let runtime = Runtime::with_options(RuntimeOptions {
|
||||||
|
limits: RuntimeLimits {
|
||||||
|
max_transcript_projection_items: 2,
|
||||||
|
max_event_batch_items: 16,
|
||||||
|
},
|
||||||
|
..RuntimeOptions::default()
|
||||||
|
});
|
||||||
|
let detail = runtime.create_worker(task_request("chat")).unwrap();
|
||||||
|
|
||||||
|
let first = runtime
|
||||||
|
.send_input(&detail.worker_ref, WorkerInput::user("hello"))
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(first.transcript_sequence, 1);
|
||||||
|
runtime
|
||||||
|
.send_input(&detail.worker_ref, WorkerInput::system("note"))
|
||||||
|
.unwrap();
|
||||||
|
runtime
|
||||||
|
.send_input(&detail.worker_ref, WorkerInput::user("again"))
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let projection = runtime
|
||||||
|
.transcript_projection(&detail.worker_ref, TranscriptQuery::new(0, 2))
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(projection.total_items, 3);
|
||||||
|
assert_eq!(projection.items.len(), 2);
|
||||||
|
assert_eq!(projection.items[0].content, "hello");
|
||||||
|
assert_eq!(projection.items[1].role, TranscriptRole::System);
|
||||||
|
assert_eq!(projection.next_start, Some(2));
|
||||||
|
|
||||||
|
let err = runtime
|
||||||
|
.transcript_projection(&detail.worker_ref, TranscriptQuery::new(0, 3))
|
||||||
|
.unwrap_err();
|
||||||
|
assert!(matches!(err, RuntimeError::LimitTooLarge { .. }));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn stop_and_cancel_workers_update_summary() {
|
||||||
|
let runtime = Runtime::new_memory();
|
||||||
|
let stopped = runtime.create_worker(task_request("stop me")).unwrap();
|
||||||
|
let cancelled = runtime.create_worker(task_request("cancel me")).unwrap();
|
||||||
|
|
||||||
|
let stop_ack = runtime
|
||||||
|
.stop_worker(&stopped.worker_ref, Some("done".to_string()))
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(stop_ack.status, WorkerStatus::Stopped);
|
||||||
|
|
||||||
|
let cancel_ack = runtime
|
||||||
|
.cancel_worker(&cancelled.worker_ref, Some("abort".to_string()))
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(cancel_ack.status, WorkerStatus::Cancelled);
|
||||||
|
|
||||||
|
let summary = runtime.summary().unwrap();
|
||||||
|
assert_eq!(summary.worker_count, 2);
|
||||||
|
assert_eq!(summary.active_worker_count, 0);
|
||||||
|
assert_eq!(summary.stopped_worker_count, 1);
|
||||||
|
assert_eq!(summary.cancelled_worker_count, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn event_cursor_and_poll_only_subscription_are_bounded_placeholders() {
|
||||||
|
let runtime = Runtime::new_memory();
|
||||||
|
let cursor = runtime.event_cursor_from_start().unwrap();
|
||||||
|
let subscription = runtime.subscribe_events(cursor.clone()).unwrap();
|
||||||
|
assert_eq!(subscription.mode, EventSubscriptionMode::PollOnly);
|
||||||
|
|
||||||
|
let worker = runtime.create_worker(task_request("events")).unwrap();
|
||||||
|
runtime
|
||||||
|
.send_input(&worker.worker_ref, WorkerInput::user("eventful"))
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let batch = runtime.read_events(&cursor, 2).unwrap();
|
||||||
|
assert_eq!(batch.events.len(), 2);
|
||||||
|
assert!(batch.has_more);
|
||||||
|
assert_eq!(batch.events[0].kind, RuntimeEventKind::RuntimeStarted);
|
||||||
|
assert_eq!(batch.events[1].kind, RuntimeEventKind::WorkerCreated);
|
||||||
|
|
||||||
|
let next = runtime.read_events(&batch.cursor, 2).unwrap();
|
||||||
|
assert_eq!(next.events.len(), 1);
|
||||||
|
assert_eq!(next.events[0].kind, RuntimeEventKind::WorkerInputAccepted);
|
||||||
|
assert!(!next.has_more);
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue
Block a user