merge: runtime worker launch unification

This commit is contained in:
Keisuke Hirata 2026-06-29 03:37:45 +09:00
commit bdb339fab8
No known key found for this signature in database
10 changed files with 941 additions and 564 deletions

View File

@ -1,31 +1,8 @@
use crate::execution::WorkerExecutionStatus; use crate::execution::WorkerExecutionStatus;
use crate::identity::{RuntimeId, WorkerId, WorkerRef}; use crate::identity::{RuntimeId, WorkerId, WorkerRef};
use crate::interaction::WorkerInput;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
/// Intent supplied when a Worker is created.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum WorkerIntent {
Assistant {
#[serde(default, skip_serializing_if = "Option::is_none")]
purpose: Option<String>,
},
Task {
objective: String,
},
Role {
role: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
purpose: Option<String>,
},
}
impl Default for WorkerIntent {
fn default() -> Self {
Self::Assistant { purpose: None }
}
}
/// Profile selector boundary. This is a selector, not a resolved config bundle. /// Profile selector boundary. This is a selector, not a resolved config bundle.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "kind", content = "value", rename_all = "snake_case")] #[serde(tag = "kind", content = "value", rename_all = "snake_case")]
@ -48,77 +25,21 @@ pub struct ConfigBundleRef {
pub digest: String, pub digest: String,
} }
/// Requested capability name plus optional human-readable reason. /// Canonical Runtime Worker creation request.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] ///
pub struct CapabilityRequest { /// Browser/product launch semantics are resolved by a backend before this
pub name: String, /// request is built. The request contains only durable Runtime identity inputs:
#[serde(default, skip_serializing_if = "Option::is_none")] /// a backend-decided profile selector, a previously synced ConfigBundle identity,
pub reason: Option<String>, /// and optional initial user input that is committed in the same transaction as
} /// Worker catalog/transcript persistence. It carries no cwd/workspace path, tool
/// scope, credential, socket/session path, raw config body, or execution binding
impl CapabilityRequest { /// internals.
pub fn named(name: impl Into<String>) -> Self {
Self {
name: name.into(),
reason: None,
}
}
}
/// Opaque workspace reference supplied by a caller.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct WorkspaceRef {
pub name: String,
pub reference: String,
}
/// Opaque mount reference supplied by a caller.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct MountRef {
pub name: String,
pub reference: String,
}
/// Worker creation request for the catalog/lifecycle API.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct CreateWorkerRequest { pub struct CreateWorkerRequest {
pub intent: WorkerIntent,
pub profile: ProfileSelector, pub profile: ProfileSelector,
pub config_bundle: ConfigBundleRef,
#[serde(default, skip_serializing_if = "Option::is_none")] #[serde(default, skip_serializing_if = "Option::is_none")]
pub config_bundle: Option<ConfigBundleRef>, pub initial_input: Option<WorkerInput>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub requested_capabilities: Vec<CapabilityRequest>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub workspace_refs: Vec<WorkspaceRef>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub mount_refs: Vec<MountRef>,
}
impl Default for CreateWorkerRequest {
fn default() -> Self {
Self {
intent: WorkerIntent::default(),
profile: ProfileSelector::default(),
config_bundle: None,
requested_capabilities: Vec::new(),
workspace_refs: Vec::new(),
mount_refs: Vec::new(),
}
}
}
impl CreateWorkerRequest {
/// Create a tools-less Worker using runtime-local default resources.
pub fn tools_less(intent: WorkerIntent, profile: ProfileSelector) -> Self {
Self {
intent,
profile,
config_bundle: None,
requested_capabilities: Vec::new(),
workspace_refs: Vec::new(),
mount_refs: Vec::new(),
}
}
} }
/// Worker lifecycle status for the in-memory embedded runtime. /// Worker lifecycle status for the in-memory embedded runtime.
@ -144,10 +65,8 @@ pub struct WorkerSummary {
pub worker_id: WorkerId, pub worker_id: WorkerId,
pub status: WorkerStatus, pub status: WorkerStatus,
pub execution: WorkerExecutionStatus, pub execution: WorkerExecutionStatus,
pub intent: WorkerIntent,
pub profile: ProfileSelector, pub profile: ProfileSelector,
pub requested_capability_count: usize, pub config_bundle: ConfigBundleRef,
pub has_config_bundle: bool,
pub transcript_len: usize, pub transcript_len: usize,
pub last_event_id: u64, pub last_event_id: u64,
} }
@ -160,16 +79,8 @@ pub struct WorkerDetail {
pub worker_id: WorkerId, pub worker_id: WorkerId,
pub status: WorkerStatus, pub status: WorkerStatus,
pub execution: WorkerExecutionStatus, pub execution: WorkerExecutionStatus,
pub intent: WorkerIntent,
pub profile: ProfileSelector, pub profile: ProfileSelector,
#[serde(default, skip_serializing_if = "Option::is_none")] pub config_bundle: ConfigBundleRef,
pub config_bundle: Option<ConfigBundleRef>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub requested_capabilities: Vec<CapabilityRequest>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub workspace_refs: Vec<WorkspaceRef>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub mount_refs: Vec<MountRef>,
pub transcript_len: usize, pub transcript_len: usize,
pub last_event_id: u64, pub last_event_id: u64,
} }

View File

@ -23,6 +23,9 @@ pub enum RuntimeError {
actual_runtime_id: RuntimeId, actual_runtime_id: RuntimeId,
}, },
#[error("initial worker input must be user input, got {kind}")]
InvalidInitialInputKind { kind: String },
#[error("worker {worker_id} was not found in runtime {runtime_id}")] #[error("worker {worker_id} was not found in runtime {runtime_id}")]
WorkerNotFound { WorkerNotFound {
runtime_id: RuntimeId, runtime_id: RuntimeId,
@ -35,6 +38,9 @@ pub enum RuntimeError {
message: String, message: String,
}, },
#[error("worker creation has no execution backend: {message}")]
ExecutionBackendUnavailable { message: String },
#[error("worker {worker_id} execution {operation:?} returned {outcome:?}: {message}")] #[error("worker {worker_id} execution {operation:?} returned {outcome:?}: {message}")]
WorkerExecutionRejected { WorkerExecutionRejected {
worker_id: WorkerId, worker_id: WorkerId,

View File

@ -18,9 +18,29 @@ use std::sync::Arc;
pub enum WorkerExecutionBackendKind { pub enum WorkerExecutionBackendKind {
#[default] #[default]
Unconnected, Unconnected,
/// A durable execution binding was restored, but no live handle was recovered.
Stale,
Connected, Connected,
} }
/// Durable, non-authority execution binding projection.
///
/// This records only enough identity to diagnose stale mappings after restore.
/// It is not a live handle and must not contain sockets, paths, credentials, or
/// provider-private authority.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct WorkerExecutionBindingIdentity {
pub backend_id: String,
}
impl WorkerExecutionBindingIdentity {
pub fn from_handle(handle: &WorkerExecutionHandle) -> Self {
Self {
backend_id: handle.backend_id.clone(),
}
}
}
/// Current execution-side run state for a Worker. /// Current execution-side run state for a Worker.
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)] #[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")] #[serde(rename_all = "snake_case")]
@ -131,6 +151,8 @@ pub struct WorkerExecutionStatus {
pub backend: WorkerExecutionBackendKind, pub backend: WorkerExecutionBackendKind,
pub run_state: WorkerExecutionRunState, pub run_state: WorkerExecutionRunState,
#[serde(default, skip_serializing_if = "Option::is_none")] #[serde(default, skip_serializing_if = "Option::is_none")]
pub binding: Option<WorkerExecutionBindingIdentity>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub last_result: Option<WorkerExecutionResult>, pub last_result: Option<WorkerExecutionResult>,
} }
@ -143,10 +165,22 @@ impl WorkerExecutionStatus {
Self { Self {
backend: WorkerExecutionBackendKind::Connected, backend: WorkerExecutionBackendKind::Connected,
run_state, run_state,
binding: None,
last_result: None, last_result: None,
} }
} }
pub fn stale(mut previous: Self) -> Self {
previous.backend = WorkerExecutionBackendKind::Stale;
previous.run_state = WorkerExecutionRunState::Unconnected;
previous
}
pub fn with_binding(mut self, binding: WorkerExecutionBindingIdentity) -> Self {
self.binding = Some(binding);
self
}
pub fn with_result(mut self, result: WorkerExecutionResult) -> Self { pub fn with_result(mut self, result: WorkerExecutionResult) -> Self {
self.run_state = result.run_state; self.run_state = result.run_state;
self.last_result = Some(result); self.last_result = Some(result);

View File

@ -2,6 +2,7 @@ use crate::catalog::{CreateWorkerRequest, WorkerStatus};
use crate::config_bundle::ConfigBundle; use crate::config_bundle::ConfigBundle;
use crate::diagnostics::RuntimeDiagnostic; use crate::diagnostics::RuntimeDiagnostic;
use crate::error::RuntimeError; use crate::error::RuntimeError;
use crate::execution::WorkerExecutionStatus;
use crate::identity::{RuntimeId, WorkerId, WorkerRef}; use crate::identity::{RuntimeId, WorkerId, WorkerRef};
use crate::management::{RuntimeBackendKind, RuntimeLimits, RuntimeStatus}; use crate::management::{RuntimeBackendKind, RuntimeLimits, RuntimeStatus};
use crate::observation::{ use crate::observation::{
@ -376,6 +377,7 @@ pub(crate) struct PersistedWorkerRecord {
pub(crate) worker_id: WorkerId, pub(crate) worker_id: WorkerId,
pub(crate) status: WorkerStatus, pub(crate) status: WorkerStatus,
pub(crate) request: CreateWorkerRequest, pub(crate) request: CreateWorkerRequest,
pub(crate) execution: WorkerExecutionStatus,
pub(crate) transcript: Vec<TranscriptEntry>, pub(crate) transcript: Vec<TranscriptEntry>,
pub(crate) next_transcript_sequence: u64, pub(crate) next_transcript_sequence: u64,
pub(crate) last_event_id: u64, pub(crate) last_event_id: u64,
@ -473,6 +475,8 @@ struct WorkerSnapshot {
worker_id: WorkerId, worker_id: WorkerId,
status: WorkerStatus, status: WorkerStatus,
request: CreateWorkerRequest, request: CreateWorkerRequest,
#[serde(default = "WorkerExecutionStatus::unconnected")]
execution: WorkerExecutionStatus,
next_transcript_sequence: u64, next_transcript_sequence: u64,
last_event_id: u64, last_event_id: u64,
} }
@ -485,6 +489,7 @@ impl WorkerSnapshot {
worker_id: worker.worker_id.clone(), worker_id: worker.worker_id.clone(),
status: worker.status, status: worker.status,
request: worker.request.clone(), request: worker.request.clone(),
execution: worker.execution.clone(),
next_transcript_sequence: worker.next_transcript_sequence, next_transcript_sequence: worker.next_transcript_sequence,
last_event_id: worker.last_event_id, last_event_id: worker.last_event_id,
} }
@ -530,6 +535,7 @@ impl WorkerSnapshot {
worker_id: self.worker_id, worker_id: self.worker_id,
status: self.status, status: self.status,
request: self.request, request: self.request,
execution: self.execution,
transcript, transcript,
next_transcript_sequence: self.next_transcript_sequence, next_transcript_sequence: self.next_transcript_sequence,
last_event_id: self.last_event_id, last_event_id: self.last_event_id,

View File

@ -824,9 +824,11 @@ fn status_for_runtime_error(error: &RuntimeError) -> StatusCode {
} }
RuntimeError::RuntimeStopped { .. } RuntimeError::RuntimeStopped { .. }
| RuntimeError::WorkerExecutionUnavailable { .. } | RuntimeError::WorkerExecutionUnavailable { .. }
| RuntimeError::ExecutionBackendUnavailable { .. }
| RuntimeError::WorkerExecutionRejected { .. } => StatusCode::CONFLICT, | RuntimeError::WorkerExecutionRejected { .. } => StatusCode::CONFLICT,
RuntimeError::LimitTooLarge { .. } RuntimeError::LimitTooLarge { .. }
| RuntimeError::InvalidRequest(_) | RuntimeError::InvalidRequest(_)
| RuntimeError::InvalidInitialInputKind { .. }
| RuntimeError::ConfigBundleDigestMismatch { .. } | RuntimeError::ConfigBundleDigestMismatch { .. }
| RuntimeError::InvalidProfileSelector { .. } | RuntimeError::InvalidProfileSelector { .. }
| RuntimeError::UnsupportedConfigDeclaration { .. } | RuntimeError::UnsupportedConfigDeclaration { .. }
@ -846,9 +848,11 @@ fn code_for_runtime_error(error: &RuntimeError) -> &'static str {
RuntimeError::WrongRuntimeCursor { .. } => "wrong_runtime_cursor", RuntimeError::WrongRuntimeCursor { .. } => "wrong_runtime_cursor",
RuntimeError::WorkerNotFound { .. } => "worker_not_found", RuntimeError::WorkerNotFound { .. } => "worker_not_found",
RuntimeError::WorkerExecutionUnavailable { .. } => "worker_execution_unavailable", RuntimeError::WorkerExecutionUnavailable { .. } => "worker_execution_unavailable",
RuntimeError::ExecutionBackendUnavailable { .. } => "execution_backend_unavailable",
RuntimeError::WorkerExecutionRejected { .. } => "worker_execution_rejected", RuntimeError::WorkerExecutionRejected { .. } => "worker_execution_rejected",
RuntimeError::LimitTooLarge { .. } => "limit_too_large", RuntimeError::LimitTooLarge { .. } => "limit_too_large",
RuntimeError::InvalidRequest(_) => "invalid_request", RuntimeError::InvalidRequest(_) => "invalid_request",
RuntimeError::InvalidInitialInputKind { .. } => "invalid_initial_input_kind",
RuntimeError::ConfigBundleMissing { .. } => "config_bundle_missing", RuntimeError::ConfigBundleMissing { .. } => "config_bundle_missing",
RuntimeError::ConfigBundleDigestMismatch { .. } => "config_bundle_digest_mismatch", RuntimeError::ConfigBundleDigestMismatch { .. } => "config_bundle_digest_mismatch",
RuntimeError::InvalidProfileSelector { .. } => "invalid_profile_selector", RuntimeError::InvalidProfileSelector { .. } => "invalid_profile_selector",
@ -872,7 +876,10 @@ pub enum RuntimeHttpServerError {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use crate::catalog::{CapabilityRequest, ProfileSelector, WorkerIntent}; use crate::catalog::{ConfigBundleRef, ProfileSelector};
use crate::config_bundle::{
ConfigBundle, ConfigBundleMetadata, ConfigBundleProvenance, ConfigProfileDescriptor,
};
use crate::execution::{ use crate::execution::{
WorkerExecutionBackend, WorkerExecutionHandle, WorkerExecutionOperation, WorkerExecutionBackend, WorkerExecutionHandle, WorkerExecutionOperation,
WorkerExecutionResult, WorkerExecutionRunState, WorkerExecutionSpawnRequest, WorkerExecutionResult, WorkerExecutionRunState, WorkerExecutionSpawnRequest,
@ -883,16 +890,38 @@ mod tests {
use axum::http::Method; use axum::http::Method;
use tower::ServiceExt; use tower::ServiceExt;
fn task_request(objective: &str) -> CreateWorkerRequest { fn test_bundle(profile: ProfileSelector) -> ConfigBundle {
CreateWorkerRequest { ConfigBundle {
intent: WorkerIntent::Task { metadata: ConfigBundleMetadata {
objective: objective.to_string(), id: "http-test-bundle".to_string(),
digest: String::new(),
revision: "test".to_string(),
workspace_id: "test-workspace".to_string(),
created_at: "test".to_string(),
provenance: ConfigBundleProvenance {
source: "test".to_string(),
detail: None,
}, },
profile: ProfileSelector::Builtin("builtin:coder".to_string()), },
config_bundle: None, profiles: vec![ConfigProfileDescriptor {
requested_capabilities: vec![CapabilityRequest::named("read")], selector: profile,
workspace_refs: Vec::new(), label: Some("test".to_string()),
mount_refs: Vec::new(), }],
declarations: Vec::new(),
}
.with_computed_digest()
}
fn task_request(_objective: &str) -> CreateWorkerRequest {
let profile = ProfileSelector::Builtin("builtin:coder".to_string());
let bundle = test_bundle(profile.clone());
CreateWorkerRequest {
profile,
config_bundle: ConfigBundleRef {
id: bundle.metadata.id,
digest: bundle.metadata.digest,
},
initial_input: None,
} }
} }
@ -969,6 +998,11 @@ mod tests {
let runtime = let runtime =
Runtime::with_execution_backend(RuntimeOptions::default(), Arc::new(AcceptingBackend)) Runtime::with_execution_backend(RuntimeOptions::default(), Arc::new(AcceptingBackend))
.unwrap(); .unwrap();
runtime
.store_config_bundle(test_bundle(ProfileSelector::Builtin(
"builtin:coder".to_string(),
)))
.unwrap();
let app = runtime_http_router(runtime.clone(), None); let app = runtime_http_router(runtime.clone(), None);
let response = json_request( let response = json_request(
@ -1091,15 +1125,89 @@ mod tests {
#[cfg(all(test, feature = "ws-server"))] #[cfg(all(test, feature = "ws-server"))]
mod ws_tests { mod ws_tests {
use super::*; use super::*;
use crate::catalog::{ConfigBundleRef, ProfileSelector};
use crate::config_bundle::{
ConfigBundle, ConfigBundleMetadata, ConfigBundleProvenance, ConfigProfileDescriptor,
};
use crate::execution::{
WorkerExecutionBackend, WorkerExecutionHandle, WorkerExecutionOperation,
WorkerExecutionResult, WorkerExecutionRunState, WorkerExecutionSpawnRequest,
WorkerExecutionSpawnResult,
};
use crate::interaction::WorkerInput;
use futures::{SinkExt, StreamExt}; use futures::{SinkExt, StreamExt};
use std::sync::Arc;
use tokio_tungstenite::connect_async; use tokio_tungstenite::connect_async;
use tokio_tungstenite::tungstenite::Message; use tokio_tungstenite::tungstenite::Message;
struct WsBackend;
impl WorkerExecutionBackend for WsBackend {
fn backend_id(&self) -> &str {
"ws-test"
}
fn spawn_worker(&self, request: WorkerExecutionSpawnRequest) -> WorkerExecutionSpawnResult {
WorkerExecutionSpawnResult::Connected {
handle: WorkerExecutionHandle::new(request.worker_ref, self.backend_id()),
run_state: WorkerExecutionRunState::Idle,
}
}
fn dispatch_input(
&self,
_handle: &WorkerExecutionHandle,
_input: WorkerInput,
) -> WorkerExecutionResult {
WorkerExecutionResult::accepted(
WorkerExecutionOperation::Input,
WorkerExecutionRunState::Idle,
)
}
}
fn ws_test_bundle(profile: ProfileSelector) -> ConfigBundle {
ConfigBundle {
metadata: ConfigBundleMetadata {
id: "ws-test-bundle".to_string(),
digest: String::new(),
revision: "test".to_string(),
workspace_id: "test".to_string(),
created_at: "test".to_string(),
provenance: ConfigBundleProvenance {
source: "test".to_string(),
detail: None,
},
},
profiles: vec![ConfigProfileDescriptor {
selector: profile,
label: Some("ws".to_string()),
}],
declarations: Vec::new(),
}
.with_computed_digest()
}
fn ws_create_request() -> CreateWorkerRequest {
let bundle = ws_test_bundle(ProfileSelector::RuntimeDefault);
CreateWorkerRequest {
profile: ProfileSelector::RuntimeDefault,
config_bundle: ConfigBundleRef {
id: bundle.metadata.id,
digest: bundle.metadata.digest,
},
initial_input: None,
}
}
async fn spawn_runtime_server() -> (Runtime, WorkerRef, String) { async fn spawn_runtime_server() -> (Runtime, WorkerRef, String) {
let runtime = Runtime::new_memory(); let runtime =
let worker = runtime Runtime::with_execution_backend(RuntimeOptions::default(), Arc::new(WsBackend))
.create_worker(CreateWorkerRequest::default())
.unwrap(); .unwrap();
runtime
.store_config_bundle(ws_test_bundle(ProfileSelector::RuntimeDefault))
.unwrap();
let worker = runtime.create_worker(ws_create_request()).unwrap();
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap(); let addr = listener.local_addr().unwrap();
tokio::spawn({ tokio::spawn({
@ -1169,9 +1277,7 @@ mod ws_tests {
#[tokio::test] #[tokio::test]
async fn runtime_ws_cursor_resume_is_duplicate_safe_and_filters_workers() { async fn runtime_ws_cursor_resume_is_duplicate_safe_and_filters_workers() {
let (runtime, worker_ref, url) = spawn_runtime_server().await; let (runtime, worker_ref, url) = spawn_runtime_server().await;
let other = runtime let other = runtime.create_worker(ws_create_request()).unwrap();
.create_worker(CreateWorkerRequest::default())
.unwrap();
let first = runtime let first = runtime
.observe_worker_event( .observe_worker_event(
&worker_ref, &worker_ref,

View File

@ -24,4 +24,5 @@ mod runtime;
#[cfg(feature = "fs-store")] #[cfg(feature = "fs-store")]
pub use fs_store::{FsRuntimeStore, FsRuntimeStoreOptions}; pub use fs_store::{FsRuntimeStore, FsRuntimeStoreOptions};
pub use management::{RuntimeLimits, RuntimeOptions};
pub use runtime::Runtime; pub use runtime::Runtime;

View File

@ -4,15 +4,17 @@ use crate::catalog::{
}; };
use crate::config_bundle::{ use crate::config_bundle::{
ConfigBundle, ConfigBundleAvailability, ConfigBundleSummary, validate_config_bundle, ConfigBundle, ConfigBundleAvailability, ConfigBundleSummary, validate_config_bundle,
validate_config_bundle_ref, validate_profile_selector, validate_config_bundle_ref,
}; };
use crate::diagnostics::{DiagnosticSeverity, RuntimeDiagnostic}; #[cfg(feature = "fs-store")]
use crate::diagnostics::DiagnosticSeverity;
use crate::diagnostics::RuntimeDiagnostic;
use crate::error::RuntimeError; use crate::error::RuntimeError;
use crate::execution::{ use crate::execution::{
WorkerExecutionBackend, WorkerExecutionBackendKind, WorkerExecutionBackendRef, WorkerExecutionBackend, WorkerExecutionBackendKind, WorkerExecutionBackendRef,
WorkerExecutionHandle, WorkerExecutionOperation, WorkerExecutionResult, WorkerExecutionBindingIdentity, WorkerExecutionHandle, WorkerExecutionOperation,
WorkerExecutionRunState, WorkerExecutionSpawnRequest, WorkerExecutionSpawnResult, WorkerExecutionResult, WorkerExecutionRunState, WorkerExecutionSpawnRequest,
WorkerExecutionStatus, WorkerExecutionSpawnResult, WorkerExecutionStatus,
}; };
#[cfg(feature = "fs-store")] #[cfg(feature = "fs-store")]
use crate::fs_store::{ use crate::fs_store::{
@ -231,55 +233,112 @@ impl Runtime {
Ok(event_id) Ok(event_id)
} }
/// Create a Worker in the embedded catalog. /// Create a Worker through the canonical ConfigBundle + execution backend path.
pub fn create_worker( pub fn create_worker(
&self, &self,
request: CreateWorkerRequest, request: CreateWorkerRequest,
) -> Result<WorkerDetail, RuntimeError> { ) -> Result<WorkerDetail, RuntimeError> {
let (backend, worker_ref, spawn_request) = {
let mut state = self.lock()?; let mut state = self.lock()?;
state.ensure_running()?; state.ensure_running()?;
validate_create_worker_request(&request)?; validate_create_worker_request(&request)?;
state.validate_worker_config_boundary(&request)?; state.validate_worker_config_boundary(&request)?;
let backend = state.execution_backend.clone().ok_or_else(|| {
RuntimeError::ExecutionBackendUnavailable {
message: "worker creation requires an execution backend".to_string(),
}
})?;
let worker_id = WorkerId::generated(state.next_worker_sequence); let worker_id = WorkerId::generated(state.next_worker_sequence);
state.next_worker_sequence += 1; state.next_worker_sequence += 1;
let worker_ref = WorkerRef::new(state.runtime_id.clone(), worker_id.clone()); let worker_ref = WorkerRef::new(state.runtime_id.clone(), worker_id.clone());
let backend = state.execution_backend.clone();
let spawn_request = backend.as_ref().map(|_| WorkerExecutionSpawnRequest {
worker_ref: worker_ref.clone(),
request: request.clone(),
context: self.execution_context(worker_ref.clone()),
});
let event_id = state.push_event( let event_id = state.push_event(
Some(worker_ref.clone()), Some(worker_ref.clone()),
RuntimeEventKind::WorkerCreated, RuntimeEventKind::WorkerCreated,
format!("worker {worker_id} created"), format!("worker {worker_id} created"),
); );
let mut transcript = Vec::new();
let mut next_transcript_sequence = 1;
if let Some(input) = request.initial_input.clone() {
transcript.push(TranscriptEntry {
sequence: next_transcript_sequence,
worker_ref: worker_ref.clone(),
role: TranscriptRole::User,
content: input.content,
event_id,
});
next_transcript_sequence += 1;
}
let record = WorkerRecord { let record = WorkerRecord {
worker_ref: worker_ref.clone(), worker_ref: worker_ref.clone(),
worker_id: worker_id.clone(), worker_id: worker_id.clone(),
status: WorkerStatus::Running, status: WorkerStatus::Running,
request, request: request.clone(),
execution: WorkerExecutionStatus::unconnected(), execution: WorkerExecutionStatus::unconnected(),
execution_handle: None, execution_handle: None,
transcript: Vec::new(), transcript,
next_transcript_sequence: 1, next_transcript_sequence,
last_event_id: event_id, last_event_id: event_id,
}; };
let detail = record.detail(&state.runtime_id); state.workers.insert(worker_id, record);
state.emit_create_diagnostics(&detail); let spawn_request = WorkerExecutionSpawnRequest {
state.workers.insert(worker_id.clone(), record); worker_ref: worker_ref.clone(),
state.persist_runtime_snapshot()?; request,
state.persist_worker(&worker_id)?; context: self.execution_context(worker_ref.clone()),
state.persist_event_by_id(event_id)?; };
drop(state); (backend, worker_ref, spawn_request)
};
if let (Some(backend), Some(spawn_request)) = (backend, spawn_request) { let spawn_result = backend.spawn_worker(spawn_request);
let result = backend.spawn_worker(spawn_request); let (handle, run_state) = match spawn_result {
self.apply_spawn_result(&worker_ref, result) WorkerExecutionSpawnResult::Connected { handle, run_state } => (handle, run_state),
WorkerExecutionSpawnResult::Rejected(result)
| WorkerExecutionSpawnResult::Errored(result) => {
self.rollback_failed_create(&worker_ref)?;
return Err(RuntimeError::WorkerExecutionRejected {
worker_id: worker_ref.worker_id.clone(),
operation: result.operation,
outcome: result.outcome,
message: result.message_or_default(),
result,
});
}
};
if let Some(initial_input) = {
let state = self.lock()?;
state.worker(&worker_ref)?.request.initial_input.clone()
} {
let dispatch_result = backend.dispatch_input(&handle, initial_input);
if !dispatch_result.is_accepted() {
let _ = backend.stop_worker(&handle);
self.rollback_failed_create(&worker_ref)?;
return Err(RuntimeError::WorkerExecutionRejected {
worker_id: worker_ref.worker_id.clone(),
operation: dispatch_result.operation,
outcome: dispatch_result.outcome,
message: dispatch_result.message_or_default(),
result: dispatch_result,
});
}
self.commit_created_worker(
&worker_ref,
handle,
WorkerExecutionRunState::Busy,
WorkerExecutionResult::accepted(
WorkerExecutionOperation::Input,
WorkerExecutionRunState::Busy,
),
)
} else { } else {
Ok(detail) self.commit_created_worker(
&worker_ref,
handle,
run_state,
WorkerExecutionResult::accepted(WorkerExecutionOperation::Spawn, run_state),
)
} }
} }
@ -328,7 +387,9 @@ impl Runtime {
"worker has no execution backend", "worker has no execution backend",
); );
let worker = state.worker_mut(worker_ref)?; let worker = state.worker_mut(worker_ref)?;
worker.execution = WorkerExecutionStatus::unconnected().with_result(result); let mut execution = WorkerExecutionStatus::unconnected().with_result(result);
execution.binding = worker.execution.binding.clone();
worker.execution = execution;
state.persist_worker(&worker_ref.worker_id)?; state.persist_worker(&worker_ref.worker_id)?;
return Err(RuntimeError::WorkerExecutionUnavailable { return Err(RuntimeError::WorkerExecutionUnavailable {
worker_id: worker_ref.worker_id.clone(), worker_id: worker_ref.worker_id.clone(),
@ -370,6 +431,7 @@ impl Runtime {
worker.execution = WorkerExecutionStatus { worker.execution = WorkerExecutionStatus {
backend: WorkerExecutionBackendKind::Connected, backend: WorkerExecutionBackendKind::Connected,
run_state: dispatch_result.run_state, run_state: dispatch_result.run_state,
binding: worker.execution.binding.clone(),
last_result: Some(dispatch_result), last_result: Some(dispatch_result),
}; };
worker.transcript.push(TranscriptEntry { worker.transcript.push(TranscriptEntry {
@ -414,38 +476,44 @@ impl Runtime {
}) })
} }
fn apply_spawn_result( fn commit_created_worker(
&self, &self,
worker_ref: &WorkerRef, worker_ref: &WorkerRef,
result: WorkerExecutionSpawnResult, handle: WorkerExecutionHandle,
run_state: WorkerExecutionRunState,
result: WorkerExecutionResult,
) -> Result<WorkerDetail, RuntimeError> { ) -> Result<WorkerDetail, RuntimeError> {
let mut state = self.lock()?; let mut state = self.lock()?;
let runtime_id = state.runtime_id.clone(); let runtime_id = state.runtime_id.clone();
let detail = { let detail = {
let binding = WorkerExecutionBindingIdentity::from_handle(&handle);
let worker = state.worker_mut(worker_ref)?; let worker = state.worker_mut(worker_ref)?;
match result {
WorkerExecutionSpawnResult::Connected { handle, run_state } => {
worker.execution_handle = Some(handle); worker.execution_handle = Some(handle);
worker.execution = WorkerExecutionStatus::connected(run_state).with_result( worker.execution = WorkerExecutionStatus::connected(run_state)
WorkerExecutionResult::accepted(WorkerExecutionOperation::Spawn, run_state), .with_binding(binding)
); .with_result(result);
}
WorkerExecutionSpawnResult::Rejected(result)
| WorkerExecutionSpawnResult::Errored(result) => {
worker.execution_handle = None;
worker.execution = WorkerExecutionStatus {
backend: WorkerExecutionBackendKind::Connected,
run_state: result.run_state,
last_result: Some(result),
};
}
}
worker.detail(&runtime_id) worker.detail(&runtime_id)
}; };
state.persist_runtime_snapshot()?;
state.persist_worker(&worker_ref.worker_id)?; state.persist_worker(&worker_ref.worker_id)?;
let worker = state.worker(worker_ref)?;
for entry in &worker.transcript {
state.persist_transcript_entry(&worker_ref.worker_id, entry.sequence)?;
}
state.persist_event_by_id(detail.last_event_id)?;
Ok(detail) Ok(detail)
} }
fn rollback_failed_create(&self, worker_ref: &WorkerRef) -> Result<(), RuntimeError> {
let mut state = self.lock()?;
if let Some(record) = state.workers.remove(&worker_ref.worker_id) {
state.events.retain(|event| {
event.id != record.last_event_id || event.worker_ref.as_ref() != Some(worker_ref)
});
}
Ok(())
}
fn record_execution_result( fn record_execution_result(
&self, &self,
worker_ref: &WorkerRef, worker_ref: &WorkerRef,
@ -456,6 +524,7 @@ impl Runtime {
worker.execution = WorkerExecutionStatus { worker.execution = WorkerExecutionStatus {
backend: WorkerExecutionBackendKind::Connected, backend: WorkerExecutionBackendKind::Connected,
run_state: result.run_state, run_state: result.run_state,
binding: worker.execution.binding.clone(),
last_result: Some(result), last_result: Some(result),
}; };
state.persist_worker(&worker_ref.worker_id)?; state.persist_worker(&worker_ref.worker_id)?;
@ -827,6 +896,7 @@ struct RuntimeState {
execution_backend: Option<WorkerExecutionBackendRef>, execution_backend: Option<WorkerExecutionBackendRef>,
next_worker_sequence: u64, next_worker_sequence: u64,
next_event_id: u64, next_event_id: u64,
#[cfg(feature = "fs-store")]
next_diagnostic_id: u64, next_diagnostic_id: u64,
workers: BTreeMap<WorkerId, WorkerRecord>, workers: BTreeMap<WorkerId, WorkerRecord>,
config_bundles: BTreeMap<String, ConfigBundle>, config_bundles: BTreeMap<String, ConfigBundle>,
@ -852,6 +922,7 @@ impl RuntimeState {
execution_backend: None, execution_backend: None,
next_worker_sequence: 1, next_worker_sequence: 1,
next_event_id: 1, next_event_id: 1,
#[cfg(feature = "fs-store")]
next_diagnostic_id: 1, next_diagnostic_id: 1,
workers: BTreeMap::new(), workers: BTreeMap::new(),
config_bundles: BTreeMap::new(), config_bundles: BTreeMap::new(),
@ -883,6 +954,7 @@ impl RuntimeState {
execution_backend: None, execution_backend: None,
next_worker_sequence: 1, next_worker_sequence: 1,
next_event_id: 1, next_event_id: 1,
#[cfg(feature = "fs-store")]
next_diagnostic_id: 1, next_diagnostic_id: 1,
workers: BTreeMap::new(), workers: BTreeMap::new(),
config_bundles: BTreeMap::new(), config_bundles: BTreeMap::new(),
@ -915,7 +987,28 @@ impl RuntimeState {
} }
let mut workers = BTreeMap::new(); let mut workers = BTreeMap::new();
let mut diagnostics = persisted.diagnostics;
let mut next_diagnostic_id = persisted.next_diagnostic_id;
for (worker_id, worker) in persisted.workers { for (worker_id, worker) in persisted.workers {
let execution = if worker.execution.binding.is_some()
&& worker.execution.backend == WorkerExecutionBackendKind::Connected
{
let stale = WorkerExecutionStatus::stale(worker.execution);
diagnostics.push(RuntimeDiagnostic {
id: next_diagnostic_id,
severity: DiagnosticSeverity::Warning,
code: "worker_execution_mapping_stale".to_string(),
message: format!(
"worker {} has persisted execution binding identity but no live execution handle was restored",
worker.worker_id
),
worker_ref: Some(worker.worker_ref.clone()),
});
next_diagnostic_id += 1;
stale
} else {
worker.execution
};
workers.insert( workers.insert(
worker_id, worker_id,
WorkerRecord { WorkerRecord {
@ -923,7 +1016,7 @@ impl RuntimeState {
worker_id: worker.worker_id, worker_id: worker.worker_id,
status: worker.status, status: worker.status,
request: worker.request, request: worker.request,
execution: WorkerExecutionStatus::unconnected(), execution,
execution_handle: None, execution_handle: None,
transcript: worker.transcript, transcript: worker.transcript,
next_transcript_sequence: worker.next_transcript_sequence, next_transcript_sequence: worker.next_transcript_sequence,
@ -942,11 +1035,11 @@ impl RuntimeState {
execution_backend: None, execution_backend: None,
next_worker_sequence: persisted.next_worker_sequence, next_worker_sequence: persisted.next_worker_sequence,
next_event_id: persisted.next_event_id, next_event_id: persisted.next_event_id,
next_diagnostic_id: persisted.next_diagnostic_id, next_diagnostic_id,
workers, workers,
config_bundles: persisted.config_bundles, config_bundles: persisted.config_bundles,
events: persisted.events, events: persisted.events,
diagnostics: persisted.diagnostics, diagnostics,
#[cfg(feature = "ws-server")] #[cfg(feature = "ws-server")]
next_observation_sequence: 1, next_observation_sequence: 1,
#[cfg(feature = "ws-server")] #[cfg(feature = "ws-server")]
@ -1131,8 +1224,7 @@ impl RuntimeState {
&self, &self,
request: &CreateWorkerRequest, request: &CreateWorkerRequest,
) -> Result<(), RuntimeError> { ) -> Result<(), RuntimeError> {
match &request.config_bundle { let reference = &request.config_bundle;
Some(reference) => {
let availability = self.check_config_bundle_ref(reference)?; let availability = self.check_config_bundle_ref(reference)?;
let bundle = self let bundle = self
.config_bundles .config_bundles
@ -1144,24 +1236,11 @@ impl RuntimeState {
return Err(RuntimeError::InvalidProfileSelector { return Err(RuntimeError::InvalidProfileSelector {
profile: profile_label(&request.profile), profile: profile_label(&request.profile),
bundle_id: Some(reference.id.clone()), bundle_id: Some(reference.id.clone()),
message: "profile selector is not declared by synced config bundle" message: "profile selector is not declared by synced config bundle".to_string(),
.to_string(),
}); });
} }
Ok(()) Ok(())
} }
None => match &request.profile {
ProfileSelector::RuntimeDefault | ProfileSelector::Builtin(_) => {
validate_profile_selector(request.profile.clone(), None)
}
ProfileSelector::Named(_) => Err(RuntimeError::InvalidProfileSelector {
profile: profile_label(&request.profile),
bundle_id: None,
message: "named profiles require a synced config bundle reference".to_string(),
}),
},
}
}
fn ensure_worker_ref(&self, worker_ref: &WorkerRef) -> Result<(), RuntimeError> { fn ensure_worker_ref(&self, worker_ref: &WorkerRef) -> Result<(), RuntimeError> {
if worker_ref.runtime_id != self.runtime_id { if worker_ref.runtime_id != self.runtime_id {
@ -1373,43 +1452,6 @@ impl RuntimeState {
status.run_state = next_run_state; status.run_state = next_run_state;
true true
} }
fn push_diagnostic(
&mut self,
severity: DiagnosticSeverity,
code: impl Into<String>,
message: impl Into<String>,
worker_ref: Option<WorkerRef>,
) {
let id = self.next_diagnostic_id;
self.next_diagnostic_id += 1;
self.diagnostics.push(RuntimeDiagnostic {
id,
severity,
code: code.into(),
message: message.into(),
worker_ref,
});
}
fn emit_create_diagnostics(&mut self, detail: &WorkerDetail) {
if detail.config_bundle.is_none() {
self.push_diagnostic(
DiagnosticSeverity::Info,
"runtime.local_default_resources",
"worker created without ConfigBundleRef; runtime-local defaults are assumed",
Some(detail.worker_ref.clone()),
);
}
if detail.requested_capabilities.is_empty() {
self.push_diagnostic(
DiagnosticSeverity::Info,
"worker.tools_less",
"worker created without requested tool capabilities",
Some(detail.worker_ref.clone()),
);
}
}
} }
#[derive(Debug)] #[derive(Debug)]
@ -1433,10 +1475,8 @@ impl WorkerRecord {
worker_id: self.worker_id.clone(), worker_id: self.worker_id.clone(),
status: self.status, status: self.status,
execution: self.execution.clone(), execution: self.execution.clone(),
intent: self.request.intent.clone(),
profile: self.request.profile.clone(), profile: self.request.profile.clone(),
requested_capability_count: self.request.requested_capabilities.len(), config_bundle: self.request.config_bundle.clone(),
has_config_bundle: self.request.config_bundle.is_some(),
transcript_len: self.transcript.len(), transcript_len: self.transcript.len(),
last_event_id: self.last_event_id, last_event_id: self.last_event_id,
} }
@ -1449,12 +1489,8 @@ impl WorkerRecord {
worker_id: self.worker_id.clone(), worker_id: self.worker_id.clone(),
status: self.status, status: self.status,
execution: self.execution.clone(), execution: self.execution.clone(),
intent: self.request.intent.clone(),
profile: self.request.profile.clone(), profile: self.request.profile.clone(),
config_bundle: self.request.config_bundle.clone(), config_bundle: self.request.config_bundle.clone(),
requested_capabilities: self.request.requested_capabilities.clone(),
workspace_refs: self.request.workspace_refs.clone(),
mount_refs: self.request.mount_refs.clone(),
transcript_len: self.transcript.len(), transcript_len: self.transcript.len(),
last_event_id: self.last_event_id, last_event_id: self.last_event_id,
} }
@ -1467,6 +1503,7 @@ impl WorkerRecord {
worker_id: self.worker_id.clone(), worker_id: self.worker_id.clone(),
status: self.status, status: self.status,
request: self.request.clone(), request: self.request.clone(),
execution: self.execution.clone(),
transcript: self.transcript.clone(), transcript: self.transcript.clone(),
next_transcript_sequence: self.next_transcript_sequence, next_transcript_sequence: self.next_transcript_sequence,
last_event_id: self.last_event_id, last_event_id: self.last_event_id,
@ -1483,17 +1520,25 @@ fn profile_label(selector: &ProfileSelector) -> String {
} }
fn validate_create_worker_request(request: &CreateWorkerRequest) -> Result<(), RuntimeError> { fn validate_create_worker_request(request: &CreateWorkerRequest) -> Result<(), RuntimeError> {
if let crate::catalog::WorkerIntent::Task { objective } = &request.intent { if request.config_bundle.id.trim().is_empty() {
if objective.trim().is_empty() {
return Err(RuntimeError::InvalidRequest( return Err(RuntimeError::InvalidRequest(
"task objective must not be empty".to_string(), "config_bundle.id must not be empty".to_string(),
)); ));
} }
} if request.config_bundle.digest.trim().is_empty() {
for capability in &request.requested_capabilities {
if capability.name.trim().is_empty() {
return Err(RuntimeError::InvalidRequest( return Err(RuntimeError::InvalidRequest(
"capability name must not be empty".to_string(), "config_bundle.digest must not be empty".to_string(),
));
}
if let Some(input) = &request.initial_input {
if input.kind != WorkerInputKind::User {
return Err(RuntimeError::InvalidInitialInputKind {
kind: format!("{:?}", input.kind),
});
}
if input.content.trim().is_empty() {
return Err(RuntimeError::InvalidRequest(
"initial_input.content must not be empty".to_string(),
)); ));
} }
} }
@ -1512,7 +1557,7 @@ fn validate_worker_input(input: &WorkerInput) -> Result<(), RuntimeError> {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use crate::catalog::{CapabilityRequest, ConfigBundleRef, ProfileSelector, WorkerIntent}; use crate::catalog::{ConfigBundleRef, ProfileSelector};
use crate::config_bundle::{ use crate::config_bundle::{
ConfigBundle, ConfigBundleMetadata, ConfigBundleProvenance, ConfigDeclaration, ConfigBundle, ConfigBundleMetadata, ConfigBundleProvenance, ConfigDeclaration,
ConfigDeclarationKind, ConfigProfileDescriptor, ConfigDeclarationKind, ConfigProfileDescriptor,
@ -1524,19 +1569,45 @@ mod tests {
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
fn task_request(objective: &str) -> CreateWorkerRequest { fn task_request(_objective: &str) -> CreateWorkerRequest {
let profile = ProfileSelector::Builtin("builtin:coder".to_string());
let bundle = test_bundle_for_profile(profile.clone());
CreateWorkerRequest { CreateWorkerRequest {
intent: WorkerIntent::Task { profile,
objective: objective.to_string(), config_bundle: ConfigBundleRef {
id: bundle.metadata.id,
digest: bundle.metadata.digest,
}, },
profile: ProfileSelector::Builtin("builtin:coder".to_string()), initial_input: None,
config_bundle: None,
requested_capabilities: vec![CapabilityRequest::named("read")],
workspace_refs: Vec::new(),
mount_refs: Vec::new(),
} }
} }
fn test_bundle_for_profile(profile: ProfileSelector) -> ConfigBundle {
ConfigBundle {
metadata: ConfigBundleMetadata {
id: "bundle-1".to_string(),
digest: String::new(),
revision: "rev-1".to_string(),
workspace_id: "workspace-1".to_string(),
created_at: "2026-06-26T00:00:00Z".to_string(),
provenance: ConfigBundleProvenance {
source: "workspace-backend".to_string(),
detail: Some("profile-sync".to_string()),
},
},
profiles: vec![ConfigProfileDescriptor {
selector: profile,
label: Some("Coder".to_string()),
}],
declarations: vec![ConfigDeclaration {
kind: ConfigDeclarationKind::CapabilityGrant,
name: "read".to_string(),
reference: "capability:read".to_string(),
}],
}
.with_computed_digest()
}
#[derive(Default)] #[derive(Default)]
struct TestExecutionBackend { struct TestExecutionBackend {
dispatch_result: Mutex<Option<WorkerExecutionResult>>, dispatch_result: Mutex<Option<WorkerExecutionResult>>,
@ -1609,77 +1680,58 @@ mod tests {
} }
fn runtime_with_backend() -> Runtime { fn runtime_with_backend() -> Runtime {
Runtime::with_execution_backend( let runtime = Runtime::with_execution_backend(
RuntimeOptions::default(), RuntimeOptions::default(),
Arc::new(TestExecutionBackend::default()), Arc::new(TestExecutionBackend::default()),
) )
.unwrap() .unwrap();
runtime.store_config_bundle(test_bundle()).unwrap();
runtime
} }
fn runtime_and_backend() -> (Runtime, Arc<TestExecutionBackend>) { fn runtime_and_backend() -> (Runtime, Arc<TestExecutionBackend>) {
let backend = Arc::new(TestExecutionBackend::default()); let backend = Arc::new(TestExecutionBackend::default());
let runtime = let runtime =
Runtime::with_execution_backend(RuntimeOptions::default(), backend.clone()).unwrap(); Runtime::with_execution_backend(RuntimeOptions::default(), backend.clone()).unwrap();
runtime.store_config_bundle(test_bundle()).unwrap();
(runtime, backend) (runtime, backend)
} }
fn test_bundle() -> ConfigBundle { fn test_bundle() -> ConfigBundle {
ConfigBundle { test_bundle_for_profile(ProfileSelector::Builtin("builtin:coder".to_string()))
metadata: ConfigBundleMetadata {
id: "bundle-1".to_string(),
digest: String::new(),
revision: "rev-1".to_string(),
workspace_id: "workspace-1".to_string(),
created_at: "2026-06-26T00:00:00Z".to_string(),
provenance: ConfigBundleProvenance {
source: "workspace-backend".to_string(),
detail: Some("profile-sync".to_string()),
},
},
profiles: vec![ConfigProfileDescriptor {
selector: ProfileSelector::Builtin("builtin:coder".to_string()),
label: Some("Coder".to_string()),
}],
declarations: vec![ConfigDeclaration {
kind: ConfigDeclarationKind::CapabilityGrant,
name: "read".to_string(),
reference: "capability:read".to_string(),
}],
}
.with_computed_digest()
} }
fn bundled_task_request(objective: &str, bundle: &ConfigBundle) -> CreateWorkerRequest { fn bundled_task_request(objective: &str, bundle: &ConfigBundle) -> CreateWorkerRequest {
let mut request = task_request(objective); let mut request = task_request(objective);
request.config_bundle = Some(ConfigBundleRef { request.config_bundle = ConfigBundleRef {
id: bundle.metadata.id.clone(), id: bundle.metadata.id.clone(),
digest: bundle.metadata.digest.clone(), digest: bundle.metadata.digest.clone(),
}); };
request request
} }
#[test] #[test]
fn create_list_and_detail_preserve_runtime_worker_authority() { fn create_list_and_detail_preserve_runtime_worker_authority() {
let runtime = Runtime::new_memory(); let runtime = runtime_with_backend();
let detail = runtime.create_worker(task_request("implement v0")).unwrap(); let detail = runtime.create_worker(task_request("implement v0")).unwrap();
assert_eq!(detail.worker_ref.runtime_id, runtime.runtime_id().unwrap()); assert_eq!(detail.worker_ref.runtime_id, runtime.runtime_id().unwrap());
assert_eq!(detail.status, WorkerStatus::Running); assert_eq!(detail.status, WorkerStatus::Running);
assert!(detail.config_bundle.is_none()); assert_eq!(detail.config_bundle.id, "bundle-1");
let list = runtime.list_workers().unwrap(); let list = runtime.list_workers().unwrap();
assert_eq!(list.len(), 1); assert_eq!(list.len(), 1);
assert_eq!(list[0].worker_ref, detail.worker_ref); assert_eq!(list[0].worker_ref, detail.worker_ref);
assert_eq!(list[0].requested_capability_count, 1); assert_eq!(list[0].config_bundle, detail.config_bundle);
let fetched = runtime.worker_detail(&detail.worker_ref).unwrap(); let fetched = runtime.worker_detail(&detail.worker_ref).unwrap();
assert_eq!(fetched.worker_id, detail.worker_id); assert_eq!(fetched.worker_id, detail.worker_id);
assert_eq!(fetched.intent, detail.intent); assert_eq!(fetched.profile, detail.profile);
} }
#[test] #[test]
fn synced_config_bundle_is_stored_checked_and_used_for_worker_creation() { fn synced_config_bundle_is_stored_checked_and_used_for_worker_creation() {
let runtime = Runtime::new_memory(); let runtime = runtime_with_backend();
let bundle = test_bundle(); let bundle = test_bundle();
let availability = runtime.store_config_bundle(bundle.clone()).unwrap(); let availability = runtime.store_config_bundle(bundle.clone()).unwrap();
assert_eq!(availability.reference.id, "bundle-1"); assert_eq!(availability.reference.id, "bundle-1");
@ -1697,7 +1749,7 @@ mod tests {
let detail = runtime let detail = runtime
.create_worker(bundled_task_request("synced", &bundle)) .create_worker(bundled_task_request("synced", &bundle))
.unwrap(); .unwrap();
assert_eq!(detail.config_bundle, Some(availability.reference)); assert_eq!(detail.config_bundle, availability.reference);
} }
#[test] #[test]
@ -1746,8 +1798,8 @@ mod tests {
#[test] #[test]
fn rejects_worker_refs_from_another_runtime() { fn rejects_worker_refs_from_another_runtime() {
let runtime_a = Runtime::new_memory(); let runtime_a = runtime_with_backend();
let runtime_b = Runtime::new_memory(); let runtime_b = runtime_with_backend();
let detail = runtime_a.create_worker(task_request("runtime a")).unwrap(); let detail = runtime_a.create_worker(task_request("runtime a")).unwrap();
let err = runtime_b.worker_detail(&detail.worker_ref).unwrap_err(); let err = runtime_b.worker_detail(&detail.worker_ref).unwrap_err();
@ -1755,52 +1807,50 @@ mod tests {
} }
#[test] #[test]
fn tools_less_worker_without_config_bundle_uses_local_defaults_and_diagnostics() { fn create_worker_rejects_system_initial_input_without_persisting_worker() {
let runtime = Runtime::new_memory(); let runtime = runtime_with_backend();
let detail = runtime let mut request = task_request("system initial input");
.create_worker(CreateWorkerRequest::tools_less( request.initial_input = Some(WorkerInput::system("role/system belongs in config bundle"));
WorkerIntent::default(),
ProfileSelector::RuntimeDefault,
))
.unwrap();
assert!(detail.config_bundle.is_none()); let error = runtime.create_worker(request).unwrap_err();
assert!(detail.requested_capabilities.is_empty()); assert!(matches!(
let diagnostics = runtime.diagnostics().unwrap(); error,
assert_eq!(diagnostics.len(), 2); RuntimeError::InvalidInitialInputKind { .. }
));
assert!(runtime.list_workers().unwrap().is_empty());
let events = runtime
.read_events(&runtime.event_cursor_from_start().unwrap(), 16)
.unwrap();
assert!( assert!(
diagnostics events
.events
.iter() .iter()
.any(|diagnostic| diagnostic.code == "runtime.local_default_resources") .all(|event| event.kind != RuntimeEventKind::WorkerCreated)
);
assert!(
diagnostics
.iter()
.any(|diagnostic| diagnostic.code == "worker.tools_less")
); );
} }
#[test] #[test]
fn backend_unconnected_worker_input_is_rejected_and_not_transcribed() { fn create_worker_without_execution_backend_is_rejected_and_not_persisted() {
let runtime = Runtime::new_memory(); let runtime = Runtime::new_memory();
let detail = runtime.create_worker(task_request("placeholder")).unwrap(); runtime.store_config_bundle(test_bundle()).unwrap();
assert_eq!( let error = runtime
detail.execution.backend, .create_worker(task_request("no backend"))
WorkerExecutionBackendKind::Unconnected
);
let err = runtime
.send_input(&detail.worker_ref, WorkerInput::user("must reject"))
.unwrap_err(); .unwrap_err();
assert!(matches!( assert!(matches!(
err, error,
RuntimeError::WorkerExecutionUnavailable { .. } RuntimeError::ExecutionBackendUnavailable { .. }
)); ));
assert!(runtime.list_workers().unwrap().is_empty());
}
let projection = runtime #[test]
.transcript_projection(&detail.worker_ref, TranscriptQuery::new(0, 1)) fn create_worker_missing_config_bundle_is_rejected_before_backend() {
.unwrap(); let runtime = Runtime::new_memory();
assert_eq!(projection.total_items, 0); let error = runtime
.create_worker(task_request("missing bundle"))
.unwrap_err();
assert!(matches!(error, RuntimeError::ConfigBundleMissing { .. }));
assert!(runtime.list_workers().unwrap().is_empty());
} }
#[test] #[test]
@ -1885,6 +1935,7 @@ mod tests {
let runtime = let runtime =
Runtime::with_execution_backend(RuntimeOptions::default(), Arc::new(InputOnlyBackend)) Runtime::with_execution_backend(RuntimeOptions::default(), Arc::new(InputOnlyBackend))
.unwrap(); .unwrap();
runtime.store_config_bundle(test_bundle()).unwrap();
let detail = runtime.create_worker(task_request("no stop")).unwrap(); let detail = runtime.create_worker(task_request("no stop")).unwrap();
let err = runtime let err = runtime
@ -1916,6 +1967,7 @@ mod tests {
Arc::new(TestExecutionBackend::default()), Arc::new(TestExecutionBackend::default()),
) )
.unwrap(); .unwrap();
runtime.store_config_bundle(test_bundle()).unwrap();
let detail = runtime.create_worker(task_request("chat")).unwrap(); let detail = runtime.create_worker(task_request("chat")).unwrap();
let first = runtime let first = runtime
@ -1946,7 +1998,7 @@ mod tests {
#[test] #[test]
fn stop_and_cancel_workers_update_summary() { fn stop_and_cancel_workers_update_summary() {
let runtime = Runtime::new_memory(); let runtime = runtime_with_backend();
let stopped = runtime.create_worker(task_request("stop me")).unwrap(); let stopped = runtime.create_worker(task_request("stop me")).unwrap();
let cancelled = runtime.create_worker(task_request("cancel me")).unwrap(); let cancelled = runtime.create_worker(task_request("cancel me")).unwrap();
@ -1969,7 +2021,7 @@ mod tests {
#[test] #[test]
fn stop_then_cancel_preserves_stopped_terminal_state() { fn stop_then_cancel_preserves_stopped_terminal_state() {
let runtime = Runtime::new_memory(); let runtime = runtime_with_backend();
let cursor = runtime.event_cursor_from_start().unwrap(); let cursor = runtime.event_cursor_from_start().unwrap();
let worker = runtime let worker = runtime
.create_worker(task_request("stable stopped")) .create_worker(task_request("stable stopped"))
@ -2014,7 +2066,7 @@ mod tests {
#[test] #[test]
fn cancel_then_stop_preserves_cancelled_terminal_state() { fn cancel_then_stop_preserves_cancelled_terminal_state() {
let runtime = Runtime::new_memory(); let runtime = runtime_with_backend();
let cursor = runtime.event_cursor_from_start().unwrap(); let cursor = runtime.event_cursor_from_start().unwrap();
let worker = runtime let worker = runtime
.create_worker(task_request("stable cancelled")) .create_worker(task_request("stable cancelled"))
@ -2126,6 +2178,7 @@ mod tests {
runtime.summary().unwrap().backend, runtime.summary().unwrap().backend,
RuntimeBackendKind::FsStore RuntimeBackendKind::FsStore
); );
runtime.store_config_bundle(test_bundle()).unwrap();
let worker = runtime.create_worker(task_request("persist me")).unwrap(); let worker = runtime.create_worker(task_request("persist me")).unwrap();
runtime runtime
@ -2149,6 +2202,28 @@ mod tests {
.unwrap(); .unwrap();
let restored_worker = restored.worker_detail(&worker.worker_ref).unwrap(); let restored_worker = restored.worker_detail(&worker.worker_ref).unwrap();
assert_eq!(restored_worker.status, WorkerStatus::Stopped); assert_eq!(restored_worker.status, WorkerStatus::Stopped);
assert_eq!(
restored_worker.execution.backend,
WorkerExecutionBackendKind::Stale
);
assert_eq!(
restored_worker
.execution
.binding
.as_ref()
.map(|binding| binding.backend_id.as_str()),
Some("test-execution-backend")
);
assert!(
restored
.diagnostics()
.unwrap()
.iter()
.any(
|diagnostic| diagnostic.code == "worker_execution_mapping_stale"
&& diagnostic.worker_ref.as_ref() == Some(&worker.worker_ref)
)
);
assert_eq!(restored_worker.transcript_len, 2); assert_eq!(restored_worker.transcript_len, 2);
let projection = restored let projection = restored
@ -2224,13 +2299,17 @@ mod tests {
let missing_root = fs_store_root("missing"); let missing_root = fs_store_root("missing");
let missing_runtime_id = RuntimeId::new("runtime-missing").unwrap(); let missing_runtime_id = RuntimeId::new("runtime-missing").unwrap();
let missing_runtime = Runtime::with_fs_store(crate::fs_store::FsRuntimeStoreOptions { let missing_runtime = Runtime::with_fs_store_and_execution_backend(
crate::fs_store::FsRuntimeStoreOptions {
root: missing_root.clone(), root: missing_root.clone(),
runtime_id: Some(missing_runtime_id.clone()), runtime_id: Some(missing_runtime_id.clone()),
display_name: None, display_name: None,
limits: RuntimeLimits::default(), limits: RuntimeLimits::default(),
}) },
Arc::new(TestExecutionBackend::default()),
)
.unwrap(); .unwrap();
missing_runtime.store_config_bundle(test_bundle()).unwrap();
missing_runtime missing_runtime
.create_worker(task_request("missing worker snapshot")) .create_worker(task_request("missing worker snapshot"))
.unwrap(); .unwrap();

View File

@ -2,7 +2,7 @@ use std::sync::{Arc, Mutex};
use chrono::Utc; use chrono::Utc;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use worker_runtime::catalog::{CapabilityRequest, ConfigBundleRef, ProfileSelector}; use worker_runtime::catalog::ProfileSelector;
use worker_runtime::config_bundle::{ use worker_runtime::config_bundle::{
ConfigBundle, ConfigBundleMetadata, ConfigBundleProvenance, ConfigProfileDescriptor, ConfigBundle, ConfigBundleMetadata, ConfigBundleProvenance, ConfigProfileDescriptor,
}; };
@ -367,10 +367,6 @@ fn spawn_companion_worker(runtime: &RuntimeRegistry) -> CompanionWorkerState {
let selector = companion_profile_selector(); let selector = companion_profile_selector();
let mut diagnostics = Vec::new(); let mut diagnostics = Vec::new();
let config_bundle = companion_config_bundle(); let config_bundle = companion_config_bundle();
let config_ref = ConfigBundleRef {
id: config_bundle.metadata.id.clone(),
digest: config_bundle.metadata.digest.clone(),
};
match runtime.sync_config_bundle(COMPANION_RUNTIME_ID, config_bundle) { match runtime.sync_config_bundle(COMPANION_RUNTIME_ID, config_bundle) {
Ok(result) => diagnostics.extend(result.diagnostics), Ok(result) => diagnostics.extend(result.diagnostics),
@ -390,8 +386,7 @@ fn spawn_companion_worker(runtime: &RuntimeRegistry) -> CompanionWorkerState {
expected_segments: 0, expected_segments: 0,
}, },
profile: Some(selector), profile: Some(selector),
config_bundle: Some(config_ref), initial_input: None,
requested_capabilities: vec![CapabilityRequest::named("worker.input.user")],
}, },
); );
@ -611,36 +606,37 @@ mod tests {
} }
#[test] #[test]
fn companion_spawns_worker_with_companion_profile_and_diagnostic_when_not_input_capable() { fn companion_spawns_worker_with_companion_profile_through_runtime_backend() {
let registry = let registry = RuntimeRegistry::for_workspace(
RuntimeRegistry::for_workspace(EmbeddedWorkerRuntime::new_memory("local:test")); EmbeddedWorkerRuntime::new_memory_with_execution_backend(
"local:test",
Arc::new(DeterministicExecutionBackend::default()),
)
.unwrap(),
);
let registry = Arc::new(registry); let registry = Arc::new(registry);
let companion = CompanionConsole::new(registry.clone()); let companion = CompanionConsole::new(registry.clone());
let status = companion.status(); let status = companion.status();
let worker = status.worker.clone().expect("companion worker"); let worker = status.worker.clone().expect("companion worker");
assert_eq!(worker.runtime_id, COMPANION_RUNTIME_ID); assert_eq!(worker.runtime_id, COMPANION_RUNTIME_ID);
assert_eq!(worker.role.as_deref(), Some("workspace_companion")); assert_eq!(worker.role.as_deref(), Some(COMPANION_PROFILE_ID));
assert!(!worker.capabilities.can_accept_input); assert!(worker.capabilities.can_accept_input);
assert_eq!(status.transport.completion, "not_input_capable"); assert_eq!(status.transport.completion, "connected");
assert!( assert!(status.diagnostics.is_empty());
status
.diagnostics
.iter()
.any(|diagnostic| diagnostic.code == "companion_worker_not_input_capable")
);
let response = companion.send_message(CompanionMessageRequest { let response = companion.send_message(CompanionMessageRequest {
content: "hello".to_string(), content: "hello".to_string(),
}); });
assert_eq!(response.state, CompanionState::Rejected); assert_eq!(response.state, CompanionState::Accepted);
assert!(response.diagnostics.is_empty());
assert!( assert!(
!response response
.diagnostics .transcript
.items
.iter() .iter()
.any(|diagnostic| diagnostic.code == "companion_llm_not_connected") .any(|entry| entry.role == "user" && entry.content == "hello")
); );
assert!(response.transcript.items.is_empty());
let worker_detail = registry let worker_detail = registry
.worker(COMPANION_RUNTIME_ID, &worker.worker_id) .worker(COMPANION_RUNTIME_ID, &worker.worker_id)

View File

@ -8,12 +8,15 @@ use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256}; use sha2::{Digest, Sha256};
use std::{sync::Arc, time::Duration}; use std::{sync::Arc, time::Duration};
use worker_runtime::catalog::{ use worker_runtime::catalog::{
CapabilityRequest, ConfigBundleRef, CreateWorkerRequest, ProfileSelector, ConfigBundleRef, CreateWorkerRequest, ProfileSelector, WorkerDetail as EmbeddedWorkerDetail,
WorkerDetail as EmbeddedWorkerDetail, WorkerIntent, WorkerStatus as EmbeddedWorkerStatus, WorkerStatus as EmbeddedWorkerStatus,
};
use worker_runtime::config_bundle::{
ConfigBundle, ConfigBundleAvailability, ConfigBundleMetadata, ConfigBundleProvenance,
ConfigBundleSummary, ConfigProfileDescriptor,
}; };
use worker_runtime::config_bundle::{ConfigBundle, ConfigBundleAvailability, ConfigBundleSummary};
use worker_runtime::error::RuntimeError as EmbeddedRuntimeError; use worker_runtime::error::RuntimeError as EmbeddedRuntimeError;
use worker_runtime::execution::WorkerExecutionRunState; use worker_runtime::execution::{WorkerExecutionRunState, WorkerExecutionStatus};
use worker_runtime::http_server::{ use worker_runtime::http_server::{
RuntimeHttpConfigBundleAvailabilityResponse, RuntimeHttpConfigBundleSyncRequest, RuntimeHttpConfigBundleAvailabilityResponse, RuntimeHttpConfigBundleSyncRequest,
RuntimeHttpErrorResponse, RuntimeHttpSummaryResponse, RuntimeHttpTranscriptResponse, RuntimeHttpErrorResponse, RuntimeHttpSummaryResponse, RuntimeHttpTranscriptResponse,
@ -236,11 +239,12 @@ pub struct WorkerLookupResult {
/// Browser-safe worker spawn request shape. /// Browser-safe worker spawn request shape.
/// ///
/// The request intentionally carries only workspace policy intents, stable /// The request carries Browser-facing launch semantics only: workspace intent,
/// worker identifiers, optional profile selectors, config bundle refs, and /// optional display identity, acceptance policy, optional profile selector, and
/// requested capability names. Raw workspace roots, child cwd, executable path, /// optional initial input. Runtime execution authority is resolved by the host
/// Runtime endpoints/credentials, raw bundle storage paths, and host-local /// into a synced ConfigBundle before the canonical Runtime create request is
/// resolved WorkerSpec content are resolved by the runtime service and never /// built. Raw workspace roots, child cwd, executable paths, tool scope,
/// credentials, raw config stores, sockets, sessions, and storage paths are not
/// accepted from Workspace API callers. /// accepted from Workspace API callers.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct WorkerSpawnRequest { pub struct WorkerSpawnRequest {
@ -251,9 +255,7 @@ pub struct WorkerSpawnRequest {
#[serde(default, skip_serializing_if = "Option::is_none")] #[serde(default, skip_serializing_if = "Option::is_none")]
pub profile: Option<ProfileSelector>, pub profile: Option<ProfileSelector>,
#[serde(default, skip_serializing_if = "Option::is_none")] #[serde(default, skip_serializing_if = "Option::is_none")]
pub config_bundle: Option<ConfigBundleRef>, pub initial_input: Option<EmbeddedWorkerInput>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub requested_capabilities: Vec<CapabilityRequest>,
} }
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
@ -964,11 +966,7 @@ impl EmbeddedWorkerRuntime {
status: EmbeddedWorkerStatus, status: EmbeddedWorkerStatus,
execution: &worker_runtime::execution::WorkerExecutionStatus, execution: &worker_runtime::execution::WorkerExecutionStatus,
) -> bool { ) -> bool {
self.execution_enabled runtime_worker_can_accept_input(self.execution_enabled, status, execution)
&& status == EmbeddedWorkerStatus::Running
&& execution.backend == worker_runtime::execution::WorkerExecutionBackendKind::Connected
&& execution.run_state == WorkerExecutionRunState::Idle
&& !execution_last_result_blocks_control(execution)
} }
fn can_stop_embedded_worker( fn can_stop_embedded_worker(
@ -976,16 +974,7 @@ impl EmbeddedWorkerRuntime {
status: EmbeddedWorkerStatus, status: EmbeddedWorkerStatus,
execution: &worker_runtime::execution::WorkerExecutionStatus, execution: &worker_runtime::execution::WorkerExecutionStatus,
) -> bool { ) -> bool {
self.execution_enabled runtime_worker_can_stop(self.execution_enabled, status, execution)
&& status == EmbeddedWorkerStatus::Running
&& execution.backend == worker_runtime::execution::WorkerExecutionBackendKind::Connected
&& !matches!(
execution.run_state,
WorkerExecutionRunState::Rejected
| WorkerExecutionRunState::Errored
| WorkerExecutionRunState::Unconnected
)
&& !execution_last_result_blocks_control(execution)
} }
fn map_worker_summary(&self, summary: worker_runtime::catalog::WorkerSummary) -> WorkerSummary { fn map_worker_summary(&self, summary: worker_runtime::catalog::WorkerSummary) -> WorkerSummary {
@ -994,14 +983,14 @@ impl EmbeddedWorkerRuntime {
worker_id: summary.worker_ref.worker_id.as_str().to_string(), worker_id: summary.worker_ref.worker_id.as_str().to_string(),
host_id: self.host_id.clone(), host_id: self.host_id.clone(),
label: safe_display_hint(summary.worker_ref.worker_id.as_str()), label: safe_display_hint(summary.worker_ref.worker_id.as_str()),
role: embedded_intent_label(&summary.intent), role: embedded_profile_label(&summary.profile),
profile: embedded_profile_label(&summary.profile), profile: embedded_profile_label(&summary.profile),
workspace: WorkerWorkspaceSummary { workspace: WorkerWorkspaceSummary {
visibility: "backend_internal".to_string(), visibility: "backend_internal".to_string(),
identity: "runtime_registry_worker".to_string(), identity: "runtime_registry_worker".to_string(),
}, },
state: embedded_worker_status_label(summary.status).to_string(), state: embedded_worker_status_label(summary.status).to_string(),
status: embedded_worker_execution_status_label(summary.status, summary.execution.run_state) status: embedded_worker_execution_status_label(summary.status, &summary.execution)
.to_string(), .to_string(),
last_seen_at: None, last_seen_at: None,
implementation: WorkerImplementationSummary { implementation: WorkerImplementationSummary {
@ -1027,14 +1016,14 @@ impl EmbeddedWorkerRuntime {
worker_id: detail.worker_id.as_str().to_string(), worker_id: detail.worker_id.as_str().to_string(),
host_id: self.host_id.clone(), host_id: self.host_id.clone(),
label: safe_display_hint(detail.worker_id.as_str()), label: safe_display_hint(detail.worker_id.as_str()),
role: embedded_intent_label(&detail.intent), role: embedded_profile_label(&detail.profile),
profile: embedded_profile_label(&detail.profile), profile: embedded_profile_label(&detail.profile),
workspace: WorkerWorkspaceSummary { workspace: WorkerWorkspaceSummary {
visibility: "backend_internal".to_string(), visibility: "backend_internal".to_string(),
identity: "runtime_registry_worker".to_string(), identity: "runtime_registry_worker".to_string(),
}, },
state: embedded_worker_status_label(detail.status).to_string(), state: embedded_worker_status_label(detail.status).to_string(),
status: embedded_worker_execution_status_label(detail.status, detail.execution.run_state) status: embedded_worker_execution_status_label(detail.status, &detail.execution)
.to_string(), .to_string(),
last_seen_at: None, last_seen_at: None,
implementation: WorkerImplementationSummary { implementation: WorkerImplementationSummary {
@ -1195,26 +1184,35 @@ impl WorkspaceWorkerRuntime for EmbeddedWorkerRuntime {
if matches!(request.acceptance, WorkerSpawnAcceptanceRequirement::RunAccepted { expected_segments } if expected_segments > 0) if matches!(request.acceptance, WorkerSpawnAcceptanceRequirement::RunAccepted { expected_segments } if expected_segments > 0)
{ {
diagnostics.push(diagnostic( diagnostics.push(diagnostic(
"embedded_runtime_tools_less", "embedded_runtime_acceptance_projection",
DiagnosticSeverity::Info, DiagnosticSeverity::Info,
"Embedded Runtime v0 creates a tools-less catalog Worker and does not spawn provider segments".to_string(), "Embedded Runtime accepts creation through a runtime execution backend; provider segment counts are observed after execution, not faked at create time".to_string(),
)); ));
} }
let create_request = CreateWorkerRequest { let profile = request
intent: embedded_create_intent(&request.intent),
profile: request
.profile .profile
.clone() .clone()
.unwrap_or_else(|| embedded_profile_selector(&request.intent)), .unwrap_or_else(|| embedded_profile_selector(&request.intent));
config_bundle: request.config_bundle.clone(), let config_bundle = match self
requested_capabilities: if request.requested_capabilities.is_empty() { .runtime
vec![CapabilityRequest::named("read")] .store_config_bundle(default_embedded_config_bundle(&profile))
} else { {
request.requested_capabilities.clone() Ok(availability) => availability.reference,
}, Err(error) => {
workspace_refs: Vec::new(), diagnostics.push(embedded_runtime_diagnostic(&error));
mount_refs: Vec::new(), return WorkerSpawnResult {
state: WorkerOperationState::Rejected,
worker: None,
acceptance_evidence: Vec::new(),
diagnostics,
};
}
};
let create_request = CreateWorkerRequest {
profile,
config_bundle,
initial_input: request.initial_input.clone(),
}; };
match self.runtime.create_worker(create_request) { match self.runtime.create_worker(create_request) {
Ok(detail) => { Ok(detail) => {
@ -1675,22 +1673,23 @@ impl RemoteWorkerRuntime {
worker_id: summary.worker_ref.worker_id.as_str().to_string(), worker_id: summary.worker_ref.worker_id.as_str().to_string(),
host_id: self.host_id.clone(), host_id: self.host_id.clone(),
label: safe_display_hint(summary.worker_ref.worker_id.as_str()), label: safe_display_hint(summary.worker_ref.worker_id.as_str()),
role: embedded_intent_label(&summary.intent), role: None,
profile: embedded_profile_label(&summary.profile), profile: embedded_profile_label(&summary.profile),
workspace: WorkerWorkspaceSummary { workspace: WorkerWorkspaceSummary {
visibility: "remote_runtime".to_string(), visibility: "remote_runtime".to_string(),
identity: "runtime_registry_worker".to_string(), identity: "runtime_registry_worker".to_string(),
}, },
state: embedded_worker_status_label(summary.status).to_string(), state: embedded_worker_status_label(summary.status).to_string(),
status: embedded_worker_status_label(summary.status).to_string(), status: embedded_worker_execution_status_label(summary.status, &summary.execution)
.to_string(),
last_seen_at: None, last_seen_at: None,
implementation: WorkerImplementationSummary { implementation: WorkerImplementationSummary {
kind: "remote_worker_runtime".to_string(), kind: "remote_worker_runtime".to_string(),
display_hint: "Backend-proxied remote worker-runtime Worker".to_string(), display_hint: "Backend-proxied remote worker-runtime Worker".to_string(),
}, },
capabilities: WorkerCapabilitySummary { capabilities: WorkerCapabilitySummary {
can_accept_input: true, can_accept_input: runtime_worker_can_accept_input(true, summary.status, &summary.execution),
can_stop: true, can_stop: runtime_worker_can_stop(true, summary.status, &summary.execution),
can_spawn_followup: false, can_spawn_followup: false,
}, },
diagnostics: vec![diagnostic( diagnostics: vec![diagnostic(
@ -1707,22 +1706,23 @@ impl RemoteWorkerRuntime {
worker_id: detail.worker_id.as_str().to_string(), worker_id: detail.worker_id.as_str().to_string(),
host_id: self.host_id.clone(), host_id: self.host_id.clone(),
label: safe_display_hint(detail.worker_id.as_str()), label: safe_display_hint(detail.worker_id.as_str()),
role: embedded_intent_label(&detail.intent), role: None,
profile: embedded_profile_label(&detail.profile), profile: embedded_profile_label(&detail.profile),
workspace: WorkerWorkspaceSummary { workspace: WorkerWorkspaceSummary {
visibility: "remote_runtime".to_string(), visibility: "remote_runtime".to_string(),
identity: "runtime_registry_worker".to_string(), identity: "runtime_registry_worker".to_string(),
}, },
state: embedded_worker_status_label(detail.status).to_string(), state: embedded_worker_status_label(detail.status).to_string(),
status: embedded_worker_status_label(detail.status).to_string(), status: embedded_worker_execution_status_label(detail.status, &detail.execution)
.to_string(),
last_seen_at: None, last_seen_at: None,
implementation: WorkerImplementationSummary { implementation: WorkerImplementationSummary {
kind: "remote_worker_runtime".to_string(), kind: "remote_worker_runtime".to_string(),
display_hint: "Backend-proxied remote worker-runtime Worker".to_string(), display_hint: "Backend-proxied remote worker-runtime Worker".to_string(),
}, },
capabilities: WorkerCapabilitySummary { capabilities: WorkerCapabilitySummary {
can_accept_input: true, can_accept_input: runtime_worker_can_accept_input(true, detail.status, &detail.execution),
can_stop: true, can_stop: runtime_worker_can_stop(true, detail.status, &detail.execution),
can_spawn_followup: false, can_spawn_followup: false,
}, },
diagnostics: vec![diagnostic( diagnostics: vec![diagnostic(
@ -1875,20 +1875,24 @@ impl WorkspaceWorkerRuntime for RemoteWorkerRuntime {
)], )],
}; };
} }
let create = CreateWorkerRequest { let profile = request
intent: embedded_create_intent(&request.intent),
profile: request
.profile .profile
.clone() .clone()
.unwrap_or_else(|| embedded_profile_selector(&request.intent)), .unwrap_or_else(|| embedded_profile_selector(&request.intent));
config_bundle: request.config_bundle.clone(), let sync = self.sync_config_bundle(default_embedded_config_bundle(&profile));
requested_capabilities: if request.requested_capabilities.is_empty() { let Some(config_bundle) = sync.availability.map(|availability| availability.reference)
vec![CapabilityRequest::named("read")] else {
} else { return WorkerSpawnResult {
request.requested_capabilities.clone() state: WorkerOperationState::Rejected,
}, worker: None,
workspace_refs: Vec::new(), acceptance_evidence: Vec::new(),
mount_refs: Vec::new(), diagnostics: sync.diagnostics,
};
};
let create = CreateWorkerRequest {
profile,
config_bundle,
initial_input: request.initial_input.clone(),
}; };
match self.post_json::<_, RuntimeHttpWorkerResponse>("/v1/workers", &create) { match self.post_json::<_, RuntimeHttpWorkerResponse>("/v1/workers", &create) {
Ok(response) => WorkerSpawnResult { Ok(response) => WorkerSpawnResult {
@ -2099,9 +2103,37 @@ fn embedded_spawn_execution_failure_diagnostic(
)) ))
} }
fn execution_last_result_blocks_control( fn runtime_worker_can_accept_input(
execution: &worker_runtime::execution::WorkerExecutionStatus, execution_enabled: bool,
status: EmbeddedWorkerStatus,
execution: &WorkerExecutionStatus,
) -> bool { ) -> bool {
execution_enabled
&& status == EmbeddedWorkerStatus::Running
&& execution.backend == worker_runtime::execution::WorkerExecutionBackendKind::Connected
&& execution.run_state == WorkerExecutionRunState::Idle
&& !execution_last_result_blocks_control(execution)
}
fn runtime_worker_can_stop(
execution_enabled: bool,
status: EmbeddedWorkerStatus,
execution: &WorkerExecutionStatus,
) -> bool {
execution_enabled
&& status == EmbeddedWorkerStatus::Running
&& execution.backend == worker_runtime::execution::WorkerExecutionBackendKind::Connected
&& !matches!(
execution.run_state,
WorkerExecutionRunState::Stopped
| WorkerExecutionRunState::Rejected
| WorkerExecutionRunState::Errored
| WorkerExecutionRunState::Unconnected
)
&& !execution_last_result_blocks_control(execution)
}
fn execution_last_result_blocks_control(execution: &WorkerExecutionStatus) -> bool {
execution.last_result.as_ref().is_some_and(|result| { execution.last_result.as_ref().is_some_and(|result| {
matches!( matches!(
result.outcome, result.outcome,
@ -2122,37 +2154,53 @@ fn embedded_worker_status_label(status: EmbeddedWorkerStatus) -> &'static str {
fn embedded_worker_execution_status_label( fn embedded_worker_execution_status_label(
status: EmbeddedWorkerStatus, status: EmbeddedWorkerStatus,
run_state: WorkerExecutionRunState, execution: &WorkerExecutionStatus,
) -> &'static str { ) -> &'static str {
match status { match status {
EmbeddedWorkerStatus::Stopped => "stopped", EmbeddedWorkerStatus::Stopped => "stopped",
EmbeddedWorkerStatus::Cancelled => "cancelled", EmbeddedWorkerStatus::Cancelled => "cancelled",
EmbeddedWorkerStatus::Running => match run_state { EmbeddedWorkerStatus::Running => {
if execution.backend == worker_runtime::execution::WorkerExecutionBackendKind::Stale {
return "stale";
}
match execution.run_state {
WorkerExecutionRunState::Idle => "idle", WorkerExecutionRunState::Idle => "idle",
WorkerExecutionRunState::Busy => "running", WorkerExecutionRunState::Busy => "running",
WorkerExecutionRunState::Stopped => "stopped", WorkerExecutionRunState::Stopped => "stopped",
WorkerExecutionRunState::Rejected => "rejected", WorkerExecutionRunState::Rejected => "rejected",
WorkerExecutionRunState::Errored => "errored", WorkerExecutionRunState::Errored => "errored",
WorkerExecutionRunState::Unconnected => "unconnected", WorkerExecutionRunState::Unconnected => "unconnected",
}, }
}
} }
} }
fn embedded_create_intent(intent: &WorkerSpawnIntent) -> WorkerIntent { fn default_embedded_config_bundle(profile: &ProfileSelector) -> ConfigBundle {
match intent { let id = format!(
WorkerSpawnIntent::WorkspaceCompanion => WorkerIntent::Role { "workspace-runtime-{}",
role: "workspace_companion".to_string(), embedded_profile_label(profile)
purpose: Some("workspace backend internal companion".to_string()), .unwrap_or_else(|| "default".to_string())
.replace([':', '/', ' '], "-")
);
ConfigBundle {
metadata: ConfigBundleMetadata {
id,
digest: String::new(),
revision: "workspace-runtime-v0".to_string(),
workspace_id: "workspace-server".to_string(),
created_at: "runtime-generated".to_string(),
provenance: ConfigBundleProvenance {
source: "workspace-server".to_string(),
detail: Some("backend-resolved launch bundle".to_string()),
}, },
WorkerSpawnIntent::WorkspaceOrchestrator => WorkerIntent::Role {
role: "workspace_orchestrator".to_string(),
purpose: Some("workspace backend internal orchestration".to_string()),
},
WorkerSpawnIntent::TicketRole { ticket_id, role } => WorkerIntent::Role {
role: ticket_role_profile_slug(role).to_string(),
purpose: Some(format!("ticket {ticket_id}")),
}, },
profiles: vec![ConfigProfileDescriptor {
selector: profile.clone(),
label: embedded_profile_label(profile),
}],
declarations: Vec::new(),
} }
.with_computed_digest()
} }
fn embedded_profile_selector(intent: &WorkerSpawnIntent) -> ProfileSelector { fn embedded_profile_selector(intent: &WorkerSpawnIntent) -> ProfileSelector {
@ -2176,16 +2224,6 @@ fn ticket_role_profile_slug(role: &TicketWorkerRole) -> &'static str {
} }
} }
fn embedded_intent_label(intent: &WorkerIntent) -> Option<String> {
match intent {
WorkerIntent::Assistant { purpose } => {
purpose.clone().or_else(|| Some("assistant".to_string()))
}
WorkerIntent::Task { objective } => Some(safe_display_hint(objective)),
WorkerIntent::Role { role, .. } => Some(safe_display_hint(role)),
}
}
fn embedded_profile_label(profile: &ProfileSelector) -> Option<String> { fn embedded_profile_label(profile: &ProfileSelector) -> Option<String> {
Some(match profile { Some(match profile {
ProfileSelector::RuntimeDefault => "runtime_default".to_string(), ProfileSelector::RuntimeDefault => "runtime_default".to_string(),
@ -2324,7 +2362,8 @@ fn embedded_runtime_diagnostic(error: &EmbeddedRuntimeError) -> RuntimeDiagnosti
DiagnosticSeverity::Warning, DiagnosticSeverity::Warning,
"Embedded Runtime worker was not found".to_string(), "Embedded Runtime worker was not found".to_string(),
), ),
EmbeddedRuntimeError::WorkerExecutionUnavailable { .. } => diagnostic( EmbeddedRuntimeError::WorkerExecutionUnavailable { .. }
| EmbeddedRuntimeError::ExecutionBackendUnavailable { .. } => diagnostic(
"embedded_worker_execution_unavailable", "embedded_worker_execution_unavailable",
DiagnosticSeverity::Warning, DiagnosticSeverity::Warning,
"Embedded Worker has no execution backend attached".to_string(), "Embedded Worker has no execution backend attached".to_string(),
@ -2339,6 +2378,11 @@ fn embedded_runtime_diagnostic(error: &EmbeddedRuntimeError) -> RuntimeDiagnosti
DiagnosticSeverity::Warning, DiagnosticSeverity::Warning,
format!("Requested limit {requested} exceeds embedded Runtime maximum {max}"), format!("Requested limit {requested} exceeds embedded Runtime maximum {max}"),
), ),
EmbeddedRuntimeError::InvalidInitialInputKind { .. } => diagnostic(
"embedded_worker_initial_input_kind_invalid",
DiagnosticSeverity::Warning,
error.to_string(),
),
EmbeddedRuntimeError::InvalidRequest(_) EmbeddedRuntimeError::InvalidRequest(_)
| EmbeddedRuntimeError::ConfigBundleMissing { .. } | EmbeddedRuntimeError::ConfigBundleMissing { .. }
| EmbeddedRuntimeError::ConfigBundleDigestMismatch { .. } | EmbeddedRuntimeError::ConfigBundleDigestMismatch { .. }
@ -2962,8 +3006,7 @@ mod tests {
expected_segments: 0, expected_segments: 0,
}, },
profile: None, profile: None,
config_bundle: None, initial_input: None,
requested_capabilities: Vec::new(),
} }
} }
@ -2978,13 +3021,35 @@ mod tests {
assert_eq!(spawned.state, WorkerOperationState::Rejected); assert_eq!(spawned.state, WorkerOperationState::Rejected);
assert!(spawned.acceptance_evidence.is_empty()); assert!(spawned.acceptance_evidence.is_empty());
assert!(spawned.diagnostics.iter().any(|diagnostic| { assert!(spawned.diagnostics.iter().any(|diagnostic| {
diagnostic.code == "embedded_worker_execution_spawn_errored" diagnostic.code == "embedded_worker_execution_rejected"
&& !diagnostic.message.contains("/tmp/secret-provider-config") && !diagnostic.message.contains("/tmp/secret-provider-config")
})); }));
let worker = spawned.worker.expect("failed execution is still projected"); assert!(spawned.worker.is_none());
assert_eq!(worker.status, "errored"); }
assert!(!worker.capabilities.can_accept_input);
assert!(!worker.capabilities.can_stop); #[test]
fn embedded_runtime_rejects_system_initial_input_without_worker_projection() {
let runtime = EmbeddedWorkerRuntime::new_memory_with_execution_backend(
"local:test",
Arc::new(AcceptingExecutionBackend::default()),
)
.expect("test backend should connect");
let mut request = embedded_spawn_request();
request.initial_input = Some(EmbeddedWorkerInput {
kind: EmbeddedWorkerInputKind::System,
content: "system/role instruction belongs in profile".to_string(),
});
let spawned = runtime.spawn_worker(request);
assert_eq!(spawned.state, WorkerOperationState::Rejected);
assert!(spawned.worker.is_none());
assert!(spawned.diagnostics.iter().any(|diagnostic| {
diagnostic.code == "embedded_worker_initial_input_kind_invalid"
&& diagnostic
.message
.contains("initial worker input must be user input")
}));
assert!(runtime.list_workers(10).items.is_empty());
} }
#[test] #[test]
@ -3035,8 +3100,13 @@ mod tests {
#[test] #[test]
fn embedded_runtime_registers_routes_input_and_transcript_without_internal_leaks() { fn embedded_runtime_registers_routes_input_and_transcript_without_internal_leaks() {
let registry = let registry = RuntimeRegistry::for_workspace(
RuntimeRegistry::for_workspace(EmbeddedWorkerRuntime::new_memory("local:test")); EmbeddedWorkerRuntime::new_memory_with_execution_backend(
"local:test",
Arc::new(AcceptingExecutionBackend::default()),
)
.expect("test backend should connect"),
);
let runtimes = registry.list_runtimes(10); let runtimes = registry.list_runtimes(10);
let embedded_summary = runtimes let embedded_summary = runtimes
@ -3050,7 +3120,7 @@ mod tests {
); );
assert_eq!(embedded_summary.source.status, RuntimeSourceStatus::Active); assert_eq!(embedded_summary.source.status, RuntimeSourceStatus::Active);
assert!(embedded_summary.capabilities.can_spawn_worker); assert!(embedded_summary.capabilities.can_spawn_worker);
assert!(!embedded_summary.capabilities.can_accept_input); assert!(embedded_summary.capabilities.can_accept_input);
let spawned = registry let spawned = registry
.spawn_worker( .spawn_worker(
@ -3065,8 +3135,7 @@ mod tests {
expected_segments: 0, expected_segments: 0,
}, },
profile: None, profile: None,
config_bundle: None, initial_input: None,
requested_capabilities: Vec::new(),
}, },
) )
.unwrap(); .unwrap();
@ -3083,7 +3152,7 @@ mod tests {
assert_eq!(worker.workspace.identity, "runtime_registry_worker"); assert_eq!(worker.workspace.identity, "runtime_registry_worker");
assert_eq!(worker.implementation.kind, "embedded_worker_runtime"); assert_eq!(worker.implementation.kind, "embedded_worker_runtime");
assert_eq!(worker.profile.as_deref(), Some("builtin:coder")); assert_eq!(worker.profile.as_deref(), Some("builtin:coder"));
assert!(!worker.capabilities.can_accept_input); assert!(worker.capabilities.can_accept_input);
let input = registry let input = registry
.send_input( .send_input(
@ -3095,21 +3164,20 @@ mod tests {
}, },
) )
.unwrap(); .unwrap();
assert_eq!(input.state, WorkerOperationState::Rejected); assert_eq!(input.state, WorkerOperationState::Accepted);
assert_eq!(input.runtime_id, EMBEDDED_RUNTIME_ID); assert_eq!(input.runtime_id, EMBEDDED_RUNTIME_ID);
assert_eq!(input.worker_id, worker.worker_id); assert_eq!(input.worker_id, worker.worker_id);
assert!(
input
.diagnostics
.iter()
.any(|diagnostic| diagnostic.code == "embedded_worker_execution_unavailable")
);
let transcript = registry let transcript = registry
.transcript(EMBEDDED_RUNTIME_ID, &worker.worker_id, 0, 10) .transcript(EMBEDDED_RUNTIME_ID, &worker.worker_id, 0, 10)
.unwrap(); .unwrap();
assert_eq!(transcript.state, WorkerOperationState::Accepted); assert_eq!(transcript.state, WorkerOperationState::Accepted);
assert!(transcript.items.is_empty()); assert!(
transcript
.items
.iter()
.any(|entry| entry.role == "user" && entry.content == "hello embedded runtime")
);
let json = serde_json::to_string(&(embedded_summary, worker, transcript)).unwrap(); let json = serde_json::to_string(&(embedded_summary, worker, transcript)).unwrap();
for forbidden in [ for forbidden in [
@ -3132,9 +3200,13 @@ mod tests {
#[test] #[test]
fn embedded_backend_syncs_config_bundle_and_spawns_with_bundle_ref() { fn embedded_backend_syncs_config_bundle_and_spawns_with_bundle_ref() {
let registry = RuntimeRegistry::new(vec![Arc::new(EmbeddedWorkerRuntime::new_memory( let registry = RuntimeRegistry::new(vec![Arc::new(
EmbeddedWorkerRuntime::new_memory_with_execution_backend(
"local:test", "local:test",
))]); Arc::new(AcceptingExecutionBackend::default()),
)
.unwrap(),
)]);
let bundle = test_config_bundle(); let bundle = test_config_bundle();
let sync = registry let sync = registry
.sync_config_bundle(EMBEDDED_RUNTIME_ID, bundle.clone()) .sync_config_bundle(EMBEDDED_RUNTIME_ID, bundle.clone())
@ -3162,8 +3234,7 @@ mod tests {
expected_segments: 0, expected_segments: 0,
}, },
profile: Some(ProfileSelector::Builtin("builtin:coder".to_string())), profile: Some(ProfileSelector::Builtin("builtin:coder".to_string())),
config_bundle: Some(reference), initial_input: None,
requested_capabilities: vec![CapabilityRequest::named("read")],
}, },
) )
.unwrap(); .unwrap();
@ -3176,9 +3247,13 @@ mod tests {
#[test] #[test]
fn embedded_runtime_rejects_socket_ready_acceptance_without_socket_identity() { fn embedded_runtime_rejects_socket_ready_acceptance_without_socket_identity() {
let registry = RuntimeRegistry::new(vec![Arc::new(EmbeddedWorkerRuntime::new_memory( let registry = RuntimeRegistry::new(vec![Arc::new(
EmbeddedWorkerRuntime::new_memory_with_execution_backend(
"local:test", "local:test",
))]); Arc::new(AcceptingExecutionBackend::default()),
)
.unwrap(),
)]);
let result = registry let result = registry
.spawn_worker( .spawn_worker(
EMBEDDED_RUNTIME_ID, EMBEDDED_RUNTIME_ID,
@ -3187,8 +3262,7 @@ mod tests {
requested_worker_name: None, requested_worker_name: None,
acceptance: WorkerSpawnAcceptanceRequirement::SocketReady, acceptance: WorkerSpawnAcceptanceRequirement::SocketReady,
profile: None, profile: None,
config_bundle: None, initial_input: None,
requested_capabilities: Vec::new(),
}, },
) )
.unwrap(); .unwrap();
@ -3275,6 +3349,8 @@ mod tests {
workers.items[0].workspace.identity, workers.items[0].workspace.identity,
"runtime_registry_worker" "runtime_registry_worker"
); );
assert!(workers.items[0].capabilities.can_accept_input);
assert!(workers.items[0].capabilities.can_stop);
let input = registry let input = registry
.send_input( .send_input(
@ -3304,6 +3380,101 @@ mod tests {
assert!(browser_payload.contains("worker_id")); assert!(browser_payload.contains("worker_id"));
} }
#[test]
fn remote_runtime_projection_blocks_stale_and_unconnected_execution_input() {
let (base_url, server) = serve_mock_http(vec![
mock_response(
"GET",
"/v1/workers",
true,
200,
json!({
"workers": [
worker_json_with_execution(
"embedded-worker-runtime",
"worker-stale",
"stale",
"unconnected",
None,
),
worker_json_with_execution(
"embedded-worker-runtime",
"worker-unconnected",
"unconnected",
"unconnected",
None,
),
worker_json_with_execution(
"embedded-worker-runtime",
"worker-rejected",
"connected",
"rejected",
Some("rejected"),
),
worker_json_with_execution(
"embedded-worker-runtime",
"worker-errored",
"connected",
"errored",
Some("errored"),
)
]
})
.to_string(),
),
mock_response(
"GET",
"/v1/workers/worker-stale",
true,
200,
json!({
"worker": worker_json_with_execution(
"embedded-worker-runtime",
"worker-stale",
"stale",
"unconnected",
None,
)})
.to_string(),
),
]);
let registry = RuntimeRegistry::new(vec![Arc::new(
RemoteWorkerRuntime::new(RemoteRuntimeConfig::new(
"remote:primary",
"Remote Primary",
base_url,
Some("secret-token-do-not-leak".to_string()),
))
.unwrap(),
)]);
let workers = registry.list_workers(10);
assert_eq!(workers.items.len(), 4);
for worker in &workers.items {
assert!(
!worker.capabilities.can_accept_input,
"{} should not be input-capable",
worker.worker_id
);
assert!(
!worker.capabilities.can_stop,
"{} should not be stoppable",
worker.worker_id
);
}
assert_eq!(workers.items[0].status, "stale");
assert_eq!(workers.items[1].status, "unconnected");
assert_eq!(workers.items[2].status, "rejected");
assert_eq!(workers.items[3].status, "errored");
let stale_detail = registry.worker("remote:primary", "worker-stale").unwrap();
assert!(!stale_detail.capabilities.can_accept_input);
assert!(!stale_detail.capabilities.can_stop);
assert_eq!(stale_detail.status, "stale");
server.join().expect("mock remote server finished");
}
#[test] #[test]
fn remote_config_bundle_sync_and_check_diagnostics_are_sanitized_and_path_safe() { fn remote_config_bundle_sync_and_check_diagnostics_are_sanitized_and_path_safe() {
let leaked_store_path = "/var/lib/yoi/runtime/bundles/bundle-1.json"; let leaked_store_path = "/var/lib/yoi/runtime/bundles/bundle-1.json";
@ -3472,20 +3643,33 @@ mod tests {
} }
fn worker_json(runtime_id: &str, worker_id: &str) -> serde_json::Value { fn worker_json(runtime_id: &str, worker_id: &str) -> serde_json::Value {
worker_json_with_execution(runtime_id, worker_id, "connected", "idle", None)
}
fn worker_json_with_execution(
runtime_id: &str,
worker_id: &str,
backend: &str,
run_state: &str,
last_outcome: Option<&str>,
) -> serde_json::Value {
let last_result = last_outcome.map(|outcome| {
json!({
"operation": "input",
"outcome": outcome,
"run_state": run_state,
"message": format!("{outcome} result")
})
});
json!({ json!({
"worker_ref": { "runtime_id": runtime_id, "worker_id": worker_id }, "worker_ref": { "runtime_id": runtime_id, "worker_id": worker_id },
"runtime_id": runtime_id, "runtime_id": runtime_id,
"worker_id": worker_id, "worker_id": worker_id,
"status": "running", "status": "running",
"execution": { "backend": "connected", "run_state": "idle" }, "execution": { "backend": backend, "run_state": run_state, "last_result": last_result },
"intent": { "kind": "role", "role": "coder", "purpose": "remote test" }, "intent": { "kind": "role", "role": "coder", "purpose": "remote test" },
"profile": { "kind": "builtin", "value": "coder" }, "profile": { "kind": "builtin", "value": "coder" },
"config_bundle": null, "config_bundle": { "id": "remote-bundle", "digest": "remote-digest" },
"requested_capabilities": [],
"workspace_refs": [],
"mount_refs": [],
"requested_capability_count": 0,
"has_config_bundle": false,
"transcript_len": 0, "transcript_len": 0,
"last_event_id": 0 "last_event_id": 0
}) })

View File

@ -1063,6 +1063,7 @@ mod tests {
use axum::http::Request; use axum::http::Request;
use futures::{SinkExt, StreamExt}; use futures::{SinkExt, StreamExt};
use serde_json::{Value, json}; use serde_json::{Value, json};
use std::sync::Arc;
use tokio_tungstenite::connect_async; use tokio_tungstenite::connect_async;
use tokio_tungstenite::tungstenite::Message; use tokio_tungstenite::tungstenite::Message;
use tower::ServiceExt; use tower::ServiceExt;
@ -1140,6 +1141,51 @@ mod tests {
} }
} }
fn runtime_test_bundle() -> worker_runtime::config_bundle::ConfigBundle {
worker_runtime::config_bundle::ConfigBundle {
metadata: worker_runtime::config_bundle::ConfigBundleMetadata {
id: "server-test-bundle".to_string(),
digest: String::new(),
revision: "test".to_string(),
workspace_id: "test".to_string(),
created_at: "test".to_string(),
provenance: worker_runtime::config_bundle::ConfigBundleProvenance {
source: "test".to_string(),
detail: None,
},
},
profiles: vec![worker_runtime::config_bundle::ConfigProfileDescriptor {
selector: worker_runtime::catalog::ProfileSelector::RuntimeDefault,
label: Some("server-test".to_string()),
}],
declarations: Vec::new(),
}
.with_computed_digest()
}
fn runtime_create_request() -> worker_runtime::catalog::CreateWorkerRequest {
let bundle = runtime_test_bundle();
worker_runtime::catalog::CreateWorkerRequest {
profile: worker_runtime::catalog::ProfileSelector::RuntimeDefault,
config_bundle: worker_runtime::catalog::ConfigBundleRef {
id: bundle.metadata.id,
digest: bundle.metadata.digest,
},
initial_input: None,
}
}
fn runtime_with_worker() -> (worker_runtime::Runtime, worker_runtime::identity::WorkerRef) {
let runtime = worker_runtime::Runtime::with_execution_backend(
worker_runtime::RuntimeOptions::default(),
Arc::new(DeterministicExecutionBackend::default()),
)
.unwrap();
runtime.store_config_bundle(runtime_test_bundle()).unwrap();
let worker = runtime.create_worker(runtime_create_request()).unwrap();
(runtime, worker.worker_ref)
}
#[tokio::test] #[tokio::test]
async fn serves_bounded_read_apis_and_static_spa_separately() { async fn serves_bounded_read_apis_and_static_spa_separately() {
let dir = tempfile::tempdir().unwrap(); let dir = tempfile::tempdir().unwrap();
@ -1153,7 +1199,13 @@ mod tests {
let store = SqliteWorkspaceStore::in_memory().unwrap(); let store = SqliteWorkspaceStore::in_memory().unwrap();
let mut config = ServerConfig::local_dev(dir.path(), test_identity()); let mut config = ServerConfig::local_dev(dir.path(), test_identity());
config.static_assets_dir = Some(static_dir); config.static_assets_dir = Some(static_dir);
let api = WorkspaceApi::new(config, Arc::new(store)).await.unwrap(); let api = WorkspaceApi::new_with_execution_backend(
config,
Arc::new(store),
Arc::new(DeterministicExecutionBackend::default()),
)
.await
.unwrap();
let app = build_router(api); let app = build_router(api);
let workspace = get_json(app.clone(), "/api/workspace").await; let workspace = get_json(app.clone(), "/api/workspace").await;
@ -1260,7 +1312,7 @@ mod tests {
let worker_items = workers["items"].as_array().unwrap(); let worker_items = workers["items"].as_array().unwrap();
let companion_worker = worker_items let companion_worker = worker_items
.iter() .iter()
.find(|worker| worker["role"] == "workspace_companion") .find(|worker| worker["role"] == "builtin:companion")
.expect("companion worker is visible through runtime worker API"); .expect("companion worker is visible through runtime worker API");
assert_eq!(companion_worker["runtime_id"], "embedded-worker-runtime"); assert_eq!(companion_worker["runtime_id"], "embedded-worker-runtime");
assert!(companion_worker["capabilities"]["can_stop"].is_boolean()); assert!(companion_worker["capabilities"]["can_stop"].is_boolean());
@ -1270,7 +1322,7 @@ mod tests {
companion_status["state"].as_str(), companion_status["state"].as_str(),
Some("ready") | Some("error") Some("ready") | Some("error")
)); ));
assert_eq!(companion_status["worker"]["role"], "workspace_companion"); assert_eq!(companion_status["worker"]["role"], "builtin:companion");
assert_eq!( assert_eq!(
companion_status["transport"]["kind"], companion_status["transport"]["kind"],
"embedded_worker_runtime" "embedded_worker_runtime"
@ -1305,7 +1357,7 @@ mod tests {
.as_array() .as_array()
.unwrap() .unwrap()
.iter() .iter()
.any(|worker| worker["role"] == "workspace_companion") .any(|worker| worker["role"] == "builtin:companion")
); );
let runs_response = app let runs_response = app
@ -1477,7 +1529,13 @@ mod tests {
let dir = tempfile::tempdir().unwrap(); let dir = tempfile::tempdir().unwrap();
let store = SqliteWorkspaceStore::in_memory().unwrap(); let store = SqliteWorkspaceStore::in_memory().unwrap();
let config = ServerConfig::local_dev(dir.path(), test_identity()); let config = ServerConfig::local_dev(dir.path(), test_identity());
let api = WorkspaceApi::new(config, Arc::new(store)).await.unwrap(); let api = WorkspaceApi::new_with_execution_backend(
config,
Arc::new(store),
Arc::new(DeterministicExecutionBackend::default()),
)
.await
.unwrap();
let app = build_router(api); let app = build_router(api);
let runtimes = get_json(app.clone(), "/api/runtimes").await; let runtimes = get_json(app.clone(), "/api/runtimes").await;
@ -1515,20 +1573,14 @@ mod tests {
}), }),
) )
.await; .await;
assert_eq!(spawned["state"], "rejected"); assert_eq!(spawned["state"], "accepted");
assert!( let diagnostics = spawned["diagnostics"].as_array().unwrap();
spawned["diagnostics"] assert!(diagnostics.iter().all(|diagnostic| {
.as_array() !diagnostic["message"]
.unwrap()
.iter()
.any(|diagnostic| {
diagnostic["code"] == "embedded_worker_execution_spawn_errored"
&& !diagnostic["message"]
.as_str() .as_str()
.unwrap() .unwrap_or_default()
.contains("/workspace/demo") .contains("/workspace/demo")
}) }));
);
let worker_id = spawned["worker"]["worker_id"].as_str().unwrap().to_string(); let worker_id = spawned["worker"]["worker_id"].as_str().unwrap().to_string();
assert_eq!(spawned["worker"]["runtime_id"], "embedded-worker-runtime"); assert_eq!(spawned["worker"]["runtime_id"], "embedded-worker-runtime");
assert_eq!( assert_eq!(
@ -1557,16 +1609,10 @@ mod tests {
}), }),
) )
.await; .await;
assert_eq!(accepted["state"], "rejected"); assert_eq!(accepted["state"], "accepted");
assert_eq!(accepted["runtime_id"], "embedded-worker-runtime"); assert_eq!(accepted["runtime_id"], "embedded-worker-runtime");
assert_eq!(accepted["worker_id"], worker_id); assert_eq!(accepted["worker_id"], worker_id);
assert!( assert!(accepted["diagnostics"].as_array().unwrap().is_empty());
accepted["diagnostics"]
.as_array()
.unwrap()
.iter()
.any(|diagnostic| diagnostic["code"] == "embedded_worker_execution_unavailable")
);
let transcript = get_json( let transcript = get_json(
app.clone(), app.clone(),
@ -1574,7 +1620,9 @@ mod tests {
) )
.await; .await;
assert_eq!(transcript["state"], "accepted"); assert_eq!(transcript["state"], "accepted");
assert!(transcript["items"].as_array().unwrap().is_empty()); assert!(transcript["items"].as_array().unwrap().iter().any(
|item| item["role"] == "user" && item["content"] == "hello from browser-facing api"
));
let wrong_runtime = app let wrong_runtime = app
.clone() .clone()
@ -1620,10 +1668,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn proxies_worker_observation_ws_with_backend_cursors_and_diagnostics() { async fn proxies_worker_observation_ws_with_backend_cursors_and_diagnostics() {
let runtime = worker_runtime::Runtime::new_memory(); let (runtime, worker_ref) = runtime_with_worker();
let worker = runtime
.create_worker(worker_runtime::catalog::CreateWorkerRequest::default())
.unwrap();
let runtime_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); let runtime_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let runtime_addr = runtime_listener.local_addr().unwrap(); let runtime_addr = runtime_listener.local_addr().unwrap();
tokio::spawn({ tokio::spawn({
@ -1645,11 +1690,17 @@ mod tests {
worker_id: "worker-a".into(), worker_id: "worker-a".into(),
endpoint: format!( endpoint: format!(
"ws://{runtime_addr}/v1/workers/{}/events/ws", "ws://{runtime_addr}/v1/workers/{}/events/ws",
worker.worker_ref.worker_id worker_ref.worker_id
), ),
bearer_token: None, bearer_token: None,
}); });
let api = WorkspaceApi::new(config, Arc::new(store)).await.unwrap(); let api = WorkspaceApi::new_with_execution_backend(
config,
Arc::new(store),
Arc::new(DeterministicExecutionBackend::default()),
)
.await
.unwrap();
let app_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); let app_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let app_addr = app_listener.local_addr().unwrap(); let app_addr = app_listener.local_addr().unwrap();
tokio::spawn(async move { axum::serve(app_listener, build_router(api)).await.unwrap() }); tokio::spawn(async move { axum::serve(app_listener, build_router(api)).await.unwrap() });
@ -1666,7 +1717,7 @@ mod tests {
runtime runtime
.observe_worker_event( .observe_worker_event(
&worker.worker_ref, &worker_ref,
protocol::Event::TextDelta { protocol::Event::TextDelta {
text: "live".into(), text: "live".into(),
}, },
@ -1686,7 +1737,7 @@ mod tests {
let _snapshot = next_client_frame(&mut resumed).await; let _snapshot = next_client_frame(&mut resumed).await;
runtime runtime
.observe_worker_event( .observe_worker_event(
&worker.worker_ref, &worker_ref,
protocol::Event::TextDone { protocol::Event::TextDone {
text: "done".into(), text: "done".into(),
}, },
@ -1822,10 +1873,7 @@ mod tests {
worker_runtime::identity::WorkerRef, worker_runtime::identity::WorkerRef,
String, String,
) { ) {
let runtime = worker_runtime::Runtime::new_memory(); let (runtime, worker_ref) = runtime_with_worker();
let worker = runtime
.create_worker(worker_runtime::catalog::CreateWorkerRequest::default())
.unwrap();
let runtime_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); let runtime_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let runtime_addr = runtime_listener.local_addr().unwrap(); let runtime_addr = runtime_listener.local_addr().unwrap();
tokio::spawn({ tokio::spawn({
@ -1838,9 +1886,9 @@ mod tests {
}); });
let endpoint = format!( let endpoint = format!(
"ws://{runtime_addr}/v1/workers/{}/events/ws", "ws://{runtime_addr}/v1/workers/{}/events/ws",
worker.worker_ref.worker_id worker_ref.worker_id
); );
(runtime, worker.worker_ref, endpoint) (runtime, worker_ref, endpoint)
} }
async fn spawn_workspace_proxy( async fn spawn_workspace_proxy(
@ -1852,7 +1900,13 @@ mod tests {
let runtime_id = source.runtime_id.clone(); let runtime_id = source.runtime_id.clone();
let worker_id = source.worker_id.clone(); let worker_id = source.worker_id.clone();
config.runtime_event_sources.push(source); config.runtime_event_sources.push(source);
let api = WorkspaceApi::new(config, Arc::new(store)).await.unwrap(); let api = WorkspaceApi::new_with_execution_backend(
config,
Arc::new(store),
Arc::new(DeterministicExecutionBackend::default()),
)
.await
.unwrap();
let app_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); let app_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let app_addr = app_listener.local_addr().unwrap(); let app_addr = app_listener.local_addr().unwrap();
tokio::spawn(async move { axum::serve(app_listener, build_router(api)).await.unwrap() }); tokio::spawn(async move { axum::serve(app_listener, build_router(api)).await.unwrap() });