merge: worker runtime worker adapter

This commit is contained in:
Keisuke Hirata 2026-06-28 06:42:32 +09:00
commit c3ed223dfd
No known key found for this signature in database
10 changed files with 1383 additions and 49 deletions

2
Cargo.lock generated
View File

@ -5932,6 +5932,7 @@ dependencies = [
"uuid",
"wasmtime",
"wat",
"worker-runtime",
"workflow",
"yoi-plugin-pdk",
]
@ -6054,6 +6055,7 @@ dependencies = [
"tower",
"tracing",
"uuid",
"worker",
"worker-runtime",
]

View File

@ -11,7 +11,8 @@ use crate::error::RuntimeError;
use crate::execution::{
WorkerExecutionBackend, WorkerExecutionBackendKind, WorkerExecutionBackendRef,
WorkerExecutionHandle, WorkerExecutionOperation, WorkerExecutionResult,
WorkerExecutionSpawnRequest, WorkerExecutionSpawnResult, WorkerExecutionStatus,
WorkerExecutionRunState, WorkerExecutionSpawnRequest, WorkerExecutionSpawnResult,
WorkerExecutionStatus,
};
#[cfg(feature = "fs-store")]
use crate::fs_store::{
@ -722,7 +723,16 @@ impl Runtime {
) -> Result<WorkerObservationEvent, RuntimeError> {
let mut state = self.lock()?;
state.ensure_worker_ref(worker_ref)?;
let transcript_sequence = state.project_protocol_event_to_transcript(worker_ref, &payload);
let execution_state_changed =
state.project_protocol_event_to_execution(worker_ref, &payload);
let event = state.push_worker_observation_event(worker_ref.clone(), payload);
if transcript_sequence.is_some() || execution_state_changed {
state.persist_worker(&worker_ref.worker_id)?;
}
if let Some(sequence) = transcript_sequence {
state.persist_transcript_entry(&worker_ref.worker_id, sequence)?;
}
Ok(event)
}
@ -1259,6 +1269,111 @@ impl RuntimeState {
event
}
#[cfg(feature = "ws-server")]
fn append_worker_transcript_entry(
&mut self,
worker_ref: &WorkerRef,
role: TranscriptRole,
content: impl Into<String>,
) -> Option<u64> {
let content = content.into();
if content.trim().is_empty() {
return None;
}
let event_id = self.last_event_id();
let worker = self.workers.get_mut(&worker_ref.worker_id)?;
let sequence = worker.next_transcript_sequence;
worker.next_transcript_sequence += 1;
worker.transcript.push(TranscriptEntry {
sequence,
worker_ref: worker_ref.clone(),
role,
content,
event_id,
});
Some(sequence)
}
#[cfg(feature = "ws-server")]
fn project_protocol_event_to_transcript(
&mut self,
worker_ref: &WorkerRef,
event: &protocol::Event,
) -> Option<u64> {
match event {
protocol::Event::TextDone { text, .. } => self.append_worker_transcript_entry(
worker_ref,
TranscriptRole::Assistant,
text.clone(),
),
protocol::Event::Error { message, .. } => self.append_worker_transcript_entry(
worker_ref,
TranscriptRole::System,
format!("error: {message}"),
),
protocol::Event::ToolResult {
id,
summary,
is_error,
..
} => self.append_worker_transcript_entry(
worker_ref,
TranscriptRole::System,
format!(
"tool result {id}: {}{}",
if *is_error { "error: " } else { "" },
summary
),
),
_ => None,
}
}
#[cfg(feature = "ws-server")]
fn project_protocol_event_to_execution(
&mut self,
worker_ref: &WorkerRef,
event: &protocol::Event,
) -> bool {
let Some(worker) = self.workers.get_mut(&worker_ref.worker_id) else {
return false;
};
let status = &mut worker.execution;
let next_run_state = match event {
protocol::Event::Status {
status: protocol::WorkerStatus::Running,
} => Some(WorkerExecutionRunState::Busy),
protocol::Event::Status {
status: protocol::WorkerStatus::Idle,
} => Some(WorkerExecutionRunState::Idle),
protocol::Event::Status {
status: protocol::WorkerStatus::Paused,
} => Some(WorkerExecutionRunState::Busy),
protocol::Event::Snapshot { status, .. } => match status {
protocol::WorkerStatus::Running => Some(WorkerExecutionRunState::Busy),
protocol::WorkerStatus::Idle => Some(WorkerExecutionRunState::Idle),
protocol::WorkerStatus::Paused => Some(WorkerExecutionRunState::Busy),
},
protocol::Event::RunEnd { result } => match result {
protocol::RunResult::Finished | protocol::RunResult::RolledBack => {
Some(WorkerExecutionRunState::Idle)
}
protocol::RunResult::Paused => Some(WorkerExecutionRunState::Busy),
protocol::RunResult::LimitReached => Some(WorkerExecutionRunState::Errored),
},
protocol::Event::Error { .. } => Some(WorkerExecutionRunState::Errored),
_ => None,
};
let Some(next_run_state) = next_run_state else {
return false;
};
if status.run_state == next_run_state {
return false;
}
status.run_state = next_run_state;
true
}
fn push_diagnostic(
&mut self,
severity: DiagnosticSeverity,

View File

@ -5,6 +5,10 @@ edition.workspace = true
license.workspace = true
autobins = false
[features]
default = []
runtime-adapter = ["dep:worker-runtime"]
[dependencies]
async-trait = { workspace = true }
clap = { version = "4.6.0", features = ["derive"] }
@ -17,6 +21,7 @@ protocol = { workspace = true }
provider = { workspace = true }
client = { workspace = true }
pod-registry = { workspace = true }
worker-runtime = { workspace = true, features = ["ws-server"], optional = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
reqwest = { version = "0.13", default-features = false, features = ["blocking", "native-tls"] }

View File

@ -225,6 +225,21 @@ fn load_profile(
Ok((resolved.manifest, PromptLoader::builtins_only()))
}
#[cfg(feature = "runtime-adapter")]
pub(crate) fn resolve_runtime_profile_manifest(
profile: Option<&str>,
workspace_root: &Path,
worker_name: &str,
) -> Result<(WorkerManifest, PromptLoader), String> {
let selector = profile
.map(ProfileSelector::parse_cli)
.unwrap_or(ProfileSelector::Default);
let (mut manifest, loader) = load_profile(&selector, workspace_root, worker_name)?;
apply_profile_launch_policy(&mut manifest, workspace_root, None)?;
apply_plugin_resolution_plan(&mut manifest, workspace_root);
Ok((manifest, loader))
}
fn load_single_manifest(
path: &Path,
explicit_worker_name: Option<&str>,

View File

@ -10,6 +10,8 @@ pub(crate) mod in_flight;
pub mod ipc;
pub mod prompt;
pub mod runtime;
#[cfg(feature = "runtime-adapter")]
pub mod runtime_adapter;
pub mod segment_log_sink;
pub mod shared_state;
mod shutdown_after_idle;

View File

@ -0,0 +1,712 @@
//! Adapter from `worker-runtime` execution backend boundary to the real
//! `worker` crate controller/run lifecycle.
//!
//! The adapter intentionally owns real `WorkerHandle`s internally and exposes
//! only the opaque `worker-runtime` execution handle to callers. Browser/API
//! projections therefore keep the existing runtime redaction boundary: no raw
//! socket paths, session paths, manifests, credentials, or handles leave this
//! module.
use std::collections::HashMap;
use std::future::Future;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, mpsc};
use std::time::Duration;
use async_trait::async_trait;
use manifest::paths;
use pod_store::{CombinedStore, FsWorkerStore};
use protocol::{Method, Segment, WorkerStatus};
use session_store::FsStore;
use tokio::runtime::Runtime;
use tokio::sync::broadcast;
use worker_runtime::execution::{
WorkerExecutionBackend, WorkerExecutionHandle, WorkerExecutionOperation, WorkerExecutionResult,
WorkerExecutionRunState, WorkerExecutionSpawnRequest, WorkerExecutionSpawnResult,
};
use worker_runtime::interaction::{WorkerInput, WorkerInputKind};
use crate::{Worker, WorkerController, WorkerHandle};
const DEFAULT_BACKEND_ID: &str = "worker-crate";
const RUNTIME_TASK_TIMEOUT: Duration = Duration::from_secs(10);
/// Factory seam used by [`WorkerRuntimeExecutionBackend`] to construct a real
/// controller-backed Worker for a Runtime catalog entry.
#[async_trait]
pub trait RuntimeWorkerFactory: Send + Sync + 'static {
async fn spawn_controller(
&self,
request: WorkerExecutionSpawnRequest,
) -> Result<WorkerHandle, String>;
}
/// Production factory that resolves a normal Worker profile and spawns it under
/// `WorkerController`.
#[derive(Debug, Clone)]
pub struct ProfileRuntimeWorkerFactory {
workspace_root: PathBuf,
cwd: PathBuf,
store_dir: Option<PathBuf>,
pod_store_dir: Option<PathBuf>,
profile: Option<String>,
runtime_base_dir: Option<PathBuf>,
}
impl ProfileRuntimeWorkerFactory {
pub fn new(workspace_root: impl Into<PathBuf>) -> Self {
let workspace_root = workspace_root.into();
Self {
cwd: workspace_root.clone(),
workspace_root,
store_dir: None,
pod_store_dir: None,
profile: None,
runtime_base_dir: None,
}
}
pub fn with_cwd(mut self, cwd: impl Into<PathBuf>) -> Self {
self.cwd = cwd.into();
self
}
pub fn with_store_dir(mut self, store_dir: impl Into<PathBuf>) -> Self {
self.store_dir = Some(store_dir.into());
self
}
pub fn with_pod_store_dir(mut self, pod_store_dir: impl Into<PathBuf>) -> Self {
self.pod_store_dir = Some(pod_store_dir.into());
self
}
/// Set the profile selector used for Runtime-created Workers. When unset,
/// normal default profile discovery is used.
pub fn with_profile(mut self, profile: impl Into<String>) -> Self {
self.profile = Some(profile.into());
self
}
pub fn with_runtime_base_dir(mut self, runtime_base_dir: impl Into<PathBuf>) -> Self {
self.runtime_base_dir = Some(runtime_base_dir.into());
self
}
fn store_dir(&self) -> Result<PathBuf, String> {
self.store_dir
.clone()
.or_else(paths::sessions_dir)
.ok_or_else(|| {
"could not resolve sessions directory (set YOI_HOME, YOI_DATA_DIR, or HOME)"
.to_string()
})
}
fn pod_store_dir(&self, store_dir: &std::path::Path) -> PathBuf {
self.pod_store_dir
.clone()
.or_else(|| paths::data_dir().map(|data_dir| data_dir.join("pods")))
.or_else(|| store_dir.parent().map(|parent| parent.join("pods")))
.unwrap_or_else(|| PathBuf::from("workers"))
}
fn runtime_base_dir(&self) -> Result<PathBuf, String> {
self.runtime_base_dir
.clone()
.or_else(|| crate::runtime::dir::default_base().ok())
.ok_or_else(|| "could not resolve worker runtime directory".to_string())
}
fn runtime_worker_name(request: &WorkerExecutionSpawnRequest) -> String {
request.worker_ref.worker_id.to_string()
}
fn runtime_profile_value(
profile: &worker_runtime::catalog::ProfileSelector,
) -> Option<std::borrow::Cow<'_, str>> {
match profile {
worker_runtime::catalog::ProfileSelector::RuntimeDefault => None,
worker_runtime::catalog::ProfileSelector::Named(name) => {
Some(std::borrow::Cow::Borrowed(name.as_str()))
}
worker_runtime::catalog::ProfileSelector::Builtin(name) => {
if name.starts_with("builtin:") {
Some(std::borrow::Cow::Borrowed(name.as_str()))
} else {
Some(std::borrow::Cow::Owned(format!("builtin:{name}")))
}
}
}
}
fn runtime_profile<'a>(
&'a self,
request: &'a WorkerExecutionSpawnRequest,
) -> Option<std::borrow::Cow<'a, str>> {
if let Some(profile) = self.profile.as_deref() {
return Some(std::borrow::Cow::Borrowed(profile));
}
Self::runtime_profile_value(&request.request.profile)
}
}
#[async_trait]
impl RuntimeWorkerFactory for ProfileRuntimeWorkerFactory {
async fn spawn_controller(
&self,
request: WorkerExecutionSpawnRequest,
) -> Result<WorkerHandle, String> {
let worker_name = Self::runtime_worker_name(&request);
let profile = self.runtime_profile(&request);
let (mut manifest, loader) = crate::entrypoint::resolve_runtime_profile_manifest(
profile.as_deref(),
&self.workspace_root,
&worker_name,
)?;
manifest.worker.name = worker_name;
let store_dir = self.store_dir()?;
let session_store = FsStore::new(&store_dir).map_err(|err| {
format!(
"failed to initialize session store at {}: {err}",
store_dir.display()
)
})?;
let pod_store_dir = self.pod_store_dir(&store_dir);
let pod_store = FsWorkerStore::new(&pod_store_dir).map_err(|err| {
format!(
"failed to initialize worker metadata store at {}: {err}",
pod_store_dir.display()
)
})?;
let store = CombinedStore::new(session_store, pod_store);
let worker = Worker::from_manifest_with_context(
manifest,
store,
loader,
self.workspace_root.clone(),
self.cwd.clone(),
)
.await
.map_err(|err| format!("failed to create Worker from profile: {err}"))?;
let runtime_base = self.runtime_base_dir()?;
let (handle, _shutdown_rx) = WorkerController::spawn(worker, &runtime_base)
.await
.map_err(|err| format!("failed to spawn Worker controller: {err}"))?;
Ok(handle)
}
}
struct RuntimeWorkerExecution {
handle: WorkerHandle,
busy: Arc<AtomicBool>,
}
/// `worker-runtime` execution backend backed by real `worker` crate Workers.
pub struct WorkerRuntimeExecutionBackend<F = ProfileRuntimeWorkerFactory> {
backend_id: String,
factory: Arc<F>,
runtime: Mutex<Option<Runtime>>,
workers: Mutex<HashMap<worker_runtime::identity::WorkerRef, RuntimeWorkerExecution>>,
}
impl WorkerRuntimeExecutionBackend<ProfileRuntimeWorkerFactory> {
pub fn from_workspace(workspace_root: impl Into<PathBuf>) -> Result<Self, String> {
Self::new(ProfileRuntimeWorkerFactory::new(workspace_root))
}
}
impl<F> WorkerRuntimeExecutionBackend<F>
where
F: RuntimeWorkerFactory,
{
pub fn new(factory: F) -> Result<Self, String> {
let runtime = tokio::runtime::Builder::new_multi_thread()
.thread_name("yoi-runtime-worker-adapter")
.enable_all()
.build()
.map_err(|err| format!("failed to build worker adapter runtime: {err}"))?;
Ok(Self {
backend_id: DEFAULT_BACKEND_ID.to_string(),
factory: Arc::new(factory),
runtime: Mutex::new(Some(runtime)),
workers: Mutex::new(HashMap::new()),
})
}
pub fn with_backend_id(mut self, backend_id: impl Into<String>) -> Self {
self.backend_id = backend_id.into();
self
}
fn wait_for_runtime_task<T>(receiver: mpsc::Receiver<Result<T, String>>) -> Result<T, String> {
receiver
.recv_timeout(RUNTIME_TASK_TIMEOUT)
.map_err(|err| format!("worker adapter task did not complete: {err}"))?
}
fn spawn_on_adapter_runtime<Fut>(&self, task: Fut) -> Result<(), String>
where
Fut: Future<Output = ()> + Send + 'static,
{
let runtime = self
.runtime
.lock()
.map_err(|_| "worker adapter runtime lock is poisoned".to_string())?;
let runtime = runtime
.as_ref()
.ok_or_else(|| "worker adapter runtime is shutting down".to_string())?;
runtime.spawn(task);
Ok(())
}
fn run_on_adapter_runtime<T, Fut>(&self, task: Fut) -> Result<T, String>
where
T: Send + 'static,
Fut: Future<Output = Result<T, String>> + Send + 'static,
{
let (tx, rx) = mpsc::sync_channel(1);
self.spawn_on_adapter_runtime(async move {
let _ = tx.send(task.await);
})?;
Self::wait_for_runtime_task(rx)
}
fn get_execution(
&self,
handle: &WorkerExecutionHandle,
) -> Result<(WorkerHandle, Arc<AtomicBool>), WorkerExecutionResult> {
if handle.backend_id() != self.backend_id() {
return Err(WorkerExecutionResult::rejected(
WorkerExecutionOperation::Input,
format!(
"execution handle belongs to backend {}, not {}",
handle.backend_id(),
self.backend_id()
),
));
}
let workers = self.workers.lock().map_err(|_| {
WorkerExecutionResult::errored(
WorkerExecutionOperation::Input,
"worker adapter registry lock is poisoned",
)
})?;
workers
.get(handle.worker_ref())
.map(|execution| (execution.handle.clone(), execution.busy.clone()))
.ok_or_else(|| {
WorkerExecutionResult::rejected(
WorkerExecutionOperation::Input,
"execution handle does not reference a live Worker",
)
})
}
fn send_method(
&self,
operation: WorkerExecutionOperation,
worker: WorkerHandle,
method: Method,
accepted_run_state: WorkerExecutionRunState,
) -> WorkerExecutionResult {
self.run_on_adapter_runtime(async move {
worker
.send(method)
.await
.map_err(|err| format!("failed to send Worker method: {err}"))
})
.map(|_| WorkerExecutionResult::accepted(operation, accepted_run_state))
.unwrap_or_else(|message| WorkerExecutionResult::errored(operation, message))
}
}
impl<F> Drop for WorkerRuntimeExecutionBackend<F> {
fn drop(&mut self) {
if let Ok(mut runtime) = self.runtime.lock()
&& let Some(runtime) = runtime.take()
{
let _ = std::thread::spawn(move || drop(runtime)).join();
}
}
}
impl<F> WorkerExecutionBackend for WorkerRuntimeExecutionBackend<F>
where
F: RuntimeWorkerFactory,
{
fn backend_id(&self) -> &str {
&self.backend_id
}
fn spawn_worker(&self, request: WorkerExecutionSpawnRequest) -> WorkerExecutionSpawnResult {
if self
.workers
.lock()
.map(|workers| workers.contains_key(&request.worker_ref))
.unwrap_or(false)
{
return WorkerExecutionSpawnResult::Rejected(WorkerExecutionResult::busy(
WorkerExecutionOperation::Spawn,
"Worker is already connected to execution backend",
));
}
let factory = self.factory.clone();
let bridge_context = request.context.clone();
let worker_ref = request.worker_ref.clone();
let spawn_result =
self.run_on_adapter_runtime(async move { factory.spawn_controller(request).await });
let handle = match spawn_result {
Ok(handle) => handle,
Err(message) => {
return WorkerExecutionSpawnResult::Errored(WorkerExecutionResult::errored(
WorkerExecutionOperation::Spawn,
message,
));
}
};
let mut events = handle.subscribe();
let bridge_handle = handle.clone();
let busy = Arc::new(AtomicBool::new(false));
let bridge_busy = busy.clone();
if let Err(message) = self.spawn_on_adapter_runtime(async move {
loop {
match events.recv().await {
Ok(event) => {
let _ = bridge_context.publish_protocol_event(event);
if bridge_handle.shared_state.get_status() == WorkerStatus::Idle {
bridge_busy.store(false, Ordering::SeqCst);
}
}
Err(broadcast::error::RecvError::Lagged(_)) => continue,
Err(broadcast::error::RecvError::Closed) => break,
}
}
}) {
return WorkerExecutionSpawnResult::Errored(WorkerExecutionResult::errored(
WorkerExecutionOperation::Spawn,
message,
));
}
let mut workers = match self.workers.lock() {
Ok(workers) => workers,
Err(_) => {
return WorkerExecutionSpawnResult::Errored(WorkerExecutionResult::errored(
WorkerExecutionOperation::Spawn,
"worker adapter registry lock is poisoned",
));
}
};
workers.insert(worker_ref.clone(), RuntimeWorkerExecution { handle, busy });
WorkerExecutionSpawnResult::Connected {
handle: WorkerExecutionHandle::new(worker_ref, self.backend_id()),
run_state: WorkerExecutionRunState::Idle,
}
}
fn dispatch_input(
&self,
handle: &WorkerExecutionHandle,
input: WorkerInput,
) -> WorkerExecutionResult {
let (worker, busy) = match self.get_execution(handle) {
Ok(execution) => execution,
Err(mut result) => {
result.operation = WorkerExecutionOperation::Input;
return result;
}
};
if worker.shared_state.get_status() != WorkerStatus::Idle
|| busy
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.is_err()
{
return WorkerExecutionResult::busy(
WorkerExecutionOperation::Input,
"Worker is already running; runtime adapter v0 does not queue input",
);
}
let WorkerInputKind::User = input.kind else {
busy.store(false, Ordering::SeqCst);
return WorkerExecutionResult::unsupported(
WorkerExecutionOperation::Input,
"runtime adapter currently dispatches user input only",
);
};
let content = input.content.trim().to_string();
if content.is_empty() {
busy.store(false, Ordering::SeqCst);
return WorkerExecutionResult::rejected(
WorkerExecutionOperation::Input,
"runtime adapter rejects empty user input",
);
}
let result = self.send_method(
WorkerExecutionOperation::Input,
worker,
Method::Run {
input: vec![Segment::text(content)],
},
WorkerExecutionRunState::Busy,
);
if result.outcome != worker_runtime::execution::WorkerExecutionOutcome::Accepted {
busy.store(false, Ordering::SeqCst);
}
result
}
fn stop_worker(&self, handle: &WorkerExecutionHandle) -> WorkerExecutionResult {
let (worker, _busy) = match self.get_execution(handle) {
Ok(execution) => execution,
Err(mut result) => {
result.operation = WorkerExecutionOperation::Stop;
return result;
}
};
self.send_method(
WorkerExecutionOperation::Stop,
worker,
Method::Shutdown,
WorkerExecutionRunState::Stopped,
)
}
fn cancel_worker(&self, handle: &WorkerExecutionHandle) -> WorkerExecutionResult {
let (worker, _busy) = match self.get_execution(handle) {
Ok(execution) => execution,
Err(mut result) => {
result.operation = WorkerExecutionOperation::Cancel;
return result;
}
};
self.send_method(
WorkerExecutionOperation::Cancel,
worker,
Method::Cancel,
WorkerExecutionRunState::Idle,
)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use async_trait::async_trait;
use futures::Stream;
use llm_engine::Engine;
use llm_engine::llm_client::event::{Event as LlmEvent, ResponseStatus, StatusEvent};
use llm_engine::llm_client::{ClientError, LlmClient, Request};
use manifest::{Scope, WorkerManifest};
use worker_runtime::Runtime as EmbeddedRuntime;
use worker_runtime::catalog::{CreateWorkerRequest, ProfileSelector, WorkerIntent};
use worker_runtime::identity::RuntimeId;
use worker_runtime::management::RuntimeOptions;
use worker_runtime::observation::{TranscriptQuery, TranscriptRole};
#[derive(Clone)]
struct MockClient {
responses: Arc<Vec<Vec<LlmEvent>>>,
call_count: Arc<AtomicUsize>,
captured: Arc<Mutex<Vec<Request>>>,
}
impl MockClient {
fn new(events: Vec<LlmEvent>) -> Self {
Self {
responses: Arc::new(vec![events]),
call_count: Arc::new(AtomicUsize::new(0)),
captured: Arc::new(Mutex::new(Vec::new())),
}
}
}
#[async_trait]
impl LlmClient for MockClient {
fn clone_boxed(&self) -> Box<dyn LlmClient> {
Box::new(self.clone())
}
async fn stream(
&self,
request: Request,
) -> Result<Pin<Box<dyn Stream<Item = Result<LlmEvent, ClientError>> + Send>>, ClientError>
{
self.captured.lock().unwrap().push(request);
let idx = self.call_count.fetch_add(1, Ordering::SeqCst);
let events = self.responses.get(idx).cloned().unwrap_or_default();
Ok(Box::pin(futures::stream::iter(events.into_iter().map(Ok))))
}
}
struct MockFactory {
client: MockClient,
runtime_base: PathBuf,
cwd: PathBuf,
store_dir: PathBuf,
pod_store_dir: PathBuf,
}
#[async_trait]
impl RuntimeWorkerFactory for MockFactory {
async fn spawn_controller(
&self,
_request: WorkerExecutionSpawnRequest,
) -> Result<WorkerHandle, String> {
let manifest = WorkerManifest::from_toml(
r#"
[worker]
name = "runtime-adapter-test"
pwd = "./"
[model]
scheme = "anthropic"
model_id = "test-model"
[engine]
max_tokens = 100
[[scope.allow]]
target = "./"
permission = "write"
"#,
)
.map_err(|err| err.to_string())?;
let store = CombinedStore::new(
FsStore::new(&self.store_dir).map_err(|err| err.to_string())?,
FsWorkerStore::new(&self.pod_store_dir).map_err(|err| err.to_string())?,
);
let scope = Scope::writable(&self.cwd).map_err(|err| err.to_string())?;
let worker = Worker::new(
manifest,
Engine::new(self.client.clone()),
store,
self.cwd.clone(),
scope,
)
.await
.map_err(|err| err.to_string())?;
let (handle, _shutdown_rx) = WorkerController::spawn(worker, &self.runtime_base)
.await
.map_err(|err| err.to_string())?;
Ok(handle)
}
}
fn simple_text_events() -> Vec<LlmEvent> {
vec![
LlmEvent::text_block_start(0),
LlmEvent::text_delta(0, "hello"),
LlmEvent::text_delta(0, " from worker"),
LlmEvent::text_block_stop(0, None),
LlmEvent::Status(StatusEvent {
status: ResponseStatus::Completed,
}),
]
}
fn create_request(name: &str) -> CreateWorkerRequest {
CreateWorkerRequest {
intent: WorkerIntent::Role {
role: name.to_string(),
purpose: None,
},
profile: ProfileSelector::RuntimeDefault,
config_bundle: None,
requested_capabilities: Vec::new(),
workspace_refs: Vec::new(),
mount_refs: Vec::new(),
}
}
#[test]
fn builtin_profile_selector_is_not_double_prefixed() {
assert_eq!(
ProfileRuntimeWorkerFactory::runtime_profile_value(
&worker_runtime::catalog::ProfileSelector::Builtin("coder".to_string())
)
.as_deref(),
Some("builtin:coder")
);
assert_eq!(
ProfileRuntimeWorkerFactory::runtime_profile_value(
&worker_runtime::catalog::ProfileSelector::Builtin("builtin:coder".to_string())
)
.as_deref(),
Some("builtin:coder")
);
}
#[test]
fn adapter_dispatches_user_input_through_worker_run_lifecycle() {
let client = MockClient::new(simple_text_events());
let runtime_base = tempfile::tempdir().unwrap();
let cwd = tempfile::tempdir().unwrap();
let store = tempfile::tempdir().unwrap();
let factory = MockFactory {
client: client.clone(),
runtime_base: runtime_base.path().to_path_buf(),
cwd: cwd.path().to_path_buf(),
store_dir: store.path().join("sessions"),
pod_store_dir: store.path().join("pods"),
};
let backend = WorkerRuntimeExecutionBackend::new(factory).unwrap();
let runtime = EmbeddedRuntime::with_execution_backend(
RuntimeOptions {
runtime_id: RuntimeId::new("embedded"),
..RuntimeOptions::default()
},
Arc::new(backend),
)
.unwrap();
let detail = runtime.create_worker(create_request("chat")).unwrap();
runtime
.send_input(&detail.worker_ref, WorkerInput::user("say hello"))
.unwrap();
let deadline = std::time::Instant::now() + Duration::from_secs(5);
loop {
let projection = runtime
.transcript_projection(&detail.worker_ref, TranscriptQuery::new(0, 10))
.unwrap();
if projection.items.iter().any(|item| {
item.role == TranscriptRole::Assistant && item.content == "hello from worker"
}) {
break;
}
assert!(
std::time::Instant::now() < deadline,
"timed out waiting for assistant transcript projection"
);
std::thread::sleep(Duration::from_millis(20));
}
assert_eq!(client.captured.lock().unwrap().len(), 1);
let observations = runtime
.read_worker_observation_events(
&detail.worker_ref,
worker_runtime::observation::WorkerObservationCursor::zero(),
)
.unwrap();
assert!(
observations
.iter()
.any(|event| matches!(event.payload, protocol::Event::TextDone { .. }))
);
}
}

View File

@ -22,6 +22,7 @@ thiserror.workspace = true
ticket.workspace = true
tokio = { workspace = true, features = ["fs", "macros", "net", "rt-multi-thread", "sync"] }
tokio-tungstenite.workspace = true
worker = { workspace = true, features = ["runtime-adapter"] }
worker-runtime = { workspace = true, features = ["ws-server"] }
toml.workspace = true
tracing.workspace = true

View File

@ -13,6 +13,7 @@ use worker_runtime::catalog::{
};
use worker_runtime::config_bundle::{ConfigBundle, ConfigBundleAvailability, ConfigBundleSummary};
use worker_runtime::error::RuntimeError as EmbeddedRuntimeError;
use worker_runtime::execution::WorkerExecutionRunState;
use worker_runtime::http_server::{
RuntimeHttpConfigBundleAvailabilityResponse, RuntimeHttpConfigBundleSyncRequest,
RuntimeHttpErrorResponse, RuntimeHttpSummaryResponse, RuntimeHttpTranscriptResponse,
@ -903,6 +904,7 @@ pub struct EmbeddedWorkerRuntime {
runtime_id: String,
host_id: String,
runtime: worker_runtime::Runtime,
execution_enabled: bool,
}
impl EmbeddedWorkerRuntime {
@ -917,6 +919,25 @@ impl EmbeddedWorkerRuntime {
Self::from_runtime(workspace_id, runtime)
}
pub fn new_memory_with_execution_backend(
workspace_id: impl AsRef<str>,
backend: std::sync::Arc<dyn worker_runtime::execution::WorkerExecutionBackend>,
) -> Result<Self, worker_runtime::error::RuntimeError> {
let runtime_id = EmbeddedRuntimeId::new(EMBEDDED_RUNTIME_ID)
.expect("embedded runtime id is a non-empty literal");
let runtime = worker_runtime::Runtime::with_execution_backend(
EmbeddedRuntimeOptions {
runtime_id: Some(runtime_id),
display_name: Some("Workspace backend embedded Runtime".to_string()),
..EmbeddedRuntimeOptions::default()
},
backend,
)?;
let mut embedded = Self::from_runtime(workspace_id, runtime);
embedded.execution_enabled = true;
Ok(embedded)
}
pub fn from_runtime(workspace_id: impl AsRef<str>, runtime: worker_runtime::Runtime) -> Self {
let runtime_id = runtime
.runtime_id()
@ -927,6 +948,7 @@ impl EmbeddedWorkerRuntime {
runtime_id,
host_id: host_id_for_embedded_workspace(workspace_id.as_ref()),
runtime,
execution_enabled: false,
}
}
@ -937,6 +959,35 @@ impl EmbeddedWorkerRuntime {
))
}
fn can_accept_embedded_input(
&self,
status: EmbeddedWorkerStatus,
execution: &worker_runtime::execution::WorkerExecutionStatus,
) -> bool {
self.execution_enabled
&& status == EmbeddedWorkerStatus::Running
&& execution.backend == worker_runtime::execution::WorkerExecutionBackendKind::Connected
&& execution.run_state == WorkerExecutionRunState::Idle
&& !execution_last_result_blocks_control(execution)
}
fn can_stop_embedded_worker(
&self,
status: EmbeddedWorkerStatus,
execution: &worker_runtime::execution::WorkerExecutionStatus,
) -> bool {
self.execution_enabled
&& status == EmbeddedWorkerStatus::Running
&& execution.backend == worker_runtime::execution::WorkerExecutionBackendKind::Connected
&& !matches!(
execution.run_state,
WorkerExecutionRunState::Rejected
| WorkerExecutionRunState::Errored
| WorkerExecutionRunState::Unconnected
)
&& !execution_last_result_blocks_control(execution)
}
fn map_worker_summary(&self, summary: worker_runtime::catalog::WorkerSummary) -> WorkerSummary {
WorkerSummary {
runtime_id: self.runtime_id.clone(),
@ -950,15 +1001,16 @@ impl EmbeddedWorkerRuntime {
identity: "runtime_registry_worker".to_string(),
},
state: embedded_worker_status_label(summary.status).to_string(),
status: embedded_worker_status_label(summary.status).to_string(),
status: embedded_worker_execution_status_label(summary.status, summary.execution.run_state)
.to_string(),
last_seen_at: None,
implementation: WorkerImplementationSummary {
kind: "embedded_worker_runtime".to_string(),
display_hint: "backend-internal worker-runtime Worker".to_string(),
},
capabilities: WorkerCapabilitySummary {
can_accept_input: false,
can_stop: false,
can_accept_input: self.can_accept_embedded_input(summary.status, &summary.execution),
can_stop: self.can_stop_embedded_worker(summary.status, &summary.execution),
can_spawn_followup: false,
},
diagnostics: vec![diagnostic(
@ -982,15 +1034,16 @@ impl EmbeddedWorkerRuntime {
identity: "runtime_registry_worker".to_string(),
},
state: embedded_worker_status_label(detail.status).to_string(),
status: embedded_worker_status_label(detail.status).to_string(),
status: embedded_worker_execution_status_label(detail.status, detail.execution.run_state)
.to_string(),
last_seen_at: None,
implementation: WorkerImplementationSummary {
kind: "embedded_worker_runtime".to_string(),
display_hint: "backend-internal worker-runtime Worker".to_string(),
},
capabilities: WorkerCapabilitySummary {
can_accept_input: false,
can_stop: false,
can_accept_input: self.can_accept_embedded_input(detail.status, &detail.execution),
can_stop: self.can_stop_embedded_worker(detail.status, &detail.execution),
can_spawn_followup: false,
},
diagnostics: vec![diagnostic(
@ -1020,7 +1073,7 @@ impl WorkspaceWorkerRuntime for EmbeddedWorkerRuntime {
status: "unavailable".to_string(),
source: RuntimeSourceSummary::embedded_worker_runtime(),
host_ids: Vec::new(),
capabilities: embedded_runtime_capabilities(limit, false),
capabilities: embedded_runtime_capabilities(limit, false, false),
diagnostics,
};
}
@ -1040,7 +1093,7 @@ impl WorkspaceWorkerRuntime for EmbeddedWorkerRuntime {
} else {
vec![self.host_id.clone()]
},
capabilities: embedded_runtime_capabilities(limit, true),
capabilities: embedded_runtime_capabilities(limit, true, self.execution_enabled),
diagnostics,
}
}
@ -1058,7 +1111,7 @@ impl WorkspaceWorkerRuntime for EmbeddedWorkerRuntime {
status: "available".to_string(),
observed_at: Utc::now().to_rfc3339(),
last_seen_at: None,
capabilities: embedded_runtime_capabilities(limit, true),
capabilities: embedded_runtime_capabilities(limit, true, self.execution_enabled),
diagnostics: vec![diagnostic(
"embedded_runtime_host_boundary",
DiagnosticSeverity::Info,
@ -1164,24 +1217,38 @@ impl WorkspaceWorkerRuntime for EmbeddedWorkerRuntime {
mount_refs: Vec::new(),
};
match self.runtime.create_worker(create_request) {
Ok(detail) => WorkerSpawnResult {
Ok(detail) => {
let execution_failure =
embedded_spawn_execution_failure_diagnostic(&detail.execution);
if let Some(diagnostic) = execution_failure {
diagnostics.push(diagnostic);
WorkerSpawnResult {
state: WorkerOperationState::Rejected,
worker: Some(self.map_worker_detail(detail)),
acceptance_evidence: Vec::new(),
diagnostics,
}
} else {
WorkerSpawnResult {
state: WorkerOperationState::Accepted,
worker: Some(self.map_worker_detail(detail)),
acceptance_evidence: vec![
WorkerSpawnAcceptanceEvidence {
kind: "embedded_runtime_worker_created".to_string(),
detail:
"worker-runtime catalog accepted a backend-internal tools-less Worker"
detail: "worker-runtime catalog accepted a backend-internal tools-less Worker"
.to_string(),
},
WorkerSpawnAcceptanceEvidence {
kind: "embedded_runtime_backend_internal_projection".to_string(),
detail: "only runtime_id plus worker_id backend projections were exposed"
detail:
"only runtime_id plus worker_id backend projections were exposed"
.to_string(),
},
],
diagnostics,
},
}
}
}
Err(err) => {
diagnostics.push(embedded_runtime_diagnostic(&err));
WorkerSpawnResult {
@ -1237,6 +1304,94 @@ impl WorkspaceWorkerRuntime for EmbeddedWorkerRuntime {
}
}
fn stop_worker(
&self,
worker_id: &str,
request: WorkerLifecycleRequest,
) -> WorkerLifecycleResult {
if !self.execution_enabled {
return embedded_lifecycle_rejected(
&self.runtime_id,
worker_id,
diagnostic(
"embedded_worker_execution_unavailable",
DiagnosticSeverity::Info,
format!("worker stop for '{worker_id}' requires an embedded execution backend"),
),
);
}
let Some(worker_ref) = self.worker_ref(worker_id) else {
return embedded_lifecycle_rejected(
&self.runtime_id,
worker_id,
diagnostic(
"embedded_worker_id_invalid",
DiagnosticSeverity::Warning,
"Worker id was empty and cannot be resolved".to_string(),
),
);
};
match self.runtime.stop_worker(&worker_ref, request.reason) {
Ok(ack) => WorkerLifecycleResult {
state: WorkerOperationState::Accepted,
runtime_id: self.runtime_id.clone(),
worker_id: worker_id.to_string(),
event_id: Some(ack.event_id),
diagnostics: Vec::new(),
},
Err(error) => embedded_lifecycle_rejected(
&self.runtime_id,
worker_id,
embedded_runtime_diagnostic(&error),
),
}
}
fn cancel_worker(
&self,
worker_id: &str,
request: WorkerLifecycleRequest,
) -> WorkerLifecycleResult {
if !self.execution_enabled {
return embedded_lifecycle_rejected(
&self.runtime_id,
worker_id,
diagnostic(
"embedded_worker_execution_unavailable",
DiagnosticSeverity::Info,
format!(
"worker cancel for '{worker_id}' requires an embedded execution backend"
),
),
);
}
let Some(worker_ref) = self.worker_ref(worker_id) else {
return embedded_lifecycle_rejected(
&self.runtime_id,
worker_id,
diagnostic(
"embedded_worker_id_invalid",
DiagnosticSeverity::Warning,
"Worker id was empty and cannot be resolved".to_string(),
),
);
};
match self.runtime.cancel_worker(&worker_ref, request.reason) {
Ok(ack) => WorkerLifecycleResult {
state: WorkerOperationState::Accepted,
runtime_id: self.runtime_id.clone(),
worker_id: worker_id.to_string(),
event_id: Some(ack.event_id),
diagnostics: Vec::new(),
},
Err(error) => embedded_lifecycle_rejected(
&self.runtime_id,
worker_id,
embedded_runtime_diagnostic(&error),
),
}
}
fn observation_source(
&self,
worker_id: &str,
@ -1255,17 +1410,53 @@ impl WorkspaceWorkerRuntime for EmbeddedWorkerRuntime {
))
}
fn send_input(&self, worker_id: &str, _request: WorkerInputRequest) -> WorkerInputResult {
embedded_input_rejected(
fn send_input(&self, worker_id: &str, request: WorkerInputRequest) -> WorkerInputResult {
if !self.execution_enabled {
return embedded_input_rejected(
&self.runtime_id,
worker_id,
diagnostic(
"embedded_worker_execution_unavailable",
DiagnosticSeverity::Error,
"Embedded Worker input is disabled until an execution backend is connected"
.to_string(),
DiagnosticSeverity::Info,
format!(
"worker input for '{worker_id}' requires an embedded execution backend"
),
)
),
);
}
let Some(worker_ref) = self.worker_ref(worker_id) else {
return embedded_input_rejected(
&self.runtime_id,
worker_id,
diagnostic(
"embedded_worker_id_invalid",
DiagnosticSeverity::Warning,
"Worker id was empty and cannot be resolved".to_string(),
),
);
};
let input = EmbeddedWorkerInput {
kind: match request.kind {
WorkerInputKind::User => EmbeddedWorkerInputKind::User,
WorkerInputKind::System => EmbeddedWorkerInputKind::System,
},
content: request.content,
};
match self.runtime.send_input(&worker_ref, input) {
Ok(ack) => WorkerInputResult {
state: WorkerOperationState::Accepted,
runtime_id: self.runtime_id.clone(),
worker_id: worker_id.to_string(),
transcript_sequence: Some(ack.transcript_sequence),
event_id: Some(ack.event_id),
diagnostics: Vec::new(),
},
Err(error) => embedded_input_rejected(
&self.runtime_id,
worker_id,
embedded_runtime_diagnostic(&error),
),
}
}
fn transcript(
@ -1848,14 +2039,18 @@ impl WorkspaceWorkerRuntime for RemoteWorkerRuntime {
}
}
fn embedded_runtime_capabilities(limit: usize, available: bool) -> RuntimeCapabilitySummary {
fn embedded_runtime_capabilities(
limit: usize,
available: bool,
execution_enabled: bool,
) -> RuntimeCapabilitySummary {
RuntimeCapabilitySummary {
can_list_hosts: true,
can_list_workers: available,
can_get_worker: available,
can_spawn_worker: available,
can_stop_worker: false,
can_accept_input: false,
can_stop_worker: available && execution_enabled,
can_accept_input: available && execution_enabled,
has_workspace_fs: false,
has_shell: false,
has_git: false,
@ -1875,6 +2070,48 @@ fn embedded_runtime_status_label(status: RuntimeStatus) -> &'static str {
}
}
fn embedded_spawn_execution_failure_diagnostic(
execution: &worker_runtime::execution::WorkerExecutionStatus,
) -> Option<RuntimeDiagnostic> {
let result = execution.last_result.as_ref()?;
let severity = match result.outcome {
worker_runtime::execution::WorkerExecutionOutcome::Accepted => return None,
worker_runtime::execution::WorkerExecutionOutcome::Rejected
| worker_runtime::execution::WorkerExecutionOutcome::Busy
| worker_runtime::execution::WorkerExecutionOutcome::Unsupported => {
DiagnosticSeverity::Warning
}
worker_runtime::execution::WorkerExecutionOutcome::Errored => DiagnosticSeverity::Error,
};
let status = match result.outcome {
worker_runtime::execution::WorkerExecutionOutcome::Accepted => "accepted",
worker_runtime::execution::WorkerExecutionOutcome::Rejected => "rejected",
worker_runtime::execution::WorkerExecutionOutcome::Busy => "busy",
worker_runtime::execution::WorkerExecutionOutcome::Unsupported => "unsupported",
worker_runtime::execution::WorkerExecutionOutcome::Errored => "errored",
};
Some(diagnostic(
format!("embedded_worker_execution_spawn_{status}"),
severity,
format!(
"Embedded Worker execution spawn was {status} during setup; check runtime configuration"
),
))
}
fn execution_last_result_blocks_control(
execution: &worker_runtime::execution::WorkerExecutionStatus,
) -> bool {
execution.last_result.as_ref().is_some_and(|result| {
matches!(
result.outcome,
worker_runtime::execution::WorkerExecutionOutcome::Rejected
| worker_runtime::execution::WorkerExecutionOutcome::Errored
| worker_runtime::execution::WorkerExecutionOutcome::Unsupported
)
})
}
fn embedded_worker_status_label(status: EmbeddedWorkerStatus) -> &'static str {
match status {
EmbeddedWorkerStatus::Running => "running",
@ -1883,6 +2120,24 @@ fn embedded_worker_status_label(status: EmbeddedWorkerStatus) -> &'static str {
}
}
fn embedded_worker_execution_status_label(
status: EmbeddedWorkerStatus,
run_state: WorkerExecutionRunState,
) -> &'static str {
match status {
EmbeddedWorkerStatus::Stopped => "stopped",
EmbeddedWorkerStatus::Cancelled => "cancelled",
EmbeddedWorkerStatus::Running => match run_state {
WorkerExecutionRunState::Idle => "idle",
WorkerExecutionRunState::Busy => "running",
WorkerExecutionRunState::Stopped => "stopped",
WorkerExecutionRunState::Rejected => "rejected",
WorkerExecutionRunState::Errored => "errored",
WorkerExecutionRunState::Unconnected => "unconnected",
},
}
}
fn embedded_create_intent(intent: &WorkerSpawnIntent) -> WorkerIntent {
match intent {
WorkerSpawnIntent::WorkspaceCompanion => WorkerIntent::Role {
@ -1967,6 +2222,20 @@ fn remote_input_rejected(
}
}
fn embedded_lifecycle_rejected(
runtime_id: &str,
worker_id: &str,
diagnostic: RuntimeDiagnostic,
) -> WorkerLifecycleResult {
WorkerLifecycleResult {
state: WorkerOperationState::Rejected,
runtime_id: runtime_id.to_string(),
worker_id: worker_id.to_string(),
event_id: None,
diagnostics: vec![diagnostic],
}
}
fn remote_lifecycle_rejected(
runtime_id: &str,
worker_id: &str,
@ -2377,9 +2646,10 @@ pub fn placeholder_spawn_response(host_id: impl Into<String>) -> WorkerSpawnResu
mod tests {
use super::*;
use serde_json::json;
use std::collections::HashMap;
use std::io::{Read as _, Write as _};
use std::net::TcpListener;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::thread;
fn test_config_bundle() -> ConfigBundle {
@ -2408,6 +2678,105 @@ mod tests {
.with_computed_digest()
}
struct FailingSpawnBackend;
impl worker_runtime::execution::WorkerExecutionBackend for FailingSpawnBackend {
fn backend_id(&self) -> &str {
"workspace-server-failing-spawn-backend"
}
fn spawn_worker(
&self,
_request: worker_runtime::execution::WorkerExecutionSpawnRequest,
) -> worker_runtime::execution::WorkerExecutionSpawnResult {
worker_runtime::execution::WorkerExecutionSpawnResult::Errored(
worker_runtime::execution::WorkerExecutionResult::errored(
worker_runtime::execution::WorkerExecutionOperation::Spawn,
"provider setup failed at /tmp/secret-provider-config",
),
)
}
fn dispatch_input(
&self,
_handle: &worker_runtime::execution::WorkerExecutionHandle,
_input: EmbeddedWorkerInput,
) -> worker_runtime::execution::WorkerExecutionResult {
worker_runtime::execution::WorkerExecutionResult::rejected(
worker_runtime::execution::WorkerExecutionOperation::Input,
"spawn failed before input could be dispatched",
)
}
}
#[derive(Default)]
struct AcceptingExecutionBackend {
contexts:
Mutex<HashMap<EmbeddedWorkerRef, worker_runtime::execution::WorkerExecutionContext>>,
}
impl worker_runtime::execution::WorkerExecutionBackend for AcceptingExecutionBackend {
fn backend_id(&self) -> &str {
"workspace-server-test-backend"
}
fn spawn_worker(
&self,
request: worker_runtime::execution::WorkerExecutionSpawnRequest,
) -> worker_runtime::execution::WorkerExecutionSpawnResult {
self.contexts
.lock()
.unwrap()
.insert(request.worker_ref.clone(), request.context);
worker_runtime::execution::WorkerExecutionSpawnResult::Connected {
handle: worker_runtime::execution::WorkerExecutionHandle::new(
request.worker_ref,
self.backend_id(),
),
run_state: WorkerExecutionRunState::Idle,
}
}
fn dispatch_input(
&self,
handle: &worker_runtime::execution::WorkerExecutionHandle,
input: EmbeddedWorkerInput,
) -> worker_runtime::execution::WorkerExecutionResult {
let context = self
.contexts
.lock()
.unwrap()
.get(handle.worker_ref())
.cloned();
let Some(context) = context else {
return worker_runtime::execution::WorkerExecutionResult::rejected(
worker_runtime::execution::WorkerExecutionOperation::Input,
"missing test context",
);
};
let content = input.content;
std::thread::spawn(move || {
std::thread::sleep(std::time::Duration::from_millis(10));
let _ = context.publish_protocol_event(protocol::Event::Status {
status: protocol::WorkerStatus::Running,
});
let _ = context.publish_protocol_event(protocol::Event::TextDone {
text: format!("echo: {content}"),
});
let _ = context.publish_protocol_event(protocol::Event::RunEnd {
result: protocol::RunResult::Finished,
});
let _ = context.publish_protocol_event(protocol::Event::Status {
status: protocol::WorkerStatus::Idle,
});
});
worker_runtime::execution::WorkerExecutionResult::accepted(
worker_runtime::execution::WorkerExecutionOperation::Input,
WorkerExecutionRunState::Busy,
)
}
}
#[derive(Clone)]
struct FixtureRuntime {
runtime_id: String,
@ -2581,6 +2950,88 @@ mod tests {
));
}
fn embedded_spawn_request() -> WorkerSpawnRequest {
WorkerSpawnRequest {
intent: WorkerSpawnIntent::TicketRole {
ticket_id: "00001KVZSGT0Q".to_string(),
role: TicketWorkerRole::Coder,
},
requested_worker_name: None,
acceptance: WorkerSpawnAcceptanceRequirement::RunAccepted {
expected_segments: 0,
},
profile: None,
config_bundle: None,
requested_capabilities: Vec::new(),
}
}
#[test]
fn embedded_runtime_spawn_execution_failure_is_rejected_and_not_input_capable() {
let runtime = EmbeddedWorkerRuntime::new_memory_with_execution_backend(
"local:test",
Arc::new(FailingSpawnBackend),
)
.expect("test backend should connect");
let spawned = runtime.spawn_worker(embedded_spawn_request());
assert_eq!(spawned.state, WorkerOperationState::Rejected);
assert!(spawned.acceptance_evidence.is_empty());
assert!(spawned.diagnostics.iter().any(|diagnostic| {
diagnostic.code == "embedded_worker_execution_spawn_errored"
&& !diagnostic.message.contains("/tmp/secret-provider-config")
}));
let worker = spawned.worker.expect("failed execution is still projected");
assert_eq!(worker.status, "errored");
assert!(!worker.capabilities.can_accept_input);
assert!(!worker.capabilities.can_stop);
}
#[test]
fn embedded_runtime_with_execution_backend_routes_input_and_projects_transcript() {
let runtime = EmbeddedWorkerRuntime::new_memory_with_execution_backend(
"local:test",
Arc::new(AcceptingExecutionBackend::default()),
)
.expect("test backend should connect");
let spawned = runtime.spawn_worker(embedded_spawn_request());
assert_eq!(spawned.state, WorkerOperationState::Accepted);
let worker = spawned.worker.expect("created embedded worker");
assert!(worker.capabilities.can_accept_input);
assert!(worker.capabilities.can_stop);
let input = runtime.send_input(
&worker.worker_id,
WorkerInputRequest {
kind: WorkerInputKind::User,
content: "hello".to_string(),
},
);
assert_eq!(input.state, WorkerOperationState::Accepted);
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2);
loop {
let detail = runtime
.worker(&worker.worker_id)
.worker
.expect("worker detail");
let transcript = runtime.transcript(&worker.worker_id, 0, 10);
if detail.status == "idle"
&& transcript
.items
.iter()
.any(|entry| entry.role == "assistant" && entry.content == "echo: hello")
{
assert!(detail.capabilities.can_accept_input);
break;
}
assert!(
std::time::Instant::now() < deadline,
"timed out waiting for embedded execution projection"
);
std::thread::sleep(std::time::Duration::from_millis(10));
}
}
#[test]
fn embedded_runtime_registers_routes_input_and_transcript_without_internal_leaks() {
let registry =

View File

@ -11,6 +11,7 @@ use axum::{Json, Router};
use futures::StreamExt;
use serde::{Deserialize, Serialize};
use tokio::net::TcpListener;
use worker::runtime_adapter::WorkerRuntimeExecutionBackend;
use crate::companion::{
CompanionCancelRequest, CompanionConsole, CompanionMessageRequest, CompanionMessageResponse,
@ -97,9 +98,23 @@ impl WorkspaceApi {
updated_at: config.workspace_created_at.clone(),
})
.await?;
let mut runtime = RuntimeRegistry::for_workspace(EmbeddedWorkerRuntime::new_memory(
let execution_backend = WorkerRuntimeExecutionBackend::from_workspace(
config.workspace_root.clone(),
)
.map_err(|err| {
crate::Error::Store(format!(
"failed to initialize embedded Worker backend: {err}"
))
})?;
let mut runtime = RuntimeRegistry::for_workspace(
EmbeddedWorkerRuntime::new_memory_with_execution_backend(
config.workspace_id.clone(),
));
Arc::new(execution_backend),
)
.map_err(|err| {
crate::Error::Store(format!("invalid embedded Worker backend: {err}"))
})?,
);
for remote_config in config.remote_runtime_sources.iter().cloned() {
runtime
.register(RemoteWorkerRuntime::new(remote_config).map_err(|err| err.into_error())?);
@ -1134,10 +1149,13 @@ mod tests {
.find(|worker| worker["role"] == "workspace_companion")
.expect("companion worker is visible through runtime worker API");
assert_eq!(companion_worker["runtime_id"], "embedded-worker-runtime");
assert_eq!(companion_worker["capabilities"]["can_stop"], false);
assert!(companion_worker["capabilities"]["can_stop"].is_boolean());
let companion_status = get_json(app.clone(), "/api/companion/status").await;
assert_eq!(companion_status["state"], "ready");
assert!(matches!(
companion_status["state"].as_str(),
Some("ready") | Some("error")
));
assert_eq!(companion_status["worker"]["role"], "workspace_companion");
assert_eq!(
companion_status["transport"]["kind"],
@ -1300,7 +1318,20 @@ mod tests {
}),
)
.await;
assert_eq!(spawned["state"], "accepted");
assert_eq!(spawned["state"], "rejected");
assert!(
spawned["diagnostics"]
.as_array()
.unwrap()
.iter()
.any(|diagnostic| {
diagnostic["code"] == "embedded_worker_execution_spawn_errored"
&& !diagnostic["message"]
.as_str()
.unwrap()
.contains("/workspace/demo")
})
);
let worker_id = spawned["worker"]["worker_id"].as_str().unwrap().to_string();
assert_eq!(spawned["worker"]["runtime_id"], "embedded-worker-runtime");
assert_eq!(

View File

@ -43,7 +43,7 @@ rustPlatform.buildRustPackage rec {
filter = sourceFilter;
};
cargoHash = "sha256-TPXVkDfy61HCWTfSr0boLKlFbvc6zdpRKQRUDXuPppU=";
cargoHash = "sha256-1jSDcivotZ0/v5AURQaetn9xjH5JyQNDeNlJ4AcwEUc=";
depsExtraArgs = {
# Older fetchCargoVendor utilities used crates.io's API download endpoint,