fix: connect embedded runtime input lifecycle

This commit is contained in:
Keisuke Hirata 2026-06-28 06:20:34 +09:00
parent 18526ee362
commit 9069b03504
No known key found for this signature in database
4 changed files with 437 additions and 39 deletions

View File

@ -11,7 +11,8 @@ use crate::error::RuntimeError;
use crate::execution::{ use crate::execution::{
WorkerExecutionBackend, WorkerExecutionBackendKind, WorkerExecutionBackendRef, WorkerExecutionBackend, WorkerExecutionBackendKind, WorkerExecutionBackendRef,
WorkerExecutionHandle, WorkerExecutionOperation, WorkerExecutionResult, WorkerExecutionHandle, WorkerExecutionOperation, WorkerExecutionResult,
WorkerExecutionSpawnRequest, WorkerExecutionSpawnResult, WorkerExecutionStatus, WorkerExecutionRunState, WorkerExecutionSpawnRequest, WorkerExecutionSpawnResult,
WorkerExecutionStatus,
}; };
#[cfg(feature = "fs-store")] #[cfg(feature = "fs-store")]
use crate::fs_store::{ use crate::fs_store::{
@ -723,9 +724,13 @@ impl Runtime {
let mut state = self.lock()?; let mut state = self.lock()?;
state.ensure_worker_ref(worker_ref)?; state.ensure_worker_ref(worker_ref)?;
let transcript_sequence = state.project_protocol_event_to_transcript(worker_ref, &payload); let transcript_sequence = state.project_protocol_event_to_transcript(worker_ref, &payload);
let execution_state_changed =
state.project_protocol_event_to_execution(worker_ref, &payload);
let event = state.push_worker_observation_event(worker_ref.clone(), payload); let event = state.push_worker_observation_event(worker_ref.clone(), payload);
if let Some(sequence) = transcript_sequence { if transcript_sequence.is_some() || execution_state_changed {
state.persist_worker(&worker_ref.worker_id)?; state.persist_worker(&worker_ref.worker_id)?;
}
if let Some(sequence) = transcript_sequence {
state.persist_transcript_entry(&worker_ref.worker_id, sequence)?; state.persist_transcript_entry(&worker_ref.worker_id, sequence)?;
} }
Ok(event) Ok(event)
@ -1324,6 +1329,51 @@ impl RuntimeState {
} }
} }
#[cfg(feature = "ws-server")]
fn project_protocol_event_to_execution(
&mut self,
worker_ref: &WorkerRef,
event: &protocol::Event,
) -> bool {
let Some(worker) = self.workers.get_mut(&worker_ref.worker_id) else {
return false;
};
let status = &mut worker.execution;
let next_run_state = match event {
protocol::Event::Status {
status: protocol::WorkerStatus::Running,
} => Some(WorkerExecutionRunState::Busy),
protocol::Event::Status {
status: protocol::WorkerStatus::Idle,
} => Some(WorkerExecutionRunState::Idle),
protocol::Event::Status {
status: protocol::WorkerStatus::Paused,
} => Some(WorkerExecutionRunState::Busy),
protocol::Event::Snapshot { status, .. } => match status {
protocol::WorkerStatus::Running => Some(WorkerExecutionRunState::Busy),
protocol::WorkerStatus::Idle => Some(WorkerExecutionRunState::Idle),
protocol::WorkerStatus::Paused => Some(WorkerExecutionRunState::Busy),
},
protocol::Event::RunEnd { result } => match result {
protocol::RunResult::Finished | protocol::RunResult::RolledBack => {
Some(WorkerExecutionRunState::Idle)
}
protocol::RunResult::Paused => Some(WorkerExecutionRunState::Busy),
protocol::RunResult::LimitReached => Some(WorkerExecutionRunState::Errored),
},
protocol::Event::Error { .. } => Some(WorkerExecutionRunState::Errored),
_ => None,
};
let Some(next_run_state) = next_run_state else {
return false;
};
if status.run_state == next_run_state {
return false;
}
status.run_state = next_run_state;
true
}
fn push_diagnostic( fn push_diagnostic(
&mut self, &mut self,
severity: DiagnosticSeverity, severity: DiagnosticSeverity,

View File

@ -123,6 +123,24 @@ impl ProfileRuntimeWorkerFactory {
request.worker_ref.worker_id.to_string() request.worker_ref.worker_id.to_string()
} }
fn runtime_profile_value(
profile: &worker_runtime::catalog::ProfileSelector,
) -> Option<std::borrow::Cow<'_, str>> {
match profile {
worker_runtime::catalog::ProfileSelector::RuntimeDefault => None,
worker_runtime::catalog::ProfileSelector::Named(name) => {
Some(std::borrow::Cow::Borrowed(name.as_str()))
}
worker_runtime::catalog::ProfileSelector::Builtin(name) => {
if name.starts_with("builtin:") {
Some(std::borrow::Cow::Borrowed(name.as_str()))
} else {
Some(std::borrow::Cow::Owned(format!("builtin:{name}")))
}
}
}
}
fn runtime_profile<'a>( fn runtime_profile<'a>(
&'a self, &'a self,
request: &'a WorkerExecutionSpawnRequest, request: &'a WorkerExecutionSpawnRequest,
@ -130,15 +148,7 @@ impl ProfileRuntimeWorkerFactory {
if let Some(profile) = self.profile.as_deref() { if let Some(profile) = self.profile.as_deref() {
return Some(std::borrow::Cow::Borrowed(profile)); return Some(std::borrow::Cow::Borrowed(profile));
} }
match &request.request.profile { Self::runtime_profile_value(&request.request.profile)
worker_runtime::catalog::ProfileSelector::RuntimeDefault => None,
worker_runtime::catalog::ProfileSelector::Named(name) => {
Some(std::borrow::Cow::Borrowed(name.as_str()))
}
worker_runtime::catalog::ProfileSelector::Builtin(name) => {
Some(std::borrow::Cow::Owned(format!("builtin:{name}")))
}
}
} }
} }
@ -302,6 +312,7 @@ where
operation: WorkerExecutionOperation, operation: WorkerExecutionOperation,
worker: WorkerHandle, worker: WorkerHandle,
method: Method, method: Method,
accepted_run_state: WorkerExecutionRunState,
) -> WorkerExecutionResult { ) -> WorkerExecutionResult {
self.run_on_adapter_runtime(async move { self.run_on_adapter_runtime(async move {
worker worker
@ -309,7 +320,7 @@ where
.await .await
.map_err(|err| format!("failed to send Worker method: {err}")) .map_err(|err| format!("failed to send Worker method: {err}"))
}) })
.map(|_| WorkerExecutionResult::accepted(operation, WorkerExecutionRunState::Idle)) .map(|_| WorkerExecutionResult::accepted(operation, accepted_run_state))
.unwrap_or_else(|message| WorkerExecutionResult::errored(operation, message)) .unwrap_or_else(|message| WorkerExecutionResult::errored(operation, message))
} }
} }
@ -448,6 +459,7 @@ where
Method::Run { Method::Run {
input: vec![Segment::text(content)], input: vec![Segment::text(content)],
}, },
WorkerExecutionRunState::Busy,
); );
if result.outcome != worker_runtime::execution::WorkerExecutionOutcome::Accepted { if result.outcome != worker_runtime::execution::WorkerExecutionOutcome::Accepted {
busy.store(false, Ordering::SeqCst); busy.store(false, Ordering::SeqCst);
@ -463,7 +475,12 @@ where
return result; return result;
} }
}; };
self.send_method(WorkerExecutionOperation::Stop, worker, Method::Shutdown) self.send_method(
WorkerExecutionOperation::Stop,
worker,
Method::Shutdown,
WorkerExecutionRunState::Stopped,
)
} }
fn cancel_worker(&self, handle: &WorkerExecutionHandle) -> WorkerExecutionResult { fn cancel_worker(&self, handle: &WorkerExecutionHandle) -> WorkerExecutionResult {
@ -474,7 +491,12 @@ where
return result; return result;
} }
}; };
self.send_method(WorkerExecutionOperation::Cancel, worker, Method::Cancel) self.send_method(
WorkerExecutionOperation::Cancel,
worker,
Method::Cancel,
WorkerExecutionRunState::Idle,
)
} }
} }
@ -611,6 +633,24 @@ mod tests {
} }
} }
#[test]
fn builtin_profile_selector_is_not_double_prefixed() {
assert_eq!(
ProfileRuntimeWorkerFactory::runtime_profile_value(
&worker_runtime::catalog::ProfileSelector::Builtin("coder".to_string())
)
.as_deref(),
Some("builtin:coder")
);
assert_eq!(
ProfileRuntimeWorkerFactory::runtime_profile_value(
&worker_runtime::catalog::ProfileSelector::Builtin("builtin:coder".to_string())
)
.as_deref(),
Some("builtin:coder")
);
}
#[test] #[test]
fn adapter_dispatches_user_input_through_worker_run_lifecycle() { fn adapter_dispatches_user_input_through_worker_run_lifecycle() {
let client = MockClient::new(simple_text_events()); let client = MockClient::new(simple_text_events());

View File

@ -13,6 +13,7 @@ use worker_runtime::catalog::{
}; };
use worker_runtime::config_bundle::{ConfigBundle, ConfigBundleAvailability, ConfigBundleSummary}; use worker_runtime::config_bundle::{ConfigBundle, ConfigBundleAvailability, ConfigBundleSummary};
use worker_runtime::error::RuntimeError as EmbeddedRuntimeError; use worker_runtime::error::RuntimeError as EmbeddedRuntimeError;
use worker_runtime::execution::WorkerExecutionRunState;
use worker_runtime::http_server::{ use worker_runtime::http_server::{
RuntimeHttpConfigBundleAvailabilityResponse, RuntimeHttpConfigBundleSyncRequest, RuntimeHttpConfigBundleAvailabilityResponse, RuntimeHttpConfigBundleSyncRequest,
RuntimeHttpErrorResponse, RuntimeHttpSummaryResponse, RuntimeHttpTranscriptResponse, RuntimeHttpErrorResponse, RuntimeHttpSummaryResponse, RuntimeHttpTranscriptResponse,
@ -903,6 +904,7 @@ pub struct EmbeddedWorkerRuntime {
runtime_id: String, runtime_id: String,
host_id: String, host_id: String,
runtime: worker_runtime::Runtime, runtime: worker_runtime::Runtime,
execution_enabled: bool,
} }
impl EmbeddedWorkerRuntime { impl EmbeddedWorkerRuntime {
@ -931,7 +933,9 @@ impl EmbeddedWorkerRuntime {
}, },
backend, backend,
)?; )?;
Ok(Self::from_runtime(workspace_id, runtime)) let mut embedded = Self::from_runtime(workspace_id, runtime);
embedded.execution_enabled = true;
Ok(embedded)
} }
pub fn from_runtime(workspace_id: impl AsRef<str>, runtime: worker_runtime::Runtime) -> Self { pub fn from_runtime(workspace_id: impl AsRef<str>, runtime: worker_runtime::Runtime) -> Self {
@ -944,6 +948,7 @@ impl EmbeddedWorkerRuntime {
runtime_id, runtime_id,
host_id: host_id_for_embedded_workspace(workspace_id.as_ref()), host_id: host_id_for_embedded_workspace(workspace_id.as_ref()),
runtime, runtime,
execution_enabled: false,
} }
} }
@ -954,6 +959,20 @@ impl EmbeddedWorkerRuntime {
)) ))
} }
fn can_accept_embedded_input(
&self,
status: EmbeddedWorkerStatus,
run_state: WorkerExecutionRunState,
) -> bool {
self.execution_enabled
&& status == EmbeddedWorkerStatus::Running
&& run_state != WorkerExecutionRunState::Busy
}
fn can_stop_embedded_worker(&self, status: EmbeddedWorkerStatus) -> bool {
self.execution_enabled && status == EmbeddedWorkerStatus::Running
}
fn map_worker_summary(&self, summary: worker_runtime::catalog::WorkerSummary) -> WorkerSummary { fn map_worker_summary(&self, summary: worker_runtime::catalog::WorkerSummary) -> WorkerSummary {
WorkerSummary { WorkerSummary {
runtime_id: self.runtime_id.clone(), runtime_id: self.runtime_id.clone(),
@ -967,15 +986,16 @@ impl EmbeddedWorkerRuntime {
identity: "runtime_registry_worker".to_string(), identity: "runtime_registry_worker".to_string(),
}, },
state: embedded_worker_status_label(summary.status).to_string(), state: embedded_worker_status_label(summary.status).to_string(),
status: embedded_worker_status_label(summary.status).to_string(), status: embedded_worker_execution_status_label(summary.status, summary.execution.run_state)
.to_string(),
last_seen_at: None, last_seen_at: None,
implementation: WorkerImplementationSummary { implementation: WorkerImplementationSummary {
kind: "embedded_worker_runtime".to_string(), kind: "embedded_worker_runtime".to_string(),
display_hint: "backend-internal worker-runtime Worker".to_string(), display_hint: "backend-internal worker-runtime Worker".to_string(),
}, },
capabilities: WorkerCapabilitySummary { capabilities: WorkerCapabilitySummary {
can_accept_input: false, can_accept_input: self.can_accept_embedded_input(summary.status, summary.execution.run_state),
can_stop: false, can_stop: self.can_stop_embedded_worker(summary.status),
can_spawn_followup: false, can_spawn_followup: false,
}, },
diagnostics: vec![diagnostic( diagnostics: vec![diagnostic(
@ -999,15 +1019,16 @@ impl EmbeddedWorkerRuntime {
identity: "runtime_registry_worker".to_string(), identity: "runtime_registry_worker".to_string(),
}, },
state: embedded_worker_status_label(detail.status).to_string(), state: embedded_worker_status_label(detail.status).to_string(),
status: embedded_worker_status_label(detail.status).to_string(), status: embedded_worker_execution_status_label(detail.status, detail.execution.run_state)
.to_string(),
last_seen_at: None, last_seen_at: None,
implementation: WorkerImplementationSummary { implementation: WorkerImplementationSummary {
kind: "embedded_worker_runtime".to_string(), kind: "embedded_worker_runtime".to_string(),
display_hint: "backend-internal worker-runtime Worker".to_string(), display_hint: "backend-internal worker-runtime Worker".to_string(),
}, },
capabilities: WorkerCapabilitySummary { capabilities: WorkerCapabilitySummary {
can_accept_input: false, can_accept_input: self.can_accept_embedded_input(detail.status, detail.execution.run_state),
can_stop: false, can_stop: self.can_stop_embedded_worker(detail.status),
can_spawn_followup: false, can_spawn_followup: false,
}, },
diagnostics: vec![diagnostic( diagnostics: vec![diagnostic(
@ -1037,7 +1058,7 @@ impl WorkspaceWorkerRuntime for EmbeddedWorkerRuntime {
status: "unavailable".to_string(), status: "unavailable".to_string(),
source: RuntimeSourceSummary::embedded_worker_runtime(), source: RuntimeSourceSummary::embedded_worker_runtime(),
host_ids: Vec::new(), host_ids: Vec::new(),
capabilities: embedded_runtime_capabilities(limit, false), capabilities: embedded_runtime_capabilities(limit, false, false),
diagnostics, diagnostics,
}; };
} }
@ -1057,7 +1078,7 @@ impl WorkspaceWorkerRuntime for EmbeddedWorkerRuntime {
} else { } else {
vec![self.host_id.clone()] vec![self.host_id.clone()]
}, },
capabilities: embedded_runtime_capabilities(limit, true), capabilities: embedded_runtime_capabilities(limit, true, self.execution_enabled),
diagnostics, diagnostics,
} }
} }
@ -1075,7 +1096,7 @@ impl WorkspaceWorkerRuntime for EmbeddedWorkerRuntime {
status: "available".to_string(), status: "available".to_string(),
observed_at: Utc::now().to_rfc3339(), observed_at: Utc::now().to_rfc3339(),
last_seen_at: None, last_seen_at: None,
capabilities: embedded_runtime_capabilities(limit, true), capabilities: embedded_runtime_capabilities(limit, true, self.execution_enabled),
diagnostics: vec![diagnostic( diagnostics: vec![diagnostic(
"embedded_runtime_host_boundary", "embedded_runtime_host_boundary",
DiagnosticSeverity::Info, DiagnosticSeverity::Info,
@ -1254,6 +1275,94 @@ impl WorkspaceWorkerRuntime for EmbeddedWorkerRuntime {
} }
} }
fn stop_worker(
&self,
worker_id: &str,
request: WorkerLifecycleRequest,
) -> WorkerLifecycleResult {
if !self.execution_enabled {
return embedded_lifecycle_rejected(
&self.runtime_id,
worker_id,
diagnostic(
"embedded_worker_execution_unavailable",
DiagnosticSeverity::Info,
format!("worker stop for '{worker_id}' requires an embedded execution backend"),
),
);
}
let Some(worker_ref) = self.worker_ref(worker_id) else {
return embedded_lifecycle_rejected(
&self.runtime_id,
worker_id,
diagnostic(
"embedded_worker_id_invalid",
DiagnosticSeverity::Warning,
"Worker id was empty and cannot be resolved".to_string(),
),
);
};
match self.runtime.stop_worker(&worker_ref, request.reason) {
Ok(ack) => WorkerLifecycleResult {
state: WorkerOperationState::Accepted,
runtime_id: self.runtime_id.clone(),
worker_id: worker_id.to_string(),
event_id: Some(ack.event_id),
diagnostics: Vec::new(),
},
Err(error) => embedded_lifecycle_rejected(
&self.runtime_id,
worker_id,
embedded_runtime_diagnostic(&error),
),
}
}
fn cancel_worker(
&self,
worker_id: &str,
request: WorkerLifecycleRequest,
) -> WorkerLifecycleResult {
if !self.execution_enabled {
return embedded_lifecycle_rejected(
&self.runtime_id,
worker_id,
diagnostic(
"embedded_worker_execution_unavailable",
DiagnosticSeverity::Info,
format!(
"worker cancel for '{worker_id}' requires an embedded execution backend"
),
),
);
}
let Some(worker_ref) = self.worker_ref(worker_id) else {
return embedded_lifecycle_rejected(
&self.runtime_id,
worker_id,
diagnostic(
"embedded_worker_id_invalid",
DiagnosticSeverity::Warning,
"Worker id was empty and cannot be resolved".to_string(),
),
);
};
match self.runtime.cancel_worker(&worker_ref, request.reason) {
Ok(ack) => WorkerLifecycleResult {
state: WorkerOperationState::Accepted,
runtime_id: self.runtime_id.clone(),
worker_id: worker_id.to_string(),
event_id: Some(ack.event_id),
diagnostics: Vec::new(),
},
Err(error) => embedded_lifecycle_rejected(
&self.runtime_id,
worker_id,
embedded_runtime_diagnostic(&error),
),
}
}
fn observation_source( fn observation_source(
&self, &self,
worker_id: &str, worker_id: &str,
@ -1272,17 +1381,53 @@ impl WorkspaceWorkerRuntime for EmbeddedWorkerRuntime {
)) ))
} }
fn send_input(&self, worker_id: &str, _request: WorkerInputRequest) -> WorkerInputResult { fn send_input(&self, worker_id: &str, request: WorkerInputRequest) -> WorkerInputResult {
embedded_input_rejected( if !self.execution_enabled {
&self.runtime_id, return embedded_input_rejected(
worker_id, &self.runtime_id,
diagnostic( worker_id,
"embedded_worker_execution_unavailable", diagnostic(
DiagnosticSeverity::Error, "embedded_worker_execution_unavailable",
"Embedded Worker input is disabled until an execution backend is connected" DiagnosticSeverity::Info,
.to_string(), format!(
"worker input for '{worker_id}' requires an embedded execution backend"
),
),
);
}
let Some(worker_ref) = self.worker_ref(worker_id) else {
return embedded_input_rejected(
&self.runtime_id,
worker_id,
diagnostic(
"embedded_worker_id_invalid",
DiagnosticSeverity::Warning,
"Worker id was empty and cannot be resolved".to_string(),
),
);
};
let input = EmbeddedWorkerInput {
kind: match request.kind {
WorkerInputKind::User => EmbeddedWorkerInputKind::User,
WorkerInputKind::System => EmbeddedWorkerInputKind::System,
},
content: request.content,
};
match self.runtime.send_input(&worker_ref, input) {
Ok(ack) => WorkerInputResult {
state: WorkerOperationState::Accepted,
runtime_id: self.runtime_id.clone(),
worker_id: worker_id.to_string(),
transcript_sequence: Some(ack.transcript_sequence),
event_id: Some(ack.event_id),
diagnostics: Vec::new(),
},
Err(error) => embedded_input_rejected(
&self.runtime_id,
worker_id,
embedded_runtime_diagnostic(&error),
), ),
) }
} }
fn transcript( fn transcript(
@ -1865,14 +2010,18 @@ impl WorkspaceWorkerRuntime for RemoteWorkerRuntime {
} }
} }
fn embedded_runtime_capabilities(limit: usize, available: bool) -> RuntimeCapabilitySummary { fn embedded_runtime_capabilities(
limit: usize,
available: bool,
execution_enabled: bool,
) -> RuntimeCapabilitySummary {
RuntimeCapabilitySummary { RuntimeCapabilitySummary {
can_list_hosts: true, can_list_hosts: true,
can_list_workers: available, can_list_workers: available,
can_get_worker: available, can_get_worker: available,
can_spawn_worker: available, can_spawn_worker: available,
can_stop_worker: false, can_stop_worker: available && execution_enabled,
can_accept_input: false, can_accept_input: available && execution_enabled,
has_workspace_fs: false, has_workspace_fs: false,
has_shell: false, has_shell: false,
has_git: false, has_git: false,
@ -1900,6 +2049,24 @@ fn embedded_worker_status_label(status: EmbeddedWorkerStatus) -> &'static str {
} }
} }
fn embedded_worker_execution_status_label(
status: EmbeddedWorkerStatus,
run_state: WorkerExecutionRunState,
) -> &'static str {
match status {
EmbeddedWorkerStatus::Stopped => "stopped",
EmbeddedWorkerStatus::Cancelled => "cancelled",
EmbeddedWorkerStatus::Running => match run_state {
WorkerExecutionRunState::Idle => "idle",
WorkerExecutionRunState::Busy => "running",
WorkerExecutionRunState::Stopped => "stopped",
WorkerExecutionRunState::Rejected => "rejected",
WorkerExecutionRunState::Errored => "errored",
WorkerExecutionRunState::Unconnected => "unconnected",
},
}
}
fn embedded_create_intent(intent: &WorkerSpawnIntent) -> WorkerIntent { fn embedded_create_intent(intent: &WorkerSpawnIntent) -> WorkerIntent {
match intent { match intent {
WorkerSpawnIntent::WorkspaceCompanion => WorkerIntent::Role { WorkerSpawnIntent::WorkspaceCompanion => WorkerIntent::Role {
@ -1984,6 +2151,20 @@ fn remote_input_rejected(
} }
} }
fn embedded_lifecycle_rejected(
runtime_id: &str,
worker_id: &str,
diagnostic: RuntimeDiagnostic,
) -> WorkerLifecycleResult {
WorkerLifecycleResult {
state: WorkerOperationState::Rejected,
runtime_id: runtime_id.to_string(),
worker_id: worker_id.to_string(),
event_id: None,
diagnostics: vec![diagnostic],
}
}
fn remote_lifecycle_rejected( fn remote_lifecycle_rejected(
runtime_id: &str, runtime_id: &str,
worker_id: &str, worker_id: &str,
@ -2394,9 +2575,10 @@ pub fn placeholder_spawn_response(host_id: impl Into<String>) -> WorkerSpawnResu
mod tests { mod tests {
use super::*; use super::*;
use serde_json::json; use serde_json::json;
use std::collections::HashMap;
use std::io::{Read as _, Write as _}; use std::io::{Read as _, Write as _};
use std::net::TcpListener; use std::net::TcpListener;
use std::sync::Arc; use std::sync::{Arc, Mutex};
use std::thread; use std::thread;
fn test_config_bundle() -> ConfigBundle { fn test_config_bundle() -> ConfigBundle {
@ -2425,6 +2607,74 @@ mod tests {
.with_computed_digest() .with_computed_digest()
} }
#[derive(Default)]
struct AcceptingExecutionBackend {
contexts:
Mutex<HashMap<EmbeddedWorkerRef, worker_runtime::execution::WorkerExecutionContext>>,
}
impl worker_runtime::execution::WorkerExecutionBackend for AcceptingExecutionBackend {
fn backend_id(&self) -> &str {
"workspace-server-test-backend"
}
fn spawn_worker(
&self,
request: worker_runtime::execution::WorkerExecutionSpawnRequest,
) -> worker_runtime::execution::WorkerExecutionSpawnResult {
self.contexts
.lock()
.unwrap()
.insert(request.worker_ref.clone(), request.context);
worker_runtime::execution::WorkerExecutionSpawnResult::Connected {
handle: worker_runtime::execution::WorkerExecutionHandle::new(
request.worker_ref,
self.backend_id(),
),
run_state: WorkerExecutionRunState::Idle,
}
}
fn dispatch_input(
&self,
handle: &worker_runtime::execution::WorkerExecutionHandle,
input: EmbeddedWorkerInput,
) -> worker_runtime::execution::WorkerExecutionResult {
let context = self
.contexts
.lock()
.unwrap()
.get(handle.worker_ref())
.cloned();
let Some(context) = context else {
return worker_runtime::execution::WorkerExecutionResult::rejected(
worker_runtime::execution::WorkerExecutionOperation::Input,
"missing test context",
);
};
let content = input.content;
std::thread::spawn(move || {
std::thread::sleep(std::time::Duration::from_millis(10));
let _ = context.publish_protocol_event(protocol::Event::Status {
status: protocol::WorkerStatus::Running,
});
let _ = context.publish_protocol_event(protocol::Event::TextDone {
text: format!("echo: {content}"),
});
let _ = context.publish_protocol_event(protocol::Event::RunEnd {
result: protocol::RunResult::Finished,
});
let _ = context.publish_protocol_event(protocol::Event::Status {
status: protocol::WorkerStatus::Idle,
});
});
worker_runtime::execution::WorkerExecutionResult::accepted(
worker_runtime::execution::WorkerExecutionOperation::Input,
WorkerExecutionRunState::Busy,
)
}
}
#[derive(Clone)] #[derive(Clone)]
struct FixtureRuntime { struct FixtureRuntime {
runtime_id: String, runtime_id: String,
@ -2598,6 +2848,64 @@ mod tests {
)); ));
} }
#[test]
fn embedded_runtime_with_execution_backend_routes_input_and_projects_transcript() {
let runtime = EmbeddedWorkerRuntime::new_memory_with_execution_backend(
"local:test",
Arc::new(AcceptingExecutionBackend::default()),
)
.expect("test backend should connect");
let spawned = runtime.spawn_worker(WorkerSpawnRequest {
intent: WorkerSpawnIntent::TicketRole {
ticket_id: "00001KVZSGT0Q".to_string(),
role: TicketWorkerRole::Coder,
},
requested_worker_name: None,
acceptance: WorkerSpawnAcceptanceRequirement::RunAccepted {
expected_segments: 0,
},
profile: None,
config_bundle: None,
requested_capabilities: Vec::new(),
});
assert_eq!(spawned.state, WorkerOperationState::Accepted);
let worker = spawned.worker.expect("created embedded worker");
assert!(worker.capabilities.can_accept_input);
assert!(worker.capabilities.can_stop);
let input = runtime.send_input(
&worker.worker_id,
WorkerInputRequest {
kind: WorkerInputKind::User,
content: "hello".to_string(),
},
);
assert_eq!(input.state, WorkerOperationState::Accepted);
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2);
loop {
let detail = runtime
.worker(&worker.worker_id)
.worker
.expect("worker detail");
let transcript = runtime.transcript(&worker.worker_id, 0, 10);
if detail.status == "idle"
&& transcript
.items
.iter()
.any(|entry| entry.role == "assistant" && entry.content == "echo: hello")
{
assert!(detail.capabilities.can_accept_input);
break;
}
assert!(
std::time::Instant::now() < deadline,
"timed out waiting for embedded execution projection"
);
std::thread::sleep(std::time::Duration::from_millis(10));
}
}
#[test] #[test]
fn embedded_runtime_registers_routes_input_and_transcript_without_internal_leaks() { fn embedded_runtime_registers_routes_input_and_transcript_without_internal_leaks() {
let registry = let registry =

View File

@ -1149,7 +1149,7 @@ mod tests {
.find(|worker| worker["role"] == "workspace_companion") .find(|worker| worker["role"] == "workspace_companion")
.expect("companion worker is visible through runtime worker API"); .expect("companion worker is visible through runtime worker API");
assert_eq!(companion_worker["runtime_id"], "embedded-worker-runtime"); assert_eq!(companion_worker["runtime_id"], "embedded-worker-runtime");
assert_eq!(companion_worker["capabilities"]["can_stop"], false); assert_eq!(companion_worker["capabilities"]["can_stop"], true);
let companion_status = get_json(app.clone(), "/api/companion/status").await; let companion_status = get_json(app.clone(), "/api/companion/status").await;
assert_eq!(companion_status["state"], "ready"); assert_eq!(companion_status["state"], "ready");