yoi/crates/worker-runtime/src/runtime.rs

1443 lines
51 KiB
Rust

use crate::catalog::{
CreateWorkerRequest, WorkerDetail, WorkerLifecycleAck, WorkerStatus, WorkerSummary,
};
use crate::diagnostics::{DiagnosticSeverity, RuntimeDiagnostic};
use crate::error::RuntimeError;
#[cfg(feature = "fs-store")]
use crate::fs_store::{
FsRuntimeStore, FsRuntimeStoreOptions, PersistedRuntimeState, PersistedWorkerRecord,
};
use crate::identity::{RuntimeId, WorkerId, WorkerRef};
use crate::interaction::{WorkerInput, WorkerInputKind, WorkerInteractionAck};
use crate::management::{
RuntimeBackendKind, RuntimeLimits, RuntimeOptions, RuntimeStatus, RuntimeSummary,
};
use crate::observation::{
EventCursor, EventSubscription, EventSubscriptionMode, RuntimeEvent, RuntimeEventBatch,
RuntimeEventKind, TranscriptEntry, TranscriptProjection, TranscriptQuery, TranscriptRole,
};
#[cfg(feature = "ws-server")]
use crate::observation::{WorkerObservationCursor, WorkerObservationEvent};
use std::collections::BTreeMap;
#[cfg(feature = "ws-server")]
use std::collections::VecDeque;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex, MutexGuard};
#[cfg(feature = "ws-server")]
use tokio::sync::broadcast;
static NEXT_RUNTIME_SEQUENCE: AtomicU64 = AtomicU64::new(1);
/// Concrete embedded Runtime domain entity.
///
/// The default implementation is memory-backed and tools/provider-less by
/// design. An optional `fs-store` feature adds filesystem persistence while
/// preserving the same typed authority boundary. It can later be adapted by
/// backend registries or web servers without making sockets, sessions, or paths
/// public authority.
#[derive(Clone, Debug)]
pub struct Runtime {
inner: Arc<Mutex<RuntimeState>>,
}
impl Runtime {
/// Create a memory-backed Runtime with generated identity and default limits.
pub fn new_memory() -> Self {
Self::with_options(RuntimeOptions::default())
}
/// Create a memory-backed Runtime with explicit options.
pub fn with_options(options: RuntimeOptions) -> Self {
let runtime_id = options.runtime_id.unwrap_or_else(|| {
RuntimeId::generated(NEXT_RUNTIME_SEQUENCE.fetch_add(1, Ordering::Relaxed))
});
let mut state = RuntimeState::new(runtime_id, options.display_name, options.limits);
state.push_event(None, RuntimeEventKind::RuntimeStarted, "runtime started");
Self {
inner: Arc::new(Mutex::new(state)),
}
}
/// Create or restore a filesystem-backed Runtime.
///
/// The store is scoped by typed Runtime identity under `options.root`; if the
/// Runtime directory already exists, persisted state is loaded and validated.
/// If it does not exist, a fresh Runtime is initialized and durable files are
/// created before the Runtime is returned.
#[cfg(feature = "fs-store")]
pub fn with_fs_store(options: FsRuntimeStoreOptions) -> Result<Self, RuntimeError> {
let runtime_id = options.runtime_id.unwrap_or_else(|| {
RuntimeId::generated(NEXT_RUNTIME_SEQUENCE.fetch_add(1, Ordering::Relaxed))
});
let opened = FsRuntimeStore::open_or_create(options.root, runtime_id.clone())?;
let state = if let Some(persisted) = opened.state {
RuntimeState::from_persisted(persisted, opened.store)?
} else {
let mut state = RuntimeState::new_fs_backed(
runtime_id,
options.display_name,
options.limits,
opened.store,
);
let event_id =
state.push_event(None, RuntimeEventKind::RuntimeStarted, "runtime started");
state.persist_runtime_snapshot()?;
state.persist_event_by_id(event_id)?;
state
};
Ok(Self {
inner: Arc::new(Mutex::new(state)),
})
}
/// Runtime id half of public Worker authority.
pub fn runtime_id(&self) -> Result<RuntimeId, RuntimeError> {
Ok(self.lock()?.runtime_id.clone())
}
/// Management-plane summary.
pub fn summary(&self) -> Result<RuntimeSummary, RuntimeError> {
let state = self.lock()?;
let mut active_worker_count = 0;
let mut stopped_worker_count = 0;
let mut cancelled_worker_count = 0;
for worker in state.workers.values() {
match worker.status {
WorkerStatus::Running => active_worker_count += 1,
WorkerStatus::Stopped => stopped_worker_count += 1,
WorkerStatus::Cancelled => cancelled_worker_count += 1,
}
}
Ok(RuntimeSummary {
runtime_id: state.runtime_id.clone(),
display_name: state.display_name.clone(),
backend: state.backend,
status: state.status,
worker_count: state.workers.len(),
active_worker_count,
stopped_worker_count,
cancelled_worker_count,
diagnostic_count: state.diagnostics.len(),
limits: state.limits.clone(),
})
}
/// Current Runtime lifecycle state.
pub fn status(&self) -> Result<RuntimeStatus, RuntimeError> {
Ok(self.lock()?.status)
}
/// Stop the Runtime. v0 keeps data readable after stop, but rejects new
/// create/send/worker lifecycle mutations.
pub fn stop_runtime(&self) -> Result<u64, RuntimeError> {
let mut state = self.lock()?;
if state.status == RuntimeStatus::Stopped {
return Ok(state.last_event_id());
}
state.status = RuntimeStatus::Stopped;
let runtime_id = state.runtime_id.clone();
for worker in state.workers.values_mut() {
if worker.status.is_active() {
worker.status = WorkerStatus::Stopped;
}
}
let event_id = state.push_event(
None,
RuntimeEventKind::RuntimeStopped,
format!("runtime {runtime_id} stopped"),
);
state.persist_runtime_snapshot()?;
state.persist_workers()?;
state.persist_event_by_id(event_id)?;
Ok(event_id)
}
/// Create a Worker in the embedded catalog.
pub fn create_worker(
&self,
request: CreateWorkerRequest,
) -> Result<WorkerDetail, RuntimeError> {
let mut state = self.lock()?;
state.ensure_running()?;
validate_create_worker_request(&request)?;
let worker_id = WorkerId::generated(state.next_worker_sequence);
state.next_worker_sequence += 1;
let worker_ref = WorkerRef::new(state.runtime_id.clone(), worker_id.clone());
let event_id = state.push_event(
Some(worker_ref.clone()),
RuntimeEventKind::WorkerCreated,
format!("worker {worker_id} created"),
);
let record = WorkerRecord {
worker_ref,
worker_id: worker_id.clone(),
status: WorkerStatus::Running,
request,
transcript: Vec::new(),
next_transcript_sequence: 1,
last_event_id: event_id,
};
let detail = record.detail(&state.runtime_id);
state.emit_create_diagnostics(&detail);
state.workers.insert(worker_id.clone(), record);
state.persist_runtime_snapshot()?;
state.persist_worker(&worker_id)?;
state.persist_event_by_id(event_id)?;
Ok(detail)
}
/// List Workers known to this Runtime.
pub fn list_workers(&self) -> Result<Vec<WorkerSummary>, RuntimeError> {
let state = self.lock()?;
Ok(state
.workers
.values()
.map(|worker| worker.summary(&state.runtime_id))
.collect())
}
/// Fetch Worker detail. The supplied [`WorkerRef`] must match this Runtime.
pub fn worker_detail(&self, worker_ref: &WorkerRef) -> Result<WorkerDetail, RuntimeError> {
let state = self.lock()?;
let worker = state.worker(worker_ref)?;
Ok(worker.detail(&state.runtime_id))
}
/// Accept input into a Worker transcript.
pub fn send_input(
&self,
worker_ref: &WorkerRef,
input: WorkerInput,
) -> Result<WorkerInteractionAck, RuntimeError> {
let mut state = self.lock()?;
state.ensure_running()?;
validate_worker_input(&input)?;
state.ensure_worker_ref(worker_ref)?;
{
let worker = state.worker(worker_ref)?;
if !worker.status.is_active() {
return Err(RuntimeError::InvalidRequest(format!(
"worker {} is not running",
worker_ref.worker_id
)));
}
}
let event_id = state.push_event(
Some(worker_ref.clone()),
RuntimeEventKind::WorkerInputAccepted,
"worker input accepted",
);
let worker = state.worker_mut(worker_ref)?;
let role = match input.kind {
WorkerInputKind::User => TranscriptRole::User,
WorkerInputKind::System => TranscriptRole::System,
};
let transcript_sequence = worker.next_transcript_sequence;
worker.next_transcript_sequence += 1;
worker.last_event_id = event_id;
worker.transcript.push(TranscriptEntry {
sequence: transcript_sequence,
worker_ref: worker_ref.clone(),
role,
content: input.content,
event_id,
});
let status = worker.status;
state.persist_runtime_snapshot()?;
state.persist_worker(&worker_ref.worker_id)?;
state.persist_event_by_id(event_id)?;
state.persist_transcript_entry(&worker_ref.worker_id, transcript_sequence)?;
Ok(WorkerInteractionAck {
worker_ref: worker_ref.clone(),
status,
transcript_sequence,
event_id,
})
}
/// Stop a Worker. Repeated stops are idempotent and return the last event id.
pub fn stop_worker(
&self,
worker_ref: &WorkerRef,
reason: Option<String>,
) -> Result<WorkerLifecycleAck, RuntimeError> {
self.transition_worker(
worker_ref,
WorkerStatus::Stopped,
RuntimeEventKind::WorkerStopped,
reason.unwrap_or_else(|| "worker stopped".to_string()),
)
}
/// Cancel a Worker. Repeated cancels are idempotent and return the last event id.
pub fn cancel_worker(
&self,
worker_ref: &WorkerRef,
reason: Option<String>,
) -> Result<WorkerLifecycleAck, RuntimeError> {
self.transition_worker(
worker_ref,
WorkerStatus::Cancelled,
RuntimeEventKind::WorkerCancelled,
reason.unwrap_or_else(|| "worker cancelled".to_string()),
)
}
/// Bounded transcript projection for a Worker.
pub fn transcript_projection(
&self,
worker_ref: &WorkerRef,
query: TranscriptQuery,
) -> Result<TranscriptProjection, RuntimeError> {
let state = self.lock()?;
if query.limit > state.limits.max_transcript_projection_items {
return Err(RuntimeError::LimitTooLarge {
requested: query.limit,
max: state.limits.max_transcript_projection_items,
});
}
let worker = state.worker(worker_ref)?;
let total_items = worker.transcript.len();
let end = query.start.saturating_add(query.limit).min(total_items);
let items = if query.start >= total_items {
Vec::new()
} else {
worker.transcript[query.start..end].to_vec()
};
let next_start = (end < total_items).then_some(end);
Ok(TranscriptProjection {
worker_ref: worker_ref.clone(),
start: query.start,
limit: query.limit,
total_items,
items,
next_start,
})
}
/// Cursor pointing to the beginning of Runtime events.
pub fn event_cursor_from_start(&self) -> Result<EventCursor, RuntimeError> {
let state = self.lock()?;
Ok(EventCursor {
runtime_id: state.runtime_id.clone(),
next_event_id: 1,
})
}
/// Cursor pointing after the current last event.
pub fn event_cursor_now(&self) -> Result<EventCursor, RuntimeError> {
let state = self.lock()?;
Ok(EventCursor {
runtime_id: state.runtime_id.clone(),
next_event_id: state.last_event_id() + 1,
})
}
/// Poll Runtime events from a cursor.
pub fn read_events(
&self,
cursor: &EventCursor,
limit: usize,
) -> Result<RuntimeEventBatch, RuntimeError> {
let state = self.lock()?;
if cursor.runtime_id != state.runtime_id {
return Err(RuntimeError::WrongRuntimeCursor {
expected_runtime_id: state.runtime_id.clone(),
actual_runtime_id: cursor.runtime_id.clone(),
});
}
if limit > state.limits.max_event_batch_items {
return Err(RuntimeError::LimitTooLarge {
requested: limit,
max: state.limits.max_event_batch_items,
});
}
let mut events = Vec::new();
for event in state
.events
.iter()
.filter(|event| event.id >= cursor.next_event_id)
.take(limit)
{
events.push(event.clone());
}
let next_event_id = events
.last()
.map(|event| event.id + 1)
.unwrap_or(cursor.next_event_id);
let has_more = state.events.iter().any(|event| event.id >= next_event_id);
Ok(RuntimeEventBatch {
runtime_id: state.runtime_id.clone(),
cursor: EventCursor {
runtime_id: state.runtime_id.clone(),
next_event_id,
},
events,
has_more,
})
}
/// Create a poll-only placeholder subscription boundary for future streaming.
pub fn subscribe_events(&self, cursor: EventCursor) -> Result<EventSubscription, RuntimeError> {
let state = self.lock()?;
if cursor.runtime_id != state.runtime_id {
return Err(RuntimeError::WrongRuntimeCursor {
expected_runtime_id: state.runtime_id.clone(),
actual_runtime_id: cursor.runtime_id,
});
}
Ok(EventSubscription {
runtime_id: state.runtime_id.clone(),
cursor,
mode: EventSubscriptionMode::PollOnly,
})
}
/// Cursor pointing after the current worker-scoped protocol observation event.
#[cfg(feature = "ws-server")]
pub fn worker_observation_cursor_now(
&self,
worker_ref: &WorkerRef,
) -> Result<WorkerObservationCursor, RuntimeError> {
let state = self.lock()?;
state.ensure_worker_ref(worker_ref)?;
let sequence = state
.observation_events
.iter()
.rev()
.find(|event| &event.worker_ref == worker_ref)
.map(|event| event.sequence)
.unwrap_or(0);
Ok(WorkerObservationCursor::new(sequence))
}
/// Build the current Worker Snapshot event used as the first observation frame.
#[cfg(feature = "ws-server")]
pub fn worker_observation_snapshot(
&self,
worker_ref: &WorkerRef,
) -> Result<protocol::Event, RuntimeError> {
let state = self.lock()?;
let _worker = state.worker(worker_ref)?;
Ok(protocol::Event::Snapshot {
entries: Vec::new(),
greeting: protocol::Greeting {
worker_name: worker_ref.worker_id.to_string(),
cwd: String::new(),
provider: "worker-runtime".to_string(),
model: "worker-runtime".to_string(),
scope_summary: "runtime worker observation".to_string(),
tools: Vec::new(),
context_window: 0,
context_tokens: 0,
},
status: protocol::WorkerStatus::Idle,
in_flight: protocol::InFlightSnapshot { blocks: Vec::new() },
})
}
/// Replay retained worker-scoped protocol observation events after a cursor.
#[cfg(feature = "ws-server")]
pub fn read_worker_observation_events(
&self,
worker_ref: &WorkerRef,
cursor: WorkerObservationCursor,
) -> Result<Vec<WorkerObservationEvent>, RuntimeError> {
let state = self.lock()?;
state.ensure_worker_ref(worker_ref)?;
state.validate_worker_observation_cursor(worker_ref, cursor)?;
Ok(state
.observation_events
.iter()
.filter(|event| &event.worker_ref == worker_ref && event.sequence > cursor.sequence)
.cloned()
.collect())
}
/// Subscribe to live protocol observation events.
#[cfg(feature = "ws-server")]
pub fn subscribe_worker_observation(
&self,
) -> Result<broadcast::Receiver<WorkerObservationEvent>, RuntimeError> {
Ok(self.lock()?.observation_tx.subscribe())
}
/// Append a Worker protocol event to the observation bus.
#[cfg(feature = "ws-server")]
pub fn observe_worker_event(
&self,
worker_ref: &WorkerRef,
payload: protocol::Event,
) -> Result<WorkerObservationEvent, RuntimeError> {
let mut state = self.lock()?;
state.ensure_worker_ref(worker_ref)?;
let event = state.push_worker_observation_event(worker_ref.clone(), payload);
Ok(event)
}
/// Snapshot current diagnostics.
pub fn diagnostics(&self) -> Result<Vec<RuntimeDiagnostic>, RuntimeError> {
Ok(self.lock()?.diagnostics.clone())
}
fn transition_worker(
&self,
worker_ref: &WorkerRef,
status: WorkerStatus,
event_kind: RuntimeEventKind,
reason: String,
) -> Result<WorkerLifecycleAck, RuntimeError> {
let mut state = self.lock()?;
state.ensure_running()?;
state.ensure_worker_ref(worker_ref)?;
{
let worker = state.worker(worker_ref)?;
if !worker.status.is_active() {
return Ok(WorkerLifecycleAck {
worker_ref: worker_ref.clone(),
status: worker.status,
event_id: worker.last_event_id,
});
}
}
let event_id = state.push_event(Some(worker_ref.clone()), event_kind, reason);
let worker = state.worker_mut(worker_ref)?;
worker.status = status;
worker.last_event_id = event_id;
let status = worker.status;
state.persist_runtime_snapshot()?;
state.persist_worker(&worker_ref.worker_id)?;
state.persist_event_by_id(event_id)?;
Ok(WorkerLifecycleAck {
worker_ref: worker_ref.clone(),
status,
event_id,
})
}
fn lock(&self) -> Result<MutexGuard<'_, RuntimeState>, RuntimeError> {
self.inner.lock().map_err(|_| RuntimeError::StatePoisoned)
}
}
#[cfg_attr(not(feature = "fs-store"), allow(dead_code))]
#[derive(Clone, Debug)]
enum RuntimePersistence {
Memory,
#[cfg(feature = "fs-store")]
Fs(FsRuntimeStore),
}
#[derive(Debug)]
struct RuntimeState {
runtime_id: RuntimeId,
display_name: Option<String>,
backend: RuntimeBackendKind,
#[cfg_attr(not(feature = "fs-store"), allow(dead_code))]
persistence: RuntimePersistence,
status: RuntimeStatus,
limits: RuntimeLimits,
next_worker_sequence: u64,
next_event_id: u64,
next_diagnostic_id: u64,
workers: BTreeMap<WorkerId, WorkerRecord>,
events: Vec<RuntimeEvent>,
diagnostics: Vec<RuntimeDiagnostic>,
#[cfg(feature = "ws-server")]
next_observation_sequence: u64,
#[cfg(feature = "ws-server")]
observation_events: VecDeque<WorkerObservationEvent>,
#[cfg(feature = "ws-server")]
observation_tx: broadcast::Sender<WorkerObservationEvent>,
}
impl RuntimeState {
fn new(runtime_id: RuntimeId, display_name: Option<String>, limits: RuntimeLimits) -> Self {
Self {
runtime_id,
display_name,
backend: RuntimeBackendKind::Memory,
persistence: RuntimePersistence::Memory,
status: RuntimeStatus::Running,
limits,
next_worker_sequence: 1,
next_event_id: 1,
next_diagnostic_id: 1,
workers: BTreeMap::new(),
events: Vec::new(),
diagnostics: Vec::new(),
#[cfg(feature = "ws-server")]
next_observation_sequence: 1,
#[cfg(feature = "ws-server")]
observation_events: VecDeque::new(),
#[cfg(feature = "ws-server")]
observation_tx: broadcast::channel(256).0,
}
}
#[cfg(feature = "fs-store")]
fn new_fs_backed(
runtime_id: RuntimeId,
display_name: Option<String>,
limits: RuntimeLimits,
store: FsRuntimeStore,
) -> Self {
Self {
runtime_id,
display_name,
backend: RuntimeBackendKind::FsStore,
persistence: RuntimePersistence::Fs(store),
status: RuntimeStatus::Running,
limits,
next_worker_sequence: 1,
next_event_id: 1,
next_diagnostic_id: 1,
workers: BTreeMap::new(),
events: Vec::new(),
diagnostics: Vec::new(),
#[cfg(feature = "ws-server")]
next_observation_sequence: 1,
#[cfg(feature = "ws-server")]
observation_events: VecDeque::new(),
#[cfg(feature = "ws-server")]
observation_tx: broadcast::channel(256).0,
}
}
#[cfg(feature = "fs-store")]
fn from_persisted(
persisted: PersistedRuntimeState,
store: FsRuntimeStore,
) -> Result<Self, RuntimeError> {
if persisted.runtime_id != *store.runtime_id() {
return Err(RuntimeError::StoreCorrupt {
operation: "restore runtime state",
path: store.runtime_dir().to_path_buf(),
message: format!(
"persisted runtime id {} does not match store runtime {}",
persisted.runtime_id,
store.runtime_id()
),
});
}
let mut workers = BTreeMap::new();
for (worker_id, worker) in persisted.workers {
workers.insert(
worker_id,
WorkerRecord {
worker_ref: worker.worker_ref,
worker_id: worker.worker_id,
status: worker.status,
request: worker.request,
transcript: worker.transcript,
next_transcript_sequence: worker.next_transcript_sequence,
last_event_id: worker.last_event_id,
},
);
}
Ok(Self {
runtime_id: persisted.runtime_id,
display_name: persisted.display_name,
backend: RuntimeBackendKind::FsStore,
persistence: RuntimePersistence::Fs(store),
status: persisted.status,
limits: persisted.limits,
next_worker_sequence: persisted.next_worker_sequence,
next_event_id: persisted.next_event_id,
next_diagnostic_id: persisted.next_diagnostic_id,
workers,
events: persisted.events,
diagnostics: persisted.diagnostics,
})
}
#[cfg(feature = "fs-store")]
fn persisted_state(&self) -> PersistedRuntimeState {
PersistedRuntimeState {
runtime_id: self.runtime_id.clone(),
display_name: self.display_name.clone(),
status: self.status,
limits: self.limits.clone(),
next_worker_sequence: self.next_worker_sequence,
next_event_id: self.next_event_id,
next_diagnostic_id: self.next_diagnostic_id,
workers: self
.workers
.iter()
.map(|(worker_id, worker)| (worker_id.clone(), worker.persisted_record()))
.collect(),
events: self.events.clone(),
diagnostics: self.diagnostics.clone(),
}
}
#[cfg(feature = "fs-store")]
fn fs_store(&self) -> Option<&FsRuntimeStore> {
match &self.persistence {
RuntimePersistence::Memory => None,
RuntimePersistence::Fs(store) => Some(store),
}
}
#[cfg(feature = "fs-store")]
fn persist_runtime_snapshot(&self) -> Result<(), RuntimeError> {
if let Some(store) = self.fs_store() {
store.write_runtime_snapshot(&self.persisted_state())?;
}
Ok(())
}
#[cfg(feature = "fs-store")]
fn persist_worker(&self, worker_id: &WorkerId) -> Result<(), RuntimeError> {
if let Some(store) = self.fs_store() {
let worker =
self.workers
.get(worker_id)
.ok_or_else(|| RuntimeError::WorkerNotFound {
runtime_id: self.runtime_id.clone(),
worker_id: worker_id.clone(),
})?;
store.write_worker_snapshot(&worker.persisted_record())?;
}
Ok(())
}
#[cfg(feature = "fs-store")]
fn persist_event_by_id(&self, event_id: u64) -> Result<(), RuntimeError> {
if let Some(store) = self.fs_store() {
let event = self
.events
.iter()
.find(|event| event.id == event_id)
.ok_or_else(|| RuntimeError::StoreCorrupt {
operation: "persist event",
path: store.runtime_dir().to_path_buf(),
message: format!("event {event_id} is missing from runtime state"),
})?;
store.append_event(event)?;
}
Ok(())
}
#[cfg(feature = "fs-store")]
fn persist_transcript_entry(
&self,
worker_id: &WorkerId,
sequence: u64,
) -> Result<(), RuntimeError> {
if let Some(store) = self.fs_store() {
let worker =
self.workers
.get(worker_id)
.ok_or_else(|| RuntimeError::WorkerNotFound {
runtime_id: self.runtime_id.clone(),
worker_id: worker_id.clone(),
})?;
let entry = worker
.transcript
.iter()
.find(|entry| entry.sequence == sequence)
.ok_or_else(|| RuntimeError::StoreCorrupt {
operation: "persist transcript",
path: store.runtime_dir().to_path_buf(),
message: format!(
"transcript sequence {sequence} is missing from worker {worker_id}"
),
})?;
store.append_transcript_entry(entry)?;
}
Ok(())
}
#[cfg(feature = "fs-store")]
fn persist_workers(&self) -> Result<(), RuntimeError> {
if self.fs_store().is_some() {
for worker_id in self.workers.keys() {
self.persist_worker(worker_id)?;
}
}
Ok(())
}
#[cfg(not(feature = "fs-store"))]
fn persist_runtime_snapshot(&self) -> Result<(), RuntimeError> {
Ok(())
}
#[cfg(not(feature = "fs-store"))]
fn persist_worker(&self, _worker_id: &WorkerId) -> Result<(), RuntimeError> {
Ok(())
}
#[cfg(not(feature = "fs-store"))]
fn persist_event_by_id(&self, _event_id: u64) -> Result<(), RuntimeError> {
Ok(())
}
#[cfg(not(feature = "fs-store"))]
fn persist_transcript_entry(
&self,
_worker_id: &WorkerId,
_sequence: u64,
) -> Result<(), RuntimeError> {
Ok(())
}
#[cfg(not(feature = "fs-store"))]
fn persist_workers(&self) -> Result<(), RuntimeError> {
Ok(())
}
fn ensure_running(&self) -> Result<(), RuntimeError> {
if self.status == RuntimeStatus::Stopped {
Err(RuntimeError::RuntimeStopped {
runtime_id: self.runtime_id.clone(),
})
} else {
Ok(())
}
}
fn ensure_worker_ref(&self, worker_ref: &WorkerRef) -> Result<(), RuntimeError> {
if worker_ref.runtime_id != self.runtime_id {
return Err(RuntimeError::WrongRuntime {
expected_runtime_id: self.runtime_id.clone(),
actual_runtime_id: worker_ref.runtime_id.clone(),
worker_id: worker_ref.worker_id.clone(),
});
}
if !self.workers.contains_key(&worker_ref.worker_id) {
return Err(RuntimeError::WorkerNotFound {
runtime_id: self.runtime_id.clone(),
worker_id: worker_ref.worker_id.clone(),
});
}
Ok(())
}
fn worker(&self, worker_ref: &WorkerRef) -> Result<&WorkerRecord, RuntimeError> {
self.ensure_worker_ref(worker_ref)?;
self.workers
.get(&worker_ref.worker_id)
.ok_or_else(|| RuntimeError::WorkerNotFound {
runtime_id: self.runtime_id.clone(),
worker_id: worker_ref.worker_id.clone(),
})
}
fn worker_mut(&mut self, worker_ref: &WorkerRef) -> Result<&mut WorkerRecord, RuntimeError> {
self.ensure_worker_ref(worker_ref)?;
self.workers
.get_mut(&worker_ref.worker_id)
.ok_or_else(|| RuntimeError::WorkerNotFound {
runtime_id: self.runtime_id.clone(),
worker_id: worker_ref.worker_id.clone(),
})
}
fn push_event(
&mut self,
worker_ref: Option<WorkerRef>,
kind: RuntimeEventKind,
message: impl Into<String>,
) -> u64 {
let id = self.next_event_id;
self.next_event_id += 1;
self.events.push(RuntimeEvent {
id,
worker_ref,
kind,
message: message.into(),
});
id
}
fn last_event_id(&self) -> u64 {
self.next_event_id.saturating_sub(1)
}
#[cfg(feature = "ws-server")]
fn validate_worker_observation_cursor(
&self,
worker_ref: &WorkerRef,
cursor: WorkerObservationCursor,
) -> Result<(), RuntimeError> {
if let Some(first) = self
.observation_events
.iter()
.find(|event| &event.worker_ref == worker_ref)
{
if cursor.sequence != 0 && cursor.sequence < first.sequence {
return Err(RuntimeError::InvalidRequest(format!(
"worker observation cursor {} is expired for worker {}",
cursor.encode(),
worker_ref.worker_id
)));
}
}
if cursor.sequence >= self.next_observation_sequence {
return Err(RuntimeError::InvalidRequest(format!(
"worker observation cursor {} is unknown for worker {}",
cursor.encode(),
worker_ref.worker_id
)));
}
Ok(())
}
#[cfg(feature = "ws-server")]
fn push_worker_observation_event(
&mut self,
worker_ref: WorkerRef,
payload: protocol::Event,
) -> WorkerObservationEvent {
const MAX_OBSERVATION_BACKLOG: usize = 1024;
let sequence = self.next_observation_sequence;
self.next_observation_sequence += 1;
let event = WorkerObservationEvent::new(sequence, worker_ref, payload);
self.observation_events.push_back(event.clone());
while self.observation_events.len() > MAX_OBSERVATION_BACKLOG {
self.observation_events.pop_front();
}
let _ = self.observation_tx.send(event.clone());
event
}
fn push_diagnostic(
&mut self,
severity: DiagnosticSeverity,
code: impl Into<String>,
message: impl Into<String>,
worker_ref: Option<WorkerRef>,
) {
let id = self.next_diagnostic_id;
self.next_diagnostic_id += 1;
self.diagnostics.push(RuntimeDiagnostic {
id,
severity,
code: code.into(),
message: message.into(),
worker_ref,
});
}
fn emit_create_diagnostics(&mut self, detail: &WorkerDetail) {
if detail.config_bundle.is_none() {
self.push_diagnostic(
DiagnosticSeverity::Info,
"runtime.local_default_resources",
"worker created without ConfigBundleRef; runtime-local defaults are assumed",
Some(detail.worker_ref.clone()),
);
}
if detail.requested_capabilities.is_empty() {
self.push_diagnostic(
DiagnosticSeverity::Info,
"worker.tools_less",
"worker created without requested tool capabilities",
Some(detail.worker_ref.clone()),
);
}
}
}
#[derive(Debug)]
struct WorkerRecord {
worker_ref: WorkerRef,
worker_id: WorkerId,
status: WorkerStatus,
request: CreateWorkerRequest,
transcript: Vec<TranscriptEntry>,
next_transcript_sequence: u64,
last_event_id: u64,
}
impl WorkerRecord {
fn summary(&self, runtime_id: &RuntimeId) -> WorkerSummary {
WorkerSummary {
worker_ref: self.worker_ref.clone(),
runtime_id: runtime_id.clone(),
worker_id: self.worker_id.clone(),
status: self.status,
intent: self.request.intent.clone(),
profile: self.request.profile.clone(),
requested_capability_count: self.request.requested_capabilities.len(),
has_config_bundle: self.request.config_bundle.is_some(),
transcript_len: self.transcript.len(),
last_event_id: self.last_event_id,
}
}
fn detail(&self, runtime_id: &RuntimeId) -> WorkerDetail {
WorkerDetail {
worker_ref: self.worker_ref.clone(),
runtime_id: runtime_id.clone(),
worker_id: self.worker_id.clone(),
status: self.status,
intent: self.request.intent.clone(),
profile: self.request.profile.clone(),
config_bundle: self.request.config_bundle.clone(),
requested_capabilities: self.request.requested_capabilities.clone(),
workspace_refs: self.request.workspace_refs.clone(),
mount_refs: self.request.mount_refs.clone(),
transcript_len: self.transcript.len(),
last_event_id: self.last_event_id,
}
}
#[cfg(feature = "fs-store")]
fn persisted_record(&self) -> PersistedWorkerRecord {
PersistedWorkerRecord {
worker_ref: self.worker_ref.clone(),
worker_id: self.worker_id.clone(),
status: self.status,
request: self.request.clone(),
transcript: self.transcript.clone(),
next_transcript_sequence: self.next_transcript_sequence,
last_event_id: self.last_event_id,
}
}
}
fn validate_create_worker_request(request: &CreateWorkerRequest) -> Result<(), RuntimeError> {
if let crate::catalog::WorkerIntent::Task { objective } = &request.intent {
if objective.trim().is_empty() {
return Err(RuntimeError::InvalidRequest(
"task objective must not be empty".to_string(),
));
}
}
for capability in &request.requested_capabilities {
if capability.name.trim().is_empty() {
return Err(RuntimeError::InvalidRequest(
"capability name must not be empty".to_string(),
));
}
}
Ok(())
}
fn validate_worker_input(input: &WorkerInput) -> Result<(), RuntimeError> {
if input.content.trim().is_empty() {
return Err(RuntimeError::InvalidRequest(
"worker input content must not be empty".to_string(),
));
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::catalog::{CapabilityRequest, ConfigBundleRef, ProfileSelector, WorkerIntent};
use crate::management::RuntimeLimits;
fn task_request(objective: &str) -> CreateWorkerRequest {
CreateWorkerRequest {
intent: WorkerIntent::Task {
objective: objective.to_string(),
},
profile: ProfileSelector::Builtin("builtin:coder".to_string()),
config_bundle: Some(ConfigBundleRef {
id: "bundle-1".to_string(),
}),
requested_capabilities: vec![CapabilityRequest::named("read")],
workspace_refs: Vec::new(),
mount_refs: Vec::new(),
}
}
#[test]
fn create_list_and_detail_preserve_runtime_worker_authority() {
let runtime = Runtime::new_memory();
let detail = runtime.create_worker(task_request("implement v0")).unwrap();
assert_eq!(detail.worker_ref.runtime_id, runtime.runtime_id().unwrap());
assert_eq!(detail.status, WorkerStatus::Running);
assert!(detail.config_bundle.is_some());
let list = runtime.list_workers().unwrap();
assert_eq!(list.len(), 1);
assert_eq!(list[0].worker_ref, detail.worker_ref);
assert_eq!(list[0].requested_capability_count, 1);
let fetched = runtime.worker_detail(&detail.worker_ref).unwrap();
assert_eq!(fetched.worker_id, detail.worker_id);
assert_eq!(fetched.intent, detail.intent);
}
#[test]
fn rejects_worker_refs_from_another_runtime() {
let runtime_a = Runtime::new_memory();
let runtime_b = Runtime::new_memory();
let detail = runtime_a.create_worker(task_request("runtime a")).unwrap();
let err = runtime_b.worker_detail(&detail.worker_ref).unwrap_err();
assert!(matches!(err, RuntimeError::WrongRuntime { .. }));
}
#[test]
fn tools_less_worker_without_config_bundle_uses_local_defaults_and_diagnostics() {
let runtime = Runtime::new_memory();
let detail = runtime
.create_worker(CreateWorkerRequest::tools_less(
WorkerIntent::default(),
ProfileSelector::RuntimeDefault,
))
.unwrap();
assert!(detail.config_bundle.is_none());
assert!(detail.requested_capabilities.is_empty());
let diagnostics = runtime.diagnostics().unwrap();
assert_eq!(diagnostics.len(), 2);
assert!(
diagnostics
.iter()
.any(|diagnostic| diagnostic.code == "runtime.local_default_resources")
);
assert!(
diagnostics
.iter()
.any(|diagnostic| diagnostic.code == "worker.tools_less")
);
}
#[test]
fn send_input_and_project_bounded_transcript() {
let runtime = Runtime::with_options(RuntimeOptions {
limits: RuntimeLimits {
max_transcript_projection_items: 2,
max_event_batch_items: 16,
},
..RuntimeOptions::default()
});
let detail = runtime.create_worker(task_request("chat")).unwrap();
let first = runtime
.send_input(&detail.worker_ref, WorkerInput::user("hello"))
.unwrap();
assert_eq!(first.transcript_sequence, 1);
runtime
.send_input(&detail.worker_ref, WorkerInput::system("note"))
.unwrap();
runtime
.send_input(&detail.worker_ref, WorkerInput::user("again"))
.unwrap();
let projection = runtime
.transcript_projection(&detail.worker_ref, TranscriptQuery::new(0, 2))
.unwrap();
assert_eq!(projection.total_items, 3);
assert_eq!(projection.items.len(), 2);
assert_eq!(projection.items[0].content, "hello");
assert_eq!(projection.items[1].role, TranscriptRole::System);
assert_eq!(projection.next_start, Some(2));
let err = runtime
.transcript_projection(&detail.worker_ref, TranscriptQuery::new(0, 3))
.unwrap_err();
assert!(matches!(err, RuntimeError::LimitTooLarge { .. }));
}
#[test]
fn stop_and_cancel_workers_update_summary() {
let runtime = Runtime::new_memory();
let stopped = runtime.create_worker(task_request("stop me")).unwrap();
let cancelled = runtime.create_worker(task_request("cancel me")).unwrap();
let stop_ack = runtime
.stop_worker(&stopped.worker_ref, Some("done".to_string()))
.unwrap();
assert_eq!(stop_ack.status, WorkerStatus::Stopped);
let cancel_ack = runtime
.cancel_worker(&cancelled.worker_ref, Some("abort".to_string()))
.unwrap();
assert_eq!(cancel_ack.status, WorkerStatus::Cancelled);
let summary = runtime.summary().unwrap();
assert_eq!(summary.worker_count, 2);
assert_eq!(summary.active_worker_count, 0);
assert_eq!(summary.stopped_worker_count, 1);
assert_eq!(summary.cancelled_worker_count, 1);
}
#[test]
fn stop_then_cancel_preserves_stopped_terminal_state() {
let runtime = Runtime::new_memory();
let cursor = runtime.event_cursor_from_start().unwrap();
let worker = runtime
.create_worker(task_request("stable stopped"))
.unwrap();
let stop_ack = runtime
.stop_worker(&worker.worker_ref, Some("done".to_string()))
.unwrap();
let cancel_ack = runtime
.cancel_worker(&worker.worker_ref, Some("late cancel".to_string()))
.unwrap();
assert_eq!(stop_ack.status, WorkerStatus::Stopped);
assert_eq!(cancel_ack.status, WorkerStatus::Stopped);
assert_eq!(cancel_ack.event_id, stop_ack.event_id);
assert_eq!(
runtime.worker_detail(&worker.worker_ref).unwrap().status,
WorkerStatus::Stopped
);
let summary = runtime.summary().unwrap();
assert_eq!(summary.active_worker_count, 0);
assert_eq!(summary.stopped_worker_count, 1);
assert_eq!(summary.cancelled_worker_count, 0);
let events = runtime.read_events(&cursor, 10).unwrap().events;
assert_eq!(
events
.iter()
.filter(|event| event.kind == RuntimeEventKind::WorkerStopped)
.count(),
1
);
assert_eq!(
events
.iter()
.filter(|event| event.kind == RuntimeEventKind::WorkerCancelled)
.count(),
0
);
}
#[test]
fn cancel_then_stop_preserves_cancelled_terminal_state() {
let runtime = Runtime::new_memory();
let cursor = runtime.event_cursor_from_start().unwrap();
let worker = runtime
.create_worker(task_request("stable cancelled"))
.unwrap();
let cancel_ack = runtime
.cancel_worker(&worker.worker_ref, Some("abort".to_string()))
.unwrap();
let stop_ack = runtime
.stop_worker(&worker.worker_ref, Some("late stop".to_string()))
.unwrap();
assert_eq!(cancel_ack.status, WorkerStatus::Cancelled);
assert_eq!(stop_ack.status, WorkerStatus::Cancelled);
assert_eq!(stop_ack.event_id, cancel_ack.event_id);
assert_eq!(
runtime.worker_detail(&worker.worker_ref).unwrap().status,
WorkerStatus::Cancelled
);
let summary = runtime.summary().unwrap();
assert_eq!(summary.active_worker_count, 0);
assert_eq!(summary.stopped_worker_count, 0);
assert_eq!(summary.cancelled_worker_count, 1);
let events = runtime.read_events(&cursor, 10).unwrap().events;
assert_eq!(
events
.iter()
.filter(|event| event.kind == RuntimeEventKind::WorkerCancelled)
.count(),
1
);
assert_eq!(
events
.iter()
.filter(|event| event.kind == RuntimeEventKind::WorkerStopped)
.count(),
0
);
}
#[test]
fn event_cursor_and_poll_only_subscription_are_bounded_placeholders() {
let runtime = Runtime::new_memory();
let cursor = runtime.event_cursor_from_start().unwrap();
let subscription = runtime.subscribe_events(cursor.clone()).unwrap();
assert_eq!(subscription.mode, EventSubscriptionMode::PollOnly);
let worker = runtime.create_worker(task_request("events")).unwrap();
runtime
.send_input(&worker.worker_ref, WorkerInput::user("eventful"))
.unwrap();
let batch = runtime.read_events(&cursor, 2).unwrap();
assert_eq!(batch.events.len(), 2);
assert!(batch.has_more);
assert_eq!(batch.events[0].kind, RuntimeEventKind::RuntimeStarted);
assert_eq!(batch.events[1].kind, RuntimeEventKind::WorkerCreated);
let next = runtime.read_events(&batch.cursor, 2).unwrap();
assert_eq!(next.events.len(), 1);
assert_eq!(next.events[0].kind, RuntimeEventKind::WorkerInputAccepted);
assert!(!next.has_more);
}
#[cfg(feature = "fs-store")]
static NEXT_FS_TEST_ROOT: AtomicU64 = AtomicU64::new(1);
#[cfg(feature = "fs-store")]
fn fs_store_root(label: &str) -> std::path::PathBuf {
let sequence = NEXT_FS_TEST_ROOT.fetch_add(1, Ordering::Relaxed);
let root = std::env::temp_dir().join(format!(
"worker-runtime-fs-store-{label}-{}-{sequence}",
std::process::id()
));
let _ = std::fs::remove_dir_all(&root);
root
}
#[cfg(feature = "fs-store")]
fn runtime_store(runtime: &Runtime) -> FsRuntimeStore {
let state = runtime.lock().unwrap();
match &state.persistence {
RuntimePersistence::Fs(store) => store.clone(),
RuntimePersistence::Memory => panic!("expected fs-backed runtime"),
}
}
#[cfg(feature = "fs-store")]
#[test]
fn fs_store_restores_workers_events_and_transcripts() {
let root = fs_store_root("restore");
let runtime_id = RuntimeId::new("runtime-fs-authority").unwrap();
let runtime = Runtime::with_fs_store(crate::fs_store::FsRuntimeStoreOptions {
root: root.clone(),
runtime_id: Some(runtime_id.clone()),
display_name: Some("filesystem runtime".to_string()),
limits: RuntimeLimits {
max_transcript_projection_items: 2,
max_event_batch_items: 2,
},
})
.unwrap();
assert_eq!(
runtime.summary().unwrap().backend,
RuntimeBackendKind::FsStore
);
let worker = runtime.create_worker(task_request("persist me")).unwrap();
runtime
.send_input(&worker.worker_ref, WorkerInput::user("first"))
.unwrap();
runtime
.send_input(&worker.worker_ref, WorkerInput::system("second"))
.unwrap();
runtime
.stop_worker(&worker.worker_ref, Some("finished".to_string()))
.unwrap();
let store = runtime_store(&runtime);
drop(runtime);
let restored = Runtime::with_fs_store(crate::fs_store::FsRuntimeStoreOptions {
root: root.clone(),
runtime_id: Some(runtime_id.clone()),
display_name: None,
limits: RuntimeLimits::default(),
})
.unwrap();
let restored_worker = restored.worker_detail(&worker.worker_ref).unwrap();
assert_eq!(restored_worker.status, WorkerStatus::Stopped);
assert_eq!(restored_worker.transcript_len, 2);
let projection = restored
.transcript_projection(&worker.worker_ref, TranscriptQuery::new(0, 1))
.unwrap();
assert_eq!(projection.total_items, 2);
assert_eq!(projection.items[0].content, "first");
assert_eq!(projection.next_start, Some(1));
let cursor = restored.event_cursor_from_start().unwrap();
let batch = restored.read_events(&cursor, 2).unwrap();
assert_eq!(batch.events.len(), 2);
assert!(batch.has_more);
assert_eq!(batch.events[0].kind, RuntimeEventKind::RuntimeStarted);
assert_eq!(batch.events[1].kind, RuntimeEventKind::WorkerCreated);
let direct_events = store.read_events(&cursor, 2, 2).unwrap();
assert_eq!(direct_events.events, batch.events);
let direct_transcript = store
.read_transcript(&worker.worker_ref, TranscriptQuery::new(1, 1), 2)
.unwrap();
assert_eq!(direct_transcript.items[0].content, "second");
let _ = std::fs::remove_dir_all(root);
}
#[cfg(feature = "fs-store")]
#[test]
fn fs_store_reports_corrupt_and_missing_data() {
let corrupt_root = fs_store_root("corrupt");
let corrupt_runtime_id = RuntimeId::new("runtime-corrupt").unwrap();
let corrupt_runtime = Runtime::with_fs_store(crate::fs_store::FsRuntimeStoreOptions {
root: corrupt_root.clone(),
runtime_id: Some(corrupt_runtime_id.clone()),
display_name: None,
limits: RuntimeLimits::default(),
})
.unwrap();
let corrupt_store = runtime_store(&corrupt_runtime);
std::fs::write(
corrupt_store.runtime_dir().join("runtime.json"),
b"not json",
)
.unwrap();
drop(corrupt_runtime);
let err = Runtime::with_fs_store(crate::fs_store::FsRuntimeStoreOptions {
root: corrupt_root.clone(),
runtime_id: Some(corrupt_runtime_id),
display_name: None,
limits: RuntimeLimits::default(),
})
.unwrap_err();
assert!(matches!(err, RuntimeError::StoreCorrupt { .. }));
let _ = std::fs::remove_dir_all(corrupt_root);
let missing_root = fs_store_root("missing");
let missing_runtime_id = RuntimeId::new("runtime-missing").unwrap();
let missing_runtime = Runtime::with_fs_store(crate::fs_store::FsRuntimeStoreOptions {
root: missing_root.clone(),
runtime_id: Some(missing_runtime_id.clone()),
display_name: None,
limits: RuntimeLimits::default(),
})
.unwrap();
missing_runtime
.create_worker(task_request("missing worker snapshot"))
.unwrap();
let missing_store = runtime_store(&missing_runtime);
let mut worker_dirs = std::fs::read_dir(missing_store.runtime_dir().join("workers"))
.unwrap()
.collect::<Result<Vec<_>, _>>()
.unwrap();
worker_dirs.sort_by_key(|entry| entry.path());
std::fs::remove_file(worker_dirs[0].path().join("worker.json")).unwrap();
drop(missing_runtime);
let err = Runtime::with_fs_store(crate::fs_store::FsRuntimeStoreOptions {
root: missing_root.clone(),
runtime_id: Some(missing_runtime_id),
display_name: None,
limits: RuntimeLimits::default(),
})
.unwrap_err();
assert!(matches!(err, RuntimeError::StoreMissing { .. }));
let _ = std::fs::remove_dir_all(missing_root);
}
}