merge: workspace companion llm worker

This commit is contained in:
Keisuke Hirata 2026-06-28 16:18:48 +09:00
commit eb06b8a923
No known key found for this signature in database
4 changed files with 763 additions and 206 deletions

View File

@ -1,14 +1,21 @@
use std::sync::{Arc, Mutex};
use chrono::Utc;
use serde::{Deserialize, Serialize};
use worker_runtime::catalog::{CapabilityRequest, ProfileSelector};
use worker_runtime::catalog::{CapabilityRequest, ConfigBundleRef, ProfileSelector};
use worker_runtime::config_bundle::{
ConfigBundle, ConfigBundleMetadata, ConfigBundleProvenance, ConfigProfileDescriptor,
};
use crate::hosts::{
DiagnosticSeverity, RuntimeDiagnostic, RuntimeRegistry, WorkerOperationState,
WorkerSpawnAcceptanceRequirement, WorkerSpawnIntent, WorkerSpawnRequest, WorkerSummary,
DiagnosticSeverity, RuntimeDiagnostic, RuntimeRegistry, WorkerInputKind, WorkerInputRequest,
WorkerOperationState, WorkerSpawnAcceptanceRequirement, WorkerSpawnIntent, WorkerSpawnRequest,
WorkerSummary, WorkerTranscriptItem as RuntimeTranscriptItem, WorkerTranscriptProjection,
};
const COMPANION_RUNTIME_ID: &str = "embedded-worker-runtime";
const COMPANION_PROFILE_ID: &str = "builtin:companion";
const COMPANION_CONFIG_BUNDLE_ID: &str = "workspace-companion-config";
const MAX_MESSAGE_CHARS: usize = 8_000;
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
@ -85,12 +92,6 @@ pub struct CompanionTranscriptItem {
pub status: String,
}
#[derive(Debug, Default)]
struct CompanionTranscript {
items: Vec<CompanionTranscriptItem>,
next_sequence: u64,
}
#[derive(Debug)]
struct CompanionWorkerState {
state: CompanionState,
@ -99,63 +100,86 @@ struct CompanionWorkerState {
}
pub struct CompanionConsole {
runtime: Arc<RuntimeRegistry>,
worker: Mutex<CompanionWorkerState>,
transcript: Mutex<CompanionTranscript>,
}
impl CompanionConsole {
pub fn new(runtime: Arc<RuntimeRegistry>) -> Self {
let initial = spawn_companion_worker(&runtime);
Self {
runtime,
worker: Mutex::new(initial),
transcript: Mutex::new(CompanionTranscript::default()),
}
}
pub fn status(&self) -> CompanionStatusResponse {
let worker = match self.worker.lock() {
Ok(worker) => worker,
Err(_) => {
return CompanionStatusResponse {
state: CompanionState::Error,
worker: None,
transport: companion_transport(),
diagnostics: vec![diagnostic(
"companion_state_unavailable",
DiagnosticSeverity::Error,
"Companion state is unavailable",
)],
};
}
};
CompanionStatusResponse {
state: worker.state,
worker: worker.worker.clone(),
transport: companion_transport(),
diagnostics: worker.diagnostics.clone(),
match self.refresh_worker_state() {
Ok(worker) => CompanionStatusResponse {
state: worker.state,
worker: worker.worker.clone(),
transport: companion_transport(worker.worker.as_ref()),
diagnostics: worker.diagnostics.clone(),
},
Err(diagnostic) => CompanionStatusResponse {
state: CompanionState::Error,
worker: None,
transport: companion_transport(None),
diagnostics: vec![diagnostic],
},
}
}
pub fn transcript(&self, start: usize, limit: usize) -> CompanionTranscriptProjection {
let transcript = match self.transcript.lock() {
Ok(transcript) => transcript,
Err(_) => {
return CompanionTranscriptProjection {
state: CompanionState::Error,
start,
limit,
total_items: 0,
next_start: None,
items: Vec::new(),
diagnostics: vec![diagnostic(
"companion_transcript_unavailable",
DiagnosticSeverity::Error,
"Companion transcript is unavailable",
)],
};
match self.current_worker() {
Ok(Some(worker)) => {
match self
.runtime
.transcript(COMPANION_RUNTIME_ID, &worker.worker_id, start, limit)
{
Ok(transcript) => project_runtime_transcript(
&transcript,
companion_state_for_worker(&worker),
Vec::new(),
),
Err(error) => CompanionTranscriptProjection {
state: CompanionState::Error,
start,
limit,
total_items: 0,
next_start: None,
items: Vec::new(),
diagnostics: vec![diagnostic(
"companion_transcript_unavailable",
DiagnosticSeverity::Error,
format!("Companion Worker transcript is unavailable: {error:?}"),
)],
},
}
}
};
project_transcript(&transcript, CompanionState::Ready, start, limit, Vec::new())
Ok(None) => CompanionTranscriptProjection {
state: CompanionState::Error,
start,
limit,
total_items: 0,
next_start: None,
items: Vec::new(),
diagnostics: vec![diagnostic(
"companion_worker_unavailable",
DiagnosticSeverity::Error,
"Workspace Companion Worker is unavailable",
)],
},
Err(diagnostic) => CompanionTranscriptProjection {
state: CompanionState::Error,
start,
limit,
total_items: 0,
next_start: None,
items: Vec::new(),
diagnostics: vec![diagnostic],
},
}
}
pub fn send_message(&self, request: CompanionMessageRequest) -> CompanionMessageResponse {
@ -175,11 +199,66 @@ impl CompanionConsole {
));
}
self.rejected_message_response(diagnostic(
"companion_llm_not_connected",
DiagnosticSeverity::Error,
"Workspace Companion input is disabled until it is connected to actual Worker/LLM execution",
))
let worker = match self.current_worker() {
Ok(Some(worker)) => worker,
Ok(None) => {
return self.rejected_message_response(diagnostic(
"companion_worker_unavailable",
DiagnosticSeverity::Error,
"Workspace Companion Worker is unavailable",
));
}
Err(diagnostic) => return self.rejected_message_response(diagnostic),
};
let response = self.runtime.send_input(
COMPANION_RUNTIME_ID,
&worker.worker_id,
WorkerInputRequest {
kind: WorkerInputKind::User,
content: content.clone(),
},
);
match response {
Ok(result) => {
let state = match result.state {
WorkerOperationState::Accepted => CompanionState::Accepted,
WorkerOperationState::Unsupported | WorkerOperationState::Rejected => {
CompanionState::Rejected
}
};
let diagnostics = if result.diagnostics.is_empty() {
Vec::new()
} else {
result.diagnostics.clone()
};
let projection = self.transcript(0, 200);
CompanionMessageResponse {
state,
worker: projection_worker(&self.status()),
user_item: projection
.items
.iter()
.rev()
.find(|item| item.role == "user" && item.content == content)
.cloned(),
assistant_item: projection
.items
.iter()
.rev()
.find(|item| item.role == "assistant")
.cloned(),
transcript: projection,
diagnostics,
}
}
Err(error) => self.rejected_message_response(diagnostic(
"companion_worker_input_failed",
DiagnosticSeverity::Error,
format!("Companion Worker input dispatch failed: {error:?}"),
)),
}
}
pub fn cancel(&self, _request: CompanionCancelRequest) -> CompanionMessageResponse {
@ -188,157 +267,267 @@ impl CompanionConsole {
DiagnosticSeverity::Info,
"Workspace Companion has no active generation to cancel",
)];
match self.transcript.lock() {
Ok(transcript) => response_from_locked_transcript(
&transcript,
CompanionState::Cancelled,
self.status().worker,
None,
None,
0,
200,
diagnostics,
),
Err(_) => CompanionMessageResponse {
state: CompanionState::Error,
worker: self.status().worker,
user_item: None,
assistant_item: None,
transcript: CompanionTranscriptProjection {
state: CompanionState::Error,
start: 0,
limit: 200,
total_items: 0,
next_start: None,
items: Vec::new(),
diagnostics: vec![diagnostic(
"companion_transcript_unavailable",
DiagnosticSeverity::Error,
"Companion transcript is unavailable",
)],
},
diagnostics,
},
let status = self.status();
let projection = self.transcript(0, 200);
CompanionMessageResponse {
state: CompanionState::Cancelled,
worker: status.worker,
user_item: None,
assistant_item: projection
.items
.iter()
.rev()
.find(|item| item.role == "assistant")
.cloned(),
transcript: projection,
diagnostics,
}
}
fn rejected_message_response(&self, diagnostic: RuntimeDiagnostic) -> CompanionMessageResponse {
match self.transcript.lock() {
Ok(transcript) => response_from_locked_transcript(
&transcript,
CompanionState::Rejected,
self.status().worker,
None,
None,
0,
200,
vec![diagnostic],
),
Err(_) => CompanionMessageResponse {
state: CompanionState::Rejected,
worker: self.status().worker,
user_item: None,
assistant_item: None,
transcript: CompanionTranscriptProjection {
state: CompanionState::Error,
start: 0,
limit: 200,
total_items: 0,
next_start: None,
items: Vec::new(),
diagnostics: vec![diagnostic.clone()],
},
diagnostics: vec![diagnostic],
},
let status = self.status();
let projection = self.transcript(0, 200);
CompanionMessageResponse {
state: CompanionState::Rejected,
worker: status.worker,
user_item: None,
assistant_item: projection
.items
.iter()
.rev()
.find(|item| item.role == "assistant")
.cloned(),
transcript: projection,
diagnostics: vec![diagnostic],
}
}
fn current_worker(&self) -> Result<Option<WorkerSummary>, RuntimeDiagnostic> {
self.refresh_worker_state()
.map(|state| state.worker.clone())
}
fn refresh_worker_state(&self) -> Result<CompanionWorkerState, RuntimeDiagnostic> {
let mut state = self.worker.lock().map_err(|_| {
diagnostic(
"companion_state_unavailable",
DiagnosticSeverity::Error,
"Companion state is unavailable",
)
})?;
let Some(worker_id) = state.worker.as_ref().map(|worker| worker.worker_id.clone()) else {
return Ok(CompanionWorkerState {
state: state.state,
worker: None,
diagnostics: state.diagnostics.clone(),
});
};
match self.runtime.worker(COMPANION_RUNTIME_ID, &worker_id) {
Ok(worker) => {
let mut diagnostics = if worker.capabilities.can_accept_input {
Vec::new()
} else {
state.diagnostics.clone()
};
if !worker.capabilities.can_accept_input
&& !diagnostics
.iter()
.any(|diagnostic| diagnostic.code == "companion_worker_not_input_capable")
{
diagnostics.push(companion_not_input_capable_diagnostic(&worker));
}
state.state = companion_state_for_worker(&worker);
state.worker = Some(worker);
state.diagnostics = diagnostics;
}
Err(error) => {
state.state = CompanionState::Error;
state.diagnostics = vec![diagnostic(
"companion_worker_lookup_failed",
DiagnosticSeverity::Error,
format!("Companion Worker lookup failed: {error:?}"),
)];
}
}
Ok(CompanionWorkerState {
state: state.state,
worker: state.worker.clone(),
diagnostics: state.diagnostics.clone(),
})
}
}
fn projection_worker(status: &CompanionStatusResponse) -> Option<WorkerSummary> {
status.worker.clone()
}
fn spawn_companion_worker(runtime: &RuntimeRegistry) -> CompanionWorkerState {
let request = WorkerSpawnRequest {
intent: WorkerSpawnIntent::WorkspaceCompanion,
requested_worker_name: Some("workspace-companion".to_string()),
acceptance: WorkerSpawnAcceptanceRequirement::RunAccepted {
expected_segments: 0,
},
profile: Some(ProfileSelector::RuntimeDefault),
config_bundle: None,
requested_capabilities: vec![CapabilityRequest::named("conversation")],
let selector = companion_profile_selector();
let mut diagnostics = Vec::new();
let config_bundle = companion_config_bundle();
let config_ref = ConfigBundleRef {
id: config_bundle.metadata.id.clone(),
digest: config_bundle.metadata.digest.clone(),
};
match runtime.spawn_worker(COMPANION_RUNTIME_ID, request) {
Ok(result) if result.state == WorkerOperationState::Accepted => CompanionWorkerState {
state: CompanionState::Ready,
worker: result.worker,
diagnostics: result.diagnostics,
},
Ok(result) => CompanionWorkerState {
state: CompanionState::Error,
worker: result.worker,
diagnostics: result.diagnostics,
match runtime.sync_config_bundle(COMPANION_RUNTIME_ID, config_bundle) {
Ok(result) => diagnostics.extend(result.diagnostics),
Err(error) => diagnostics.push(diagnostic(
"companion_config_bundle_sync_failed",
DiagnosticSeverity::Error,
format!("Workspace Companion config bundle sync failed: {error:?}"),
)),
}
let response = runtime.spawn_worker(
COMPANION_RUNTIME_ID,
WorkerSpawnRequest {
intent: WorkerSpawnIntent::WorkspaceCompanion,
requested_worker_name: Some("workspace-companion".to_string()),
acceptance: WorkerSpawnAcceptanceRequirement::RunAccepted {
expected_segments: 0,
},
profile: Some(selector),
config_bundle: Some(config_ref),
requested_capabilities: vec![CapabilityRequest::named("worker.input.user")],
},
);
match response {
Ok(response) => {
diagnostics.extend(response.diagnostics);
if let Some(worker) = response.worker {
if !worker.capabilities.can_accept_input {
diagnostics.push(companion_not_input_capable_diagnostic(&worker));
}
CompanionWorkerState {
state: companion_state_for_worker(&worker),
worker: Some(worker),
diagnostics,
}
} else {
diagnostics.push(diagnostic(
"companion_worker_missing",
DiagnosticSeverity::Error,
"Workspace Companion Worker spawn did not return a Worker projection",
));
CompanionWorkerState {
state: CompanionState::Error,
worker: None,
diagnostics,
}
}
}
Err(error) => CompanionWorkerState {
state: CompanionState::Error,
worker: None,
diagnostics: vec![diagnostic(
"companion_worker_spawn_failed",
DiagnosticSeverity::Error,
format!("Companion Worker spawn failed: {error:?}"),
format!("Workspace Companion Worker spawn failed: {error:?}"),
)],
},
}
}
fn response_from_locked_transcript(
transcript: &CompanionTranscript,
state: CompanionState,
worker: Option<WorkerSummary>,
user_item: Option<CompanionTranscriptItem>,
assistant_item: Option<CompanionTranscriptItem>,
start: usize,
limit: usize,
diagnostics: Vec<RuntimeDiagnostic>,
) -> CompanionMessageResponse {
CompanionMessageResponse {
state,
worker,
user_item,
assistant_item,
transcript: project_transcript(transcript, state, start, limit, diagnostics.clone()),
diagnostics,
fn companion_profile_selector() -> ProfileSelector {
ProfileSelector::Builtin(COMPANION_PROFILE_ID.to_string())
}
fn companion_config_bundle() -> ConfigBundle {
ConfigBundle {
metadata: ConfigBundleMetadata {
id: COMPANION_CONFIG_BUNDLE_ID.to_string(),
digest: String::new(),
revision: "1".to_string(),
workspace_id: "workspace-companion".to_string(),
created_at: Utc::now().to_rfc3339(),
provenance: ConfigBundleProvenance {
source: "workspace-server".to_string(),
detail: Some("workspace-companion".to_string()),
},
},
profiles: vec![ConfigProfileDescriptor {
selector: companion_profile_selector(),
label: Some("Workspace Companion".to_string()),
}],
declarations: Vec::new(),
}
.with_computed_digest()
}
fn companion_state_for_worker(worker: &WorkerSummary) -> CompanionState {
if !worker.capabilities.can_accept_input {
return CompanionState::Error;
}
match worker.status.as_str() {
"busy" | "running" | "stopping" => CompanionState::Busy,
"errored" | "error" | "stopped" | "unavailable" => CompanionState::Error,
_ => CompanionState::Ready,
}
}
fn project_transcript(
transcript: &CompanionTranscript,
fn companion_not_input_capable_diagnostic(worker: &WorkerSummary) -> RuntimeDiagnostic {
diagnostic(
"companion_worker_not_input_capable",
DiagnosticSeverity::Error,
format!(
"Workspace Companion Worker '{}' is not input-capable; check profile, provider, secret, and authority diagnostics",
worker.worker_id
),
)
}
fn project_runtime_transcript(
transcript: &WorkerTranscriptProjection,
state: CompanionState,
start: usize,
limit: usize,
diagnostics: Vec<RuntimeDiagnostic>,
) -> CompanionTranscriptProjection {
let limit = limit.min(200);
let total_items = transcript.items.len();
let end = start.saturating_add(limit).min(total_items);
let items = if start < total_items {
transcript.items[start..end].to_vec()
} else {
Vec::new()
};
CompanionTranscriptProjection {
state,
start,
limit,
total_items,
next_start: (end < total_items).then_some(end),
items,
start: transcript.start,
limit: transcript.limit,
total_items: transcript.total_items,
next_start: transcript.next_start,
items: transcript
.items
.iter()
.map(project_runtime_transcript_item)
.collect(),
diagnostics,
}
}
fn companion_transport() -> CompanionTransportSummary {
CompanionTransportSummary {
kind: "embedded_worker_runtime".to_string(),
completion: "not_connected".to_string(),
limitation: "Workspace Companion is visible as an embedded Worker, but browser input is disabled until actual Worker/LLM execution is connected.".to_string(),
fn project_runtime_transcript_item(item: &RuntimeTranscriptItem) -> CompanionTranscriptItem {
CompanionTranscriptItem {
sequence: item.sequence,
role: item.role.clone(),
content: item.content.clone(),
created_at: format!("runtime_sequence:{}", item.sequence),
source: "worker_runtime".to_string(),
status: "committed".to_string(),
}
}
fn companion_transport(worker: Option<&WorkerSummary>) -> CompanionTransportSummary {
if worker.is_some_and(|worker| worker.capabilities.can_accept_input) {
CompanionTransportSummary {
kind: "embedded_worker_runtime".to_string(),
completion: "connected".to_string(),
limitation:
"Workspace Companion input is dispatched through the normal Worker runtime path."
.to_string(),
}
} else {
CompanionTransportSummary {
kind: "embedded_worker_runtime".to_string(),
completion: "not_input_capable".to_string(),
limitation:
"Workspace Companion is a Worker but is not input-capable; inspect typed diagnostics for missing profile, provider, secret, or authority."
.to_string(),
}
}
}
@ -358,52 +547,114 @@ fn diagnostic(
mod tests {
use super::*;
use crate::hosts::{EmbeddedWorkerRuntime, RuntimeRegistry};
use std::collections::HashMap;
use std::sync::Mutex as StdMutex;
use std::thread;
use std::time::{Duration, Instant};
use worker_runtime::execution::{
WorkerExecutionBackend, WorkerExecutionContext, WorkerExecutionHandle,
WorkerExecutionOperation, WorkerExecutionResult, WorkerExecutionRunState,
WorkerExecutionSpawnRequest, WorkerExecutionSpawnResult,
};
use worker_runtime::identity::WorkerRef;
use worker_runtime::interaction::WorkerInput;
#[derive(Default)]
struct DeterministicExecutionBackend {
contexts: StdMutex<HashMap<WorkerRef, WorkerExecutionContext>>,
}
impl WorkerExecutionBackend for DeterministicExecutionBackend {
fn backend_id(&self) -> &str {
"deterministic-companion-test"
}
fn spawn_worker(&self, request: WorkerExecutionSpawnRequest) -> WorkerExecutionSpawnResult {
self.contexts
.lock()
.unwrap()
.insert(request.worker_ref.clone(), request.context);
WorkerExecutionSpawnResult::Connected {
handle: WorkerExecutionHandle::new(
request.worker_ref.clone(),
"deterministic-companion-test",
),
run_state: WorkerExecutionRunState::Idle,
}
}
fn dispatch_input(
&self,
handle: &WorkerExecutionHandle,
request: WorkerInput,
) -> WorkerExecutionResult {
let worker = handle.worker_ref().clone();
let context = self
.contexts
.lock()
.unwrap()
.get(&worker)
.cloned()
.expect("execution context");
let content = request.content.clone();
thread::spawn(move || {
thread::sleep(Duration::from_millis(25));
let _ = context.publish_protocol_event(protocol::Event::TextDone {
text: format!("companion echoed: {content}"),
});
});
WorkerExecutionResult::accepted(
WorkerExecutionOperation::Input,
WorkerExecutionRunState::Idle,
)
}
}
#[test]
fn companion_spawns_visible_worker_without_fake_turn() {
fn companion_spawns_worker_with_companion_profile_and_diagnostic_when_not_input_capable() {
let registry =
RuntimeRegistry::for_workspace(EmbeddedWorkerRuntime::new_memory("local:test"));
let registry = Arc::new(registry);
let companion = CompanionConsole::new(registry.clone());
let status = companion.status();
assert_eq!(status.state, CompanionState::Ready);
let worker = status.worker.clone().expect("companion worker");
assert_eq!(worker.runtime_id, COMPANION_RUNTIME_ID);
assert_eq!(worker.role.as_deref(), Some("workspace_companion"));
assert!(!worker.capabilities.can_stop);
let workers = registry.list_workers(10);
assert!(!worker.capabilities.can_accept_input);
assert_eq!(status.transport.completion, "not_input_capable");
assert!(
workers
.items
status
.diagnostics
.iter()
.any(|item| item.worker_id == worker.worker_id)
.any(|diagnostic| diagnostic.code == "companion_worker_not_input_capable")
);
let response = companion.send_message(CompanionMessageRequest {
content: "hello".to_string(),
});
assert_eq!(response.state, CompanionState::Rejected);
assert!(response.transcript.items.is_empty());
assert!(
response
!response
.diagnostics
.iter()
.any(|diagnostic| diagnostic.code == "companion_llm_not_connected")
);
assert!(response.transcript.items.is_empty());
let runtime_transcript = registry
.transcript(COMPANION_RUNTIME_ID, &worker.worker_id, 0, 10)
.unwrap();
assert!(runtime_transcript.items.is_empty());
let worker_detail = registry
.worker(COMPANION_RUNTIME_ID, &worker.worker_id)
.expect("worker detail");
assert_eq!(worker_detail.profile.as_deref(), Some(COMPANION_PROFILE_ID));
let browser_payload = serde_json::to_string(&(status, response)).unwrap();
let browser_payload = serde_json::to_string(&(status, response, worker_detail)).unwrap();
for forbidden in [
"/workspace/project",
"metadata.json",
".jsonl",
"/run/user/",
"session",
"manifest",
] {
assert!(
!browser_payload.contains(forbidden),
@ -411,4 +662,101 @@ mod tests {
);
}
}
#[test]
fn companion_dispatches_input_and_projects_assistant_output_from_worker_runtime() {
let registry = RuntimeRegistry::for_workspace(
EmbeddedWorkerRuntime::new_memory_with_execution_backend(
"local:test",
Arc::new(DeterministicExecutionBackend::default()),
)
.expect("embedded runtime"),
);
let registry = Arc::new(registry);
let companion = CompanionConsole::new(registry.clone());
let status = companion.status();
let worker = status.worker.clone().expect("companion worker");
assert_eq!(status.transport.completion, "connected");
assert_eq!(worker.profile.as_deref(), Some(COMPANION_PROFILE_ID));
assert!(worker.capabilities.can_accept_input);
let source = registry
.observation_source(COMPANION_RUNTIME_ID, &worker.worker_id)
.expect("observation source");
let crate::observation::RuntimeObservationSource::Embedded(source) = source else {
panic!("expected embedded observation source");
};
let cursor = source
.runtime
.worker_observation_cursor_now(&source.worker_ref)
.expect("observation cursor");
let response = companion.send_message(CompanionMessageRequest {
content: "hello runtime".to_string(),
});
assert_eq!(response.state, CompanionState::Accepted);
assert!(
response
.user_item
.as_ref()
.is_some_and(|item| item.role == "user" && item.content == "hello runtime")
);
assert!(response.diagnostics.is_empty());
let deadline = Instant::now() + Duration::from_secs(2);
let observed = loop {
let observed = source
.runtime
.read_worker_observation_events(&source.worker_ref, cursor)
.expect("observation events");
if observed.iter().any(|event| {
serde_json::to_string(event)
.unwrap()
.contains("companion echoed: hello runtime")
}) {
break observed;
}
assert!(
Instant::now() < deadline,
"timed out waiting for observation event"
);
thread::sleep(Duration::from_millis(20));
};
let observed_json = serde_json::to_string(&observed).unwrap();
assert!(observed_json.contains("companion echoed: hello runtime"));
let deadline = Instant::now() + Duration::from_secs(2);
let transcript = loop {
let transcript = companion.transcript(0, 20);
if transcript.items.iter().any(|item| {
item.role == "assistant" && item.content == "companion echoed: hello runtime"
}) {
break transcript;
}
assert!(
Instant::now() < deadline,
"timed out waiting for companion assistant output: {transcript:?}"
);
thread::sleep(Duration::from_millis(20));
};
assert!(
transcript
.items
.iter()
.any(|item| item.role == "user" && item.content == "hello runtime")
);
assert!(transcript.items.iter().any(|item| {
item.role == "assistant"
&& item.source == "worker_runtime"
&& item.status == "committed"
}));
let runtime_transcript = registry
.transcript(COMPANION_RUNTIME_ID, &worker.worker_id, 0, 20)
.expect("runtime transcript");
assert!(runtime_transcript.items.iter().any(|item| {
item.role == "assistant" && item.content == "companion echoed: hello runtime"
}));
}
}

View File

@ -2160,9 +2160,10 @@ fn embedded_profile_selector(intent: &WorkerSpawnIntent) -> ProfileSelector {
WorkerSpawnIntent::TicketRole { role, .. } => {
ProfileSelector::Builtin(format!("builtin:{}", ticket_role_profile_slug(role)))
}
WorkerSpawnIntent::WorkspaceCompanion | WorkerSpawnIntent::WorkspaceOrchestrator => {
ProfileSelector::RuntimeDefault
WorkerSpawnIntent::WorkspaceCompanion => {
ProfileSelector::Builtin("builtin:companion".to_string())
}
WorkerSpawnIntent::WorkspaceOrchestrator => ProfileSelector::RuntimeDefault,
}
}

View File

@ -89,6 +89,22 @@ pub struct WorkspaceApi {
impl WorkspaceApi {
pub async fn new(config: ServerConfig, store: Arc<dyn ControlPlaneStore>) -> Result<Self> {
let execution_backend = WorkerRuntimeExecutionBackend::from_workspace(
config.workspace_root.clone(),
)
.map_err(|err| {
crate::Error::Store(format!(
"failed to initialize embedded Worker backend: {err}"
))
})?;
Self::new_with_execution_backend(config, store, Arc::new(execution_backend)).await
}
async fn new_with_execution_backend(
config: ServerConfig,
store: Arc<dyn ControlPlaneStore>,
execution_backend: Arc<dyn worker_runtime::execution::WorkerExecutionBackend>,
) -> Result<Self> {
store
.upsert_workspace(&WorkspaceRecord {
workspace_id: config.workspace_id.clone(),
@ -98,18 +114,10 @@ impl WorkspaceApi {
updated_at: config.workspace_created_at.clone(),
})
.await?;
let execution_backend = WorkerRuntimeExecutionBackend::from_workspace(
config.workspace_root.clone(),
)
.map_err(|err| {
crate::Error::Store(format!(
"failed to initialize embedded Worker backend: {err}"
))
})?;
let mut runtime = RuntimeRegistry::for_workspace(
EmbeddedWorkerRuntime::new_memory_with_execution_backend(
config.workspace_id.clone(),
Arc::new(execution_backend),
execution_backend,
)
.map_err(|err| {
crate::Error::Store(format!("invalid embedded Worker backend: {err}"))
@ -228,7 +236,6 @@ pub async fn serve(
pub struct WorkspaceResponse {
pub workspace_id: String,
pub display_name: String,
pub local_root: PathBuf,
pub record_authority: String,
pub schema_version: i64,
pub auth: AuthConfig,
@ -247,6 +254,7 @@ pub struct ExtensionPoints {
pub struct ExtensionPointState {
pub status: String,
pub note: String,
pub diagnostics: Vec<RuntimeDiagnostic>,
}
#[derive(Debug, Serialize, Deserialize)]
@ -332,10 +340,11 @@ async fn get_workspace(State(api): State<WorkspaceApi>) -> ApiResult<Json<Worksp
.as_ref()
.map(|record| record.display_name.clone())
.unwrap_or_else(|| api.config.workspace_display_name.clone());
let companion_status = api.companion.status();
let companion_console = companion_console_extension_point(&companion_status);
Ok(Json(WorkspaceResponse {
workspace_id: api.config.workspace_id.clone(),
display_name,
local_root: api.config.workspace_root.clone(),
record_authority: "local_yoi_project_records".to_string(),
schema_version,
auth: api.config.auth.clone(),
@ -344,19 +353,48 @@ async fn get_workspace(State(api): State<WorkspaceApi>) -> ApiResult<Json<Worksp
event_stream: ExtensionPointState {
status: "backend_proxy".to_string(),
note: "Worker observation streams are exposed only through the Workspace server proxy keyed by runtime_id + worker_id; browser clients never receive raw Runtime endpoints or socket paths.".to_string(),
diagnostics: Vec::new(),
},
host_worker_bridge: ExtensionPointState {
status: "runtime_registry".to_string(),
note: "Hosts and Workers are projected from the Workspace RuntimeRegistry; raw Runtime endpoints, sockets, and local metadata paths are not exposed.".to_string(),
diagnostics: Vec::new(),
},
companion_console: ExtensionPointState {
status: "not_connected".to_string(),
note: "Workspace Companion is visible as an embedded Worker, but browser input is disabled until actual Worker/LLM execution is connected.".to_string(),
},
companion_console,
},
}))
}
fn companion_console_extension_point(status: &CompanionStatusResponse) -> ExtensionPointState {
let completion = status.transport.completion.clone();
let note = match completion.as_str() {
"connected" => "Workspace Companion is input-capable and browser input is dispatched through the normal Worker runtime path.".to_string(),
"not_input_capable" => {
let diagnostic_codes = status
.diagnostics
.iter()
.map(|diagnostic| diagnostic.code.as_str())
.collect::<Vec<_>>()
.join(", ");
if diagnostic_codes.is_empty() {
"Workspace Companion is not input-capable; check provider, config, profile, secret, and authority diagnostics.".to_string()
} else {
format!(
"Workspace Companion is not input-capable; check typed diagnostics: {diagnostic_codes}."
)
}
}
other => format!(
"Workspace Companion transport reports {other}; browser input follows the Companion Worker runtime capability state."
),
};
ExtensionPointState {
status: completion,
note,
diagnostics: status.diagnostics.clone(),
}
}
async fn list_tickets(
State(api): State<WorkspaceApi>,
) -> ApiResult<Json<ListResponse<crate::records::TicketSummary>>> {
@ -1036,6 +1074,64 @@ mod tests {
const TEST_REPOSITORY_ID: &str = "local-0192f0e8-4d84-7d6e-a000-000000000001";
const TEST_CREATED_AT: &str = "2026-06-23T06:43:28Z";
#[derive(Default)]
struct DeterministicExecutionBackend {
contexts: std::sync::Mutex<
std::collections::HashMap<
worker_runtime::identity::WorkerRef,
worker_runtime::execution::WorkerExecutionContext,
>,
>,
}
impl worker_runtime::execution::WorkerExecutionBackend for DeterministicExecutionBackend {
fn backend_id(&self) -> &str {
"deterministic-workspace-server-test"
}
fn spawn_worker(
&self,
request: worker_runtime::execution::WorkerExecutionSpawnRequest,
) -> worker_runtime::execution::WorkerExecutionSpawnResult {
self.contexts
.lock()
.unwrap()
.insert(request.worker_ref.clone(), request.context);
worker_runtime::execution::WorkerExecutionSpawnResult::Connected {
handle: worker_runtime::execution::WorkerExecutionHandle::new(
request.worker_ref,
self.backend_id(),
),
run_state: worker_runtime::execution::WorkerExecutionRunState::Idle,
}
}
fn dispatch_input(
&self,
handle: &worker_runtime::execution::WorkerExecutionHandle,
input: worker_runtime::interaction::WorkerInput,
) -> worker_runtime::execution::WorkerExecutionResult {
let context = self
.contexts
.lock()
.unwrap()
.get(handle.worker_ref())
.cloned()
.expect("execution context");
let content = input.content.clone();
std::thread::spawn(move || {
std::thread::sleep(std::time::Duration::from_millis(25));
let _ = context.publish_protocol_event(protocol::Event::TextDone {
text: format!("server companion echoed: {content}"),
});
});
worker_runtime::execution::WorkerExecutionResult::accepted(
worker_runtime::execution::WorkerExecutionOperation::Input,
worker_runtime::execution::WorkerExecutionRunState::Idle,
)
}
}
fn test_identity() -> WorkspaceIdentity {
WorkspaceIdentity {
workspace_id: TEST_WORKSPACE_ID.to_string(),
@ -1068,6 +1164,24 @@ mod tests {
workspace["extension_points"]["host_worker_bridge"]["status"],
"runtime_registry"
);
let workspace_companion = &workspace["extension_points"]["companion_console"];
assert_ne!(workspace_companion["status"], "not_connected");
assert!(
!workspace_companion["note"]
.as_str()
.unwrap()
.contains("browser input remains disabled"),
"stale Companion Console note returned: {workspace_companion}"
);
if workspace_companion["status"] == "not_input_capable" {
assert!(
!workspace_companion["diagnostics"]
.as_array()
.unwrap()
.is_empty(),
"not_input_capable workspace companion_console lacks typed diagnostics: {workspace_companion}"
);
}
let tickets = get_json(app.clone(), "/api/tickets").await;
assert_eq!(tickets["items"][0]["id"], "00000000001J2");
@ -1161,6 +1275,7 @@ mod tests {
companion_status["transport"]["kind"],
"embedded_worker_runtime"
);
assert_ne!(companion_status["transport"]["completion"], "not_connected");
assert!(!companion_status.to_string().contains("/workspace/demo"));
let companion_message = post_json(
@ -1177,11 +1292,17 @@ mod tests {
.is_empty()
);
assert!(
companion_message["diagnostics"]
!companion_message
.to_string()
.contains("companion_llm_not_connected"),
"legacy non-execution diagnostic leaked: {companion_message}"
);
assert!(
!companion_message["diagnostics"]
.as_array()
.unwrap()
.iter()
.any(|diagnostic| diagnostic["code"] == "companion_llm_not_connected")
.is_empty(),
"missing typed diagnostic for non-input-capable Companion: {companion_message}"
);
assert!(!companion_message.to_string().contains("/workspace/demo"));
@ -1275,6 +1396,92 @@ mod tests {
);
}
#[tokio::test]
async fn legacy_companion_messages_route_dispatches_through_worker_runtime() {
let temp = tempfile::tempdir().unwrap();
let config = ServerConfig::local_dev(temp.path().join("workspace"), test_identity());
let api = WorkspaceApi::new_with_execution_backend(
config,
Arc::new(SqliteWorkspaceStore::in_memory().unwrap()),
Arc::new(DeterministicExecutionBackend::default()),
)
.await
.unwrap();
let app = build_router(api);
let workspace = get_json(app.clone(), "/api/workspace").await;
let workspace_companion = &workspace["extension_points"]["companion_console"];
assert_eq!(workspace_companion["status"], "connected");
assert!(
workspace_companion["diagnostics"]
.as_array()
.unwrap()
.is_empty()
);
assert!(
workspace_companion["note"]
.as_str()
.unwrap()
.contains("normal Worker runtime path")
);
let status = get_json(app.clone(), "/api/companion/status").await;
assert_eq!(status["transport"]["completion"], "connected");
let worker_id = status["worker"]["worker_id"].as_str().unwrap().to_string();
assert_eq!(status["worker"]["profile"], "builtin:companion");
let response = post_json(
app.clone(),
"/api/companion/messages",
serde_json::json!({ "content": "from legacy route" }),
)
.await;
assert_eq!(response["state"], "accepted");
assert_eq!(response["user_item"]["content"], "from legacy route");
assert!(!response.to_string().contains("companion_llm_not_connected"));
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2);
let transcript = loop {
let transcript = get_json(app.clone(), "/api/companion/transcript").await;
let has_assistant = transcript["items"].as_array().unwrap().iter().any(|item| {
item["role"] == "assistant"
&& item["content"] == "server companion echoed: from legacy route"
&& item["source"] == "worker_runtime"
});
if has_assistant {
break transcript;
}
assert!(
std::time::Instant::now() < deadline,
"timed out waiting for server companion transcript: {transcript}"
);
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
};
assert!(
transcript["items"]
.as_array()
.unwrap()
.iter()
.any(|item| { item["role"] == "user" && item["content"] == "from legacy route" })
);
let worker_transcript = get_json(
app,
&format!("/api/runtimes/embedded-worker-runtime/workers/{worker_id}/transcript"),
)
.await;
assert!(
worker_transcript["items"]
.as_array()
.unwrap()
.iter()
.any(|item| {
item["role"] == "assistant"
&& item["content"] == "server companion echoed: from legacy route"
})
);
}
#[tokio::test]
async fn embedded_runtime_api_routes_by_runtime_and_worker_ids_without_leaking_internals() {
let dir = tempfile::tempdir().unwrap();

View File

@ -9,6 +9,7 @@ export type { PodProtocolEvent, PodProtocolMethod, PodProtocolSegment };
export type ExtensionPoint = {
status: string;
note: string;
diagnostics: Diagnostic[];
};
export type WorkspaceResponse = {