use crate::catalog::{ CreateWorkerRequest, WorkerDetail, WorkerLifecycleAck, WorkerStatus, WorkerSummary, }; use crate::diagnostics::{DiagnosticSeverity, RuntimeDiagnostic}; use crate::error::RuntimeError; #[cfg(feature = "fs-store")] use crate::fs_store::{ FsRuntimeStore, FsRuntimeStoreOptions, PersistedRuntimeState, PersistedWorkerRecord, }; use crate::identity::{RuntimeId, WorkerId, WorkerRef}; use crate::interaction::{WorkerInput, WorkerInputKind, WorkerInteractionAck}; use crate::management::{ RuntimeBackendKind, RuntimeLimits, RuntimeOptions, RuntimeStatus, RuntimeSummary, }; use crate::observation::{ EventCursor, EventSubscription, EventSubscriptionMode, RuntimeEvent, RuntimeEventBatch, RuntimeEventKind, TranscriptEntry, TranscriptProjection, TranscriptQuery, TranscriptRole, }; use std::collections::BTreeMap; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex, MutexGuard}; static NEXT_RUNTIME_SEQUENCE: AtomicU64 = AtomicU64::new(1); /// Concrete embedded Runtime domain entity. /// /// The default implementation is memory-backed and tools/provider-less by /// design. An optional `fs-store` feature adds filesystem persistence while /// preserving the same typed authority boundary. It can later be adapted by /// backend registries or web servers without making sockets, sessions, or paths /// public authority. #[derive(Clone, Debug)] pub struct Runtime { inner: Arc>, } impl Runtime { /// Create a memory-backed Runtime with generated identity and default limits. pub fn new_memory() -> Self { Self::with_options(RuntimeOptions::default()) } /// Create a memory-backed Runtime with explicit options. pub fn with_options(options: RuntimeOptions) -> Self { let runtime_id = options.runtime_id.unwrap_or_else(|| { RuntimeId::generated(NEXT_RUNTIME_SEQUENCE.fetch_add(1, Ordering::Relaxed)) }); let mut state = RuntimeState::new(runtime_id, options.display_name, options.limits); state.push_event(None, RuntimeEventKind::RuntimeStarted, "runtime started"); Self { inner: Arc::new(Mutex::new(state)), } } /// Create or restore a filesystem-backed Runtime. /// /// The store is scoped by typed Runtime identity under `options.root`; if the /// Runtime directory already exists, persisted state is loaded and validated. /// If it does not exist, a fresh Runtime is initialized and durable files are /// created before the Runtime is returned. #[cfg(feature = "fs-store")] pub fn with_fs_store(options: FsRuntimeStoreOptions) -> Result { let runtime_id = options.runtime_id.unwrap_or_else(|| { RuntimeId::generated(NEXT_RUNTIME_SEQUENCE.fetch_add(1, Ordering::Relaxed)) }); let opened = FsRuntimeStore::open_or_create(options.root, runtime_id.clone())?; let state = if let Some(persisted) = opened.state { RuntimeState::from_persisted(persisted, opened.store)? } else { let mut state = RuntimeState::new_fs_backed( runtime_id, options.display_name, options.limits, opened.store, ); let event_id = state.push_event(None, RuntimeEventKind::RuntimeStarted, "runtime started"); state.persist_runtime_snapshot()?; state.persist_event_by_id(event_id)?; state }; Ok(Self { inner: Arc::new(Mutex::new(state)), }) } /// Runtime id half of public Worker authority. pub fn runtime_id(&self) -> Result { Ok(self.lock()?.runtime_id.clone()) } /// Management-plane summary. pub fn summary(&self) -> Result { let state = self.lock()?; let mut active_worker_count = 0; let mut stopped_worker_count = 0; let mut cancelled_worker_count = 0; for worker in state.workers.values() { match worker.status { WorkerStatus::Running => active_worker_count += 1, WorkerStatus::Stopped => stopped_worker_count += 1, WorkerStatus::Cancelled => cancelled_worker_count += 1, } } Ok(RuntimeSummary { runtime_id: state.runtime_id.clone(), display_name: state.display_name.clone(), backend: state.backend, status: state.status, worker_count: state.workers.len(), active_worker_count, stopped_worker_count, cancelled_worker_count, diagnostic_count: state.diagnostics.len(), limits: state.limits.clone(), }) } /// Current Runtime lifecycle state. pub fn status(&self) -> Result { Ok(self.lock()?.status) } /// Stop the Runtime. v0 keeps data readable after stop, but rejects new /// create/send/worker lifecycle mutations. pub fn stop_runtime(&self) -> Result { let mut state = self.lock()?; if state.status == RuntimeStatus::Stopped { return Ok(state.last_event_id()); } state.status = RuntimeStatus::Stopped; let runtime_id = state.runtime_id.clone(); for worker in state.workers.values_mut() { if worker.status.is_active() { worker.status = WorkerStatus::Stopped; } } let event_id = state.push_event( None, RuntimeEventKind::RuntimeStopped, format!("runtime {runtime_id} stopped"), ); state.persist_runtime_snapshot()?; state.persist_workers()?; state.persist_event_by_id(event_id)?; Ok(event_id) } /// Create a Worker in the embedded catalog. pub fn create_worker( &self, request: CreateWorkerRequest, ) -> Result { let mut state = self.lock()?; state.ensure_running()?; validate_create_worker_request(&request)?; let worker_id = WorkerId::generated(state.next_worker_sequence); state.next_worker_sequence += 1; let worker_ref = WorkerRef::new(state.runtime_id.clone(), worker_id.clone()); let event_id = state.push_event( Some(worker_ref.clone()), RuntimeEventKind::WorkerCreated, format!("worker {worker_id} created"), ); let record = WorkerRecord { worker_ref, worker_id: worker_id.clone(), status: WorkerStatus::Running, request, transcript: Vec::new(), next_transcript_sequence: 1, last_event_id: event_id, }; let detail = record.detail(&state.runtime_id); state.emit_create_diagnostics(&detail); state.workers.insert(worker_id.clone(), record); state.persist_runtime_snapshot()?; state.persist_worker(&worker_id)?; state.persist_event_by_id(event_id)?; Ok(detail) } /// List Workers known to this Runtime. pub fn list_workers(&self) -> Result, RuntimeError> { let state = self.lock()?; Ok(state .workers .values() .map(|worker| worker.summary(&state.runtime_id)) .collect()) } /// Fetch Worker detail. The supplied [`WorkerRef`] must match this Runtime. pub fn worker_detail(&self, worker_ref: &WorkerRef) -> Result { let state = self.lock()?; let worker = state.worker(worker_ref)?; Ok(worker.detail(&state.runtime_id)) } /// Accept input into a Worker transcript. pub fn send_input( &self, worker_ref: &WorkerRef, input: WorkerInput, ) -> Result { let mut state = self.lock()?; state.ensure_running()?; validate_worker_input(&input)?; state.ensure_worker_ref(worker_ref)?; { let worker = state.worker(worker_ref)?; if !worker.status.is_active() { return Err(RuntimeError::InvalidRequest(format!( "worker {} is not running", worker_ref.worker_id ))); } } let event_id = state.push_event( Some(worker_ref.clone()), RuntimeEventKind::WorkerInputAccepted, "worker input accepted", ); let worker = state.worker_mut(worker_ref)?; let role = match input.kind { WorkerInputKind::User => TranscriptRole::User, WorkerInputKind::System => TranscriptRole::System, }; let transcript_sequence = worker.next_transcript_sequence; worker.next_transcript_sequence += 1; worker.last_event_id = event_id; worker.transcript.push(TranscriptEntry { sequence: transcript_sequence, worker_ref: worker_ref.clone(), role, content: input.content, event_id, }); let status = worker.status; state.persist_runtime_snapshot()?; state.persist_worker(&worker_ref.worker_id)?; state.persist_event_by_id(event_id)?; state.persist_transcript_entry(&worker_ref.worker_id, transcript_sequence)?; Ok(WorkerInteractionAck { worker_ref: worker_ref.clone(), status, transcript_sequence, event_id, }) } /// Stop a Worker. Repeated stops are idempotent and return the last event id. pub fn stop_worker( &self, worker_ref: &WorkerRef, reason: Option, ) -> Result { self.transition_worker( worker_ref, WorkerStatus::Stopped, RuntimeEventKind::WorkerStopped, reason.unwrap_or_else(|| "worker stopped".to_string()), ) } /// Cancel a Worker. Repeated cancels are idempotent and return the last event id. pub fn cancel_worker( &self, worker_ref: &WorkerRef, reason: Option, ) -> Result { self.transition_worker( worker_ref, WorkerStatus::Cancelled, RuntimeEventKind::WorkerCancelled, reason.unwrap_or_else(|| "worker cancelled".to_string()), ) } /// Bounded transcript projection for a Worker. pub fn transcript_projection( &self, worker_ref: &WorkerRef, query: TranscriptQuery, ) -> Result { let state = self.lock()?; if query.limit > state.limits.max_transcript_projection_items { return Err(RuntimeError::LimitTooLarge { requested: query.limit, max: state.limits.max_transcript_projection_items, }); } let worker = state.worker(worker_ref)?; let total_items = worker.transcript.len(); let end = query.start.saturating_add(query.limit).min(total_items); let items = if query.start >= total_items { Vec::new() } else { worker.transcript[query.start..end].to_vec() }; let next_start = (end < total_items).then_some(end); Ok(TranscriptProjection { worker_ref: worker_ref.clone(), start: query.start, limit: query.limit, total_items, items, next_start, }) } /// Cursor pointing to the beginning of Runtime events. pub fn event_cursor_from_start(&self) -> Result { let state = self.lock()?; Ok(EventCursor { runtime_id: state.runtime_id.clone(), next_event_id: 1, }) } /// Cursor pointing after the current last event. pub fn event_cursor_now(&self) -> Result { let state = self.lock()?; Ok(EventCursor { runtime_id: state.runtime_id.clone(), next_event_id: state.last_event_id() + 1, }) } /// Poll Runtime events from a cursor. pub fn read_events( &self, cursor: &EventCursor, limit: usize, ) -> Result { let state = self.lock()?; if cursor.runtime_id != state.runtime_id { return Err(RuntimeError::WrongRuntimeCursor { expected_runtime_id: state.runtime_id.clone(), actual_runtime_id: cursor.runtime_id.clone(), }); } if limit > state.limits.max_event_batch_items { return Err(RuntimeError::LimitTooLarge { requested: limit, max: state.limits.max_event_batch_items, }); } let mut events = Vec::new(); for event in state .events .iter() .filter(|event| event.id >= cursor.next_event_id) .take(limit) { events.push(event.clone()); } let next_event_id = events .last() .map(|event| event.id + 1) .unwrap_or(cursor.next_event_id); let has_more = state.events.iter().any(|event| event.id >= next_event_id); Ok(RuntimeEventBatch { runtime_id: state.runtime_id.clone(), cursor: EventCursor { runtime_id: state.runtime_id.clone(), next_event_id, }, events, has_more, }) } /// Create a poll-only placeholder subscription boundary for future streaming. pub fn subscribe_events(&self, cursor: EventCursor) -> Result { let state = self.lock()?; if cursor.runtime_id != state.runtime_id { return Err(RuntimeError::WrongRuntimeCursor { expected_runtime_id: state.runtime_id.clone(), actual_runtime_id: cursor.runtime_id, }); } Ok(EventSubscription { runtime_id: state.runtime_id.clone(), cursor, mode: EventSubscriptionMode::PollOnly, }) } /// Snapshot current diagnostics. pub fn diagnostics(&self) -> Result, RuntimeError> { Ok(self.lock()?.diagnostics.clone()) } fn transition_worker( &self, worker_ref: &WorkerRef, status: WorkerStatus, event_kind: RuntimeEventKind, reason: String, ) -> Result { let mut state = self.lock()?; state.ensure_running()?; state.ensure_worker_ref(worker_ref)?; { let worker = state.worker(worker_ref)?; if !worker.status.is_active() { return Ok(WorkerLifecycleAck { worker_ref: worker_ref.clone(), status: worker.status, event_id: worker.last_event_id, }); } } let event_id = state.push_event(Some(worker_ref.clone()), event_kind, reason); let worker = state.worker_mut(worker_ref)?; worker.status = status; worker.last_event_id = event_id; let status = worker.status; state.persist_runtime_snapshot()?; state.persist_worker(&worker_ref.worker_id)?; state.persist_event_by_id(event_id)?; Ok(WorkerLifecycleAck { worker_ref: worker_ref.clone(), status, event_id, }) } fn lock(&self) -> Result, RuntimeError> { self.inner.lock().map_err(|_| RuntimeError::StatePoisoned) } } #[cfg_attr(not(feature = "fs-store"), allow(dead_code))] #[derive(Clone, Debug)] enum RuntimePersistence { Memory, #[cfg(feature = "fs-store")] Fs(FsRuntimeStore), } #[derive(Debug)] struct RuntimeState { runtime_id: RuntimeId, display_name: Option, backend: RuntimeBackendKind, #[cfg_attr(not(feature = "fs-store"), allow(dead_code))] persistence: RuntimePersistence, status: RuntimeStatus, limits: RuntimeLimits, next_worker_sequence: u64, next_event_id: u64, next_diagnostic_id: u64, workers: BTreeMap, events: Vec, diagnostics: Vec, } impl RuntimeState { fn new(runtime_id: RuntimeId, display_name: Option, limits: RuntimeLimits) -> Self { Self { runtime_id, display_name, backend: RuntimeBackendKind::Memory, persistence: RuntimePersistence::Memory, status: RuntimeStatus::Running, limits, next_worker_sequence: 1, next_event_id: 1, next_diagnostic_id: 1, workers: BTreeMap::new(), events: Vec::new(), diagnostics: Vec::new(), } } #[cfg(feature = "fs-store")] fn new_fs_backed( runtime_id: RuntimeId, display_name: Option, limits: RuntimeLimits, store: FsRuntimeStore, ) -> Self { Self { runtime_id, display_name, backend: RuntimeBackendKind::FsStore, persistence: RuntimePersistence::Fs(store), status: RuntimeStatus::Running, limits, next_worker_sequence: 1, next_event_id: 1, next_diagnostic_id: 1, workers: BTreeMap::new(), events: Vec::new(), diagnostics: Vec::new(), } } #[cfg(feature = "fs-store")] fn from_persisted( persisted: PersistedRuntimeState, store: FsRuntimeStore, ) -> Result { if persisted.runtime_id != *store.runtime_id() { return Err(RuntimeError::StoreCorrupt { operation: "restore runtime state", path: store.runtime_dir().to_path_buf(), message: format!( "persisted runtime id {} does not match store runtime {}", persisted.runtime_id, store.runtime_id() ), }); } let mut workers = BTreeMap::new(); for (worker_id, worker) in persisted.workers { workers.insert( worker_id, WorkerRecord { worker_ref: worker.worker_ref, worker_id: worker.worker_id, status: worker.status, request: worker.request, transcript: worker.transcript, next_transcript_sequence: worker.next_transcript_sequence, last_event_id: worker.last_event_id, }, ); } Ok(Self { runtime_id: persisted.runtime_id, display_name: persisted.display_name, backend: RuntimeBackendKind::FsStore, persistence: RuntimePersistence::Fs(store), status: persisted.status, limits: persisted.limits, next_worker_sequence: persisted.next_worker_sequence, next_event_id: persisted.next_event_id, next_diagnostic_id: persisted.next_diagnostic_id, workers, events: persisted.events, diagnostics: persisted.diagnostics, }) } #[cfg(feature = "fs-store")] fn persisted_state(&self) -> PersistedRuntimeState { PersistedRuntimeState { runtime_id: self.runtime_id.clone(), display_name: self.display_name.clone(), status: self.status, limits: self.limits.clone(), next_worker_sequence: self.next_worker_sequence, next_event_id: self.next_event_id, next_diagnostic_id: self.next_diagnostic_id, workers: self .workers .iter() .map(|(worker_id, worker)| (worker_id.clone(), worker.persisted_record())) .collect(), events: self.events.clone(), diagnostics: self.diagnostics.clone(), } } #[cfg(feature = "fs-store")] fn fs_store(&self) -> Option<&FsRuntimeStore> { match &self.persistence { RuntimePersistence::Memory => None, RuntimePersistence::Fs(store) => Some(store), } } #[cfg(feature = "fs-store")] fn persist_runtime_snapshot(&self) -> Result<(), RuntimeError> { if let Some(store) = self.fs_store() { store.write_runtime_snapshot(&self.persisted_state())?; } Ok(()) } #[cfg(feature = "fs-store")] fn persist_worker(&self, worker_id: &WorkerId) -> Result<(), RuntimeError> { if let Some(store) = self.fs_store() { let worker = self.workers .get(worker_id) .ok_or_else(|| RuntimeError::WorkerNotFound { runtime_id: self.runtime_id.clone(), worker_id: worker_id.clone(), })?; store.write_worker_snapshot(&worker.persisted_record())?; } Ok(()) } #[cfg(feature = "fs-store")] fn persist_event_by_id(&self, event_id: u64) -> Result<(), RuntimeError> { if let Some(store) = self.fs_store() { let event = self .events .iter() .find(|event| event.id == event_id) .ok_or_else(|| RuntimeError::StoreCorrupt { operation: "persist event", path: store.runtime_dir().to_path_buf(), message: format!("event {event_id} is missing from runtime state"), })?; store.append_event(event)?; } Ok(()) } #[cfg(feature = "fs-store")] fn persist_transcript_entry( &self, worker_id: &WorkerId, sequence: u64, ) -> Result<(), RuntimeError> { if let Some(store) = self.fs_store() { let worker = self.workers .get(worker_id) .ok_or_else(|| RuntimeError::WorkerNotFound { runtime_id: self.runtime_id.clone(), worker_id: worker_id.clone(), })?; let entry = worker .transcript .iter() .find(|entry| entry.sequence == sequence) .ok_or_else(|| RuntimeError::StoreCorrupt { operation: "persist transcript", path: store.runtime_dir().to_path_buf(), message: format!( "transcript sequence {sequence} is missing from worker {worker_id}" ), })?; store.append_transcript_entry(entry)?; } Ok(()) } #[cfg(feature = "fs-store")] fn persist_workers(&self) -> Result<(), RuntimeError> { if self.fs_store().is_some() { for worker_id in self.workers.keys() { self.persist_worker(worker_id)?; } } Ok(()) } #[cfg(not(feature = "fs-store"))] fn persist_runtime_snapshot(&self) -> Result<(), RuntimeError> { Ok(()) } #[cfg(not(feature = "fs-store"))] fn persist_worker(&self, _worker_id: &WorkerId) -> Result<(), RuntimeError> { Ok(()) } #[cfg(not(feature = "fs-store"))] fn persist_event_by_id(&self, _event_id: u64) -> Result<(), RuntimeError> { Ok(()) } #[cfg(not(feature = "fs-store"))] fn persist_transcript_entry( &self, _worker_id: &WorkerId, _sequence: u64, ) -> Result<(), RuntimeError> { Ok(()) } #[cfg(not(feature = "fs-store"))] fn persist_workers(&self) -> Result<(), RuntimeError> { Ok(()) } fn ensure_running(&self) -> Result<(), RuntimeError> { if self.status == RuntimeStatus::Stopped { Err(RuntimeError::RuntimeStopped { runtime_id: self.runtime_id.clone(), }) } else { Ok(()) } } fn ensure_worker_ref(&self, worker_ref: &WorkerRef) -> Result<(), RuntimeError> { if worker_ref.runtime_id != self.runtime_id { return Err(RuntimeError::WrongRuntime { expected_runtime_id: self.runtime_id.clone(), actual_runtime_id: worker_ref.runtime_id.clone(), worker_id: worker_ref.worker_id.clone(), }); } if !self.workers.contains_key(&worker_ref.worker_id) { return Err(RuntimeError::WorkerNotFound { runtime_id: self.runtime_id.clone(), worker_id: worker_ref.worker_id.clone(), }); } Ok(()) } fn worker(&self, worker_ref: &WorkerRef) -> Result<&WorkerRecord, RuntimeError> { self.ensure_worker_ref(worker_ref)?; self.workers .get(&worker_ref.worker_id) .ok_or_else(|| RuntimeError::WorkerNotFound { runtime_id: self.runtime_id.clone(), worker_id: worker_ref.worker_id.clone(), }) } fn worker_mut(&mut self, worker_ref: &WorkerRef) -> Result<&mut WorkerRecord, RuntimeError> { self.ensure_worker_ref(worker_ref)?; self.workers .get_mut(&worker_ref.worker_id) .ok_or_else(|| RuntimeError::WorkerNotFound { runtime_id: self.runtime_id.clone(), worker_id: worker_ref.worker_id.clone(), }) } fn push_event( &mut self, worker_ref: Option, kind: RuntimeEventKind, message: impl Into, ) -> u64 { let id = self.next_event_id; self.next_event_id += 1; self.events.push(RuntimeEvent { id, worker_ref, kind, message: message.into(), }); id } fn last_event_id(&self) -> u64 { self.next_event_id.saturating_sub(1) } fn push_diagnostic( &mut self, severity: DiagnosticSeverity, code: impl Into, message: impl Into, worker_ref: Option, ) { let id = self.next_diagnostic_id; self.next_diagnostic_id += 1; self.diagnostics.push(RuntimeDiagnostic { id, severity, code: code.into(), message: message.into(), worker_ref, }); } fn emit_create_diagnostics(&mut self, detail: &WorkerDetail) { if detail.config_bundle.is_none() { self.push_diagnostic( DiagnosticSeverity::Info, "runtime.local_default_resources", "worker created without ConfigBundleRef; runtime-local defaults are assumed", Some(detail.worker_ref.clone()), ); } if detail.requested_capabilities.is_empty() { self.push_diagnostic( DiagnosticSeverity::Info, "worker.tools_less", "worker created without requested tool capabilities", Some(detail.worker_ref.clone()), ); } } } #[derive(Debug)] struct WorkerRecord { worker_ref: WorkerRef, worker_id: WorkerId, status: WorkerStatus, request: CreateWorkerRequest, transcript: Vec, next_transcript_sequence: u64, last_event_id: u64, } impl WorkerRecord { fn summary(&self, runtime_id: &RuntimeId) -> WorkerSummary { WorkerSummary { worker_ref: self.worker_ref.clone(), runtime_id: runtime_id.clone(), worker_id: self.worker_id.clone(), status: self.status, intent: self.request.intent.clone(), profile: self.request.profile.clone(), requested_capability_count: self.request.requested_capabilities.len(), has_config_bundle: self.request.config_bundle.is_some(), transcript_len: self.transcript.len(), last_event_id: self.last_event_id, } } fn detail(&self, runtime_id: &RuntimeId) -> WorkerDetail { WorkerDetail { worker_ref: self.worker_ref.clone(), runtime_id: runtime_id.clone(), worker_id: self.worker_id.clone(), status: self.status, intent: self.request.intent.clone(), profile: self.request.profile.clone(), config_bundle: self.request.config_bundle.clone(), requested_capabilities: self.request.requested_capabilities.clone(), workspace_refs: self.request.workspace_refs.clone(), mount_refs: self.request.mount_refs.clone(), transcript_len: self.transcript.len(), last_event_id: self.last_event_id, } } #[cfg(feature = "fs-store")] fn persisted_record(&self) -> PersistedWorkerRecord { PersistedWorkerRecord { worker_ref: self.worker_ref.clone(), worker_id: self.worker_id.clone(), status: self.status, request: self.request.clone(), transcript: self.transcript.clone(), next_transcript_sequence: self.next_transcript_sequence, last_event_id: self.last_event_id, } } } fn validate_create_worker_request(request: &CreateWorkerRequest) -> Result<(), RuntimeError> { if let crate::catalog::WorkerIntent::Task { objective } = &request.intent { if objective.trim().is_empty() { return Err(RuntimeError::InvalidRequest( "task objective must not be empty".to_string(), )); } } for capability in &request.requested_capabilities { if capability.name.trim().is_empty() { return Err(RuntimeError::InvalidRequest( "capability name must not be empty".to_string(), )); } } Ok(()) } fn validate_worker_input(input: &WorkerInput) -> Result<(), RuntimeError> { if input.content.trim().is_empty() { return Err(RuntimeError::InvalidRequest( "worker input content must not be empty".to_string(), )); } Ok(()) } #[cfg(test)] mod tests { use super::*; use crate::catalog::{CapabilityRequest, ConfigBundleRef, ProfileSelector, WorkerIntent}; use crate::management::RuntimeLimits; fn task_request(objective: &str) -> CreateWorkerRequest { CreateWorkerRequest { intent: WorkerIntent::Task { objective: objective.to_string(), }, profile: ProfileSelector::Builtin("builtin:coder".to_string()), config_bundle: Some(ConfigBundleRef { id: "bundle-1".to_string(), }), requested_capabilities: vec![CapabilityRequest::named("read")], workspace_refs: Vec::new(), mount_refs: Vec::new(), } } #[test] fn create_list_and_detail_preserve_runtime_worker_authority() { let runtime = Runtime::new_memory(); let detail = runtime.create_worker(task_request("implement v0")).unwrap(); assert_eq!(detail.worker_ref.runtime_id, runtime.runtime_id().unwrap()); assert_eq!(detail.status, WorkerStatus::Running); assert!(detail.config_bundle.is_some()); let list = runtime.list_workers().unwrap(); assert_eq!(list.len(), 1); assert_eq!(list[0].worker_ref, detail.worker_ref); assert_eq!(list[0].requested_capability_count, 1); let fetched = runtime.worker_detail(&detail.worker_ref).unwrap(); assert_eq!(fetched.worker_id, detail.worker_id); assert_eq!(fetched.intent, detail.intent); } #[test] fn rejects_worker_refs_from_another_runtime() { let runtime_a = Runtime::new_memory(); let runtime_b = Runtime::new_memory(); let detail = runtime_a.create_worker(task_request("runtime a")).unwrap(); let err = runtime_b.worker_detail(&detail.worker_ref).unwrap_err(); assert!(matches!(err, RuntimeError::WrongRuntime { .. })); } #[test] fn tools_less_worker_without_config_bundle_uses_local_defaults_and_diagnostics() { let runtime = Runtime::new_memory(); let detail = runtime .create_worker(CreateWorkerRequest::tools_less( WorkerIntent::default(), ProfileSelector::RuntimeDefault, )) .unwrap(); assert!(detail.config_bundle.is_none()); assert!(detail.requested_capabilities.is_empty()); let diagnostics = runtime.diagnostics().unwrap(); assert_eq!(diagnostics.len(), 2); assert!( diagnostics .iter() .any(|diagnostic| diagnostic.code == "runtime.local_default_resources") ); assert!( diagnostics .iter() .any(|diagnostic| diagnostic.code == "worker.tools_less") ); } #[test] fn send_input_and_project_bounded_transcript() { let runtime = Runtime::with_options(RuntimeOptions { limits: RuntimeLimits { max_transcript_projection_items: 2, max_event_batch_items: 16, }, ..RuntimeOptions::default() }); let detail = runtime.create_worker(task_request("chat")).unwrap(); let first = runtime .send_input(&detail.worker_ref, WorkerInput::user("hello")) .unwrap(); assert_eq!(first.transcript_sequence, 1); runtime .send_input(&detail.worker_ref, WorkerInput::system("note")) .unwrap(); runtime .send_input(&detail.worker_ref, WorkerInput::user("again")) .unwrap(); let projection = runtime .transcript_projection(&detail.worker_ref, TranscriptQuery::new(0, 2)) .unwrap(); assert_eq!(projection.total_items, 3); assert_eq!(projection.items.len(), 2); assert_eq!(projection.items[0].content, "hello"); assert_eq!(projection.items[1].role, TranscriptRole::System); assert_eq!(projection.next_start, Some(2)); let err = runtime .transcript_projection(&detail.worker_ref, TranscriptQuery::new(0, 3)) .unwrap_err(); assert!(matches!(err, RuntimeError::LimitTooLarge { .. })); } #[test] fn stop_and_cancel_workers_update_summary() { let runtime = Runtime::new_memory(); let stopped = runtime.create_worker(task_request("stop me")).unwrap(); let cancelled = runtime.create_worker(task_request("cancel me")).unwrap(); let stop_ack = runtime .stop_worker(&stopped.worker_ref, Some("done".to_string())) .unwrap(); assert_eq!(stop_ack.status, WorkerStatus::Stopped); let cancel_ack = runtime .cancel_worker(&cancelled.worker_ref, Some("abort".to_string())) .unwrap(); assert_eq!(cancel_ack.status, WorkerStatus::Cancelled); let summary = runtime.summary().unwrap(); assert_eq!(summary.worker_count, 2); assert_eq!(summary.active_worker_count, 0); assert_eq!(summary.stopped_worker_count, 1); assert_eq!(summary.cancelled_worker_count, 1); } #[test] fn stop_then_cancel_preserves_stopped_terminal_state() { let runtime = Runtime::new_memory(); let cursor = runtime.event_cursor_from_start().unwrap(); let worker = runtime .create_worker(task_request("stable stopped")) .unwrap(); let stop_ack = runtime .stop_worker(&worker.worker_ref, Some("done".to_string())) .unwrap(); let cancel_ack = runtime .cancel_worker(&worker.worker_ref, Some("late cancel".to_string())) .unwrap(); assert_eq!(stop_ack.status, WorkerStatus::Stopped); assert_eq!(cancel_ack.status, WorkerStatus::Stopped); assert_eq!(cancel_ack.event_id, stop_ack.event_id); assert_eq!( runtime.worker_detail(&worker.worker_ref).unwrap().status, WorkerStatus::Stopped ); let summary = runtime.summary().unwrap(); assert_eq!(summary.active_worker_count, 0); assert_eq!(summary.stopped_worker_count, 1); assert_eq!(summary.cancelled_worker_count, 0); let events = runtime.read_events(&cursor, 10).unwrap().events; assert_eq!( events .iter() .filter(|event| event.kind == RuntimeEventKind::WorkerStopped) .count(), 1 ); assert_eq!( events .iter() .filter(|event| event.kind == RuntimeEventKind::WorkerCancelled) .count(), 0 ); } #[test] fn cancel_then_stop_preserves_cancelled_terminal_state() { let runtime = Runtime::new_memory(); let cursor = runtime.event_cursor_from_start().unwrap(); let worker = runtime .create_worker(task_request("stable cancelled")) .unwrap(); let cancel_ack = runtime .cancel_worker(&worker.worker_ref, Some("abort".to_string())) .unwrap(); let stop_ack = runtime .stop_worker(&worker.worker_ref, Some("late stop".to_string())) .unwrap(); assert_eq!(cancel_ack.status, WorkerStatus::Cancelled); assert_eq!(stop_ack.status, WorkerStatus::Cancelled); assert_eq!(stop_ack.event_id, cancel_ack.event_id); assert_eq!( runtime.worker_detail(&worker.worker_ref).unwrap().status, WorkerStatus::Cancelled ); let summary = runtime.summary().unwrap(); assert_eq!(summary.active_worker_count, 0); assert_eq!(summary.stopped_worker_count, 0); assert_eq!(summary.cancelled_worker_count, 1); let events = runtime.read_events(&cursor, 10).unwrap().events; assert_eq!( events .iter() .filter(|event| event.kind == RuntimeEventKind::WorkerCancelled) .count(), 1 ); assert_eq!( events .iter() .filter(|event| event.kind == RuntimeEventKind::WorkerStopped) .count(), 0 ); } #[test] fn event_cursor_and_poll_only_subscription_are_bounded_placeholders() { let runtime = Runtime::new_memory(); let cursor = runtime.event_cursor_from_start().unwrap(); let subscription = runtime.subscribe_events(cursor.clone()).unwrap(); assert_eq!(subscription.mode, EventSubscriptionMode::PollOnly); let worker = runtime.create_worker(task_request("events")).unwrap(); runtime .send_input(&worker.worker_ref, WorkerInput::user("eventful")) .unwrap(); let batch = runtime.read_events(&cursor, 2).unwrap(); assert_eq!(batch.events.len(), 2); assert!(batch.has_more); assert_eq!(batch.events[0].kind, RuntimeEventKind::RuntimeStarted); assert_eq!(batch.events[1].kind, RuntimeEventKind::WorkerCreated); let next = runtime.read_events(&batch.cursor, 2).unwrap(); assert_eq!(next.events.len(), 1); assert_eq!(next.events[0].kind, RuntimeEventKind::WorkerInputAccepted); assert!(!next.has_more); } #[cfg(feature = "fs-store")] static NEXT_FS_TEST_ROOT: AtomicU64 = AtomicU64::new(1); #[cfg(feature = "fs-store")] fn fs_store_root(label: &str) -> std::path::PathBuf { let sequence = NEXT_FS_TEST_ROOT.fetch_add(1, Ordering::Relaxed); let root = std::env::temp_dir().join(format!( "worker-runtime-fs-store-{label}-{}-{sequence}", std::process::id() )); let _ = std::fs::remove_dir_all(&root); root } #[cfg(feature = "fs-store")] fn runtime_store(runtime: &Runtime) -> FsRuntimeStore { let state = runtime.lock().unwrap(); match &state.persistence { RuntimePersistence::Fs(store) => store.clone(), RuntimePersistence::Memory => panic!("expected fs-backed runtime"), } } #[cfg(feature = "fs-store")] #[test] fn fs_store_restores_workers_events_and_transcripts() { let root = fs_store_root("restore"); let runtime_id = RuntimeId::new("runtime-fs-authority").unwrap(); let runtime = Runtime::with_fs_store(crate::fs_store::FsRuntimeStoreOptions { root: root.clone(), runtime_id: Some(runtime_id.clone()), display_name: Some("filesystem runtime".to_string()), limits: RuntimeLimits { max_transcript_projection_items: 2, max_event_batch_items: 2, }, }) .unwrap(); assert_eq!( runtime.summary().unwrap().backend, RuntimeBackendKind::FsStore ); let worker = runtime.create_worker(task_request("persist me")).unwrap(); runtime .send_input(&worker.worker_ref, WorkerInput::user("first")) .unwrap(); runtime .send_input(&worker.worker_ref, WorkerInput::system("second")) .unwrap(); runtime .stop_worker(&worker.worker_ref, Some("finished".to_string())) .unwrap(); let store = runtime_store(&runtime); drop(runtime); let restored = Runtime::with_fs_store(crate::fs_store::FsRuntimeStoreOptions { root: root.clone(), runtime_id: Some(runtime_id.clone()), display_name: None, limits: RuntimeLimits::default(), }) .unwrap(); let restored_worker = restored.worker_detail(&worker.worker_ref).unwrap(); assert_eq!(restored_worker.status, WorkerStatus::Stopped); assert_eq!(restored_worker.transcript_len, 2); let projection = restored .transcript_projection(&worker.worker_ref, TranscriptQuery::new(0, 1)) .unwrap(); assert_eq!(projection.total_items, 2); assert_eq!(projection.items[0].content, "first"); assert_eq!(projection.next_start, Some(1)); let cursor = restored.event_cursor_from_start().unwrap(); let batch = restored.read_events(&cursor, 2).unwrap(); assert_eq!(batch.events.len(), 2); assert!(batch.has_more); assert_eq!(batch.events[0].kind, RuntimeEventKind::RuntimeStarted); assert_eq!(batch.events[1].kind, RuntimeEventKind::WorkerCreated); let direct_events = store.read_events(&cursor, 2, 2).unwrap(); assert_eq!(direct_events.events, batch.events); let direct_transcript = store .read_transcript(&worker.worker_ref, TranscriptQuery::new(1, 1), 2) .unwrap(); assert_eq!(direct_transcript.items[0].content, "second"); let _ = std::fs::remove_dir_all(root); } #[cfg(feature = "fs-store")] #[test] fn fs_store_reports_corrupt_and_missing_data() { let corrupt_root = fs_store_root("corrupt"); let corrupt_runtime_id = RuntimeId::new("runtime-corrupt").unwrap(); let corrupt_runtime = Runtime::with_fs_store(crate::fs_store::FsRuntimeStoreOptions { root: corrupt_root.clone(), runtime_id: Some(corrupt_runtime_id.clone()), display_name: None, limits: RuntimeLimits::default(), }) .unwrap(); let corrupt_store = runtime_store(&corrupt_runtime); std::fs::write( corrupt_store.runtime_dir().join("runtime.json"), b"not json", ) .unwrap(); drop(corrupt_runtime); let err = Runtime::with_fs_store(crate::fs_store::FsRuntimeStoreOptions { root: corrupt_root.clone(), runtime_id: Some(corrupt_runtime_id), display_name: None, limits: RuntimeLimits::default(), }) .unwrap_err(); assert!(matches!(err, RuntimeError::StoreCorrupt { .. })); let _ = std::fs::remove_dir_all(corrupt_root); let missing_root = fs_store_root("missing"); let missing_runtime_id = RuntimeId::new("runtime-missing").unwrap(); let missing_runtime = Runtime::with_fs_store(crate::fs_store::FsRuntimeStoreOptions { root: missing_root.clone(), runtime_id: Some(missing_runtime_id.clone()), display_name: None, limits: RuntimeLimits::default(), }) .unwrap(); missing_runtime .create_worker(task_request("missing worker snapshot")) .unwrap(); let missing_store = runtime_store(&missing_runtime); let mut worker_dirs = std::fs::read_dir(missing_store.runtime_dir().join("workers")) .unwrap() .collect::, _>>() .unwrap(); worker_dirs.sort_by_key(|entry| entry.path()); std::fs::remove_file(worker_dirs[0].path().join("worker.json")).unwrap(); drop(missing_runtime); let err = Runtime::with_fs_store(crate::fs_store::FsRuntimeStoreOptions { root: missing_root.clone(), runtime_id: Some(missing_runtime_id), display_name: None, limits: RuntimeLimits::default(), }) .unwrap_err(); assert!(matches!(err, RuntimeError::StoreMissing { .. })); let _ = std::fs::remove_dir_all(missing_root); } }