diff --git a/crates/worker-runtime/src/http_server.rs b/crates/worker-runtime/src/http_server.rs index 7bd2bdfb..94fbd88b 100644 --- a/crates/worker-runtime/src/http_server.rs +++ b/crates/worker-runtime/src/http_server.rs @@ -960,7 +960,7 @@ mod tests { .await; assert_eq!(response.status(), StatusCode::OK); let detail: RuntimeHttpWorkerResponse = read_json(response).await; - assert_eq!(detail.worker.transcript_len, 1); + assert_eq!(detail.worker.transcript_len, 2); let response = empty_request( app.clone(), @@ -999,7 +999,7 @@ mod tests { assert_eq!(response.status(), StatusCode::OK); let workers: RuntimeHttpWorkersResponse = read_json(response).await; assert_eq!(workers.workers.len(), 1); - assert_eq!(workers.workers[0].transcript_len, 1); + assert_eq!(workers.workers[0].transcript_len, 2); let response = empty_request(app, Method::GET, "/v1/runtime").await; assert_eq!(response.status(), StatusCode::OK); diff --git a/crates/worker-runtime/src/observation.rs b/crates/worker-runtime/src/observation.rs index fc99e918..d9c5bb00 100644 --- a/crates/worker-runtime/src/observation.rs +++ b/crates/worker-runtime/src/observation.rs @@ -6,6 +6,7 @@ use serde::{Deserialize, Serialize}; #[serde(rename_all = "snake_case")] pub enum TranscriptRole { User, + Assistant, System, } diff --git a/crates/worker-runtime/src/runtime.rs b/crates/worker-runtime/src/runtime.rs index 1cec1f33..de98953f 100644 --- a/crates/worker-runtime/src/runtime.rs +++ b/crates/worker-runtime/src/runtime.rs @@ -33,6 +33,10 @@ use tokio::sync::broadcast; static NEXT_RUNTIME_SEQUENCE: AtomicU64 = AtomicU64::new(1); +fn providerless_embedded_response_text() -> &'static str { + "Embedded worker-runtime accepted the message. LLM execution is not connected for this worker yet." +} + /// Concrete embedded Runtime domain entity. /// /// The default implementation is memory-backed and tools/provider-less by @@ -278,6 +282,7 @@ impl Runtime { ); let worker = state.worker_mut(worker_ref)?; + let input_content = input.content; let role = match input.kind { WorkerInputKind::User => TranscriptRole::User, WorkerInputKind::System => TranscriptRole::System, @@ -289,15 +294,61 @@ impl Runtime { sequence: transcript_sequence, worker_ref: worker_ref.clone(), role, - content: input.content, + content: input_content.clone(), event_id, }); let status = worker.status; + #[cfg(feature = "ws-server")] + { + let payload = match role { + TranscriptRole::User => protocol::Event::UserMessage { + segments: vec![protocol::Segment::Text { + content: input_content.clone(), + }], + }, + TranscriptRole::Assistant => protocol::Event::TextDone { + text: input_content.clone(), + }, + TranscriptRole::System => protocol::Event::SystemItem { + item: serde_json::json!({ + "kind": "embedded_worker_system_input", + "content": input_content.clone(), + }), + }, + }; + state.push_worker_observation_event(worker_ref.clone(), payload); + } + let assistant_transcript_sequence = if matches!(role, TranscriptRole::User) { + let assistant_text = providerless_embedded_response_text().to_string(); + let worker = state.worker_mut(worker_ref)?; + let assistant_sequence = worker.next_transcript_sequence; + worker.next_transcript_sequence += 1; + worker.transcript.push(TranscriptEntry { + sequence: assistant_sequence, + worker_ref: worker_ref.clone(), + role: TranscriptRole::Assistant, + content: assistant_text.clone(), + event_id, + }); + #[cfg(feature = "ws-server")] + state.push_worker_observation_event( + worker_ref.clone(), + protocol::Event::TextDone { + text: assistant_text, + }, + ); + Some(assistant_sequence) + } else { + None + }; state.persist_runtime_snapshot()?; state.persist_worker(&worker_ref.worker_id)?; state.persist_event_by_id(event_id)?; state.persist_transcript_entry(&worker_ref.worker_id, transcript_sequence)?; + if let Some(sequence) = assistant_transcript_sequence { + state.persist_transcript_entry(&worker_ref.worker_id, sequence)?; + } Ok(WorkerInteractionAck { worker_ref: worker_ref.clone(), @@ -1361,10 +1412,10 @@ mod tests { let projection = runtime .transcript_projection(&detail.worker_ref, TranscriptQuery::new(0, 2)) .unwrap(); - assert_eq!(projection.total_items, 3); + assert_eq!(projection.total_items, 5); assert_eq!(projection.items.len(), 2); assert_eq!(projection.items[0].content, "hello"); - assert_eq!(projection.items[1].role, TranscriptRole::System); + assert_eq!(projection.items[1].role, TranscriptRole::Assistant); assert_eq!(projection.next_start, Some(2)); let err = runtime diff --git a/crates/workspace-server/src/companion.rs b/crates/workspace-server/src/companion.rs index a1b23d8e..662bcb2a 100644 --- a/crates/workspace-server/src/companion.rs +++ b/crates/workspace-server/src/companion.rs @@ -604,8 +604,9 @@ mod tests { let runtime_transcript = registry .transcript(COMPANION_RUNTIME_ID, &worker.worker_id, 0, 10) .unwrap(); - assert_eq!(runtime_transcript.items.len(), 1); + assert_eq!(runtime_transcript.items.len(), 2); assert_eq!(runtime_transcript.items[0].role, "user"); + assert_eq!(runtime_transcript.items[1].role, "assistant"); let browser_payload = serde_json::to_string(&(status, response)).unwrap(); for forbidden in [ diff --git a/crates/workspace-server/src/hosts.rs b/crates/workspace-server/src/hosts.rs index eecb7b33..4728b5bb 100644 --- a/crates/workspace-server/src/hosts.rs +++ b/crates/workspace-server/src/hosts.rs @@ -140,8 +140,6 @@ pub struct RuntimeCapabilitySummary { pub can_spawn_worker: bool, pub can_stop_worker: bool, pub can_accept_input: bool, - pub can_stream_events: bool, - pub can_read_bounded_transcript: bool, pub has_workspace_fs: bool, pub has_shell: bool, pub has_git: bool, @@ -195,10 +193,8 @@ pub struct WorkerImplementationSummary { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct WorkerCapabilitySummary { pub can_accept_input: bool, - pub can_stream_events: bool, pub can_stop: bool, pub can_spawn_followup: bool, - pub can_read_bounded_transcript: bool, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] @@ -578,7 +574,7 @@ pub trait WorkspaceWorkerRuntime: Send + Sync { fn observation_source( &self, _worker_id: &str, - ) -> Option { + ) -> Option { None } @@ -879,7 +875,7 @@ impl RuntimeRegistry { &self, runtime_id: &str, worker_id: &str, - ) -> Result { + ) -> Result { validate_backend_identifier("runtime_id", runtime_id)?; validate_backend_identifier("worker_id", worker_id)?; let runtime = self.runtime(runtime_id)?; @@ -962,10 +958,8 @@ impl EmbeddedWorkerRuntime { }, capabilities: WorkerCapabilitySummary { can_accept_input: true, - can_stream_events: false, can_stop: false, can_spawn_followup: false, - can_read_bounded_transcript: true, }, diagnostics: vec![diagnostic( "embedded_runtime_projection", @@ -996,10 +990,8 @@ impl EmbeddedWorkerRuntime { }, capabilities: WorkerCapabilitySummary { can_accept_input: true, - can_stream_events: false, can_stop: false, can_spawn_followup: false, - can_read_bounded_transcript: true, }, diagnostics: vec![diagnostic( "embedded_runtime_projection", @@ -1245,6 +1237,24 @@ impl WorkspaceWorkerRuntime for EmbeddedWorkerRuntime { } } + fn observation_source( + &self, + worker_id: &str, + ) -> Option { + let worker_ref = self.worker_ref(worker_id)?; + if self.runtime.worker_detail(&worker_ref).is_err() { + return None; + } + Some(crate::observation::RuntimeObservationSource::embedded( + crate::observation::EmbeddedRuntimeObservationSource { + runtime_id: self.runtime_id.clone(), + worker_id: worker_id.to_string(), + runtime: self.runtime.clone(), + worker_ref, + }, + )) + } + fn send_input(&self, worker_id: &str, request: WorkerInputRequest) -> WorkerInputResult { let Some(worker_ref) = self.worker_ref(worker_id) else { return embedded_input_rejected( @@ -1512,10 +1522,8 @@ impl RemoteWorkerRuntime { }, capabilities: WorkerCapabilitySummary { can_accept_input: true, - can_stream_events: true, can_stop: true, can_spawn_followup: false, - can_read_bounded_transcript: true, }, diagnostics: vec![diagnostic( "remote_runtime_projection", @@ -1546,10 +1554,8 @@ impl RemoteWorkerRuntime { }, capabilities: WorkerCapabilitySummary { can_accept_input: true, - can_stream_events: true, can_stop: true, can_spawn_followup: false, - can_read_bounded_transcript: true, }, diagnostics: vec![diagnostic( "remote_runtime_projection", @@ -1811,13 +1817,15 @@ impl WorkspaceWorkerRuntime for RemoteWorkerRuntime { fn observation_source( &self, worker_id: &str, - ) -> Option { - Some(crate::observation::RuntimeObservationSourceConfig { - runtime_id: self.runtime_id.clone(), - worker_id: worker_id.to_string(), - endpoint: self.ws_endpoint(worker_id), - bearer_token: self.bearer_token.clone(), - }) + ) -> Option { + Some(crate::observation::RuntimeObservationSource::remote_ws( + crate::observation::RuntimeObservationSourceConfig { + runtime_id: self.runtime_id.clone(), + worker_id: worker_id.to_string(), + endpoint: self.ws_endpoint(worker_id), + bearer_token: self.bearer_token.clone(), + }, + )) } fn send_input(&self, worker_id: &str, request: WorkerInputRequest) -> WorkerInputResult { @@ -1871,8 +1879,6 @@ fn embedded_runtime_capabilities(limit: usize, available: bool) -> RuntimeCapabi can_spawn_worker: available, can_stop_worker: false, can_accept_input: available, - can_stream_events: false, - can_read_bounded_transcript: available, has_workspace_fs: false, has_shell: false, has_git: false, @@ -2048,6 +2054,7 @@ fn embedded_transcript_rejected( fn embedded_transcript_role_label(role: TranscriptRole) -> &'static str { match role { TranscriptRole::User => "user", + TranscriptRole::Assistant => "assistant", TranscriptRole::System => "system", } } @@ -2139,8 +2146,6 @@ fn remote_runtime_capabilities(limit: usize, available: bool) -> RuntimeCapabili can_spawn_worker: available, can_stop_worker: available, can_accept_input: available, - can_stream_events: available, - can_read_bounded_transcript: available, has_workspace_fs: false, has_shell: false, has_git: false, @@ -2357,10 +2362,8 @@ pub fn placeholder_worker(host_id: impl Into) -> WorkerSummary { }, capabilities: WorkerCapabilitySummary { can_accept_input: false, - can_stream_events: false, can_stop: false, can_spawn_followup: false, - can_read_bounded_transcript: false, }, diagnostics: vec![diagnostic( "runtime_capability_unsupported", @@ -2450,10 +2453,8 @@ mod tests { }, capabilities: WorkerCapabilitySummary { can_accept_input: false, - can_stream_events: false, can_stop: false, can_spawn_followup: false, - can_read_bounded_transcript: false, }, diagnostics: Vec::new(), }], @@ -2481,8 +2482,6 @@ mod tests { can_spawn_worker: false, can_stop_worker: false, can_accept_input: false, - can_stream_events: false, - can_read_bounded_transcript: false, has_workspace_fs: false, has_shell: false, has_git: false, @@ -2613,7 +2612,6 @@ mod tests { assert_eq!(embedded_summary.source.status, RuntimeSourceStatus::Active); assert!(embedded_summary.capabilities.can_spawn_worker); assert!(embedded_summary.capabilities.can_accept_input); - assert!(embedded_summary.capabilities.can_read_bounded_transcript); let spawned = registry .spawn_worker( @@ -2647,7 +2645,6 @@ mod tests { assert_eq!(worker.implementation.kind, "embedded_worker_runtime"); assert_eq!(worker.profile.as_deref(), Some("builtin:coder")); assert!(worker.capabilities.can_accept_input); - assert!(worker.capabilities.can_read_bounded_transcript); let input = registry .send_input( @@ -2668,9 +2665,15 @@ mod tests { .transcript(EMBEDDED_RUNTIME_ID, &worker.worker_id, 0, 10) .unwrap(); assert_eq!(transcript.state, WorkerOperationState::Accepted); - assert_eq!(transcript.items.len(), 1); + assert_eq!(transcript.items.len(), 2); assert_eq!(transcript.items[0].role, "user"); assert_eq!(transcript.items[0].content, "hello embedded runtime"); + assert_eq!(transcript.items[1].role, "assistant"); + assert!( + transcript.items[1] + .content + .contains("LLM execution is not connected") + ); let json = serde_json::to_string(&(embedded_summary, worker, transcript)).unwrap(); for forbidden in [ @@ -2810,6 +2813,10 @@ mod tests { let observation = registry .observation_source("remote:primary", "worker-remote-1") .expect("remote runtime exposes backend-owned WS observation source"); + let crate::observation::RuntimeObservationSource::RemoteWs(observation) = observation + else { + panic!("remote runtime should expose a remote WS observation source"); + }; assert!(observation.endpoint.starts_with("ws://127.0.0.1:")); assert!( observation diff --git a/crates/workspace-server/src/observation.rs b/crates/workspace-server/src/observation.rs index fce73f30..2be74240 100644 --- a/crates/workspace-server/src/observation.rs +++ b/crates/workspace-server/src/observation.rs @@ -1,6 +1,9 @@ use std::collections::{BTreeMap, VecDeque}; use std::sync::{Arc, Mutex}; +use worker_runtime::identity::WorkerRef; +use worker_runtime::observation::{WorkerObservationCursor, WorkerObservationEvent}; + use axum::http::StatusCode; use futures::{SinkExt, StreamExt}; use serde::{Deserialize, Serialize}; @@ -32,6 +35,66 @@ impl std::fmt::Debug for RuntimeObservationSourceConfig { } } +#[derive(Clone)] +pub struct EmbeddedRuntimeObservationSource { + pub runtime_id: String, + pub worker_id: String, + pub runtime: worker_runtime::Runtime, + pub worker_ref: WorkerRef, +} + +#[derive(Clone)] +pub enum RuntimeObservationSource { + RemoteWs(RuntimeObservationSourceConfig), + Embedded(EmbeddedRuntimeObservationSource), +} + +impl RuntimeObservationSource { + pub fn remote_ws(config: RuntimeObservationSourceConfig) -> Self { + Self::RemoteWs(config) + } + + pub fn embedded(source: EmbeddedRuntimeObservationSource) -> Self { + Self::Embedded(source) + } + + pub fn runtime_id(&self) -> &str { + match self { + Self::RemoteWs(config) => &config.runtime_id, + Self::Embedded(source) => &source.runtime_id, + } + } + + pub fn worker_id(&self) -> &str { + match self { + Self::RemoteWs(config) => &config.worker_id, + Self::Embedded(source) => &source.worker_id, + } + } +} + +impl std::fmt::Debug for RuntimeObservationSource { + fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::RemoteWs(config) => formatter + .debug_struct("RemoteRuntimeObservationSource") + .field("runtime_id", &config.runtime_id) + .field("worker_id", &config.worker_id) + .field("endpoint", &"") + .field( + "bearer_token", + &config.bearer_token.as_ref().map(|_| ""), + ) + .finish(), + Self::Embedded(source) => formatter + .debug_struct("EmbeddedRuntimeObservationSource") + .field("runtime_id", &source.runtime_id) + .field("worker_id", &source.worker_id) + .finish(), + } + } +} + /// Event consumed from a Runtime-owned worker observation WebSocket. #[derive(Clone, Debug, Serialize, Deserialize)] pub struct RuntimeObservationUpstreamEvent { @@ -234,13 +297,14 @@ impl BackendObservationProxy { &self, runtime_id: &str, worker_id: &str, - ) -> Result { + ) -> Result { self.sources .get(&ObservationKey { runtime_id: runtime_id.to_string(), worker_id: worker_id.to_string(), }) .cloned() + .map(RuntimeObservationSource::remote_ws) .ok_or_else(|| { ObservationProxyError::WorkerNotFound(format!( "worker {worker_id} is not registered for runtime {runtime_id}" @@ -499,6 +563,173 @@ impl RuntimeWsObservationClient { } } +pub enum RuntimeObservationClient { + RemoteWs(RuntimeWsObservationClient), + Embedded(EmbeddedObservationClient), +} + +impl RuntimeObservationClient { + pub async fn connect( + source: &RuntimeObservationSource, + runtime_cursor: Option<&str>, + ) -> Result { + match source { + RuntimeObservationSource::RemoteWs(config) => { + RuntimeWsObservationClient::connect(config, runtime_cursor) + .await + .map(Self::RemoteWs) + } + RuntimeObservationSource::Embedded(source) => { + EmbeddedObservationClient::connect(source, runtime_cursor).map(Self::Embedded) + } + } + } + + pub async fn next_event( + &mut self, + ) -> Result { + match self { + Self::RemoteWs(client) => client.next_event().await, + Self::Embedded(client) => client.next_event().await, + } + } +} + +pub struct EmbeddedObservationClient { + runtime_id: String, + worker_id: String, + worker_ref: WorkerRef, + cursor: WorkerObservationCursor, + receiver: tokio::sync::broadcast::Receiver, + queued: VecDeque, +} + +impl EmbeddedObservationClient { + fn connect( + source: &EmbeddedRuntimeObservationSource, + runtime_cursor: Option<&str>, + ) -> Result { + let cursor = match runtime_cursor { + Some(raw) => WorkerObservationCursor::decode(raw).ok_or_else(|| { + ObservationProxyError::CursorMalformed( + "embedded runtime cursor is malformed".into(), + ) + })?, + None => source + .runtime + .worker_observation_cursor_now(&source.worker_ref) + .map_err(|err| { + ObservationProxyError::WorkerNotFound(format!( + "embedded Worker '{}' is not observable: {err}", + source.worker_id + )) + })?, + }; + let receiver = source + .runtime + .subscribe_worker_observation() + .map_err(|err| { + ObservationProxyError::WorkerNotFound(format!( + "embedded Worker '{}' observation subscription is unavailable: {err}", + source.worker_id + )) + })?; + let mut queued = VecDeque::new(); + if runtime_cursor.is_none() { + let snapshot = source + .runtime + .worker_observation_snapshot(&source.worker_ref) + .map_err(|err| { + ObservationProxyError::WorkerNotFound(format!( + "embedded Worker '{}' snapshot is unavailable: {err}", + source.worker_id + )) + })?; + queued.push_back(RuntimeObservationUpstreamEvent { + runtime_id: source.runtime_id.clone(), + worker_id: source.worker_id.clone(), + runtime_cursor: cursor.encode(), + payload: snapshot, + }); + } + for event in source + .runtime + .read_worker_observation_events(&source.worker_ref, cursor) + .map_err(|err| { + ObservationProxyError::CursorUnknownOrExpired(format!( + "embedded Worker '{}' cursor is unavailable: {err}", + source.worker_id + )) + })? + { + queued.push_back(Self::map_event( + &source.runtime_id, + &source.worker_id, + event, + )); + } + Ok(Self { + runtime_id: source.runtime_id.clone(), + worker_id: source.worker_id.clone(), + worker_ref: source.worker_ref.clone(), + cursor, + receiver, + queued, + }) + } + + async fn next_event( + &mut self, + ) -> Result { + if let Some(event) = self.queued.pop_front() { + if let Some(cursor) = WorkerObservationCursor::decode(&event.runtime_cursor) { + self.cursor = cursor; + } + return Ok(event); + } + loop { + match self.receiver.recv().await { + Ok(event) + if event.worker_ref == self.worker_ref + && event.sequence > self.cursor.sequence => + { + self.cursor = + WorkerObservationCursor::decode(&event.cursor).ok_or_else(|| { + ObservationProxyError::CursorMalformed( + "embedded runtime emitted a malformed cursor".into(), + ) + })?; + return Ok(Self::map_event(&self.runtime_id, &self.worker_id, event)); + } + Ok(_) => continue, + Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => { + return Err(ObservationProxyError::CursorUnknownOrExpired( + "embedded runtime observation backlog was exceeded".into(), + )); + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => { + return Err(ObservationProxyError::UpstreamDisconnect( + "embedded runtime observation stream closed".into(), + )); + } + } + } + } + + fn map_event( + runtime_id: &str, + worker_id: &str, + event: WorkerObservationEvent, + ) -> RuntimeObservationUpstreamEvent { + RuntimeObservationUpstreamEvent { + runtime_id: runtime_id.to_string(), + worker_id: worker_id.to_string(), + runtime_cursor: event.cursor, + payload: event.payload, + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/workspace-server/src/server.rs b/crates/workspace-server/src/server.rs index ceae3612..c5ab71d3 100644 --- a/crates/workspace-server/src/server.rs +++ b/crates/workspace-server/src/server.rs @@ -26,7 +26,7 @@ use crate::hosts::{ use crate::identity::WorkspaceIdentity; use crate::observation::{ BackendObservationProxy, ClientWorkerEventWsFrame, ClientWorkerEventsWsQuery, - ObservationProxyError, RuntimeObservationSourceConfig, RuntimeWsObservationClient, + ObservationProxyError, RuntimeObservationClient, RuntimeObservationSourceConfig, }; use crate::records::{ LocalProjectRecordReader, ObjectiveDetail, ProjectRecordList, TicketDetail, TicketSummary, @@ -676,13 +676,13 @@ async fn worker_observation_ws( async fn worker_observation_ws_session( proxy: BackendObservationProxy, - source: RuntimeObservationSourceConfig, + source: crate::observation::RuntimeObservationSource, query: ClientWorkerEventsWsQuery, mut socket: WebSocket, ) { let open = match proxy.open( - &source.runtime_id, - &source.worker_id, + source.runtime_id(), + source.worker_id(), query.cursor.as_deref(), ) { Ok(open) => open, @@ -703,7 +703,7 @@ async fn worker_observation_ws_session( } let mut upstream = - match RuntimeWsObservationClient::connect(&source, open.runtime_cursor.as_deref()).await { + match RuntimeObservationClient::connect(&source, open.runtime_cursor.as_deref()).await { Ok(client) => client, Err(error) => { let _ = diff --git a/web/workspace/src/app.css b/web/workspace/src/app.css index e5e6f1d2..632ee393 100644 --- a/web/workspace/src/app.css +++ b/web/workspace/src/app.css @@ -972,14 +972,6 @@ font-weight: 700; } -.degrade-note { - border: 1px solid var(--line); - border-radius: 12px; - padding: 0.65rem 0.8rem; - background: var(--bg-raised); - color: var(--warning); -} - .console-composer { display: grid; gap: var(--space-3); diff --git a/web/workspace/src/lib/workspace-console/worker-console.ui.test.ts b/web/workspace/src/lib/workspace-console/worker-console.ui.test.ts index 20d2e14d..d2569f92 100644 --- a/web/workspace/src/lib/workspace-console/worker-console.ui.test.ts +++ b/web/workspace/src/lib/workspace-console/worker-console.ui.test.ts @@ -70,11 +70,6 @@ Deno.test("Worker Console page is routed by runtime_id and worker_id through bac !consolePage.includes("/api/companion"), "Console page must not use Companion-specific APIs", ); - assert( - consolePage.includes("streaming observation is not available") || - consolePage.includes("Streaming observation is not available"), - "Console should show an explicit non-streaming degradation path", - ); assert( consolePage.includes("function advanceReloadToken()") && consolePage.includes("nextReloadToken += 1") && diff --git a/web/workspace/src/lib/workspace-sidebar/types.ts b/web/workspace/src/lib/workspace-sidebar/types.ts index 8e039a44..305e9da1 100644 --- a/web/workspace/src/lib/workspace-sidebar/types.ts +++ b/web/workspace/src/lib/workspace-sidebar/types.ts @@ -35,8 +35,6 @@ export type RuntimeCapabilities = { can_spawn_worker: boolean; can_stop_worker: boolean; can_accept_input: boolean; - can_stream_events: boolean; - can_read_bounded_transcript: boolean; has_workspace_fs: boolean; has_shell: boolean; has_git: boolean; @@ -72,10 +70,8 @@ export type Host = { export type WorkerCapabilities = { can_accept_input: boolean; - can_stream_events: boolean; can_stop: boolean; can_spawn_followup: boolean; - can_read_bounded_transcript: boolean; }; export type Worker = { diff --git a/web/workspace/src/routes/runtimes/[runtimeId]/workers/[workerId]/console/+page.svelte b/web/workspace/src/routes/runtimes/[runtimeId]/workers/[workerId]/console/+page.svelte index 6fb7fda6..0a743e1e 100644 --- a/web/workspace/src/routes/runtimes/[runtimeId]/workers/[workerId]/console/+page.svelte +++ b/web/workspace/src/routes/runtimes/[runtimeId]/workers/[workerId]/console/+page.svelte @@ -36,7 +36,7 @@ let draft = $state(''); let sending = $state(false); let sendError = $state(null); - let streamState = $state<'connecting' | 'open' | 'unsupported' | 'closed' | 'error'>('connecting'); + let streamState = $state<'connecting' | 'open' | 'closed' | 'error'>('connecting'); let streamDiagnostics = $state([]); let observedEvents = $state>([]); let nextReloadToken = 0; @@ -60,11 +60,6 @@ mergeDiagnostics(worker?.diagnostics ?? [], transcript?.diagnostics ?? [], streamDiagnostics) ); const canSend = $derived(Boolean(worker?.capabilities.can_accept_input) && draft.trim().length > 0 && !sending); - const transcriptOnly = $derived( - worker && !worker.capabilities.can_stream_events - ? 'Streaming observation is not available for this Worker. Console is using bounded transcript plus manual refresh.' - : null - ); async function getJson(path: string): Promise { const response = await fetch(path); @@ -180,18 +175,6 @@ streamState = 'closed'; return; } - if (!targetWorker.capabilities.can_stream_events) { - streamState = 'unsupported'; - streamDiagnostics = [ - { - code: 'worker_streaming_unsupported', - severity: 'info', - message: 'This Worker does not expose backend-proxied observation streaming; transcript refresh remains available.' - } - ]; - return; - } - streamState = 'connecting'; const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:'; const ws = new WebSocket( @@ -247,8 +230,8 @@ ...streamDiagnostics, { code: 'worker_observation_ws_error', - severity: 'warning', - message: 'Backend observation WebSocket failed; transcript refresh remains available.' + severity: 'error', + message: 'Worker observation WebSocket failed.' } ]; } @@ -334,9 +317,6 @@ {#if transcriptError}

{transcriptError}

{/if} - {#if transcriptOnly} -

{transcriptOnly}

- {/if} {#if lines.length === 0}

No transcript items or observation events are available for this Worker yet.

@@ -395,8 +375,6 @@ Capabilities
  • input: {worker.capabilities.can_accept_input ? 'available' : 'unsupported'}
  • -
  • stream: {worker.capabilities.can_stream_events ? 'available' : 'unsupported'}
  • -
  • bounded transcript: {worker.capabilities.can_read_bounded_transcript ? 'available' : 'unsupported'}
  • stop: {worker.capabilities.can_stop ? 'available' : 'unsupported'}
  • follow-up spawn: {worker.capabilities.can_spawn_followup ? 'available' : 'unsupported'}