diff --git a/Cargo.lock b/Cargo.lock index f81268a1..f651f90f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -471,13 +471,17 @@ checksum = "c8d4a3bb8b1e0c1050499d1815f5ab16d04f0959b233085fb31653fbfc9d98f9" name = "client" version = "0.1.0" dependencies = [ + "futures", "manifest", "protocol", + "reqwest", + "serde", "serde_json", "tempfile", "thiserror 2.0.18", "ticket", "tokio", + "tokio-tungstenite 0.29.0", "uuid", ] @@ -1068,7 +1072,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] @@ -2539,7 +2543,7 @@ version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -3490,7 +3494,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys 0.12.1", - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] @@ -3547,7 +3551,7 @@ dependencies = [ "security-framework", "security-framework-sys", "webpki-root-certs", - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] @@ -4157,7 +4161,7 @@ dependencies = [ "getrandom 0.4.2", "once_cell", "rustix 1.1.4", - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] @@ -5494,7 +5498,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] diff --git a/crates/client/Cargo.toml b/crates/client/Cargo.toml index 86950d8f..2487f248 100644 --- a/crates/client/Cargo.toml +++ b/crates/client/Cargo.toml @@ -8,9 +8,13 @@ license.workspace = true protocol = { workspace = true } manifest = { workspace = true } ticket = { workspace = true } +futures = { workspace = true } +reqwest = { version = "0.13", default-features = false, features = ["json", "native-tls"] } +serde = { workspace = true } serde_json = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true, features = ["rt", "macros", "net", "io-util", "sync", "time", "process", "fs"] } +tokio-tungstenite = { workspace = true } uuid = { workspace = true } [dev-dependencies] diff --git a/crates/client/src/backend_runtime.rs b/crates/client/src/backend_runtime.rs new file mode 100644 index 00000000..9636af32 --- /dev/null +++ b/crates/client/src/backend_runtime.rs @@ -0,0 +1,694 @@ +use std::collections::VecDeque; +use std::fmt; +use std::time::Duration; + +use futures::StreamExt; +use protocol::{ErrorCode, Event, Greeting, InFlightSnapshot, Method, Segment, WorkerStatus}; +use serde::{Deserialize, Serialize}; +use tokio::sync::mpsc; +use tokio_tungstenite::connect_async; +use tokio_tungstenite::tungstenite::Message as TungsteniteMessage; + +const TRANSCRIPT_SNAPSHOT_LIMIT: usize = 512; +const RECONNECT_DELAY: Duration = Duration::from_millis(500); +const MAX_RECONNECT_ATTEMPTS: usize = 3; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct BackendRuntimeTarget { + /// Workspace Backend API root URL, for example `http://127.0.0.1:8787`. + /// This is intentionally the Backend endpoint, not a Runtime endpoint. + pub base_url: String, + /// Backend-owned Runtime identity used as path authority. + pub runtime_id: String, + /// Backend-owned Worker identity used as path authority. + pub worker_id: String, +} + +impl BackendRuntimeTarget { + pub fn new( + base_url: impl Into, + runtime_id: impl Into, + worker_id: impl Into, + ) -> Self { + Self { + base_url: base_url.into(), + runtime_id: runtime_id.into(), + worker_id: worker_id.into(), + } + } + + pub fn display_label(&self) -> String { + format!("{}:{}", self.runtime_id, self.worker_id) + } +} + +#[derive(Debug)] +pub struct BackendRuntimeClient { + target: BackendRuntimeTarget, + http: reqwest::Client, + events: mpsc::UnboundedReceiver, + diagnostics: VecDeque, + _observation_task: tokio::task::JoinHandle<()>, +} + +#[derive(Debug)] +pub enum BackendRuntimeClientError { + InvalidTarget(String), + Http(reqwest::Error), +} + +impl fmt::Display for BackendRuntimeClientError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::InvalidTarget(message) => f.write_str(message), + Self::Http(error) => write!(f, "{error}"), + } + } +} + +impl std::error::Error for BackendRuntimeClientError {} + +impl From for BackendRuntimeClientError { + fn from(error: reqwest::Error) -> Self { + Self::Http(error) + } +} + +impl BackendRuntimeClient { + pub async fn connect(target: BackendRuntimeTarget) -> Result { + validate_target(&target)?; + let http = reqwest::Client::new(); + let (tx, rx) = mpsc::unbounded_channel(); + + let suppress_initial_snapshot = match load_initial_transcript(&http, &target).await { + Ok(events) => { + for event in events { + let _ = tx.send(event); + } + true + } + Err(error) => { + let _ = tx.send(diagnostic_event(format!( + "Backend initial transcript unavailable for {}: {error}", + target.display_label() + ))); + false + } + }; + + let observation_target = target.clone(); + let observation_tx = tx.clone(); + let observation_task = tokio::spawn(async move { + observe_worker_events( + observation_target, + observation_tx, + suppress_initial_snapshot, + ) + .await; + }); + + Ok(Self { + target, + http, + events: rx, + diagnostics: VecDeque::new(), + _observation_task: observation_task, + }) + } + + pub fn try_next_event(&mut self) -> Option { + if let Some(event) = self.diagnostics.pop_front() { + return Some(event); + } + self.events.try_recv().ok() + } + + pub async fn next_event(&mut self) -> Option { + if let Some(event) = self.diagnostics.pop_front() { + return Some(event); + } + self.events.recv().await + } + + pub async fn send(&mut self, method: &Method) -> Result<(), BackendRuntimeClientError> { + match backend_command_from_method(method) { + BackendCommand::Input { kind, content } => { + let url = self.worker_api_url("input"); + match self + .http + .post(url) + .json(&WorkerInputRequest { kind, content }) + .send() + .await + .and_then(|response| response.error_for_status()) + { + Ok(response) => match response.json::().await { + Ok(result) => self.enqueue_operation_diagnostics( + "input", + result.state, + result.diagnostics, + ), + Err(error) => self.enqueue_diagnostic(format!( + "Backend runtime input response could not be decoded for {}: {error}", + self.target.display_label() + )), + }, + Err(error) => self.enqueue_diagnostic(format!( + "Backend runtime input failed for {}: {error}", + self.target.display_label() + )), + } + } + BackendCommand::Lifecycle { action, reason } => { + let url = self.worker_api_url(action); + match self + .http + .post(url) + .json(&WorkerLifecycleRequest { reason }) + .send() + .await + .and_then(|response| response.error_for_status()) + { + Ok(response) => match response.json::().await { + Ok(result) => self.enqueue_operation_diagnostics( + action, + result.state, + result.diagnostics, + ), + Err(error) => self.enqueue_diagnostic(format!( + "Backend runtime {action} response could not be decoded for {}: {error}", + self.target.display_label() + )), + }, + Err(error) => self.enqueue_diagnostic(format!( + "Backend runtime {action} failed for {}: {error}", + self.target.display_label() + )), + } + } + BackendCommand::Unsupported(message) => { + self.enqueue_diagnostic(message); + } + } + Ok(()) + } + + fn worker_api_url(&self, suffix: &str) -> String { + let path = format!( + "/api/runtimes/{}/workers/{}/{}", + path_segment_encode(&self.target.runtime_id), + path_segment_encode(&self.target.worker_id), + suffix + ); + join_base_and_path(&self.target.base_url, &path) + } + + fn enqueue_operation_diagnostics( + &mut self, + operation: &str, + state: String, + diagnostics: Vec, + ) { + if state != "accepted" { + self.enqueue_diagnostic(format!( + "Backend runtime {operation} was {state} for {}", + self.target.display_label() + )); + } + for diagnostic in diagnostics { + self.enqueue_diagnostic(format!( + "Backend runtime {operation} diagnostic [{}]: {}", + diagnostic.code, diagnostic.message + )); + } + } + + fn enqueue_diagnostic(&mut self, message: impl Into) { + self.diagnostics.push_back(diagnostic_event(message)); + } +} + +impl Drop for BackendRuntimeClient { + fn drop(&mut self) { + self._observation_task.abort(); + } +} + +#[derive(Debug, PartialEq, Eq)] +enum BackendCommand { + Input { + kind: WorkerInputKind, + content: String, + }, + Lifecycle { + action: &'static str, + reason: Option, + }, + Unsupported(String), +} + +fn backend_command_from_method(method: &Method) -> BackendCommand { + match method { + Method::Run { input } => BackendCommand::Input { + kind: WorkerInputKind::User, + content: Segment::flatten_to_text(input), + }, + Method::Notify { message, .. } => BackendCommand::Input { + kind: WorkerInputKind::System, + content: message.clone(), + }, + Method::Cancel => BackendCommand::Lifecycle { + action: "cancel", + reason: Some("requested from TUI Backend Runtime API client".to_string()), + }, + Method::Shutdown => BackendCommand::Lifecycle { + action: "stop", + reason: Some("requested from TUI Backend Runtime API client".to_string()), + }, + Method::Pause => BackendCommand::Unsupported( + "Backend Runtime API does not expose pause/resume for the TUI client yet; command was not sent".to_string(), + ), + Method::Resume => BackendCommand::Unsupported( + "Backend Runtime API does not expose resume for the TUI client yet; command was not sent".to_string(), + ), + Method::Compact => BackendCommand::Unsupported( + "Backend Runtime API does not expose compaction for the TUI client yet; command was not sent".to_string(), + ), + Method::ListCompletions { .. } => BackendCommand::Unsupported( + "Backend Runtime API does not expose completion lookup for the TUI client yet".to_string(), + ), + Method::ListRewindTargets | Method::RewindTo { .. } => BackendCommand::Unsupported( + "Backend Runtime API does not expose rewind controls for the TUI client yet; command was not sent".to_string(), + ), + Method::ListWorkers | Method::RestoreWorker { .. } | Method::RegisterPeer { .. } => { + BackendCommand::Unsupported( + "Backend Runtime API worker-management controls are not available from this Console connection".to_string(), + ) + } + Method::WorkerEvent(_) => BackendCommand::Unsupported( + "Backend Runtime API does not accept child Worker lifecycle events from this Console connection".to_string(), + ), + } +} + +async fn load_initial_transcript( + http: &reqwest::Client, + target: &BackendRuntimeTarget, +) -> Result, BackendRuntimeClientError> { + let path = format!( + "/api/runtimes/{}/workers/{}/transcript?start=0&limit={TRANSCRIPT_SNAPSHOT_LIMIT}", + path_segment_encode(&target.runtime_id), + path_segment_encode(&target.worker_id) + ); + let response = http + .get(join_base_and_path(&target.base_url, &path)) + .send() + .await? + .error_for_status()?; + let transcript: WorkerTranscriptProjection = response.json().await?; + Ok(transcript_projection_to_events(target, transcript)) +} + +fn transcript_projection_to_events( + target: &BackendRuntimeTarget, + transcript: WorkerTranscriptProjection, +) -> Vec { + let mut events = vec![Event::Snapshot { + entries: Vec::new(), + greeting: Greeting { + worker_name: target.worker_id.clone(), + cwd: String::new(), + provider: "backend-runtime-api".to_string(), + model: target.runtime_id.clone(), + scope_summary: "Backend Runtime API worker observation".to_string(), + tools: Vec::new(), + context_window: 0, + context_tokens: 0, + }, + status: WorkerStatus::Idle, + in_flight: InFlightSnapshot { blocks: Vec::new() }, + }]; + + for item in transcript.items { + match item.role.as_str() { + "user" => events.push(Event::UserMessage { + segments: vec![Segment::text(item.content)], + }), + "assistant" => { + events.push(Event::TextDelta { + text: item.content.clone(), + }); + events.push(Event::TextDone { text: item.content }); + } + role => events.push(Event::Alert(protocol::Alert { + level: protocol::AlertLevel::Warn, + source: protocol::AlertSource::Worker, + message: format!( + "Backend transcript item with role `{role}` is not rendered as chat content" + ), + timestamp_ms: 0, + })), + } + } + + for diagnostic in transcript.diagnostics { + events.push(diagnostic_event(format!( + "Backend transcript diagnostic [{}]: {}", + diagnostic.code, diagnostic.message + ))); + } + events +} + +async fn observe_worker_events( + target: BackendRuntimeTarget, + tx: mpsc::UnboundedSender, + mut suppress_next_snapshot: bool, +) { + let mut cursor: Option = None; + let mut last_sequence = 0_u64; + let mut attempts = 0_usize; + + loop { + let url = observation_ws_url(&target, cursor.as_deref()); + match connect_async(&url).await { + Ok((mut ws, _)) => { + attempts = 0; + while let Some(frame) = ws.next().await { + match frame { + Ok(TungsteniteMessage::Text(text)) => { + match serde_json::from_str::(&text) { + Ok(ClientWorkerEventWsFrame::Event { envelope }) => { + if envelope.runtime_id != target.runtime_id + || envelope.worker_id != target.worker_id + { + let _ = tx.send(diagnostic_event(format!( + "Backend observation frame target mismatch: got {}:{}, expected {}", + envelope.runtime_id, + envelope.worker_id, + target.display_label() + ))); + continue; + } + if let Some(sequence) = decode_backend_cursor(&envelope.cursor) + { + if sequence <= last_sequence { + continue; + } + last_sequence = sequence; + } else { + let _ = tx.send(diagnostic_event(format!( + "Backend observation cursor was malformed: {}", + envelope.cursor + ))); + } + cursor = Some(envelope.cursor.clone()); + if suppress_next_snapshot + && matches!(envelope.payload, Event::Snapshot { .. }) + { + suppress_next_snapshot = false; + continue; + } + let _ = tx.send(envelope.payload); + } + Ok(ClientWorkerEventWsFrame::Diagnostic { diagnostic }) => { + let message = format!( + "Backend observation diagnostic [{}]: {}", + diagnostic.code, diagnostic.message + ); + let _ = tx.send(diagnostic_event(message)); + if diagnostic.code == "backend.cursor_unknown_or_expired" { + cursor = None; + last_sequence = 0; + break; + } + } + Err(error) => { + let _ = tx.send(diagnostic_event(format!( + "Backend observation frame was not valid JSON: {error}" + ))); + } + } + } + Ok(TungsteniteMessage::Close(_)) => break, + Ok(TungsteniteMessage::Ping(_)) + | Ok(TungsteniteMessage::Pong(_)) + | Ok(TungsteniteMessage::Binary(_)) + | Ok(TungsteniteMessage::Frame(_)) => {} + Err(error) => { + let _ = tx.send(diagnostic_event(format!( + "Backend observation WebSocket error for {}: {error}", + target.display_label() + ))); + break; + } + } + } + } + Err(error) => { + let _ = tx.send(diagnostic_event(format!( + "Backend observation WebSocket connect failed for {}: {error}", + target.display_label() + ))); + } + } + + attempts += 1; + if attempts > MAX_RECONNECT_ATTEMPTS { + let _ = tx.send(diagnostic_event(format!( + "Backend observation stream for {} stopped after {MAX_RECONNECT_ATTEMPTS} reconnect attempts", + target.display_label() + ))); + break; + } + tokio::time::sleep(RECONNECT_DELAY).await; + } +} + +fn diagnostic_event(message: impl Into) -> Event { + Event::Error { + code: ErrorCode::Internal, + message: message.into(), + } +} + +fn validate_target(target: &BackendRuntimeTarget) -> Result<(), BackendRuntimeClientError> { + if target.base_url.trim().is_empty() { + return Err(BackendRuntimeClientError::InvalidTarget( + "Backend API base URL is required".to_string(), + )); + } + if !(target.base_url.starts_with("http://") || target.base_url.starts_with("https://")) { + return Err(BackendRuntimeClientError::InvalidTarget( + "Backend API base URL must start with http:// or https://".to_string(), + )); + } + if target.runtime_id.is_empty() { + return Err(BackendRuntimeClientError::InvalidTarget( + "runtime_id is required".to_string(), + )); + } + if target.worker_id.is_empty() { + return Err(BackendRuntimeClientError::InvalidTarget( + "worker_id is required".to_string(), + )); + } + Ok(()) +} + +fn observation_ws_url(target: &BackendRuntimeTarget, cursor: Option<&str>) -> String { + let path = format!( + "/api/runtimes/{}/workers/{}/events/ws", + path_segment_encode(&target.runtime_id), + path_segment_encode(&target.worker_id) + ); + let mut url = join_base_and_path(&http_base_to_ws(&target.base_url), &path); + if let Some(cursor) = cursor { + url.push_str("?cursor="); + url.push_str(&query_value_encode(cursor)); + } + url +} + +fn http_base_to_ws(base: &str) -> String { + if let Some(rest) = base.strip_prefix("https://") { + format!("wss://{rest}") + } else if let Some(rest) = base.strip_prefix("http://") { + format!("ws://{rest}") + } else { + base.to_string() + } +} + +fn join_base_and_path(base: &str, path: &str) -> String { + format!("{}{}", base.trim_end_matches('/'), path) +} + +fn decode_backend_cursor(cursor: &str) -> Option { + let encoded = cursor.strip_prefix("bo_")?; + if encoded.len() != 16 { + return None; + } + u64::from_str_radix(encoded, 16).ok() +} + +fn path_segment_encode(input: &str) -> String { + percent_encode(input, |byte| { + byte.is_ascii_alphanumeric() || matches!(byte, b'-' | b'.' | b'_' | b'~') + }) +} + +fn query_value_encode(input: &str) -> String { + percent_encode(input, |byte| { + byte.is_ascii_alphanumeric() || matches!(byte, b'-' | b'.' | b'_' | b'~') + }) +} + +fn percent_encode(input: &str, keep: impl Fn(u8) -> bool) -> String { + let mut encoded = String::with_capacity(input.len()); + for byte in input.bytes() { + if keep(byte) { + encoded.push(byte as char); + } else { + encoded.push('%'); + encoded.push_str(&format!("{byte:02X}")); + } + } + encoded +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +enum WorkerInputKind { + User, + System, +} + +#[derive(Debug, Serialize)] +struct WorkerInputRequest { + kind: WorkerInputKind, + content: String, +} + +#[derive(Debug, Serialize)] +struct WorkerLifecycleRequest { + reason: Option, +} + +#[derive(Debug, Deserialize)] +struct WorkerInputResult { + state: String, + #[serde(default)] + diagnostics: Vec, +} + +#[derive(Debug, Deserialize)] +struct WorkerLifecycleResult { + state: String, + #[serde(default)] + diagnostics: Vec, +} + +#[derive(Debug, Deserialize)] +struct BackendDiagnostic { + code: String, + message: String, +} + +#[derive(Debug, Deserialize)] +struct WorkerTranscriptProjection { + #[serde(default)] + items: Vec, + #[serde(default)] + diagnostics: Vec, +} + +#[derive(Debug, Deserialize)] +struct WorkerTranscriptItem { + role: String, + content: String, +} + +#[derive(Debug, Deserialize)] +#[serde(tag = "kind", rename_all = "snake_case")] +enum ClientWorkerEventWsFrame { + Event { + envelope: ClientWorkerEventWsEnvelope, + }, + Diagnostic { + diagnostic: ClientWorkerEventWsDiagnostic, + }, +} + +#[derive(Debug, Deserialize)] +struct ClientWorkerEventWsEnvelope { + cursor: String, + runtime_id: String, + worker_id: String, + payload: Event, +} + +#[derive(Debug, Deserialize)] +struct ClientWorkerEventWsDiagnostic { + code: String, + message: String, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn backend_command_maps_run_to_user_input_without_runtime_endpoint() { + let method = Method::Run { + input: vec![ + Segment::text("hello"), + Segment::FileRef { + path: "src/lib.rs".into(), + }, + ], + }; + assert_eq!( + backend_command_from_method(&method), + BackendCommand::Input { + kind: WorkerInputKind::User, + content: "hello@src/lib.rs".to_string(), + } + ); + } + + #[test] + fn observation_url_uses_backend_runtime_worker_identity() { + let target = + BackendRuntimeTarget::new("http://127.0.0.1:8787/", "runtime/one", "worker one"); + assert_eq!( + observation_ws_url(&target, Some("bo_0000000000000001")), + "ws://127.0.0.1:8787/api/runtimes/runtime%2Fone/workers/worker%20one/events/ws?cursor=bo_0000000000000001" + ); + } + + #[test] + fn transcript_projection_seeds_snapshot_and_chat_events() { + let target = BackendRuntimeTarget::new("http://backend", "runtime-a", "worker-b"); + let events = transcript_projection_to_events( + &target, + WorkerTranscriptProjection { + items: vec![ + WorkerTranscriptItem { + role: "user".to_string(), + content: "hi".to_string(), + }, + WorkerTranscriptItem { + role: "assistant".to_string(), + content: "hello".to_string(), + }, + ], + diagnostics: Vec::new(), + }, + ); + assert!(matches!(events[0], Event::Snapshot { .. })); + assert!(matches!(events[1], Event::UserMessage { .. })); + assert!(matches!(events[2], Event::TextDelta { .. })); + assert!(matches!(events[3], Event::TextDone { .. })); + } +} diff --git a/crates/client/src/lib.rs b/crates/client/src/lib.rs index 13995380..cd918676 100644 --- a/crates/client/src/lib.rs +++ b/crates/client/src/lib.rs @@ -8,11 +8,13 @@ //! //! TUI / GUI / E2E ハーネスはこの crate に依存して protocol を喋る。 +pub mod backend_runtime; pub mod runtime_command; pub mod spawn; pub mod ticket_role; mod worker_client; +pub use backend_runtime::{BackendRuntimeClient, BackendRuntimeClientError, BackendRuntimeTarget}; pub use runtime_command::WorkerRuntimeCommand; pub use spawn::{ diff --git a/crates/tui/src/console/mod.rs b/crates/tui/src/console/mod.rs index 214aee22..a8a9516d 100644 --- a/crates/tui/src/console/mod.rs +++ b/crates/tui/src/console/mod.rs @@ -16,16 +16,16 @@ use crossterm::event::{ }; use crossterm::terminal::{EnterAlternateScreen, LeaveAlternateScreen}; use crossterm::{Command, execute}; +use protocol::{Event, Method, WorkerStatus}; #[cfg(feature = "e2e-test")] -use protocol::{Event, Greeting, RewindSummary, RewindTarget, RewindTargetId, Segment}; -use protocol::{Method, WorkerStatus}; +use protocol::{Greeting, RewindSummary, RewindTarget, RewindTargetId, Segment}; use ratatui::Terminal; use ratatui::backend::CrosstermBackend; use session_store::SegmentId; use tokio::sync::mpsc; use base64::{Engine as _, engine::general_purpose::STANDARD as BASE64_STANDARD}; -use client::{WorkerClient, WorkerRuntimeCommand}; +use client::{BackendRuntimeClient, BackendRuntimeTarget, WorkerClient, WorkerRuntimeCommand}; use crate::app::{ActionbarNoticeLevel, ActionbarNoticeSource, App}; use crate::composer_keys::{ComposerEditAction, composer_edit_action}; @@ -171,6 +171,54 @@ pub(crate) async fn run_worker_name( result } +enum ConsoleConnection { + LegacySocket(WorkerClient), + BackendRuntime(BackendRuntimeClient), +} + +impl ConsoleConnection { + fn try_next_event(&mut self) -> Option { + match self { + Self::LegacySocket(client) => client.try_next_event(), + Self::BackendRuntime(client) => client.try_next_event(), + } + } + + async fn next_event(&mut self) -> Option { + match self { + Self::LegacySocket(client) => client.next_event().await, + Self::BackendRuntime(client) => client.next_event().await, + } + } + + async fn send(&mut self, method: &Method) -> Result<(), Box> { + match self { + Self::LegacySocket(client) => Ok(client.send(method).await?), + Self::BackendRuntime(client) => Ok(client.send(method).await?), + } + } +} + +pub(crate) async fn run_backend_runtime( + target: BackendRuntimeTarget, +) -> Result<(), Box> { + let worker_label = target.display_label(); + let client = BackendRuntimeClient::connect(target).await?; + let mut terminal = enter_fullscreen()?; + let workspace_root = std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from(".")); + let mut app = App::new_with_persistent_input_history(worker_label, &workspace_root); + app.connected = true; + let result = run_loop( + &mut terminal, + &mut app, + ConsoleConnection::BackendRuntime(client), + None, + ) + .await; + let _ = leave_fullscreen(&mut terminal); + result +} + async fn run_connected_pod( terminal: &mut ConsoleTerminal, worker_name: String, @@ -180,7 +228,13 @@ async fn run_connected_pod( let workspace_root = std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from(".")); let mut app = App::new_with_persistent_input_history(worker_name, &workspace_root); app.connected = true; - run_loop(terminal, &mut app, client, runtime_command).await + run_loop( + terminal, + &mut app, + ConsoleConnection::LegacySocket(client), + Some(runtime_command), + ) + .await } pub(crate) async fn open_from_dashboard( @@ -396,7 +450,13 @@ async fn run( app.connected = true; // The Worker sends `Event::Snapshot` automatically on connect; // no explicit method call is required to fetch history. - run_loop(terminal, &mut app, client, runtime_command).await?; + run_loop( + terminal, + &mut app, + ConsoleConnection::LegacySocket(client), + Some(runtime_command), + ) + .await?; } Err(e) => { app.push_error(format!( @@ -673,9 +733,9 @@ where async fn drain_terminal_events( app: &mut App, - client: &mut WorkerClient, + client: &mut ConsoleConnection, term_rx: &mut mpsc::UnboundedReceiver, - runtime_command: &WorkerRuntimeCommand, + runtime_command: Option<&WorkerRuntimeCommand>, ) -> Result> { let mut handled = false; for _ in 0..TERMINAL_EVENT_DRAIN_LIMIT { @@ -701,7 +761,7 @@ async fn drain_terminal_events( async fn drain_worker_events( app: &mut App, - client: &mut WorkerClient, + client: &mut ConsoleConnection, ) -> Result> { let mut handled = false; for _ in 0..POD_EVENT_DRAIN_LIMIT { @@ -721,8 +781,8 @@ async fn drain_worker_events( async fn run_loop( terminal: &mut Terminal>, app: &mut App, - mut client: WorkerClient, - runtime_command: WorkerRuntimeCommand, + mut client: ConsoleConnection, + runtime_command: Option, ) -> Result<(), Box> { let (_terminal_reader, mut term_rx) = TerminalEventReader::spawn()?; @@ -734,7 +794,7 @@ async fn run_loop( } let handled_term_event = - drain_terminal_events(app, &mut client, &mut term_rx, &runtime_command).await?; + drain_terminal_events(app, &mut client, &mut term_rx, runtime_command.as_ref()).await?; if app.quit { break; } @@ -746,7 +806,8 @@ async fn run_loop( match next_loop_input(&mut term_rx, app.connected, client.next_event()).await { LoopInput::Terminal(term_event) => { - handle_terminal_event(app, &mut client, term_event?, &runtime_command).await?; + handle_terminal_event(app, &mut client, term_event?, runtime_command.as_ref()) + .await?; } LoopInput::Worker(event) => match event { Some(ev) => { @@ -770,9 +831,9 @@ async fn run_loop( async fn handle_terminal_event( app: &mut App, - client: &mut WorkerClient, + client: &mut ConsoleConnection, event: TermEvent, - _runtime_command: &WorkerRuntimeCommand, + _runtime_command: Option<&WorkerRuntimeCommand>, ) -> Result<(), Box> { match event { TermEvent::Key(key) => { diff --git a/crates/tui/src/lib.rs b/crates/tui/src/lib.rs index f5de189e..902732df 100644 --- a/crates/tui/src/lib.rs +++ b/crates/tui/src/lib.rs @@ -33,7 +33,7 @@ use crossterm::execute; use crossterm::terminal::{LeaveAlternateScreen, disable_raw_mode, enable_raw_mode}; use session_store::SegmentId; -use client::WorkerRuntimeCommand; +use client::{BackendRuntimeTarget, WorkerRuntimeCommand}; #[derive(Debug, Clone)] pub struct LaunchOptions { @@ -55,6 +55,9 @@ pub enum LaunchMode { worker_name: String, socket_override: Option, }, + /// `yoi --backend --runtime-id --worker-id `: connect through the + /// Workspace Backend Runtime API and observe the Backend-proxied event stream. + BackendRuntime { target: BackendRuntimeTarget }, /// `yoi resume`: open the Worker picker, then attach to the selected live Worker /// or restore the selected stopped Worker by name. Without `--all`, the picker /// is scoped to the current runtime workspace. @@ -103,6 +106,7 @@ pub async fn launch(options: LaunchOptions) -> ExitCode { worker_name, socket_override, } => console::run_worker_name(worker_name, socket_override, runtime_command).await, + LaunchMode::BackendRuntime { target } => console::run_backend_runtime(target).await, LaunchMode::Resume { all } => { console::run_resume(runtime_command, workspace_root.clone(), all).await } diff --git a/crates/yoi/src/main.rs b/crates/yoi/src/main.rs index 8a7dd1c8..a8c1bb02 100644 --- a/crates/yoi/src/main.rs +++ b/crates/yoi/src/main.rs @@ -11,7 +11,7 @@ use std::fmt; use std::path::PathBuf; use std::process::{Command, ExitCode}; -use client::WorkerRuntimeCommand; +use client::{BackendRuntimeTarget, WorkerRuntimeCommand}; use memory_lint::{LintCliOptions, LintStatus}; use session_store::SegmentId; use tui::{LaunchMode, LaunchOptions}; @@ -286,6 +286,9 @@ fn parse_console_options(args: &[String]) -> Result { let mut session = None; let mut worker_name = None; let mut socket_override = None; + let mut backend_url = None; + let mut runtime_id = None; + let mut worker_id = None; let mut profile = None; let mut i = 0; @@ -329,6 +332,36 @@ fn parse_console_options(args: &[String]) -> Result { workspace_root = PathBuf::from(value); i += 2; } + "--backend" => { + let value = args + .get(i + 1) + .ok_or_else(|| ParseError("--backend requires a URL".to_string()))?; + if value.starts_with('-') || value.is_empty() { + return Err(ParseError("--backend requires a URL".to_string())); + } + backend_url = Some(value.clone()); + i += 2; + } + "--runtime-id" | "--runtime" => { + let value = args + .get(i + 1) + .ok_or_else(|| ParseError("--runtime-id requires a value".to_string()))?; + if value.starts_with('-') || value.is_empty() { + return Err(ParseError("--runtime-id requires a value".to_string())); + } + runtime_id = Some(value.clone()); + i += 2; + } + "--worker-id" => { + let value = args + .get(i + 1) + .ok_or_else(|| ParseError("--worker-id requires a value".to_string()))?; + if value.starts_with('-') || value.is_empty() { + return Err(ParseError("--worker-id requires a value".to_string())); + } + worker_id = Some(value.clone()); + i += 2; + } "--profile" => { let value = args .get(i + 1) @@ -371,6 +404,38 @@ fn parse_console_options(args: &[String]) -> Result { workspace_root = PathBuf::from(value); i += 1; } + arg if arg.starts_with("--backend=") => { + let value = arg.trim_start_matches("--backend="); + if value.is_empty() { + return Err(ParseError("--backend requires a URL".to_string())); + } + backend_url = Some(value.to_string()); + i += 1; + } + arg if arg.starts_with("--runtime-id=") => { + let value = arg.trim_start_matches("--runtime-id="); + if value.is_empty() { + return Err(ParseError("--runtime-id requires a value".to_string())); + } + runtime_id = Some(value.to_string()); + i += 1; + } + arg if arg.starts_with("--runtime=") => { + let value = arg.trim_start_matches("--runtime="); + if value.is_empty() { + return Err(ParseError("--runtime-id requires a value".to_string())); + } + runtime_id = Some(value.to_string()); + i += 1; + } + arg if arg.starts_with("--worker-id=") => { + let value = arg.trim_start_matches("--worker-id="); + if value.is_empty() { + return Err(ParseError("--worker-id requires a value".to_string())); + } + worker_id = Some(value.to_string()); + i += 1; + } arg if arg.starts_with("--profile=") => { let value = arg.trim_start_matches("--profile="); if value.is_empty() { @@ -390,6 +455,26 @@ fn parse_console_options(args: &[String]) -> Result { } } + let backend_target_present = + backend_url.is_some() || runtime_id.is_some() || worker_id.is_some(); + if backend_target_present + && (backend_url.is_none() || runtime_id.is_none() || worker_id.is_none()) + { + return Err(ParseError( + "--backend, --runtime-id, and --worker-id are required together".to_string(), + )); + } + if backend_target_present + && (session.is_some() + || worker_name.is_some() + || socket_override.is_some() + || profile.is_some()) + { + return Err(ParseError( + "Backend Runtime API target cannot be combined with --worker, --socket, --session, or --profile".to_string(), + )); + } + if profile.is_some() && (session.is_some() || socket_override.is_some()) { return Err(ParseError( "--profile can only be used for fresh spawn".to_string(), @@ -404,6 +489,19 @@ fn parse_console_options(args: &[String]) -> Result { )); } + if backend_target_present { + return Ok(Mode::Tui { + mode: LaunchMode::BackendRuntime { + target: BackendRuntimeTarget::new( + backend_url.expect("checked by backend_target_present"), + runtime_id.expect("checked by backend_target_present"), + worker_id.expect("checked by backend_target_present"), + ), + }, + workspace_root, + }); + } + if let Some(profile) = profile { return Ok(Mode::Tui { mode: LaunchMode::Spawn { @@ -901,7 +999,7 @@ fn parse_session_id(value: &str) -> Result { fn print_help() { println!( - "yoi\n\nUsage:\n yoi [OPTIONS]\n yoi resume [--workspace ] [--all]\n yoi panel [--workspace ]\n yoi keys\n yoi setup-model\n yoi worker [WORKER_OPTIONS]\n yoi worker delete [--force] [--dry-run]\n yoi worker prune --older-than [--force] [--dry-run]\n yoi objective [OPTIONS]\n yoi session analyze --json\n yoi session prune --unreferenced [--older-than ] [--force] [--dry-run]\n yoi ticket [OPTIONS]\n yoi workspace serve [OPTIONS]\n yoi plugin new [--json]\n yoi plugin check [--json]\n yoi plugin pack [--output ] [--json]\n yoi plugin list [--workspace ] [--profile ] [--json]\n yoi plugin show [--workspace ] [--profile ] [--json]\n yoi mcp list [--workspace ] [--profile ] [--json]\n yoi mcp show [--workspace ] [--profile ] [--json]\n yoi mcp tools|resources|prompts [SERVER] [--workspace ] [--profile ] [--json]\n yoi memory lint [OPTIONS]\n\nSurfaces:\n Console Single-Worker chat/client surface (default, --worker, yoi resume)\n Dashboard Workspace cockpit/action surface (yoi panel)\n TUI Terminal UI implementation umbrella for Console and Dashboard\n\nOptions:\n --workspace Runtime workspace root for default Console/--worker (defaults to cwd)\n --worker Open the Worker Console by name (attach/restore/create)\n --socket Attach a Worker Console to a specific socket with --worker\n --session Resume a specific session segment in the Worker Console\n --profile Select a reusable Profile recipe\n -h, --help Print help\n" + "yoi\n\nUsage:\n yoi [OPTIONS]\n yoi resume [--workspace ] [--all]\n yoi panel [--workspace ]\n yoi keys\n yoi setup-model\n yoi worker [WORKER_OPTIONS]\n yoi worker delete [--force] [--dry-run]\n yoi worker prune --older-than [--force] [--dry-run]\n yoi objective [OPTIONS]\n yoi session analyze --json\n yoi session prune --unreferenced [--older-than ] [--force] [--dry-run]\n yoi ticket [OPTIONS]\n yoi workspace serve [OPTIONS]\n yoi plugin new [--json]\n yoi plugin check [--json]\n yoi plugin pack [--output ] [--json]\n yoi plugin list [--workspace ] [--profile ] [--json]\n yoi plugin show [--workspace ] [--profile ] [--json]\n yoi mcp list [--workspace ] [--profile ] [--json]\n yoi mcp show [--workspace ] [--profile ] [--json]\n yoi mcp tools|resources|prompts [SERVER] [--workspace ] [--profile ] [--json]\n yoi memory lint [OPTIONS]\n\nSurfaces:\n Console Single-Worker chat/client surface (default, --worker, yoi resume, Backend Runtime target)\n Dashboard Workspace cockpit/action surface (yoi panel)\n TUI Terminal UI implementation umbrella for Console and Dashboard\n\nOptions:\n --workspace Runtime workspace root for default Console/--worker (defaults to cwd)\n --worker Open the Worker Console by name (attach/restore/create)\n --socket Attach a Worker Console to a specific socket with --worker\n --session Resume a specific session segment in the Worker Console\n --profile Select a reusable Profile recipe\n -h, --help Print help\n" ); } @@ -945,6 +1043,59 @@ mod tests { } } + #[test] + fn parse_backend_runtime_target_mode() { + match parse_args_from([ + "--backend", + "http://127.0.0.1:8787", + "--runtime-id", + "runtime-a", + "--worker-id", + "worker-b", + ]) + .unwrap() + { + Mode::Tui { + mode: LaunchMode::BackendRuntime { target }, + .. + } => { + assert_eq!(target.base_url, "http://127.0.0.1:8787"); + assert_eq!(target.runtime_id, "runtime-a"); + assert_eq!(target.worker_id, "worker-b"); + } + _ => panic!("expected BackendRuntime mode"), + } + } + + #[test] + fn parse_backend_runtime_target_requires_complete_identity() { + let err = parse_args_from(["--backend", "http://127.0.0.1:8787", "--worker-id", "w"]) + .unwrap_err(); + assert_eq!( + err.to_string(), + "--backend, --runtime-id, and --worker-id are required together" + ); + } + + #[test] + fn parse_backend_runtime_target_rejects_legacy_socket_mix() { + let err = parse_args_from([ + "--backend", + "http://127.0.0.1:8787", + "--runtime-id", + "r", + "--worker-id", + "w", + "--worker", + "agent", + ]) + .unwrap_err(); + assert_eq!( + err.to_string(), + "Backend Runtime API target cannot be combined with --worker, --socket, --session, or --profile" + ); + } + #[test] fn parse_bare_word_is_unknown_command() { let err = parse_args_from(["agent"]).unwrap_err(); diff --git a/package.nix b/package.nix index ddfeb04e..ffab8968 100644 --- a/package.nix +++ b/package.nix @@ -43,7 +43,7 @@ rustPlatform.buildRustPackage rec { filter = sourceFilter; }; - cargoHash = "sha256-/7qrJH25rQSV2tKMOVUSu6ISUuEi+4WdwuX0E94LZYg="; + cargoHash = "sha256-fdmGo/HE80wRSLE/u20YXS63G/vvHx43uoc9BivZUxQ="; depsExtraArgs = { # Older fetchCargoVendor utilities used crates.io's API download endpoint,