merge: worker runtime execution backend

This commit is contained in:
Keisuke Hirata 2026-06-28 04:48:38 +09:00
commit 0753e155a5
No known key found for this signature in database
8 changed files with 901 additions and 30 deletions

View File

@ -1,3 +1,4 @@
use crate::execution::WorkerExecutionStatus;
use crate::identity::{RuntimeId, WorkerId, WorkerRef};
use serde::{Deserialize, Serialize};
@ -142,6 +143,7 @@ pub struct WorkerSummary {
pub runtime_id: RuntimeId,
pub worker_id: WorkerId,
pub status: WorkerStatus,
pub execution: WorkerExecutionStatus,
pub intent: WorkerIntent,
pub profile: ProfileSelector,
pub requested_capability_count: usize,
@ -157,6 +159,7 @@ pub struct WorkerDetail {
pub runtime_id: RuntimeId,
pub worker_id: WorkerId,
pub status: WorkerStatus,
pub execution: WorkerExecutionStatus,
pub intent: WorkerIntent,
pub profile: ProfileSelector,
#[serde(default, skip_serializing_if = "Option::is_none")]

View File

@ -1,3 +1,4 @@
use crate::execution::WorkerExecutionResult;
use crate::identity::{RuntimeId, WorkerId};
use std::path::PathBuf;
@ -28,6 +29,21 @@ pub enum RuntimeError {
worker_id: WorkerId,
},
#[error("worker {worker_id} has no execution backend: {message}")]
WorkerExecutionUnavailable {
worker_id: WorkerId,
message: String,
},
#[error("worker {worker_id} execution {operation:?} returned {outcome:?}: {message}")]
WorkerExecutionRejected {
worker_id: WorkerId,
operation: crate::execution::WorkerExecutionOperation,
outcome: crate::execution::WorkerExecutionOutcome,
message: String,
result: WorkerExecutionResult,
},
#[error("limit {requested} exceeds maximum {max}")]
LimitTooLarge { requested: usize, max: usize },

View File

@ -0,0 +1,337 @@
use crate::catalog::CreateWorkerRequest;
use crate::error::RuntimeError;
use crate::identity::WorkerRef;
use crate::interaction::WorkerInput;
#[cfg(feature = "ws-server")]
use crate::observation::WorkerObservationEvent;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::sync::Arc;
/// Coarse execution attachment visible through Worker catalog/detail responses.
///
/// This deliberately does not expose backend handles, process paths, sockets,
/// credentials, session files, or manifest paths. It only says whether Runtime
/// has an execution backend attached for the Worker.
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum WorkerExecutionBackendKind {
#[default]
Unconnected,
Connected,
}
/// Current execution-side run state for a Worker.
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum WorkerExecutionRunState {
#[default]
Unconnected,
Idle,
Busy,
Rejected,
Errored,
Stopped,
}
/// Execution operation that produced a result.
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum WorkerExecutionOperation {
Spawn,
Input,
Stop,
Cancel,
}
/// Typed execution result class.
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum WorkerExecutionOutcome {
Accepted,
Busy,
Rejected,
Errored,
Unsupported,
}
/// Backend result for a Worker execution operation.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct WorkerExecutionResult {
pub operation: WorkerExecutionOperation,
pub outcome: WorkerExecutionOutcome,
pub run_state: WorkerExecutionRunState,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub message: Option<String>,
}
impl WorkerExecutionResult {
pub fn accepted(
operation: WorkerExecutionOperation,
run_state: WorkerExecutionRunState,
) -> Self {
Self {
operation,
outcome: WorkerExecutionOutcome::Accepted,
run_state,
message: None,
}
}
pub fn busy(operation: WorkerExecutionOperation, message: impl Into<String>) -> Self {
Self {
operation,
outcome: WorkerExecutionOutcome::Busy,
run_state: WorkerExecutionRunState::Busy,
message: Some(message.into()),
}
}
pub fn rejected(operation: WorkerExecutionOperation, message: impl Into<String>) -> Self {
Self {
operation,
outcome: WorkerExecutionOutcome::Rejected,
run_state: WorkerExecutionRunState::Rejected,
message: Some(message.into()),
}
}
pub fn errored(operation: WorkerExecutionOperation, message: impl Into<String>) -> Self {
Self {
operation,
outcome: WorkerExecutionOutcome::Errored,
run_state: WorkerExecutionRunState::Errored,
message: Some(message.into()),
}
}
pub fn unsupported(operation: WorkerExecutionOperation, message: impl Into<String>) -> Self {
Self {
operation,
outcome: WorkerExecutionOutcome::Unsupported,
run_state: WorkerExecutionRunState::Rejected,
message: Some(message.into()),
}
}
pub fn is_accepted(&self) -> bool {
self.outcome == WorkerExecutionOutcome::Accepted
}
pub fn message_or_default(&self) -> String {
self.message
.clone()
.unwrap_or_else(|| format!("{:?} {:?}", self.operation, self.outcome))
}
}
/// Execution status surfaced in Worker summary/detail responses.
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct WorkerExecutionStatus {
pub backend: WorkerExecutionBackendKind,
pub run_state: WorkerExecutionRunState,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub last_result: Option<WorkerExecutionResult>,
}
impl WorkerExecutionStatus {
pub fn unconnected() -> Self {
Self::default()
}
pub fn connected(run_state: WorkerExecutionRunState) -> Self {
Self {
backend: WorkerExecutionBackendKind::Connected,
run_state,
last_result: None,
}
}
pub fn with_result(mut self, result: WorkerExecutionResult) -> Self {
self.run_state = result.run_state;
self.last_result = Some(result);
self
}
}
/// Opaque per-Worker execution handle returned by a backend.
///
/// The handle is a typed token for routing calls back into the same backend. It
/// intentionally contains no socket path, process id, credential, manifest path,
/// or session path.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct WorkerExecutionHandle {
worker_ref: WorkerRef,
backend_id: String,
}
impl WorkerExecutionHandle {
pub fn new(worker_ref: WorkerRef, backend_id: impl Into<String>) -> Self {
Self {
worker_ref,
backend_id: backend_id.into(),
}
}
pub fn worker_ref(&self) -> &WorkerRef {
&self.worker_ref
}
pub fn backend_id(&self) -> &str {
&self.backend_id
}
}
/// Runtime hooks available to an execution backend for one Worker.
#[derive(Clone)]
pub struct WorkerExecutionContext {
worker_ref: WorkerRef,
#[cfg(feature = "ws-server")]
observation_publisher: Arc<
dyn Fn(WorkerRef, protocol::Event) -> Result<WorkerObservationEvent, RuntimeError>
+ Send
+ Sync,
>,
}
impl WorkerExecutionContext {
#[cfg(feature = "ws-server")]
pub(crate) fn new(
worker_ref: WorkerRef,
observation_publisher: Arc<
dyn Fn(WorkerRef, protocol::Event) -> Result<WorkerObservationEvent, RuntimeError>
+ Send
+ Sync,
>,
) -> Self {
Self {
worker_ref,
observation_publisher,
}
}
#[cfg(not(feature = "ws-server"))]
pub(crate) fn new(worker_ref: WorkerRef) -> Self {
Self { worker_ref }
}
pub fn worker_ref(&self) -> &WorkerRef {
&self.worker_ref
}
/// Publish a protocol event into the Runtime observation bus.
#[cfg(feature = "ws-server")]
pub fn publish_protocol_event(
&self,
payload: protocol::Event,
) -> Result<WorkerObservationEvent, RuntimeError> {
(self.observation_publisher)(self.worker_ref.clone(), payload)
}
}
impl fmt::Debug for WorkerExecutionContext {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("WorkerExecutionContext")
.field("worker_ref", &self.worker_ref)
.finish_non_exhaustive()
}
}
/// Spawn/initialization request passed to an execution backend.
#[derive(Clone, Debug)]
pub struct WorkerExecutionSpawnRequest {
pub worker_ref: WorkerRef,
pub request: CreateWorkerRequest,
pub context: WorkerExecutionContext,
}
/// Result of backend Worker spawn/initialization.
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum WorkerExecutionSpawnResult {
Connected {
handle: WorkerExecutionHandle,
run_state: WorkerExecutionRunState,
},
Rejected(WorkerExecutionResult),
Errored(WorkerExecutionResult),
}
/// Backend boundary for Worker execution.
///
/// Runtime owns Worker catalog, transcript, observation, and lifecycle state. A
/// backend owns concrete execution. The default Runtime has no backend, so input
/// to those Workers is rejected instead of producing providerless responses.
pub trait WorkerExecutionBackend: Send + Sync + 'static {
fn backend_id(&self) -> &str;
fn spawn_worker(&self, request: WorkerExecutionSpawnRequest) -> WorkerExecutionSpawnResult;
fn dispatch_input(
&self,
handle: &WorkerExecutionHandle,
input: WorkerInput,
) -> WorkerExecutionResult;
fn stop_worker(&self, _handle: &WorkerExecutionHandle) -> WorkerExecutionResult {
WorkerExecutionResult::unsupported(
WorkerExecutionOperation::Stop,
"execution backend does not support stopping workers",
)
}
fn cancel_worker(&self, _handle: &WorkerExecutionHandle) -> WorkerExecutionResult {
WorkerExecutionResult::unsupported(
WorkerExecutionOperation::Cancel,
"execution backend does not support cancelling workers",
)
}
}
#[derive(Clone)]
pub(crate) struct WorkerExecutionBackendRef {
id: String,
backend: Arc<dyn WorkerExecutionBackend>,
}
impl WorkerExecutionBackendRef {
pub(crate) fn new(backend: Arc<dyn WorkerExecutionBackend>) -> Result<Self, RuntimeError> {
let id = backend.backend_id().trim().to_string();
if id.is_empty() {
return Err(RuntimeError::InvalidRequest(
"execution backend id must not be empty".to_string(),
));
}
Ok(Self { id, backend })
}
pub(crate) fn spawn_worker(
&self,
request: WorkerExecutionSpawnRequest,
) -> WorkerExecutionSpawnResult {
self.backend.spawn_worker(request)
}
pub(crate) fn dispatch_input(
&self,
handle: &WorkerExecutionHandle,
input: WorkerInput,
) -> WorkerExecutionResult {
self.backend.dispatch_input(handle, input)
}
pub(crate) fn stop_worker(&self, handle: &WorkerExecutionHandle) -> WorkerExecutionResult {
self.backend.stop_worker(handle)
}
pub(crate) fn cancel_worker(&self, handle: &WorkerExecutionHandle) -> WorkerExecutionResult {
self.backend.cancel_worker(handle)
}
}
impl fmt::Debug for WorkerExecutionBackendRef {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("WorkerExecutionBackendRef")
.field("id", &self.id)
.finish_non_exhaustive()
}
}

View File

@ -822,7 +822,9 @@ fn status_for_runtime_error(error: &RuntimeError) -> StatusCode {
RuntimeError::WorkerNotFound { .. } | RuntimeError::ConfigBundleMissing { .. } => {
StatusCode::NOT_FOUND
}
RuntimeError::RuntimeStopped { .. } => StatusCode::CONFLICT,
RuntimeError::RuntimeStopped { .. }
| RuntimeError::WorkerExecutionUnavailable { .. }
| RuntimeError::WorkerExecutionRejected { .. } => StatusCode::CONFLICT,
RuntimeError::LimitTooLarge { .. }
| RuntimeError::InvalidRequest(_)
| RuntimeError::ConfigBundleDigestMismatch { .. }
@ -843,6 +845,8 @@ fn code_for_runtime_error(error: &RuntimeError) -> &'static str {
RuntimeError::WrongRuntime { .. } => "wrong_runtime",
RuntimeError::WrongRuntimeCursor { .. } => "wrong_runtime_cursor",
RuntimeError::WorkerNotFound { .. } => "worker_not_found",
RuntimeError::WorkerExecutionUnavailable { .. } => "worker_execution_unavailable",
RuntimeError::WorkerExecutionRejected { .. } => "worker_execution_rejected",
RuntimeError::LimitTooLarge { .. } => "limit_too_large",
RuntimeError::InvalidRequest(_) => "invalid_request",
RuntimeError::ConfigBundleMissing { .. } => "config_bundle_missing",
@ -869,6 +873,12 @@ pub enum RuntimeHttpServerError {
mod tests {
use super::*;
use crate::catalog::{CapabilityRequest, ProfileSelector, WorkerIntent};
use crate::execution::{
WorkerExecutionBackend, WorkerExecutionHandle, WorkerExecutionOperation,
WorkerExecutionResult, WorkerExecutionRunState, WorkerExecutionSpawnRequest,
WorkerExecutionSpawnResult,
};
use crate::management::RuntimeOptions;
use axum::body::to_bytes;
use axum::http::Method;
use tower::ServiceExt;
@ -886,6 +896,39 @@ mod tests {
}
}
struct AcceptingBackend;
impl WorkerExecutionBackend for AcceptingBackend {
fn backend_id(&self) -> &str {
"http-test"
}
fn spawn_worker(&self, request: WorkerExecutionSpawnRequest) -> WorkerExecutionSpawnResult {
WorkerExecutionSpawnResult::Connected {
handle: WorkerExecutionHandle::new(request.worker_ref, self.backend_id()),
run_state: WorkerExecutionRunState::Idle,
}
}
fn dispatch_input(
&self,
_handle: &WorkerExecutionHandle,
_input: WorkerInput,
) -> WorkerExecutionResult {
WorkerExecutionResult::accepted(
WorkerExecutionOperation::Input,
WorkerExecutionRunState::Idle,
)
}
fn stop_worker(&self, _handle: &WorkerExecutionHandle) -> WorkerExecutionResult {
WorkerExecutionResult::accepted(
WorkerExecutionOperation::Stop,
WorkerExecutionRunState::Stopped,
)
}
}
async fn json_request<T: Serialize>(
app: Router,
method: Method,
@ -923,7 +966,9 @@ mod tests {
#[tokio::test]
async fn rest_command_api_delegates_to_runtime() {
let runtime = Runtime::new_memory();
let runtime =
Runtime::with_execution_backend(RuntimeOptions::default(), Arc::new(AcceptingBackend))
.unwrap();
let app = runtime_http_router(runtime.clone(), None);
let response = json_request(

View File

@ -11,6 +11,7 @@ pub mod catalog;
pub mod config_bundle;
pub mod diagnostics;
pub mod error;
pub mod execution;
#[cfg(feature = "fs-store")]
pub mod fs_store;
#[cfg(feature = "http-server")]

View File

@ -8,6 +8,11 @@ use crate::config_bundle::{
};
use crate::diagnostics::{DiagnosticSeverity, RuntimeDiagnostic};
use crate::error::RuntimeError;
use crate::execution::{
WorkerExecutionBackend, WorkerExecutionBackendKind, WorkerExecutionBackendRef,
WorkerExecutionHandle, WorkerExecutionOperation, WorkerExecutionResult,
WorkerExecutionSpawnRequest, WorkerExecutionSpawnResult, WorkerExecutionStatus,
};
#[cfg(feature = "fs-store")]
use crate::fs_store::{
FsRuntimeStore, FsRuntimeStoreOptions, PersistedRuntimeState, PersistedWorkerRecord,
@ -63,6 +68,16 @@ impl Runtime {
}
}
/// Create a memory-backed Runtime with an attached execution backend.
pub fn with_execution_backend(
options: RuntimeOptions,
backend: Arc<dyn WorkerExecutionBackend>,
) -> Result<Self, RuntimeError> {
let runtime = Self::with_options(options);
runtime.install_execution_backend(backend)?;
Ok(runtime)
}
/// Create or restore a filesystem-backed Runtime.
///
/// The store is scoped by typed Runtime identity under `options.root`; if the
@ -71,11 +86,28 @@ impl Runtime {
/// created before the Runtime is returned.
#[cfg(feature = "fs-store")]
pub fn with_fs_store(options: FsRuntimeStoreOptions) -> Result<Self, RuntimeError> {
Self::with_fs_store_inner(options, None)
}
/// Create or restore a filesystem-backed Runtime with an execution backend.
#[cfg(feature = "fs-store")]
pub fn with_fs_store_and_execution_backend(
options: FsRuntimeStoreOptions,
backend: Arc<dyn WorkerExecutionBackend>,
) -> Result<Self, RuntimeError> {
Self::with_fs_store_inner(options, Some(WorkerExecutionBackendRef::new(backend)?))
}
#[cfg(feature = "fs-store")]
fn with_fs_store_inner(
options: FsRuntimeStoreOptions,
execution_backend: Option<WorkerExecutionBackendRef>,
) -> Result<Self, RuntimeError> {
let runtime_id = options.runtime_id.unwrap_or_else(|| {
RuntimeId::generated(NEXT_RUNTIME_SEQUENCE.fetch_add(1, Ordering::Relaxed))
});
let opened = FsRuntimeStore::open_or_create(options.root, runtime_id.clone())?;
let state = if let Some(persisted) = opened.state {
let mut state = if let Some(persisted) = opened.state {
RuntimeState::from_persisted(persisted, opened.store)?
} else {
let mut state = RuntimeState::new_fs_backed(
@ -90,6 +122,7 @@ impl Runtime {
state.persist_event_by_id(event_id)?;
state
};
state.execution_backend = execution_backend;
Ok(Self {
inner: Arc::new(Mutex::new(state)),
})
@ -210,6 +243,12 @@ impl Runtime {
let worker_id = WorkerId::generated(state.next_worker_sequence);
state.next_worker_sequence += 1;
let worker_ref = WorkerRef::new(state.runtime_id.clone(), worker_id.clone());
let backend = state.execution_backend.clone();
let spawn_request = backend.as_ref().map(|_| WorkerExecutionSpawnRequest {
worker_ref: worker_ref.clone(),
request: request.clone(),
context: self.execution_context(worker_ref.clone()),
});
let event_id = state.push_event(
Some(worker_ref.clone()),
RuntimeEventKind::WorkerCreated,
@ -217,10 +256,12 @@ impl Runtime {
);
let record = WorkerRecord {
worker_ref,
worker_ref: worker_ref.clone(),
worker_id: worker_id.clone(),
status: WorkerStatus::Running,
request,
execution: WorkerExecutionStatus::unconnected(),
execution_handle: None,
transcript: Vec::new(),
next_transcript_sequence: 1,
last_event_id: event_id,
@ -231,8 +272,15 @@ impl Runtime {
state.persist_runtime_snapshot()?;
state.persist_worker(&worker_id)?;
state.persist_event_by_id(event_id)?;
drop(state);
if let (Some(backend), Some(spawn_request)) = (backend, spawn_request) {
let result = backend.spawn_worker(spawn_request);
self.apply_spawn_result(&worker_ref, result)
} else {
Ok(detail)
}
}
/// List Workers known to this Runtime.
pub fn list_workers(&self) -> Result<Vec<WorkerSummary>, RuntimeError> {
@ -257,11 +305,11 @@ impl Runtime {
worker_ref: &WorkerRef,
input: WorkerInput,
) -> Result<WorkerInteractionAck, RuntimeError> {
let (backend, handle) = {
let mut state = self.lock()?;
state.ensure_running()?;
validate_worker_input(&input)?;
state.ensure_worker_ref(worker_ref)?;
{
let worker = state.worker(worker_ref)?;
if !worker.status.is_active() {
return Err(RuntimeError::InvalidRequest(format!(
@ -269,8 +317,40 @@ impl Runtime {
worker_ref.worker_id
)));
}
let backend = state.execution_backend.clone();
let handle = worker.execution_handle.clone();
match (backend, handle) {
(Some(backend), Some(handle)) => (backend, handle),
_ => {
let result = WorkerExecutionResult::rejected(
WorkerExecutionOperation::Input,
"worker has no execution backend",
);
let worker = state.worker_mut(worker_ref)?;
worker.execution = WorkerExecutionStatus::unconnected().with_result(result);
state.persist_worker(&worker_ref.worker_id)?;
return Err(RuntimeError::WorkerExecutionUnavailable {
worker_id: worker_ref.worker_id.clone(),
message: "worker has no execution backend".to_string(),
});
}
}
};
let dispatch_result = backend.dispatch_input(&handle, input.clone());
if !dispatch_result.is_accepted() {
self.record_execution_result(worker_ref, dispatch_result.clone())?;
return Err(RuntimeError::WorkerExecutionRejected {
worker_id: worker_ref.worker_id.clone(),
operation: dispatch_result.operation,
outcome: dispatch_result.outcome,
message: dispatch_result.message_or_default(),
result: dispatch_result,
});
}
let mut state = self.lock()?;
state.ensure_running()?;
let event_id = state.push_event(
Some(worker_ref.clone()),
RuntimeEventKind::WorkerInputAccepted,
@ -286,6 +366,11 @@ impl Runtime {
let transcript_sequence = worker.next_transcript_sequence;
worker.next_transcript_sequence += 1;
worker.last_event_id = event_id;
worker.execution = WorkerExecutionStatus {
backend: WorkerExecutionBackendKind::Connected,
run_state: dispatch_result.run_state,
last_result: Some(dispatch_result),
};
worker.transcript.push(TranscriptEntry {
sequence: transcript_sequence,
worker_ref: worker_ref.clone(),
@ -328,12 +413,103 @@ impl Runtime {
})
}
fn apply_spawn_result(
&self,
worker_ref: &WorkerRef,
result: WorkerExecutionSpawnResult,
) -> Result<WorkerDetail, RuntimeError> {
let mut state = self.lock()?;
let runtime_id = state.runtime_id.clone();
let detail = {
let worker = state.worker_mut(worker_ref)?;
match result {
WorkerExecutionSpawnResult::Connected { handle, run_state } => {
worker.execution_handle = Some(handle);
worker.execution = WorkerExecutionStatus::connected(run_state).with_result(
WorkerExecutionResult::accepted(WorkerExecutionOperation::Spawn, run_state),
);
}
WorkerExecutionSpawnResult::Rejected(result)
| WorkerExecutionSpawnResult::Errored(result) => {
worker.execution_handle = None;
worker.execution = WorkerExecutionStatus {
backend: WorkerExecutionBackendKind::Connected,
run_state: result.run_state,
last_result: Some(result),
};
}
}
worker.detail(&runtime_id)
};
state.persist_worker(&worker_ref.worker_id)?;
Ok(detail)
}
fn record_execution_result(
&self,
worker_ref: &WorkerRef,
result: WorkerExecutionResult,
) -> Result<(), RuntimeError> {
let mut state = self.lock()?;
let worker = state.worker_mut(worker_ref)?;
worker.execution = WorkerExecutionStatus {
backend: WorkerExecutionBackendKind::Connected,
run_state: result.run_state,
last_result: Some(result),
};
state.persist_worker(&worker_ref.worker_id)?;
Ok(())
}
fn dispatch_lifecycle_to_backend(
&self,
worker_ref: &WorkerRef,
operation: WorkerExecutionOperation,
) -> Result<(), RuntimeError> {
let Some((backend, handle)) = ({
let state = self.lock()?;
state.ensure_worker_ref(worker_ref)?;
let worker = state.worker(worker_ref)?;
if !worker.status.is_active() {
return Ok(());
}
match (
state.execution_backend.clone(),
worker.execution_handle.clone(),
) {
(Some(backend), Some(handle)) => Some((backend, handle)),
_ => None,
}
}) else {
return Ok(());
};
let result = match operation {
WorkerExecutionOperation::Stop => backend.stop_worker(&handle),
WorkerExecutionOperation::Cancel => backend.cancel_worker(&handle),
WorkerExecutionOperation::Spawn | WorkerExecutionOperation::Input => return Ok(()),
};
if result.is_accepted() {
self.record_execution_result(worker_ref, result)?;
return Ok(());
}
self.record_execution_result(worker_ref, result.clone())?;
Err(RuntimeError::WorkerExecutionRejected {
worker_id: worker_ref.worker_id.clone(),
operation: result.operation,
outcome: result.outcome,
message: result.message_or_default(),
result,
})
}
/// Stop a Worker. Repeated stops are idempotent and return the last event id.
pub fn stop_worker(
&self,
worker_ref: &WorkerRef,
reason: Option<String>,
) -> Result<WorkerLifecycleAck, RuntimeError> {
self.dispatch_lifecycle_to_backend(worker_ref, WorkerExecutionOperation::Stop)?;
self.transition_worker(
worker_ref,
WorkerStatus::Stopped,
@ -348,6 +524,7 @@ impl Runtime {
worker_ref: &WorkerRef,
reason: Option<String>,
) -> Result<WorkerLifecycleAck, RuntimeError> {
self.dispatch_lifecycle_to_backend(worker_ref, WorkerExecutionOperation::Cancel)?;
self.transition_worker(
worker_ref,
WorkerStatus::Cancelled,
@ -591,6 +768,30 @@ impl Runtime {
})
}
fn install_execution_backend(
&self,
backend: Arc<dyn WorkerExecutionBackend>,
) -> Result<(), RuntimeError> {
let backend = WorkerExecutionBackendRef::new(backend)?;
let mut state = self.lock()?;
state.execution_backend = Some(backend);
Ok(())
}
#[cfg(feature = "ws-server")]
fn execution_context(&self, worker_ref: WorkerRef) -> crate::execution::WorkerExecutionContext {
let runtime = self.clone();
crate::execution::WorkerExecutionContext::new(
worker_ref,
Arc::new(move |worker_ref, payload| runtime.observe_worker_event(&worker_ref, payload)),
)
}
#[cfg(not(feature = "ws-server"))]
fn execution_context(&self, worker_ref: WorkerRef) -> crate::execution::WorkerExecutionContext {
crate::execution::WorkerExecutionContext::new(worker_ref)
}
fn lock(&self) -> Result<MutexGuard<'_, RuntimeState>, RuntimeError> {
self.inner.lock().map_err(|_| RuntimeError::StatePoisoned)
}
@ -613,6 +814,7 @@ struct RuntimeState {
persistence: RuntimePersistence,
status: RuntimeStatus,
limits: RuntimeLimits,
execution_backend: Option<WorkerExecutionBackendRef>,
next_worker_sequence: u64,
next_event_id: u64,
next_diagnostic_id: u64,
@ -637,6 +839,7 @@ impl RuntimeState {
persistence: RuntimePersistence::Memory,
status: RuntimeStatus::Running,
limits,
execution_backend: None,
next_worker_sequence: 1,
next_event_id: 1,
next_diagnostic_id: 1,
@ -667,6 +870,7 @@ impl RuntimeState {
persistence: RuntimePersistence::Fs(store),
status: RuntimeStatus::Running,
limits,
execution_backend: None,
next_worker_sequence: 1,
next_event_id: 1,
next_diagnostic_id: 1,
@ -709,6 +913,8 @@ impl RuntimeState {
worker_id: worker.worker_id,
status: worker.status,
request: worker.request,
execution: WorkerExecutionStatus::unconnected(),
execution_handle: None,
transcript: worker.transcript,
next_transcript_sequence: worker.next_transcript_sequence,
last_event_id: worker.last_event_id,
@ -723,6 +929,7 @@ impl RuntimeState {
persistence: RuntimePersistence::Fs(store),
status: persisted.status,
limits: persisted.limits,
execution_backend: None,
next_worker_sequence: persisted.next_worker_sequence,
next_event_id: persisted.next_event_id,
next_diagnostic_id: persisted.next_diagnostic_id,
@ -730,6 +937,12 @@ impl RuntimeState {
config_bundles: persisted.config_bundles,
events: persisted.events,
diagnostics: persisted.diagnostics,
#[cfg(feature = "ws-server")]
next_observation_sequence: 1,
#[cfg(feature = "ws-server")]
observation_events: VecDeque::new(),
#[cfg(feature = "ws-server")]
observation_tx: broadcast::channel(256).0,
})
}
@ -1090,6 +1303,8 @@ struct WorkerRecord {
worker_id: WorkerId,
status: WorkerStatus,
request: CreateWorkerRequest,
execution: WorkerExecutionStatus,
execution_handle: Option<WorkerExecutionHandle>,
transcript: Vec<TranscriptEntry>,
next_transcript_sequence: u64,
last_event_id: u64,
@ -1102,6 +1317,7 @@ impl WorkerRecord {
runtime_id: runtime_id.clone(),
worker_id: self.worker_id.clone(),
status: self.status,
execution: self.execution.clone(),
intent: self.request.intent.clone(),
profile: self.request.profile.clone(),
requested_capability_count: self.request.requested_capabilities.len(),
@ -1117,6 +1333,7 @@ impl WorkerRecord {
runtime_id: runtime_id.clone(),
worker_id: self.worker_id.clone(),
status: self.status,
execution: self.execution.clone(),
intent: self.request.intent.clone(),
profile: self.request.profile.clone(),
config_bundle: self.request.config_bundle.clone(),
@ -1185,7 +1402,12 @@ mod tests {
ConfigBundle, ConfigBundleMetadata, ConfigBundleProvenance, ConfigDeclaration,
ConfigDeclarationKind, ConfigProfileDescriptor,
};
use crate::management::RuntimeLimits;
use crate::execution::{
WorkerExecutionBackend, WorkerExecutionContext, WorkerExecutionHandle,
WorkerExecutionRunState,
};
use std::collections::BTreeMap;
use std::sync::{Arc, Mutex};
fn task_request(objective: &str) -> CreateWorkerRequest {
CreateWorkerRequest {
@ -1200,6 +1422,92 @@ mod tests {
}
}
#[derive(Default)]
struct TestExecutionBackend {
dispatch_result: Mutex<Option<WorkerExecutionResult>>,
contexts: Mutex<BTreeMap<WorkerId, WorkerExecutionContext>>,
}
impl TestExecutionBackend {
fn set_dispatch_result(&self, result: WorkerExecutionResult) {
*self.dispatch_result.lock().unwrap() = Some(result);
}
#[cfg(feature = "ws-server")]
fn publish_text_delta(
&self,
worker_ref: &WorkerRef,
text: &str,
) -> Result<crate::observation::WorkerObservationEvent, RuntimeError> {
let contexts = self.contexts.lock().unwrap();
let context = contexts.get(&worker_ref.worker_id).expect("context stored");
context.publish_protocol_event(protocol::Event::TextDelta { text: text.into() })
}
}
impl WorkerExecutionBackend for TestExecutionBackend {
fn backend_id(&self) -> &str {
"test-execution-backend"
}
fn spawn_worker(&self, request: WorkerExecutionSpawnRequest) -> WorkerExecutionSpawnResult {
self.contexts
.lock()
.unwrap()
.insert(request.worker_ref.worker_id.clone(), request.context);
WorkerExecutionSpawnResult::Connected {
handle: WorkerExecutionHandle::new(request.worker_ref, self.backend_id()),
run_state: WorkerExecutionRunState::Idle,
}
}
fn dispatch_input(
&self,
_handle: &WorkerExecutionHandle,
_input: WorkerInput,
) -> WorkerExecutionResult {
self.dispatch_result
.lock()
.unwrap()
.clone()
.unwrap_or_else(|| {
WorkerExecutionResult::accepted(
WorkerExecutionOperation::Input,
WorkerExecutionRunState::Idle,
)
})
}
fn stop_worker(&self, _handle: &WorkerExecutionHandle) -> WorkerExecutionResult {
WorkerExecutionResult::accepted(
WorkerExecutionOperation::Stop,
WorkerExecutionRunState::Stopped,
)
}
fn cancel_worker(&self, _handle: &WorkerExecutionHandle) -> WorkerExecutionResult {
WorkerExecutionResult::accepted(
WorkerExecutionOperation::Cancel,
WorkerExecutionRunState::Stopped,
)
}
}
fn runtime_with_backend() -> Runtime {
Runtime::with_execution_backend(
RuntimeOptions::default(),
Arc::new(TestExecutionBackend::default()),
)
.unwrap()
}
fn runtime_and_backend() -> (Runtime, Arc<TestExecutionBackend>) {
let backend = Arc::new(TestExecutionBackend::default());
let runtime =
Runtime::with_execution_backend(RuntimeOptions::default(), backend.clone()).unwrap();
(runtime, backend)
}
fn test_bundle() -> ConfigBundle {
ConfigBundle {
metadata: ConfigBundleMetadata {
@ -1357,15 +1665,142 @@ mod tests {
);
}
#[test]
fn backend_unconnected_worker_input_is_rejected_and_not_transcribed() {
let runtime = Runtime::new_memory();
let detail = runtime.create_worker(task_request("placeholder")).unwrap();
assert_eq!(
detail.execution.backend,
WorkerExecutionBackendKind::Unconnected
);
let err = runtime
.send_input(&detail.worker_ref, WorkerInput::user("must reject"))
.unwrap_err();
assert!(matches!(
err,
RuntimeError::WorkerExecutionUnavailable { .. }
));
let projection = runtime
.transcript_projection(&detail.worker_ref, TranscriptQuery::new(0, 1))
.unwrap();
assert_eq!(projection.total_items, 0);
}
#[test]
fn connected_backend_busy_dispatch_is_typed_and_not_transcribed() {
let (runtime, backend) = runtime_and_backend();
backend.set_dispatch_result(WorkerExecutionResult::busy(
WorkerExecutionOperation::Input,
"worker is already running",
));
let detail = runtime.create_worker(task_request("busy")).unwrap();
let err = runtime
.send_input(&detail.worker_ref, WorkerInput::user("wait"))
.unwrap_err();
assert!(matches!(
err,
RuntimeError::WorkerExecutionRejected {
outcome: crate::execution::WorkerExecutionOutcome::Busy,
..
}
));
let refreshed = runtime.worker_detail(&detail.worker_ref).unwrap();
assert_eq!(refreshed.execution.run_state, WorkerExecutionRunState::Busy);
assert_eq!(
runtime
.transcript_projection(&detail.worker_ref, TranscriptQuery::new(0, 1))
.unwrap()
.total_items,
0
);
}
#[cfg(feature = "ws-server")]
#[test]
fn backend_protocol_publish_hook_writes_observation_bus() {
let (runtime, backend) = runtime_and_backend();
let detail = runtime.create_worker(task_request("observe")).unwrap();
let observation = backend
.publish_text_delta(&detail.worker_ref, "from backend")
.unwrap();
assert_eq!(observation.worker_ref, detail.worker_ref);
let observations = runtime
.read_worker_observation_events(&detail.worker_ref, WorkerObservationCursor::zero())
.unwrap();
assert_eq!(observations.len(), 1);
assert!(matches!(
observations[0].payload,
protocol::Event::TextDelta { .. }
));
}
struct InputOnlyBackend;
impl WorkerExecutionBackend for InputOnlyBackend {
fn backend_id(&self) -> &str {
"input-only"
}
fn spawn_worker(&self, request: WorkerExecutionSpawnRequest) -> WorkerExecutionSpawnResult {
WorkerExecutionSpawnResult::Connected {
handle: WorkerExecutionHandle::new(request.worker_ref, self.backend_id()),
run_state: WorkerExecutionRunState::Idle,
}
}
fn dispatch_input(
&self,
_handle: &WorkerExecutionHandle,
_input: WorkerInput,
) -> WorkerExecutionResult {
WorkerExecutionResult::accepted(
WorkerExecutionOperation::Input,
WorkerExecutionRunState::Idle,
)
}
}
#[test]
fn connected_backend_stop_unsupported_is_typed_rejection() {
let runtime =
Runtime::with_execution_backend(RuntimeOptions::default(), Arc::new(InputOnlyBackend))
.unwrap();
let detail = runtime.create_worker(task_request("no stop")).unwrap();
let err = runtime
.stop_worker(&detail.worker_ref, Some("stop".to_string()))
.unwrap_err();
assert!(matches!(
err,
RuntimeError::WorkerExecutionRejected {
outcome: crate::execution::WorkerExecutionOutcome::Unsupported,
..
}
));
assert_eq!(
runtime.worker_detail(&detail.worker_ref).unwrap().status,
WorkerStatus::Running
);
}
#[test]
fn send_input_and_project_bounded_transcript() {
let runtime = Runtime::with_options(RuntimeOptions {
let runtime = Runtime::with_execution_backend(
RuntimeOptions {
limits: RuntimeLimits {
max_transcript_projection_items: 2,
max_event_batch_items: 16,
},
..RuntimeOptions::default()
});
},
Arc::new(TestExecutionBackend::default()),
)
.unwrap();
let detail = runtime.create_worker(task_request("chat")).unwrap();
let first = runtime
@ -1509,7 +1944,7 @@ mod tests {
#[test]
fn event_cursor_and_poll_only_subscription_are_bounded_placeholders() {
let runtime = Runtime::new_memory();
let runtime = runtime_with_backend();
let cursor = runtime.event_cursor_from_start().unwrap();
let subscription = runtime.subscribe_events(cursor.clone()).unwrap();
assert_eq!(subscription.mode, EventSubscriptionMode::PollOnly);
@ -1559,7 +1994,8 @@ mod tests {
fn fs_store_restores_workers_events_and_transcripts() {
let root = fs_store_root("restore");
let runtime_id = RuntimeId::new("runtime-fs-authority").unwrap();
let runtime = Runtime::with_fs_store(crate::fs_store::FsRuntimeStoreOptions {
let runtime = Runtime::with_fs_store_and_execution_backend(
crate::fs_store::FsRuntimeStoreOptions {
root: root.clone(),
runtime_id: Some(runtime_id.clone()),
display_name: Some("filesystem runtime".to_string()),
@ -1567,7 +2003,9 @@ mod tests {
max_transcript_projection_items: 2,
max_event_batch_items: 2,
},
})
},
Arc::new(TestExecutionBackend::default()),
)
.unwrap();
assert_eq!(
runtime.summary().unwrap().backend,
@ -1619,6 +2057,24 @@ mod tests {
.unwrap();
assert_eq!(direct_transcript.items[0].content, "second");
#[cfg(feature = "ws-server")]
{
let observation = restored
.observe_worker_event(
&worker.worker_ref,
protocol::Event::TextDelta {
text: "restored observation bus".to_string(),
},
)
.unwrap();
assert_eq!(observation.sequence, 1);
let observations = restored
.read_worker_observation_events(&worker.worker_ref, WorkerObservationCursor::zero())
.unwrap();
assert_eq!(observations.len(), 1);
assert_eq!(observations[0].cursor, observation.cursor);
}
let _ = std::fs::remove_dir_all(root);
}

View File

@ -1260,9 +1260,9 @@ impl WorkspaceWorkerRuntime for EmbeddedWorkerRuntime {
&self.runtime_id,
worker_id,
diagnostic(
"embedded_worker_llm_not_connected",
"embedded_worker_execution_unavailable",
DiagnosticSeverity::Error,
"Embedded Worker input is disabled until actual Worker/LLM execution is connected"
"Embedded Worker input is disabled until an execution backend is connected"
.to_string(),
),
)
@ -2054,6 +2054,16 @@ fn embedded_runtime_diagnostic(error: &EmbeddedRuntimeError) -> RuntimeDiagnosti
DiagnosticSeverity::Warning,
"Embedded Runtime worker was not found".to_string(),
),
EmbeddedRuntimeError::WorkerExecutionUnavailable { .. } => diagnostic(
"embedded_worker_execution_unavailable",
DiagnosticSeverity::Warning,
"Embedded Worker has no execution backend attached".to_string(),
),
EmbeddedRuntimeError::WorkerExecutionRejected { .. } => diagnostic(
"embedded_worker_execution_rejected",
DiagnosticSeverity::Warning,
"Embedded Worker execution backend rejected the operation".to_string(),
),
EmbeddedRuntimeError::LimitTooLarge { requested, max } => diagnostic(
"embedded_runtime_limit_too_large",
DiagnosticSeverity::Warning,
@ -2640,7 +2650,7 @@ mod tests {
input
.diagnostics
.iter()
.any(|diagnostic| diagnostic.code == "embedded_worker_llm_not_connected")
.any(|diagnostic| diagnostic.code == "embedded_worker_execution_unavailable")
);
let transcript = registry
@ -2658,6 +2668,8 @@ mod tests {
"token",
"credential",
"provider",
"can_stream_events",
"can_read_bounded_transcript",
] {
assert!(
!json.contains(forbidden),
@ -3013,6 +3025,7 @@ mod tests {
"runtime_id": runtime_id,
"worker_id": worker_id,
"status": "running",
"execution": { "backend": "connected", "run_state": "idle" },
"intent": { "kind": "role", "role": "coder", "purpose": "remote test" },
"profile": { "kind": "builtin", "value": "coder" },
"config_bundle": null,

View File

@ -1337,7 +1337,7 @@ mod tests {
.as_array()
.unwrap()
.iter()
.any(|diagnostic| diagnostic["code"] == "embedded_worker_llm_not_connected")
.any(|diagnostic| diagnostic["code"] == "embedded_worker_execution_unavailable")
);
let transcript = get_json(